return sai;
}
-static inline
+static inline
struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
{
LASSERT(sai);
LASSERT(sa_is_stopped(sai));
if (sai->sai_sent > sai->sai_replied)
- CDEBUG(D_READA,"statahead for dir "DFID" does not "
+ CDEBUG(D_READA,"statahead for dir %lu/%u does not "
"finish: [sent:%u] [replied:%u]\n",
- PFID(&lli->lli_fid),
+ inode->i_ino, inode->i_generation,
sai->sai_sent, sai->sai_replied);
list_for_each_entry_safe(entry, next, &sai->sai_entries_sent,
struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
struct ll_sai_entry *entry;
ENTRY;
-
+
spin_lock(&lli->lli_lock);
sai->sai_index_next++;
if (likely(!list_empty(&sai->sai_entries_stated))) {
struct lookup_intent *it;
struct dentry *dentry;
int rc = 0;
- struct mdt_body *body;
ENTRY;
spin_lock(&lli->lli_lock);
it = &minfo->mi_it;
dentry = minfo->mi_dentry;
- body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
- if (body == NULL)
- GOTO(out, rc = -EFAULT);
-
if (dentry->d_inode == NULL) {
/*
* lookup.
.icbd_childp = &dentry
};
- LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
-
- /*
- * XXX: No fid in reply, this is probaly cross-ref case.
- * SA can't handle it yet.
- */
- if (body->valid & OBD_MD_MDS)
- GOTO(out, rc = -EAGAIN);
-
- rc = ll_lookup_it_finish(req, it, &icbd);
+ rc = lookup_it_finish(req, DLM_REPLY_REC_OFF, it, &icbd);
if (!rc)
/*
* Here dentry->d_inode might be NULL,
/*
* revalidate.
*/
- if (!lu_fid_eq(&minfo->mi_data.op_fid2, &body->fid1)) {
+ struct mds_body *body;
+
+ body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
+ sizeof(*body));
+ if (memcmp(&minfo->mi_data.fid2, &body->fid1,
+ sizeof(body->fid1))) {
ll_unhash_aliases(dentry->d_inode);
GOTO(out, rc = -EAGAIN);
}
- rc = ll_revalidate_it_finish(req, it, dentry);
+ rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, dentry);
if (rc) {
ll_unhash_aliases(dentry->d_inode);
GOTO(out, rc);
spin_lock(&dcache_lock);
lock_dentry(dentry);
__d_drop(dentry);
-#ifdef DCACHE_LUSTRE_INVALID
dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
-#endif
unlock_dentry(dentry);
d_rehash_cond(dentry, 0);
spin_unlock(&dcache_lock);
return rc;
}
-static int ll_statahead_interpret(struct ptlrpc_request *req,
+static int ll_statahead_interpret(struct obd_export *exp,
+ struct ptlrpc_request *req,
struct md_enqueue_info *minfo,
int rc)
{
spin_lock(&lli->lli_lock);
if (unlikely(lli->lli_sai == NULL ||
- lli->lli_sai->sai_generation != minfo->mi_generation)) {
+ lli->lli_sai->sai_generation != minfo->mi_generation)) {
spin_unlock(&lli->lli_lock);
ll_intent_release(it);
dput(dentry);
{
LASSERT(minfo && einfo);
iput(minfo->mi_dir);
- capa_put(minfo->mi_data.op_capa1);
- capa_put(minfo->mi_data.op_capa2);
OBD_FREE_PTR(minfo);
OBD_FREE_PTR(einfo);
}
-/**
- * There is race condition between "capa_put" and "ll_statahead_interpret" for
- * accessing "op_data.op_capa[1,2]" as following:
- * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling
- * "md_intent_getattr_async". But "ll_statahead_interpret" maybe run first, and
- * fill "op_data.op_capa[1,2]" as POISON, then cause "capa_put" access invalid
- * "ocapa". So here reserve "op_data.op_capa[1,2]" in "pcapa" before calling
- * "md_intent_getattr_async".
- */
-static int sa_args_init(struct inode *dir, struct dentry *dentry,
+static int sa_args_prep(struct inode *dir, struct dentry *dentry,
struct md_enqueue_info **pmi,
- struct ldlm_enqueue_info **pei,
- struct obd_capa **pcapa)
+ struct ldlm_enqueue_info **pei)
{
struct ll_inode_info *lli = ll_i2info(dir);
struct md_enqueue_info *minfo;
struct ldlm_enqueue_info *einfo;
- struct md_op_data *op_data;
OBD_ALLOC_PTR(einfo);
if (einfo == NULL)
return -ENOMEM;
}
- op_data = ll_prep_md_op_data(&minfo->mi_data, dir, dentry->d_inode,
- dentry->d_name.name, dentry->d_name.len,
- 0, LUSTRE_OPC_ANY, NULL);
- if (IS_ERR(op_data)) {
- OBD_FREE_PTR(einfo);
- OBD_FREE_PTR(minfo);
- return PTR_ERR(op_data);
- }
-
minfo->mi_it.it_op = IT_GETATTR;
minfo->mi_dentry = dentry;
minfo->mi_dir = igrab(dir);
einfo->ei_type = LDLM_IBITS;
einfo->ei_mode = it_to_lock_mode(&minfo->mi_it);
- einfo->ei_cb_bl = ll_md_blocking_ast;
+ einfo->ei_cb_bl = ll_mdc_blocking_ast;
einfo->ei_cb_cp = ldlm_completion_ast;
einfo->ei_cb_gl = NULL;
einfo->ei_cbdata = NULL;
*pmi = minfo;
*pei = einfo;
- pcapa[0] = op_data->op_capa1;
- pcapa[1] = op_data->op_capa2;
return 0;
}
{
struct md_enqueue_info *minfo;
struct ldlm_enqueue_info *einfo;
- struct obd_capa *capas[2];
int rc;
ENTRY;
- rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
+ rc = sa_args_prep(dir, dentry, &minfo, &einfo);
if (rc)
RETURN(rc);
- rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
- if (!rc) {
- capa_put(capas[0]);
- capa_put(capas[1]);
- } else {
+ rc = ll_prepare_mdc_op_data(&minfo->mi_data, dir, NULL,
+ dentry->d_name.name, dentry->d_name.len, 0,
+ NULL);
+ if (rc == 0)
+ rc = mdc_intent_getattr_async(ll_i2mdcexp(dir), minfo, einfo);
+
+ if (rc)
sa_args_fini(minfo, einfo);
- }
RETURN(rc);
}
static int do_sa_revalidate(struct inode *dir, struct dentry *dentry)
{
struct inode *inode = dentry->d_inode;
+ struct ll_fid fid;
struct lookup_intent it = { .it_op = IT_GETATTR };
struct md_enqueue_info *minfo;
struct ldlm_enqueue_info *einfo;
- struct obd_capa *capas[2];
int rc;
ENTRY;
if (dentry == dentry->d_sb->s_root)
RETURN(1);
- rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode));
+ ll_inode2fid(&fid, inode);
+
+ rc = mdc_revalidate_lock(ll_i2mdcexp(dir), &it, &fid);
if (rc == 1) {
ll_intent_release(&it);
RETURN(1);
}
- rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
+ rc = sa_args_prep(dir, dentry, &minfo, &einfo);
if (rc)
RETURN(rc);
- rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
- if (!rc) {
- capa_put(capas[0]);
- capa_put(capas[1]);
- } else {
+ rc = ll_prepare_mdc_op_data(&minfo->mi_data, dir,
+ inode, dentry->d_name.name,
+ dentry->d_name.len, 0, NULL);
+ if (rc == 0)
+ rc = mdc_intent_getattr_async(ll_i2mdcexp(dir), minfo, einfo);
+
+ if (rc)
sa_args_fini(minfo, einfo);
- }
RETURN(rc);
}
q->hash = full_name_hash(name, namelen);
}
-static int ll_statahead_one(struct dentry *parent, const char* entry_name,
- int entry_name_len)
+static int ll_statahead_one(struct dentry *parent, struct ll_dir_entry *de)
{
struct inode *dir = parent->d_inode;
struct ll_inode_info *lli = ll_i2info(dir);
int rc;
ENTRY;
-#ifdef DCACHE_LUSTRE_INVALID
if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
-#else
- if (d_unhashed(parent)) {
-#endif
CDEBUG(D_READA, "parent dentry@%p %.*s is "
"invalid, skip statahead\n",
parent, parent->d_name.len, parent->d_name.name);
if (IS_ERR(se))
RETURN(PTR_ERR(se));
- ll_name2qstr(&name, entry_name, entry_name_len);
+ ll_name2qstr(&name, de->lde_name, de->lde_name_len);
dentry = d_lookup(parent, &name);
if (!dentry) {
dentry = d_alloc(parent, &name);
struct ll_sb_info *sbi = ll_i2sbi(dir);
struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
struct ptlrpc_thread *thread = &sai->sai_thread;
- struct page *page;
- __u64 pos = 0;
+ unsigned long index = 0;
int first = 0;
int rc = 0;
- struct ll_dir_chain chain;
ENTRY;
{
cfs_waitq_signal(&thread->t_ctl_waitq);
CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
- ll_dir_chain_init(&chain);
- page = ll_get_dir_page(dir, pos, 0, &chain);
-
while (1) {
struct l_wait_info lwi = { 0 };
- struct lu_dirpage *dp;
- struct lu_dirent *ent;
+ unsigned long npages;
+ char *kaddr, *limit;
+ struct ll_dir_entry *de;
+ struct page *page;
+
+ npages = dir_pages(dir);
+ /*
+ * reach the end of dir.
+ */
+ if (index >= npages) {
+ CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n",
+ index, npages);
+ while (1) {
+ l_wait_event(thread->t_ctl_waitq,
+ !sa_is_running(sai) ||
+ !sa_received_empty(sai) ||
+ sai->sai_sent == sai->sai_replied,
+ &lwi);
+ if (!sa_received_empty(sai) &&
+ sa_is_running(sai))
+ do_statahead_interpret(sai);
+ else
+ GOTO(out, rc);
+ }
+ }
+
+ page = ll_get_dir_page(dir, index);
if (IS_ERR(page)) {
rc = PTR_ERR(page);
- CERROR("error reading dir "DFID" at "LPU64"/%u: rc %d\n",
- PFID(ll_inode2fid(dir)), pos,
+ CERROR("error reading dir %lu/%u page %lu/%u: rc %d\n",
+ dir->i_ino, dir->i_generation, index,
sai->sai_index, rc);
break;
}
- dp = page_address(page);
- for (ent = lu_dirent_start(dp); ent != NULL;
- ent = lu_dirent_next(ent)) {
- char *name = ent->lde_name;
- int namelen = le16_to_cpu(ent->lde_namelen);
+ kaddr = page_address(page);
+ limit = kaddr + CFS_PAGE_SIZE - ll_dir_rec_len(1);
+ de = (struct ll_dir_entry *)kaddr;
+ if (!index) {
+ /*
+ * skip "."
+ */
+ de = ll_dir_next_entry(de);
+ /*
+ * skip ".."
+ */
+ de = ll_dir_next_entry(de);
+ }
+
+ for (; (char*)de <= limit; de = ll_dir_next_entry(de)) {
+ if (de->lde_inode == 0)
+ continue;
- if (namelen == 0)
+ if (de->lde_name[0] == '.' && !sai->sai_ls_all) {
/*
- * Skip dummy record.
+ * skip hidden files..
*/
+ sai->sai_skip_hidden++;
continue;
-
- if (name[0] == '.') {
- if (namelen == 1) {
- /*
- * skip "."
- */
- continue;
- } else if (name[1] == '.' && namelen == 2) {
- /*
- * skip ".."
- */
- continue;
- } else if (!sai->sai_ls_all) {
- /*
- * skip hidden files.
- */
- sai->sai_skip_hidden++;
- continue;
- }
}
/*
*/
goto keep_de;
- rc = ll_statahead_one(parent, name, namelen);
+ rc = ll_statahead_one(parent, de);
if (rc < 0) {
ll_put_page(page);
GOTO(out, rc);
}
}
- pos = le64_to_cpu(dp->ldp_hash_end);
ll_put_page(page);
- if (pos == DIR_END_OFF) {
- /*
- * End of directory reached.
- */
- while (1) {
- l_wait_event(thread->t_ctl_waitq,
- !sa_is_running(sai) ||
- !sa_received_empty(sai) ||
- sai->sai_sent == sai->sai_replied,
- &lwi);
- if (!sa_received_empty(sai) &&
- sa_is_running(sai))
- do_statahead_interpret(sai);
- else
- GOTO(out, rc);
- }
- } else if (1) {
- /*
- * chain is exhausted.
- * Normal case: continue to the next page.
- */
- page = ll_get_dir_page(dir, pos, 1, &chain);
- } else {
- /*
- * go into overflow page.
- */
- }
+ index++;
}
EXIT;
out:
- ll_dir_chain_fini(&chain);
spin_lock(&lli->lli_lock);
thread->t_flags = SVC_STOPPED;
spin_unlock(&lli->lli_lock);
}
enum {
- /**
+ /*
* not first dirent, or is "."
*/
LS_NONE_FIRST_DE = 0,
- /**
+ /*
* the first non-hidden dirent
*/
LS_FIRST_DE,
- /**
- * the first hidden dirent, that is "."
+ /*
+ * the first hidden dirent, that is ".xxx
*/
LS_FIRST_DOT_DE
};
static int is_first_dirent(struct inode *dir, struct dentry *dentry)
{
- struct ll_dir_chain chain;
- struct qstr *target = &dentry->d_name;
- struct page *page;
- __u64 pos = 0;
- int dot_de;
- int rc = LS_NONE_FIRST_DE;
+ struct qstr *d_name = &dentry->d_name;
+ unsigned long npages, index = 0;
+ struct page *page;
+ struct ll_dir_entry *de;
+ char *kaddr, *limit;
+ int rc = LS_NONE_FIRST_DE, dot_de;
ENTRY;
- ll_dir_chain_init(&chain);
- page = ll_get_dir_page(dir, pos, 0, &chain);
-
while (1) {
- struct lu_dirpage *dp;
- struct lu_dirent *ent;
+ npages = dir_pages(dir);
+ /*
+ * reach the end of dir.
+ */
+ if (index >= npages) {
+ CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n",
+ index, npages);
+ break;
+ }
+ page = ll_get_dir_page(dir, index);
if (IS_ERR(page)) {
rc = PTR_ERR(page);
- CERROR("error reading dir "DFID" at "LPU64": rc %d\n",
- PFID(ll_inode2fid(dir)), pos, rc);
+ CERROR("error reading dir %lu/%u page %lu: rc %d\n",
+ dir->i_ino, dir->i_generation, index, rc);
break;
}
- dp = page_address(page);
- for (ent = lu_dirent_start(dp); ent != NULL;
- ent = lu_dirent_next(ent)) {
- char *name = ent->lde_name;
- int namelen = le16_to_cpu(ent->lde_namelen);
+ kaddr = page_address(page);
+ limit = kaddr + CFS_PAGE_SIZE - ll_dir_rec_len(1);
+ de = (struct ll_dir_entry *)kaddr;
+ if (!index) {
+ if (unlikely(!(de->lde_name_len == 1 &&
+ strncmp(de->lde_name, ".", 1) == 0)))
+ CWARN("Maybe got bad on-disk dir: %lu/%u\n",
+ dir->i_ino, dir->i_generation);
+ /*
+ * skip "." or ingore bad entry.
+ */
+ de = ll_dir_next_entry(de);
- if (namelen == 0)
- /*
- * skip dummy record.
- */
+ if (unlikely(!(de->lde_name_len == 2 &&
+ strncmp(de->lde_name, "..", 2) == 0)))
+ CWARN("Maybe got bad on-disk dir: %lu/%u\n",
+ dir->i_ino, dir->i_generation);
+ /*
+ * skip ".." or ingore bad entry.
+ */
+ de = ll_dir_next_entry(de);
+ }
+
+ for (; (char*)de <= limit; de = ll_dir_next_entry(de)) {
+ if (!de->lde_inode)
continue;
- if (name[0] == '.') {
- if (namelen == 1)
- /*
- * skip "."
- */
- continue;
- else if (name[1] == '.' && namelen == 2)
- /*
- * skip ".."
- */
- continue;
- else
- dot_de = 1;
- } else {
+ if (de->lde_name[0] == '.')
+ dot_de = 1;
+ else
dot_de = 0;
- }
- if (dot_de && target->name[0] != '.') {
+ if (dot_de && d_name->name[0] != '.') {
CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
- target->len, target->name,
- namelen, name);
+ d_name->len, d_name->name,
+ de->lde_name_len, de->lde_name);
continue;
}
- if (target->len == namelen &&
- memcmp(target->name, name, namelen) == 0)
- rc = LS_FIRST_DE + dot_de;
- else
+ if (d_name->len != de->lde_name_len ||
+ strncmp(d_name->name, de->lde_name, d_name->len) != 0)
rc = LS_NONE_FIRST_DE;
+ else if (!dot_de)
+ rc = LS_FIRST_DE;
+ else
+ rc = LS_FIRST_DOT_DE;
+
ll_put_page(page);
- GOTO(out, rc);
+ RETURN(rc);
}
- pos = le64_to_cpu(dp->ldp_hash_end);
ll_put_page(page);
- if (pos == DIR_END_OFF) {
- /*
- * End of directory reached.
- */
- break;
- } else if (1) {
- /*
- * chain is exhausted
- * Normal case: continue to the next page.
- */
- page = ll_get_dir_page(dir, pos, 1, &chain);
- } else {
- /*
- * go into overflow page.
- */
- }
+ index++;
}
- EXIT;
-
-out:
- ll_dir_chain_fini(&chain);
- return rc;
+ RETURN(rc);
}
/**
struct ll_inode_info *lli;
struct ll_statahead_info *sai;
struct dentry *parent;
- struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+ struct l_wait_info lwi = { 0 };
int rc = 0;
ENTRY;
/*
* thread started already, avoid double-stat.
*/
+ lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
rc = l_wait_event(sai->sai_waitq,
ll_sai_entry_stated(sai) ||
sa_is_stopped(sai),
RETURN(rc);
}
- /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
+ /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
rc = is_first_dirent(dir, *dentryp);
if (rc == LS_NONE_FIRST_DE)
/* It is not "ls -{a}l" operation, no need statahead for it. */
sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
sai->sai_inode = igrab(dir);
if (unlikely(sai->sai_inode == NULL)) {
- CWARN("Do not start stat ahead on dying inode "DFID" .\n",
- PFID(&lli->lli_fid));
+ CWARN("Do not start stat ahead on dying inode %lu/%u.\n",
+ dir->i_ino, dir->i_generation);
OBD_FREE_PTR(sai);
GOTO(out, rc = -ESTALE);
}
/* get parent reference count here, and put it in ll_statahead_thread */
parent = dget((*dentryp)->d_parent);
if (unlikely(sai->sai_inode != parent->d_inode)) {
- struct ll_inode_info *nlli = ll_i2info(parent->d_inode);
-
CWARN("Race condition, someone changed %.*s just now: "
"old parent "DFID", new parent "DFID" .\n",
(*dentryp)->d_name.len, (*dentryp)->d_name.name,
- PFID(&lli->lli_fid), PFID(&nlli->lli_fid));
+ PFID(ll_inode_lu_fid(dir)),
+ PFID(ll_inode_lu_fid(parent->d_inode)));
dput(parent);
iput(sai->sai_inode);
OBD_FREE_PTR(sai);
RETURN(-EAGAIN);
}
- l_wait_event(sai->sai_thread.t_ctl_waitq,
+ l_wait_event(sai->sai_thread.t_ctl_waitq,
sa_is_running(sai) || sa_is_stopped(sai),
&lwi);
CDEBUG(D_READA, "Statahead for dir "DFID" hit ratio "
"too low: hit/miss %u/%u, sent/replied %u/%u, "
"stopping statahead thread: pid %d\n",
- PFID(&lli->lli_fid), sai->sai_hit,
+ PFID(ll_inode_lu_fid(dir)), sai->sai_hit,
sai->sai_miss, sai->sai_sent,
sai->sai_replied, cfs_curproc_pid());
spin_lock(&lli->lli_lock);