* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2015, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include <linux/fs.h>
#include <linux/sched.h>
+#include <linux/kthread.h>
#include <linux/mm.h>
#include <linux/highmem.h>
#include <linux/pagemap.h>
struct inode *se_inode;
/* entry name */
struct qstr se_qstr;
+ /* entry fid */
+ struct lu_fid se_fid;
};
static unsigned int sai_generation = 0;
/* allocate sa_entry and hash it to allow scanner process to find it */
static struct sa_entry *
-sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len)
+sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len,
+ const struct lu_fid *fid)
{
struct ll_inode_info *lli;
struct sa_entry *entry;
entry->se_qstr.hash = full_name_hash(name, len);
entry->se_qstr.len = len;
entry->se_qstr.name = dname;
+ entry->se_fid = *fid;
lli = ll_i2info(sai->sai_dentry->d_inode);
{
struct sa_entry *se;
struct list_head *pos = &sai->sai_entries;
+ __u64 index = entry->se_index;
LASSERT(!sa_ready(entry));
LASSERT(list_empty(&entry->se_list));
- entry->se_state = ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC;
-
list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
if (se->se_index < entry->se_index) {
pos = &se->se_list;
}
}
list_add(&entry->se_list, pos);
+ entry->se_state = ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC;
- return (entry->se_index == sai->sai_index_wait);
+ return (index == sai->sai_index_wait);
}
/*
if (body == NULL)
GOTO(out, rc = -EFAULT);
- child = entry->se_inode;
- if (child == NULL) {
- /*
- * lookup.
- */
- LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
-
- /* XXX: No fid in reply, this is probaly cross-ref case.
- * SA can't handle it yet. */
- if (body->mbo_valid & OBD_MD_MDS)
- GOTO(out, rc = -EAGAIN);
- } else {
- /*
- * revalidate.
- */
- /* unlinked and re-created with the same name */
+ child = entry->se_inode;
+ if (child != NULL) {
+ /* revalidate; unlinked and re-created with the same name */
if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2,
&body->mbo_fid1))) {
- entry->se_inode = NULL;
- iput(child);
- child = NULL;
- }
- }
+ entry->se_inode = NULL;
+ iput(child);
+ child = NULL;
+ }
+ }
- it->d.lustre.it_lock_handle = entry->se_handle;
+ it->it_lock_handle = entry->se_handle;
rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
if (rc != 1)
GOTO(out, rc = -EAGAIN);
struct ll_statahead_info *sai = lli->lli_sai;
struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
__u64 handle = 0;
- bool wakeup;
+ wait_queue_head_t *waitq = NULL;
ENTRY;
if (it_disposition(it, DISP_LOOKUP_NEG))
* thread enqueues lock on parent in readdir and another
* process enqueues lock on child with parent lock held, eg.
* unlink. */
- handle = it->d.lustre.it_lock_handle;
+ handle = it->it_lock_handle;
ll_intent_drop_lock(it);
}
spin_lock(&lli->lli_sa_lock);
if (rc != 0) {
- wakeup = __sa_make_ready(sai, entry, rc);
+ if (__sa_make_ready(sai, entry, rc))
+ waitq = &sai->sai_waitq;
} else {
entry->se_minfo = minfo;
entry->se_req = ptlrpc_request_addref(req);
* for readpage and other tries to enqueue lock on child
* with parent's lock held, for example: unlink. */
entry->se_handle = handle;
- wakeup = !sa_has_callback(sai);
+ if (!sa_has_callback(sai))
+ waitq = &sai->sai_thread.t_ctl_waitq;
+
list_add_tail(&entry->se_list, &sai->sai_interim_entries);
}
sai->sai_replied++;
- if (wakeup)
- wake_up(&sai->sai_thread.t_ctl_waitq);
+ if (waitq != NULL)
+ wake_up(waitq);
spin_unlock(&lli->lli_sa_lock);
RETURN(rc);
}
/* finish async stat RPC arguments */
-static void sa_fini_data(struct md_enqueue_info *minfo,
- struct ldlm_enqueue_info *einfo)
+static void sa_fini_data(struct md_enqueue_info *minfo)
{
- LASSERT(minfo && einfo);
iput(minfo->mi_dir);
- capa_put(minfo->mi_data.op_capa1);
- capa_put(minfo->mi_data.op_capa2);
OBD_FREE_PTR(minfo);
- OBD_FREE_PTR(einfo);
}
/*
* prepare arguments for async stat RPC.
- *
- * There is race condition between "capa_put" and "ll_statahead_interpret" for
- * accessing "op_data.op_capa[1,2]" as following:
- * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling
- * "md_intent_getattr_async". But "ll_statahead_interpret" maybe run first, and
- * fill "op_data.op_capa[1,2]" as POISON, then cause "capa_put" access invalid
- * "ocapa". So here reserve "op_data.op_capa[1,2]" in "pcapa" before calling
- * "md_intent_getattr_async".
*/
-static int sa_prep_data(struct inode *dir, struct inode *child,
- struct sa_entry *entry, struct md_enqueue_info **pmi,
- struct ldlm_enqueue_info **pei,
- struct obd_capa **pcapa)
+static struct md_enqueue_info *
+sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
{
- struct qstr *qstr = &entry->se_qstr;
- struct md_enqueue_info *minfo;
- struct ldlm_enqueue_info *einfo;
- struct md_op_data *op_data;
-
- OBD_ALLOC_PTR(einfo);
- if (einfo == NULL)
- return -ENOMEM;
-
- OBD_ALLOC_PTR(minfo);
- if (minfo == NULL) {
- OBD_FREE_PTR(einfo);
- return -ENOMEM;
- }
+ struct md_enqueue_info *minfo;
+ struct ldlm_enqueue_info *einfo;
+ struct md_op_data *op_data;
- op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child, qstr->name,
- qstr->len, 0, LUSTRE_OPC_ANY, NULL);
- if (IS_ERR(op_data)) {
- OBD_FREE_PTR(einfo);
- OBD_FREE_PTR(minfo);
- return PTR_ERR(op_data);
- }
+ OBD_ALLOC_PTR(minfo);
+ if (minfo == NULL)
+ return ERR_PTR(-ENOMEM);
+
+ op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child, NULL, 0, 0,
+ LUSTRE_OPC_ANY, NULL);
+ if (IS_ERR(op_data)) {
+ OBD_FREE_PTR(minfo);
+ return (struct md_enqueue_info *)op_data;
+ }
+
+ if (child == NULL)
+ op_data->op_fid2 = entry->se_fid;
minfo->mi_it.it_op = IT_GETATTR;
minfo->mi_dir = igrab(dir);
minfo->mi_cb = ll_statahead_interpret;
minfo->mi_cbdata = entry;
- einfo->ei_type = LDLM_IBITS;
- einfo->ei_mode = it_to_lock_mode(&minfo->mi_it);
- einfo->ei_cb_bl = ll_md_blocking_ast;
- einfo->ei_cb_cp = ldlm_completion_ast;
- einfo->ei_cb_gl = NULL;
- einfo->ei_cbdata = NULL;
+ einfo = &minfo->mi_einfo;
+ einfo->ei_type = LDLM_IBITS;
+ einfo->ei_mode = it_to_lock_mode(&minfo->mi_it);
+ einfo->ei_cb_bl = ll_md_blocking_ast;
+ einfo->ei_cb_cp = ldlm_completion_ast;
+ einfo->ei_cb_gl = NULL;
+ einfo->ei_cbdata = NULL;
- *pmi = minfo;
- *pei = einfo;
- pcapa[0] = op_data->op_capa1;
- pcapa[1] = op_data->op_capa2;
-
- return 0;
+ return minfo;
}
/* async stat for file not found in dcache */
static int sa_lookup(struct inode *dir, struct sa_entry *entry)
{
struct md_enqueue_info *minfo;
- struct ldlm_enqueue_info *einfo;
- struct obd_capa *capas[2];
int rc;
ENTRY;
- rc = sa_prep_data(dir, NULL, entry, &minfo, &einfo, capas);
- if (rc)
- RETURN(rc);
+ minfo = sa_prep_data(dir, NULL, entry);
+ if (IS_ERR(minfo))
+ RETURN(PTR_ERR(minfo));
- rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
- if (!rc) {
- capa_put(capas[0]);
- capa_put(capas[1]);
- } else {
- sa_fini_data(minfo, einfo);
- }
+ rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
+ if (rc < 0)
+ sa_fini_data(minfo);
RETURN(rc);
}
{
struct inode *inode = dentry->d_inode;
struct lookup_intent it = { .it_op = IT_GETATTR,
- .d.lustre.it_lock_handle = 0 };
+ .it_lock_handle = 0 };
struct md_enqueue_info *minfo;
- struct ldlm_enqueue_info *einfo;
- struct obd_capa *capas[2];
int rc;
ENTRY;
rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
NULL);
if (rc == 1) {
- entry->se_handle = it.d.lustre.it_lock_handle;
+ entry->se_handle = it.it_lock_handle;
ll_intent_release(&it);
RETURN(1);
}
- rc = sa_prep_data(dir, inode, entry, &minfo, &einfo, capas);
- if (rc) {
+ minfo = sa_prep_data(dir, inode, entry);
+ if (IS_ERR(minfo)) {
entry->se_inode = NULL;
iput(inode);
- RETURN(rc);
+ RETURN(PTR_ERR(minfo));
}
- rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
- if (!rc) {
- capa_put(capas[0]);
- capa_put(capas[1]);
- } else {
+ rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
+ if (rc < 0) {
entry->se_inode = NULL;
iput(inode);
- sa_fini_data(minfo, einfo);
+ sa_fini_data(minfo);
}
RETURN(rc);
}
/* async stat for file with @name */
-static void sa_statahead(struct dentry *parent, const char *name, int len)
+static void sa_statahead(struct dentry *parent, const char *name, int len,
+ const struct lu_fid *fid)
{
struct inode *dir = parent->d_inode;
struct ll_inode_info *lli = ll_i2info(dir);
int rc;
ENTRY;
- entry = sa_alloc(sai, sai->sai_index, name, len);
+ entry = sa_alloc(sai, sai->sai_index, name, len, fid);
if (IS_ERR(entry))
RETURN_EXIT;
__u64 hash;
int namelen;
char *name;
+ struct lu_fid fid;
hash = le64_to_cpu(ent->lde_hash);
if (unlikely(hash < pos))
if (unlikely(++first == 1))
continue;
+ fid_le_to_cpu(&fid, &ent->lde_fid);
+
/* wait for spare statahead window */
do {
l_wait_event(sa_thread->t_ctl_waitq,
} while (sa_sent_full(sai) &&
thread_is_running(sa_thread));
- sa_statahead(parent, name, namelen);
+ sa_statahead(parent, name, namelen, &fid);
}
pos = le64_to_cpu(dp->ldp_hash_end);
struct sa_entry *entry = NULL;
struct l_wait_info lwi = { 0 };
struct ll_dentry_data *ldd;
- struct ll_inode_info *lli;
+ struct ll_inode_info *lli = ll_i2info(dir);
int rc = 0;
ENTRY;
sa_handle_callback(sai);
if (!sa_ready(entry)) {
+ spin_lock(&lli->lli_sa_lock);
sai->sai_index_wait = entry->se_index;
+ spin_unlock(&lli->lli_sa_lock);
lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL,
- LWI_ON_SIGNAL_NOOP, NULL);
- rc = l_wait_event(sai->sai_waitq,
- sa_ready(entry) ||
- thread_is_stopped(&sai->sai_thread),
- &lwi);
+ LWI_ON_SIGNAL_NOOP, NULL);
+ rc = l_wait_event(sai->sai_waitq, sa_ready(entry), &lwi);
if (rc < 0) {
/*
* entry may not be ready, so it may be used by inflight
if (entry->se_state == SA_ENTRY_SUCC && entry->se_inode != NULL) {
struct inode *inode = entry->se_inode;
struct lookup_intent it = { .it_op = IT_GETATTR,
- .d.lustre.it_lock_handle =
+ .it_lock_handle =
entry->se_handle };
__u64 bits;
struct dentry *alias;
alias = ll_splice_alias(inode, *dentryp);
- if (IS_ERR(alias))
+ if (IS_ERR(alias)) {
+ ll_intent_release(&it);
GOTO(out, rc = PTR_ERR(alias));
+ }
*dentryp = alias;
/* statahead prepared this inode, transfer inode
* refcount from sa_entry to dentry */
(*dentryp)->d_name.name,
PFID(ll_inode2fid((*dentryp)->d_inode)),
PFID(ll_inode2fid(inode)));
+ ll_intent_release(&it);
GOTO(out, rc = -ESTALE);
}
* dentry_may_statahead().
*/
ldd = ll_d2d(*dentryp);
- lli = ll_i2info(dir);
/* ldd can be NULL if llite lookup failed. */
if (ldd != NULL)
ldd->lld_sa_generation = lli->lli_sa_generation;