#include "llite_internal.h"
+cfs_spinlock_t ll_lookup_lock = CFS_SPIN_LOCK_UNLOCKED;
/* should NOT be called with the dcache lock, see fs/dcache.c */
static void ll_release(struct dentry *de)
__d_drop(dentry);
unlock_dentry(dentry);
spin_unlock(&dcache_lock);
+ cfs_spin_unlock(&ll_lookup_lock);
dput(dentry);
+ cfs_spin_lock(&ll_lookup_lock);
spin_lock(&dcache_lock);
return 1;
}
inode->i_ino, inode->i_generation, inode);
head = &inode->i_dentry;
+ cfs_spin_lock(&ll_lookup_lock);
spin_lock(&dcache_lock);
restart:
tmp = head;
goto restart;
}
spin_unlock(&dcache_lock);
+ cfs_spin_unlock(&ll_lookup_lock);
EXIT;
}
RETURN(rc);
}
-void ll_finish_locks(struct lookup_intent *it, struct dentry *dentry)
+void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
{
LASSERT(it != NULL);
LASSERT(dentry != NULL);
/* unfortunately ll_intent_lock may cause a callback and revoke our
* dentry */
+ cfs_spin_lock(&ll_lookup_lock);
spin_lock(&dcache_lock);
lock_dentry(de);
__d_drop(de);
unlock_dentry(de);
d_rehash_cond(de, 0);
spin_unlock(&dcache_lock);
+ cfs_spin_unlock(&ll_lookup_lock);
out:
/* We do not free request as it may be reused during following lookup
ptlrpc_req_finished(req);
if (rc == 0) {
ll_unhash_aliases(de->d_inode);
+ /* done in ll_unhash_aliases()
+ dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
} else {
CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
"inode %p refc %d\n", de->d_name.len,
de->d_name.name, de, de->d_parent, de->d_inode,
atomic_read(&de->d_count));
- ll_finish_locks(it, de);
+ ll_lookup_finish_locks(it, de);
lock_dentry(de);
de->d_flags &= ~DCACHE_LUSTRE_INVALID;
unlock_dentry(de);
/* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */
if (it->it_op == IT_GETATTR)
lookup_it.it_op = IT_GETATTR;
- ll_finish_locks(it, de);
+ ll_lookup_finish_locks(it, de);
it = &lookup_it;
}
if (request)
ptlrpc_req_finished(request);
if (rc < 0) {
- CERROR("lock enqueue: "DFID" at "LPU64": rc %d\n",
- PFID(ll_inode2fid(dir)), hash, rc);
+ CERROR("lock enqueue: rc: %d\n", rc);
return ERR_PTR(rc);
}
+ } else {
+ /* for cross-ref object, l_ast_data of the lock may not be set,
+ * we reset it here */
+ md_set_lock_data(ll_i2sbi(dir)->ll_md_exp, &lockh.cookie,
+ dir, NULL);
}
- md_set_lock_data(ll_i2sbi(dir)->ll_md_exp, &lockh.cookie, dir, NULL);
ldlm_lock_dump_handle(D_OTHER, &lockh);
page = ll_dir_page_locate(dir, hash, &start, &end);
- if (IS_ERR(page)) {
- CERROR("dir page locate: "DFID" at "LPU64": rc %ld\n",
- PFID(ll_inode2fid(dir)), hash, PTR_ERR(page));
+ if (IS_ERR(page))
GOTO(out_unlock, page);
- }
if (page != NULL) {
/*
page = read_cache_page(mapping, hash_x_index((unsigned long)hash),
(filler_t*)mapping->a_ops->readpage, NULL);
- if (IS_ERR(page)) {
- CERROR("read cache page: "DFID" at "LPU64": rc %ld\n",
- PFID(ll_inode2fid(dir)), hash, PTR_ERR(page));
+ if (IS_ERR(page))
GOTO(out_unlock, page);
- }
wait_on_page(page);
(void)kmap(page);
- if (!PageUptodate(page)) {
- CERROR("page not updated: "DFID" at "LPU64": rc %d\n",
- PFID(ll_inode2fid(dir)), hash, -5);
+ if (!PageUptodate(page))
goto fail;
- }
if (!PageChecked(page))
ll_check_page(dir, page);
- if (PageError(page)) {
- CERROR("page error: "DFID" at "LPU64": rc %d\n",
- PFID(ll_inode2fid(dir)), hash, -5);
+ if (PageError(page))
goto fail;
- }
hash_collision:
dp = page_address(page);
here to preserve get_cwd functionality on 2.6.
Bug 10503 */
if (!dentry->d_inode->i_nlink) {
+ cfs_spin_lock(&ll_lookup_lock);
spin_lock(&dcache_lock);
ll_drop_dentry(dentry);
spin_unlock(&dcache_lock);
+ cfs_spin_unlock(&ll_lookup_lock);
}
- ll_finish_locks(&oit, dentry);
+ ll_lookup_finish_locks(&oit, dentry);
} else if (!ll_have_md_lock(dentry->d_inode, ibits)) {
struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
/* metadata stat-ahead */
unsigned int ll_sa_max; /* max statahead RPCs */
- cfs_atomic_t ll_sa_total; /* statahead thread started
- * count */
- cfs_atomic_t ll_sa_wrong; /* statahead thread stopped for
+ unsigned int ll_sa_wrong; /* statahead thread stopped for
* low hit ratio */
+ unsigned int ll_sa_total; /* statahead thread started
+ * count */
+ unsigned long long ll_sa_blocked; /* ls count waiting for
+ * statahead */
+ unsigned long long ll_sa_cached; /* ls count got in cache */
+ unsigned long long ll_sa_hit; /* hit count */
+ unsigned long long ll_sa_miss; /* miss count */
dev_t ll_sdev_orig; /* save s_dev before assign for
* clustred nfs */
#endif
int ll_lookup_it_finish(struct ptlrpc_request *request,
struct lookup_intent *it, void *data);
+void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry);
/* llite/rw.c */
int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to);
/**
* protect race ll_find_aliases vs ll_revalidate_it vs ll_unhash_aliases
*/
+extern cfs_spinlock_t ll_lookup_lock;
extern struct dentry_operations ll_d_ops;
void ll_intent_drop_lock(struct lookup_intent *);
void ll_intent_release(struct lookup_intent *);
int ll_drop_dentry(struct dentry *dentry);
void ll_unhash_aliases(struct inode *);
void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft);
-void ll_finish_locks(struct lookup_intent *it, struct dentry *dentry);
+void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry);
int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name);
int ll_revalidate_it_finish(struct ptlrpc_request *request,
struct lookup_intent *it, struct dentry *de);
/* metadata statahead is enabled by default */
sbi->ll_sa_max = LL_SA_RPC_DEF;
- atomic_set(&sbi->ll_sa_total, 0);
- atomic_set(&sbi->ll_sa_wrong, 0);
RETURN(sbi);
}
struct ll_sb_info *sbi = ll_s2sbi(sb);
return snprintf(page, count,
+ "statahead wrong: %u\n"
"statahead total: %u\n"
- "statahead wrong: %u\n",
- atomic_read(&sbi->ll_sa_total),
- atomic_read(&sbi->ll_sa_wrong));
+ "ls blocked: %llu\n"
+ "ls cached: %llu\n"
+ "hit count: %llu\n"
+ "miss count: %llu\n",
+ sbi->ll_sa_wrong,
+ sbi->ll_sa_total,
+ sbi->ll_sa_blocked,
+ sbi->ll_sa_cached,
+ sbi->ll_sa_hit,
+ sbi->ll_sa_miss);
}
static int ll_rd_lazystatfs(char *page, char **start, off_t off,
{
struct dentry *dentry, *tmp_alias, *tmp_subdir;
+ cfs_spin_lock(&ll_lookup_lock);
spin_lock(&dcache_lock);
restart:
list_for_each_entry_safe(dentry, tmp_alias,
}
}
spin_unlock(&dcache_lock);
+ cfs_spin_unlock(&ll_lookup_lock);
}
struct dentry *dentry;
struct dentry *last_discon = NULL;
+ cfs_spin_lock(&ll_lookup_lock);
spin_lock(&dcache_lock);
list_for_each(tmp, &inode->i_dentry) {
dentry = list_entry(tmp, struct dentry, d_alias);
ll_dops_init(dentry, 0);
d_rehash_cond(dentry, 0); /* avoid taking dcache_lock inside */
spin_unlock(&dcache_lock);
+ cfs_spin_unlock(&ll_lookup_lock);
iput(inode);
CDEBUG(D_DENTRY, "alias dentry %.*s (%p) parent %p inode %p "
"refc %d\n", de->d_name.len, de->d_name.name, de,
last_discon->d_flags |= DCACHE_LUSTRE_INVALID;
unlock_dentry(last_discon);
spin_unlock(&dcache_lock);
+ cfs_spin_unlock(&ll_lookup_lock);
ll_dops_init(last_discon, 1);
d_rehash(de);
d_move(last_discon, de);
ll_d_add(de, inode);
spin_unlock(&dcache_lock);
+ cfs_spin_unlock(&ll_lookup_lock);
return de;
}
/* we have lookup look - unhide dentry */
if (bits & MDS_INODELOCK_LOOKUP) {
lock_dentry(*de);
- (*de)->d_flags &= ~DCACHE_LUSTRE_INVALID;
+ (*de)->d_flags &= ~(DCACHE_LUSTRE_INVALID);
unlock_dentry(*de);
}
} else {
!S_ISDIR(dentry->d_inode->i_mode)) {
ll_release_openhandle(dentry, it);
}
- ll_finish_locks(it, dentry);
+ ll_lookup_finish_locks(it, dentry);
if (dentry == save)
GOTO(out, retval = NULL);
CDEBUG(D_READA, "alloc sai entry %p index %u\n",
entry, index);
entry->se_index = index;
- entry->se_stat = SA_ENTRY_UNSTATED;
+ entry->se_stat = SA_ENTRY_UNSTATED;
cfs_spin_lock(&lli->lli_lock);
cfs_list_add_tail(&entry->se_list, &sai->sai_entries_sent);
* delete it from sai_entries_stated head when fini, it need not
* to process entry's member.
*/
-static int ll_sai_entry_fini(struct ll_statahead_info *sai)
+static void ll_sai_entry_fini(struct ll_statahead_info *sai)
{
struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
struct ll_sai_entry *entry;
- int rc = 0;
ENTRY;
cfs_spin_lock(&lli->lli_lock);
struct ll_sai_entry, se_list);
if (entry->se_index < sai->sai_index_next) {
cfs_list_del(&entry->se_list);
- rc = entry->se_stat;
OBD_FREE_PTR(entry);
}
- } else {
+ } else
LASSERT(sa_is_stopped(sai));
- }
cfs_spin_unlock(&lli->lli_lock);
- RETURN(rc);
+ EXIT;
}
/**
entry->se_req = ptlrpc_request_addref(req);
entry->se_minfo = minfo;
RETURN(entry);
- } else if (entry->se_index > index) {
+ } else if (entry->se_index > index)
RETURN(NULL);
- }
}
}
RETURN(NULL);
if (body->valid & OBD_MD_MDS)
GOTO(out, rc = -EAGAIN);
- /* BUG 15962: if statahead insert dentry into dcache (for
- * lookup),it should hold parent dir's i_mutex to synchronize
- * with other operations from VFS layer.
- * E.g.: create/delete/rename/lookup, and so on. */
- mutex_lock(&minfo->mi_dir->i_mutex);
rc = ll_lookup_it_finish(req, it, &icbd);
- mutex_unlock(&minfo->mi_dir->i_mutex);
if (!rc)
/*
* Here dentry->d_inode might be NULL,
* because the entry may have been removed before
* we start doing stat ahead.
*/
- ll_finish_locks(it, dentry);
+ ll_lookup_finish_locks(it, dentry);
if (dentry != save) {
minfo->mi_dentry = dentry;
GOTO(out, rc);
}
+ cfs_spin_lock(&ll_lookup_lock);
spin_lock(&dcache_lock);
lock_dentry(dentry);
__d_drop(dentry);
unlock_dentry(dentry);
d_rehash_cond(dentry, 0);
spin_unlock(&dcache_lock);
+ cfs_spin_unlock(&ll_lookup_lock);
- ll_finish_locks(it, dentry);
+ ll_lookup_finish_locks(it, dentry);
}
EXIT;
sai = ll_sai_get(lli->lli_sai);
entry = ll_sai_entry_set(sai,
(unsigned int)(long)minfo->mi_cbdata,
- rc < 0 ? rc : SA_ENTRY_STATED, req,
- minfo);
+ rc ? SA_ENTRY_UNSTATED :
+ SA_ENTRY_STATED, req, minfo);
LASSERT(entry != NULL);
if (likely(sa_is_running(sai))) {
ll_sai_entry_to_received(sai, entry);
int rc;
ENTRY;
- if (unlikely(inode == NULL))
+ if (inode == NULL)
RETURN(1);
if (d_mountpoint(dentry))
RETURN(1);
- if (unlikely(dentry == dentry->d_sb->s_root))
+ if (dentry == dentry->d_sb->s_root)
RETURN(1);
rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode));
if (rc) {
CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n",
se, se->se_index, se->se_stat, rc);
- se->se_stat = rc < 0 ? rc : SA_ENTRY_STATED;
+ se->se_stat = rc;
if (ll_sai_entry_to_stated(sai, se))
cfs_waitq_signal(&sai->sai_waitq);
} else {
cfs_daemonize(pname);
}
- atomic_inc(&sbi->ll_sa_total);
+ sbi->ll_sa_total++;
cfs_spin_lock(&lli->lli_lock);
thread->t_flags = SVC_RUNNING;
cfs_spin_unlock(&lli->lli_lock);
if (IS_ERR(page)) {
rc = PTR_ERR(page);
- CDEBUG(D_READA, "error reading dir "DFID" at "LPU64
- "/%u: [rc %d] [parent %u]\n",
- PFID(ll_inode2fid(dir)), pos, sai->sai_index,
- rc, lli->lli_opendir_pid);
+ CDEBUG(D_READA, "error reading dir "DFID" at "LPU64"/%u: rc %d\n",
+ PFID(ll_inode2fid(dir)), pos,
+ sai->sai_index, rc);
break;
}
struct lu_dirent *ent;
if (IS_ERR(page)) {
- struct ll_inode_info *lli = ll_i2info(dir);
-
rc = PTR_ERR(page);
- CERROR("error reading dir "DFID" at "LPU64": "
- "[rc %d] [parent %u]\n",
- PFID(ll_inode2fid(dir)), pos,
- rc, lli->lli_opendir_pid);
+ CERROR("error reading dir "DFID" at "LPU64": rc %d\n",
+ PFID(ll_inode2fid(dir)), pos, rc);
break;
}
sai = lli->lli_sai;
if (sai) {
+ struct ll_sb_info *sbi;
+
if (unlikely(sa_is_stopped(sai) &&
cfs_list_empty(&sai->sai_entries_stated)))
RETURN(-EBADFD);
}
}
- if (!ll_sai_entry_stated(sai)) {
- /* BUG 15962:
- *
- * If statahead insert dentry into dcache (for lookup),
- * it should hold parent dir's i_mutex to synchronize
- * with other operations from VFS layer.
- * E.g.: create/delete/rename/lookup, and so on.
- *
- * To prevent the dead lock between statahead and its
- * parent process, the parent process should release
- * such i_mutex before waiting for statahead to fetch
- * related dentry attribute from MDS.
- *
- * It is no matter for parent process to release such
- * i_mutex temporary, if someone else create dentry for
- * the same item in such interval, we can find it after
- * woke up by statahead. */
- if (lookup) {
- LASSERT(mutex_is_locked(&dir->i_mutex));
- mutex_unlock(&dir->i_mutex);
- }
+ sbi = ll_i2sbi(dir);
+ if (ll_sai_entry_stated(sai)) {
+ sbi->ll_sa_cached++;
+ } else {
+ sbi->ll_sa_blocked++;
+ /*
+ * thread started already, avoid double-stat.
+ */
rc = l_wait_event(sai->sai_waitq,
ll_sai_entry_stated(sai) ||
sa_is_stopped(sai),
&lwi);
- if (lookup)
- mutex_lock(&dir->i_mutex);
}
if (lookup) {
struct ll_statahead_info *sai;
struct ll_sb_info *sbi;
struct ll_dentry_data *ldd = ll_d2d(dentry);
- int rc;
ENTRY;
LASSERT(dir != NULL);
LASSERT(sai != NULL);
sbi = ll_i2sbi(dir);
- rc = ll_sai_entry_fini(sai);
- /* rc == -ENOENT means such dentry was removed just between statahead
- * readdir and pre-fetched, count it as hit.
- *
- * result == -ENOENT has two meanings:
- * 1. such dentry was removed just between statahead pre-fetched and
- * main process stat such dentry.
- * 2. main process stat non-exist dentry.
- * Since we can distinguish such two cases, just count it as miss. */
- if (result >= 1 || unlikely(rc == -ENOENT)) {
+ if (result >= 1) {
+ sbi->ll_sa_hit++;
sai->sai_hit++;
sai->sai_consecutive_miss = 0;
sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
} else {
+ sbi->ll_sa_miss++;
sai->sai_miss++;
sai->sai_consecutive_miss++;
if (sa_low_hit(sai) && sa_is_running(sai)) {
- atomic_inc(&sbi->ll_sa_wrong);
+ sbi->ll_sa_wrong++;
CDEBUG(D_READA, "Statahead for dir "DFID" hit ratio "
"too low: hit/miss %u/%u, sent/replied %u/%u, "
"stopping statahead thread: pid %d\n",
if (!sa_is_stopped(sai))
cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
+ ll_sai_entry_fini(sai);
if (likely(ldd != NULL))
ldd->lld_sa_generation = sai->sai_generation;