-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011 Whamcloud, Inc.
- *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include <linux/fs.h>
#include <linux/sched.h>
#include <linux/mm.h>
-#include <linux/smp_lock.h>
#include <linux/highmem.h>
#include <linux/pagemap.h>
#include <obd_support.h>
#include <lustre_lite.h>
#include <lustre_dlm.h>
-#include <linux/lustre_version.h>
#include "llite_internal.h"
#define SA_OMITTED_ENTRY_MAX 8ULL
};
static unsigned int sai_generation = 0;
-static cfs_spinlock_t sai_generation_lock = CFS_SPIN_LOCK_UNLOCKED;
+static DEFINE_SPINLOCK(sai_generation_lock);
static inline int ll_sa_entry_unlinked(struct ll_sa_entry *entry)
{
static inline void
ll_sa_entry_enhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
{
- int i = ll_sa_entry_hash(entry->se_qstr.hash);
+ int i = ll_sa_entry_hash(entry->se_qstr.hash);
- cfs_spin_lock(&sai->sai_cache_lock[i]);
- cfs_list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
- cfs_spin_unlock(&sai->sai_cache_lock[i]);
+ spin_lock(&sai->sai_cache_lock[i]);
+ cfs_list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
+ spin_unlock(&sai->sai_cache_lock[i]);
}
/*
static inline void
ll_sa_entry_unhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
{
- int i = ll_sa_entry_hash(entry->se_qstr.hash);
+ int i = ll_sa_entry_hash(entry->se_qstr.hash);
- cfs_spin_lock(&sai->sai_cache_lock[i]);
- cfs_list_del_init(&entry->se_hash);
- cfs_spin_unlock(&sai->sai_cache_lock[i]);
+ spin_lock(&sai->sai_cache_lock[i]);
+ cfs_list_del_init(&entry->se_hash);
+ spin_unlock(&sai->sai_cache_lock[i]);
}
-static inline int sa_received_empty(struct ll_statahead_info *sai)
+static inline int agl_should_run(struct ll_statahead_info *sai,
+ struct inode *inode)
{
- return cfs_list_empty(&sai->sai_entries_received);
+ return (inode != NULL && S_ISREG(inode->i_mode) && sai->sai_agl_valid);
}
-static inline int sa_not_full(struct ll_statahead_info *sai)
+static inline struct ll_sa_entry *
+sa_first_received_entry(struct ll_statahead_info *sai)
{
- return (cfs_atomic_read(&sai->sai_cache_count) < sai->sai_max);
+ return cfs_list_entry(sai->sai_entries_received.next,
+ struct ll_sa_entry, se_list);
}
-static inline int sa_is_running(struct ll_statahead_info *sai)
+static inline struct ll_inode_info *
+agl_first_entry(struct ll_statahead_info *sai)
{
- return !!(sai->sai_thread.t_flags & SVC_RUNNING);
+ return cfs_list_entry(sai->sai_entries_agl.next,
+ struct ll_inode_info, lli_agl_list);
}
-static inline int sa_is_stopping(struct ll_statahead_info *sai)
+static inline int sa_sent_full(struct ll_statahead_info *sai)
{
- return !!(sai->sai_thread.t_flags & SVC_STOPPING);
+ return cfs_atomic_read(&sai->sai_cache_count) >= sai->sai_max;
}
-static inline int sa_is_stopped(struct ll_statahead_info *sai)
+static inline int sa_received_empty(struct ll_statahead_info *sai)
{
- return !!(sai->sai_thread.t_flags & SVC_STOPPED);
+ return cfs_list_empty(&sai->sai_entries_received);
+}
+
+static inline int agl_list_empty(struct ll_statahead_info *sai)
+{
+ return cfs_list_empty(&sai->sai_entries_agl);
}
/**
entry->se_qstr.name = dname;
lli = ll_i2info(sai->sai_inode);
- cfs_spin_lock(&lli->lli_sa_lock);
- cfs_list_add_tail(&entry->se_list, &sai->sai_entries_sent);
- cfs_spin_unlock(&lli->lli_sa_lock);
+ spin_lock(&lli->lli_sa_lock);
+ cfs_list_add_tail(&entry->se_list, &sai->sai_entries_sent);
+ spin_unlock(&lli->lli_sa_lock);
- cfs_atomic_inc(&sai->sai_cache_count);
- ll_sa_entry_enhash(sai, entry);
+ cfs_atomic_inc(&sai->sai_cache_count);
+ ll_sa_entry_enhash(sai, entry);
- RETURN(entry);
+ RETURN(entry);
}
/*
static inline void
do_sai_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
{
- struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
+ struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
- ll_sa_entry_unhash(sai, entry);
+ ll_sa_entry_unhash(sai, entry);
- cfs_spin_lock(&lli->lli_sa_lock);
- entry->se_stat = SA_ENTRY_DEST;
- if (likely(!ll_sa_entry_unlinked(entry)))
- cfs_list_del_init(&entry->se_list);
- cfs_spin_unlock(&lli->lli_sa_lock);
+ spin_lock(&lli->lli_sa_lock);
+ entry->se_stat = SA_ENTRY_DEST;
+ if (likely(!ll_sa_entry_unlinked(entry)))
+ cfs_list_del_init(&entry->se_list);
+ spin_unlock(&lli->lli_sa_lock);
- ll_sa_entry_put(sai, entry);
+ ll_sa_entry_put(sai, entry);
}
/*
*/
static int
ll_sa_entry_to_stated(struct ll_statahead_info *sai,
- struct ll_sa_entry *entry, int rc)
+ struct ll_sa_entry *entry, int rc)
{
- struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
- int ret = 1;
+ struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
+ int ret = 1;
- ll_sa_entry_cleanup(sai, entry);
+ ll_sa_entry_cleanup(sai, entry);
- cfs_spin_lock(&lli->lli_sa_lock);
- if (likely(entry->se_stat != SA_ENTRY_DEST)) {
- do_sai_entry_to_stated(sai, entry, rc);
- ret = 0;
- }
- cfs_spin_unlock(&lli->lli_sa_lock);
+ spin_lock(&lli->lli_sa_lock);
+ if (likely(entry->se_stat != SA_ENTRY_DEST)) {
+ do_sai_entry_to_stated(sai, entry, rc);
+ ret = 0;
+ }
+ spin_unlock(&lli->lli_sa_lock);
+
+ return ret;
+}
- return ret;
+/*
+ * Insert inode into the list of sai_entries_agl.
+ */
+static void ll_agl_add(struct ll_statahead_info *sai,
+ struct inode *inode, int index)
+{
+ struct ll_inode_info *child = ll_i2info(inode);
+ struct ll_inode_info *parent = ll_i2info(sai->sai_inode);
+ int added = 0;
+
+ spin_lock(&child->lli_agl_lock);
+ if (child->lli_agl_index == 0) {
+ child->lli_agl_index = index;
+ spin_unlock(&child->lli_agl_lock);
+
+ LASSERT(cfs_list_empty(&child->lli_agl_list));
+
+ igrab(inode);
+ spin_lock(&parent->lli_agl_lock);
+ if (agl_list_empty(sai))
+ added = 1;
+ cfs_list_add_tail(&child->lli_agl_list, &sai->sai_entries_agl);
+ spin_unlock(&parent->lli_agl_lock);
+ } else {
+ spin_unlock(&child->lli_agl_lock);
+ }
+
+ if (added > 0)
+ cfs_waitq_signal(&sai->sai_agl_thread.t_ctl_waitq);
}
static struct ll_statahead_info *ll_sai_alloc(void)
RETURN(NULL);
cfs_atomic_set(&sai->sai_refcount, 1);
- cfs_spin_lock(&sai_generation_lock);
- sai->sai_generation = ++sai_generation;
- if (unlikely(sai_generation == 0))
- sai->sai_generation = ++sai_generation;
- cfs_spin_unlock(&sai_generation_lock);
+
+ spin_lock(&sai_generation_lock);
+ sai->sai_generation = ++sai_generation;
+ if (unlikely(sai_generation == 0))
+ sai->sai_generation = ++sai_generation;
+ spin_unlock(&sai_generation_lock);
+
sai->sai_max = LL_SA_RPC_MIN;
+ sai->sai_index = 1;
cfs_waitq_init(&sai->sai_waitq);
cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
+ cfs_waitq_init(&sai->sai_agl_thread.t_ctl_waitq);
+
CFS_INIT_LIST_HEAD(&sai->sai_entries_sent);
CFS_INIT_LIST_HEAD(&sai->sai_entries_received);
CFS_INIT_LIST_HEAD(&sai->sai_entries_stated);
+ CFS_INIT_LIST_HEAD(&sai->sai_entries_agl);
+
for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
CFS_INIT_LIST_HEAD(&sai->sai_cache[i]);
- cfs_spin_lock_init(&sai->sai_cache_lock[i]);
+ spin_lock_init(&sai->sai_cache_lock[i]);
}
cfs_atomic_set(&sai->sai_cache_count, 0);
if (unlikely(cfs_atomic_read(&sai->sai_refcount) > 0)) {
/* It is race case, the interpret callback just hold
* a reference count */
- cfs_spin_unlock(&lli->lli_sa_lock);
- RETURN_EXIT;
- }
+ spin_unlock(&lli->lli_sa_lock);
+ RETURN_EXIT;
+ }
- LASSERT(lli->lli_opendir_key == NULL);
- lli->lli_sai = NULL;
- lli->lli_opendir_pid = 0;
- cfs_spin_unlock(&lli->lli_sa_lock);
+ LASSERT(lli->lli_opendir_key == NULL);
+ LASSERT(thread_is_stopped(&sai->sai_thread));
+ LASSERT(thread_is_stopped(&sai->sai_agl_thread));
- LASSERT(sa_is_stopped(sai));
+ lli->lli_sai = NULL;
+ lli->lli_opendir_pid = 0;
+ spin_unlock(&lli->lli_sa_lock);
if (sai->sai_sent > sai->sai_replied)
CDEBUG(D_READA,"statahead for dir "DFID" does not "
do_sai_entry_fini(sai, entry);
LASSERT(cfs_atomic_read(&sai->sai_cache_count) == 0);
+ LASSERT(agl_list_empty(sai));
iput(inode);
OBD_FREE_PTR(sai);
EXIT;
}
+/* Do NOT forget to drop inode refcount when into sai_entries_agl. */
+static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+ __u64 index = lli->lli_agl_index;
+ int rc;
+ ENTRY;
+
+ LASSERT(cfs_list_empty(&lli->lli_agl_list));
+
+ /* AGL maybe fall behind statahead with one entry */
+ if (is_omitted_entry(sai, index + 1)) {
+ lli->lli_agl_index = 0;
+ iput(inode);
+ RETURN_EXIT;
+ }
+
+ /* Someone is in glimpse (sync or async), do nothing. */
+ rc = down_write_trylock(&lli->lli_glimpse_sem);
+ if (rc == 0) {
+ lli->lli_agl_index = 0;
+ iput(inode);
+ RETURN_EXIT;
+ }
+
+ /*
+ * Someone triggered glimpse within 1 sec before.
+ * 1) The former glimpse succeeded with glimpse lock granted by OST, and
+ * if the lock is still cached on client, AGL needs to do nothing. If
+ * it is cancelled by other client, AGL maybe cannot obtaion new lock
+ * for no glimpse callback triggered by AGL.
+ * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
+ * Under such case, it is quite possible that the OST will not grant
+ * glimpse lock for AGL also.
+ * 3) The former glimpse failed, compared with other two cases, it is
+ * relative rare. AGL can ignore such case, and it will not muchly
+ * affect the performance.
+ */
+ if (lli->lli_glimpse_time != 0 &&
+ cfs_time_before(cfs_time_shift(-1), lli->lli_glimpse_time)) {
+ up_write(&lli->lli_glimpse_sem);
+ lli->lli_agl_index = 0;
+ iput(inode);
+ RETURN_EXIT;
+ }
+
+ CDEBUG(D_READA, "Handling (init) async glimpse: inode = "
+ DFID", idx = "LPU64"\n", PFID(&lli->lli_fid), index);
+
+ cl_agl(inode);
+ lli->lli_agl_index = 0;
+ lli->lli_glimpse_time = cfs_time_current();
+ up_write(&lli->lli_glimpse_sem);
+
+ CDEBUG(D_READA, "Handled (init) async glimpse: inode= "
+ DFID", idx = "LPU64", rc = %d\n",
+ PFID(&lli->lli_fid), index, rc);
+
+ iput(inode);
+
+ EXIT;
+}
+
static void do_statahead_interpret(struct ll_statahead_info *sai,
struct ll_sa_entry *target)
{
int rc = 0;
ENTRY;
- cfs_spin_lock(&lli->lli_sa_lock);
- if (target != NULL && target->se_req != NULL &&
- !cfs_list_empty(&target->se_list)) {
- entry = target;
- } else if (unlikely(sa_received_empty(sai))) {
- cfs_spin_unlock(&lli->lli_sa_lock);
- RETURN_EXIT;
- } else {
- entry = cfs_list_entry(sai->sai_entries_received.next,
- struct ll_sa_entry, se_list);
- }
-
- cfs_atomic_inc(&entry->se_refcount);
- cfs_list_del_init(&entry->se_list);
- cfs_spin_unlock(&lli->lli_sa_lock);
+ spin_lock(&lli->lli_sa_lock);
+ if (target != NULL && target->se_req != NULL &&
+ !cfs_list_empty(&target->se_list)) {
+ entry = target;
+ } else if (unlikely(sa_received_empty(sai))) {
+ spin_unlock(&lli->lli_sa_lock);
+ RETURN_EXIT;
+ } else {
+ entry = sa_first_received_entry(sai);
+ }
+
+ cfs_atomic_inc(&entry->se_refcount);
+ cfs_list_del_init(&entry->se_list);
+ spin_unlock(&lli->lli_sa_lock);
LASSERT(entry->se_handle != 0);
entry->se_inode = child;
+ if (agl_should_run(sai, child))
+ ll_agl_add(sai, child, entry->se_index);
+
EXIT;
out:
if (it_disposition(it, DISP_LOOKUP_NEG))
rc = -ENOENT;
- cfs_spin_lock(&lli->lli_sa_lock);
- /* stale entry */
- if (unlikely(lli->lli_sai == NULL ||
- lli->lli_sai->sai_generation != minfo->mi_generation)) {
- cfs_spin_unlock(&lli->lli_sa_lock);
- GOTO(out, rc = -ESTALE);
- } else {
- sai = ll_sai_get(lli->lli_sai);
- if (unlikely(!sa_is_running(sai))) {
- sai->sai_replied++;
- cfs_spin_unlock(&lli->lli_sa_lock);
- GOTO(out, rc = -EBADFD);
- }
-
- entry = ll_sa_entry_get_byindex(sai, minfo->mi_cbdata);
- if (entry == NULL) {
- sai->sai_replied++;
- cfs_spin_unlock(&lli->lli_sa_lock);
- GOTO(out, rc = -EIDRM);
- }
-
- cfs_list_del_init(&entry->se_list);
- if (rc != 0) {
- sai->sai_replied++;
- do_sai_entry_to_stated(sai, entry, rc);
- cfs_spin_unlock(&lli->lli_sa_lock);
+ spin_lock(&lli->lli_sa_lock);
+ /* stale entry */
+ if (unlikely(lli->lli_sai == NULL ||
+ lli->lli_sai->sai_generation != minfo->mi_generation)) {
+ spin_unlock(&lli->lli_sa_lock);
+ GOTO(out, rc = -ESTALE);
+ } else {
+ sai = ll_sai_get(lli->lli_sai);
+ if (unlikely(!thread_is_running(&sai->sai_thread))) {
+ sai->sai_replied++;
+ spin_unlock(&lli->lli_sa_lock);
+ GOTO(out, rc = -EBADFD);
+ }
+
+ entry = ll_sa_entry_get_byindex(sai, minfo->mi_cbdata);
+ if (entry == NULL) {
+ sai->sai_replied++;
+ spin_unlock(&lli->lli_sa_lock);
+ GOTO(out, rc = -EIDRM);
+ }
+
+ cfs_list_del_init(&entry->se_list);
+ if (rc != 0) {
+ sai->sai_replied++;
+ do_sai_entry_to_stated(sai, entry, rc);
+ spin_unlock(&lli->lli_sa_lock);
if (entry->se_index == sai->sai_index_wait)
cfs_waitq_signal(&sai->sai_waitq);
} else {
cfs_list_add_tail(&entry->se_list,
&sai->sai_entries_received);
sai->sai_replied++;
- cfs_spin_unlock(&lli->lli_sa_lock);
+ spin_unlock(&lli->lli_sa_lock);
if (wakeup)
cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
}
* "md_intent_getattr_async".
*/
static int sa_args_init(struct inode *dir, struct inode *child,
- struct qstr *qstr, struct md_enqueue_info **pmi,
+ struct ll_sa_entry *entry, struct md_enqueue_info **pmi,
struct ldlm_enqueue_info **pei,
struct obd_capa **pcapa)
{
- struct ll_inode_info *lli = ll_i2info(dir);
+ struct qstr *qstr = &entry->se_qstr;
+ struct ll_inode_info *lli = ll_i2info(dir);
struct md_enqueue_info *minfo;
struct ldlm_enqueue_info *einfo;
struct md_op_data *op_data;
minfo->mi_dir = igrab(dir);
minfo->mi_cb = ll_statahead_interpret;
minfo->mi_generation = lli->lli_sai->sai_generation;
- minfo->mi_cbdata = lli->lli_sai->sai_index;
+ minfo->mi_cbdata = entry->se_index;
einfo->ei_type = LDLM_IBITS;
einfo->ei_mode = it_to_lock_mode(&minfo->mi_it);
int rc;
ENTRY;
- rc = sa_args_init(dir, NULL, &entry->se_qstr, &minfo, &einfo, capas);
+ rc = sa_args_init(dir, NULL, entry, &minfo, &einfo, capas);
if (rc)
RETURN(rc);
RETURN(1);
}
- rc = sa_args_init(dir, inode, &entry->se_qstr, &minfo, &einfo, capas);
+ rc = sa_args_init(dir, inode, entry, &minfo, &einfo, capas);
if (rc) {
entry->se_inode = NULL;
iput(inode);
static void ll_statahead_one(struct dentry *parent, const char* entry_name,
int entry_name_len)
{
- struct inode *dir = parent->d_inode;
- struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_statahead_info *sai = lli->lli_sai;
+ struct inode *dir = parent->d_inode;
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_statahead_info *sai = lli->lli_sai;
struct dentry *dentry = NULL;
struct ll_sa_entry *entry;
int rc;
RETURN_EXIT;
dentry = d_lookup(parent, &entry->se_qstr);
- if (!dentry)
+ if (!dentry) {
rc = do_sa_lookup(dir, entry);
- else
+ } else {
rc = do_sa_revalidate(dir, entry, dentry);
+ if (rc == 1 && agl_should_run(sai, dentry->d_inode))
+ ll_agl_add(sai, dentry->d_inode, entry->se_index);
+ }
if (dentry != NULL)
dput(dentry);
EXIT;
}
+static int ll_agl_thread(void *arg)
+{
+ struct dentry *parent = (struct dentry *)arg;
+ struct inode *dir = parent->d_inode;
+ struct ll_inode_info *plli = ll_i2info(dir);
+ struct ll_inode_info *clli;
+ struct ll_sb_info *sbi = ll_i2sbi(dir);
+ struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai);
+ struct ptlrpc_thread *thread = &sai->sai_agl_thread;
+ struct l_wait_info lwi = { 0 };
+ ENTRY;
+
+ {
+ char pname[16];
+ snprintf(pname, 15, "ll_agl_%u", plli->lli_opendir_pid);
+ cfs_daemonize(pname);
+ }
+
+ CDEBUG(D_READA, "agl thread started: [pid %d] [parent %.*s]\n",
+ cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
+
+ atomic_inc(&sbi->ll_agl_total);
+ spin_lock(&plli->lli_agl_lock);
+ sai->sai_agl_valid = 1;
+ thread_set_flags(thread, SVC_RUNNING);
+ spin_unlock(&plli->lli_agl_lock);
+ cfs_waitq_signal(&thread->t_ctl_waitq);
+
+ while (1) {
+ l_wait_event(thread->t_ctl_waitq,
+ !agl_list_empty(sai) ||
+ !thread_is_running(thread),
+ &lwi);
+
+ if (!thread_is_running(thread))
+ break;
+
+ spin_lock(&plli->lli_agl_lock);
+ /* The statahead thread maybe help to process AGL entries,
+ * so check whether list empty again. */
+ if (!agl_list_empty(sai)) {
+ clli = agl_first_entry(sai);
+ cfs_list_del_init(&clli->lli_agl_list);
+ spin_unlock(&plli->lli_agl_lock);
+ ll_agl_trigger(&clli->lli_vfs_inode, sai);
+ } else {
+ spin_unlock(&plli->lli_agl_lock);
+ }
+ }
+
+ spin_lock(&plli->lli_agl_lock);
+ sai->sai_agl_valid = 0;
+ while (!agl_list_empty(sai)) {
+ clli = agl_first_entry(sai);
+ cfs_list_del_init(&clli->lli_agl_list);
+ spin_unlock(&plli->lli_agl_lock);
+ clli->lli_agl_index = 0;
+ iput(&clli->lli_vfs_inode);
+ spin_lock(&plli->lli_agl_lock);
+ }
+ thread_set_flags(thread, SVC_STOPPED);
+ spin_unlock(&plli->lli_agl_lock);
+ cfs_waitq_signal(&thread->t_ctl_waitq);
+ ll_sai_put(sai);
+ CDEBUG(D_READA, "agl thread stopped: [pid %d] [parent %.*s]\n",
+ cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
+ RETURN(0);
+}
+
+static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
+{
+ struct ptlrpc_thread *thread = &sai->sai_agl_thread;
+ struct l_wait_info lwi = { 0 };
+ int rc;
+ ENTRY;
+
+ CDEBUG(D_READA, "start agl thread: [pid %d] [parent %.*s]\n",
+ cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
+
+ rc = cfs_create_thread(ll_agl_thread, parent, 0);
+ if (rc < 0) {
+ CERROR("can't start ll_agl thread, rc: %d\n", rc);
+ thread_set_flags(thread, SVC_STOPPED);
+ RETURN_EXIT;
+ }
+
+ l_wait_event(thread->t_ctl_waitq,
+ thread_is_running(thread) || thread_is_stopped(thread),
+ &lwi);
+ EXIT;
+}
+
static int ll_statahead_thread(void *arg)
{
struct dentry *parent = (struct dentry *)arg;
- struct inode *dir = parent->d_inode;
- struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_sb_info *sbi = ll_i2sbi(dir);
- struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
+ struct inode *dir = parent->d_inode;
+ struct ll_inode_info *plli = ll_i2info(dir);
+ struct ll_inode_info *clli;
+ struct ll_sb_info *sbi = ll_i2sbi(dir);
+ struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai);
struct ptlrpc_thread *thread = &sai->sai_thread;
+ struct ptlrpc_thread *agl_thread = &sai->sai_agl_thread;
struct page *page;
- __u64 pos = 0;
- int first = 0;
- int rc = 0;
+ __u64 pos = 0;
+ int first = 0;
+ int rc = 0;
struct ll_dir_chain chain;
+ struct l_wait_info lwi = { 0 };
ENTRY;
{
char pname[16];
- snprintf(pname, 15, "ll_sa_%u", lli->lli_opendir_pid);
+ snprintf(pname, 15, "ll_sa_%u", plli->lli_opendir_pid);
cfs_daemonize(pname);
}
+ CDEBUG(D_READA, "statahead thread started: [pid %d] [parent %.*s]\n",
+ cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
+
+ if (sbi->ll_flags & LL_SBI_AGL_ENABLED)
+ ll_start_agl(parent, sai);
+
atomic_inc(&sbi->ll_sa_total);
- cfs_spin_lock(&lli->lli_sa_lock);
- thread->t_flags = SVC_RUNNING;
- cfs_spin_unlock(&lli->lli_sa_lock);
- cfs_waitq_signal(&thread->t_ctl_waitq);
- CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
+ spin_lock(&plli->lli_sa_lock);
+ thread_set_flags(thread, SVC_RUNNING);
+ spin_unlock(&plli->lli_sa_lock);
+ cfs_waitq_signal(&thread->t_ctl_waitq);
- lli->lli_sa_pos = 0;
- ll_dir_chain_init(&chain);
- page = ll_get_dir_page(NULL, dir, pos, &chain);
+ ll_dir_chain_init(&chain);
+ page = ll_get_dir_page(dir, pos, &chain);
while (1) {
- struct l_wait_info lwi = { 0 };
struct lu_dirpage *dp;
struct lu_dirent *ent;
CDEBUG(D_READA, "error reading dir "DFID" at "LPU64
"/"LPU64": [rc %d] [parent %u]\n",
PFID(ll_inode2fid(dir)), pos, sai->sai_index,
- rc, lli->lli_opendir_pid);
+ rc, plli->lli_opendir_pid);
GOTO(out, rc);
}
if (unlikely(++first == 1))
continue;
-keep_de:
+keep_it:
l_wait_event(thread->t_ctl_waitq,
- sa_not_full(sai) ||
+ !sa_sent_full(sai) ||
!sa_received_empty(sai) ||
- !sa_is_running(sai),
+ !agl_list_empty(sai) ||
+ !thread_is_running(thread),
&lwi);
+interpret_it:
while (!sa_received_empty(sai))
do_statahead_interpret(sai, NULL);
- if (unlikely(!sa_is_running(sai))) {
+ if (unlikely(!thread_is_running(thread))) {
ll_release_page(page, 0);
- GOTO(out, rc);
+ GOTO(out, rc = 0);
}
- if (!sa_not_full(sai))
- /*
- * do not skip the current de.
- */
- goto keep_de;
+ /* If no window for metadata statahead, but there are
+ * some AGL entries to be triggered, then try to help
+ * to process the AGL entries. */
+ if (sa_sent_full(sai)) {
+ spin_lock(&plli->lli_agl_lock);
+ while (!agl_list_empty(sai)) {
+ clli = agl_first_entry(sai);
+ cfs_list_del_init(&clli->lli_agl_list);
+ spin_unlock(&plli->lli_agl_lock);
+ ll_agl_trigger(&clli->lli_vfs_inode,
+ sai);
+
+ if (!sa_received_empty(sai))
+ goto interpret_it;
+
+ if (unlikely(
+ !thread_is_running(thread))) {
+ ll_release_page(page, 0);
+ GOTO(out, rc = 0);
+ }
+
+ if (!sa_sent_full(sai))
+ goto do_it;
+
+ spin_lock(&plli->lli_agl_lock);
+ }
+ spin_unlock(&plli->lli_agl_lock);
+
+ goto keep_it;
+ }
+do_it:
ll_statahead_one(parent, name, namelen);
}
pos = le64_to_cpu(dp->ldp_hash_end);
l_wait_event(thread->t_ctl_waitq,
!sa_received_empty(sai) ||
sai->sai_sent == sai->sai_replied||
- !sa_is_running(sai),
+ !thread_is_running(thread),
&lwi);
while (!sa_received_empty(sai))
do_statahead_interpret(sai, NULL);
- if ((sai->sai_sent == sai->sai_replied &&
- sa_received_empty(sai)) ||
- !sa_is_running(sai))
+ if (unlikely(!thread_is_running(thread)))
GOTO(out, rc = 0);
+
+ if (sai->sai_sent == sai->sai_replied &&
+ sa_received_empty(sai))
+ break;
}
+
+ spin_lock(&plli->lli_agl_lock);
+ while (!agl_list_empty(sai) &&
+ thread_is_running(thread)) {
+ clli = agl_first_entry(sai);
+ cfs_list_del_init(&clli->lli_agl_list);
+ spin_unlock(&plli->lli_agl_lock);
+ ll_agl_trigger(&clli->lli_vfs_inode, sai);
+ spin_lock(&plli->lli_agl_lock);
+ }
+ spin_unlock(&plli->lli_agl_lock);
+
+ GOTO(out, rc = 0);
} else if (1) {
/*
* chain is exhausted.
*/
ll_release_page(page, le32_to_cpu(dp->ldp_flags) &
LDF_COLLIDE);
- lli->lli_sa_pos = pos;
sai->sai_in_readpage = 1;
- page = ll_get_dir_page(NULL, dir, pos, &chain);
+ page = ll_get_dir_page(dir, pos, &chain);
sai->sai_in_readpage = 0;
} else {
LASSERT(le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
EXIT;
out:
- ll_dir_chain_fini(&chain);
- cfs_spin_lock(&lli->lli_sa_lock);
- if (!sa_received_empty(sai)) {
- thread->t_flags = SVC_STOPPING;
- cfs_spin_unlock(&lli->lli_sa_lock);
-
- /* To release the resources held by received entries. */
- while (!sa_received_empty(sai))
- do_statahead_interpret(sai, NULL);
-
- cfs_spin_lock(&lli->lli_sa_lock);
+ if (sai->sai_agl_valid) {
+ spin_lock(&plli->lli_agl_lock);
+ thread_set_flags(agl_thread, SVC_STOPPING);
+ spin_unlock(&plli->lli_agl_lock);
+ cfs_waitq_signal(&agl_thread->t_ctl_waitq);
+
+ CDEBUG(D_READA, "stop agl thread: [pid %d]\n",
+ cfs_curproc_pid());
+ l_wait_event(agl_thread->t_ctl_waitq,
+ thread_is_stopped(agl_thread),
+ &lwi);
+ } else {
+ /* Set agl_thread flags anyway. */
+ thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED);
}
- thread->t_flags = SVC_STOPPED;
- cfs_spin_unlock(&lli->lli_sa_lock);
+ ll_dir_chain_fini(&chain);
+ spin_lock(&plli->lli_sa_lock);
+ if (!sa_received_empty(sai)) {
+ thread_set_flags(thread, SVC_STOPPING);
+ spin_unlock(&plli->lli_sa_lock);
+
+ /* To release the resources held by received entries. */
+ while (!sa_received_empty(sai))
+ do_statahead_interpret(sai, NULL);
+
+ spin_lock(&plli->lli_sa_lock);
+ }
+ thread_set_flags(thread, SVC_STOPPED);
+ spin_unlock(&plli->lli_sa_lock);
cfs_waitq_signal(&sai->sai_waitq);
cfs_waitq_signal(&thread->t_ctl_waitq);
ll_sai_put(sai);
dput(parent);
- CDEBUG(D_READA, "statahead thread stopped, pid %d\n",
- cfs_curproc_pid());
+ CDEBUG(D_READA, "statahead thread stopped: [pid %d] [parent %.*s]\n",
+ cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
return rc;
}
*/
void ll_stop_statahead(struct inode *dir, void *key)
{
- struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_inode_info *lli = ll_i2info(dir);
- if (unlikely(key == NULL))
- return;
+ if (unlikely(key == NULL))
+ return;
- cfs_spin_lock(&lli->lli_sa_lock);
- if (lli->lli_opendir_key != key || lli->lli_opendir_pid == 0) {
- cfs_spin_unlock(&lli->lli_sa_lock);
+ spin_lock(&lli->lli_sa_lock);
+ if (lli->lli_opendir_key != key || lli->lli_opendir_pid == 0) {
+ spin_unlock(&lli->lli_sa_lock);
return;
}
struct l_wait_info lwi = { 0 };
struct ptlrpc_thread *thread = &lli->lli_sai->sai_thread;
- if (!sa_is_stopped(lli->lli_sai)) {
- thread->t_flags = SVC_STOPPING;
- cfs_spin_unlock(&lli->lli_sa_lock);
- cfs_waitq_signal(&thread->t_ctl_waitq);
-
- CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
- cfs_curproc_pid());
- l_wait_event(thread->t_ctl_waitq,
- sa_is_stopped(lli->lli_sai),
- &lwi);
- } else {
- cfs_spin_unlock(&lli->lli_sa_lock);
- }
-
- /*
- * Put the ref which was held when first statahead_enter.
- * It maybe not the last ref for some statahead requests
- * maybe inflight.
- */
- ll_sai_put(lli->lli_sai);
- } else {
- lli->lli_opendir_pid = 0;
- cfs_spin_unlock(&lli->lli_sa_lock);
- }
+ if (!thread_is_stopped(thread)) {
+ thread_set_flags(thread, SVC_STOPPING);
+ spin_unlock(&lli->lli_sa_lock);
+ cfs_waitq_signal(&thread->t_ctl_waitq);
+
+ CDEBUG(D_READA, "stop statahead thread: [pid %d]\n",
+ cfs_curproc_pid());
+ l_wait_event(thread->t_ctl_waitq,
+ thread_is_stopped(thread),
+ &lwi);
+ } else {
+ spin_unlock(&lli->lli_sa_lock);
+ }
+
+ /*
+ * Put the ref which was held when first statahead_enter.
+ * It maybe not the last ref for some statahead requests
+ * maybe inflight.
+ */
+ ll_sai_put(lli->lli_sai);
+ } else {
+ lli->lli_opendir_pid = 0;
+ spin_unlock(&lli->lli_sa_lock);
+ }
}
enum {
static int is_first_dirent(struct inode *dir, struct dentry *dentry)
{
- struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_dir_chain chain;
- struct qstr *target = &dentry->d_name;
- struct page *page;
- __u64 pos = 0;
- int dot_de;
- int rc = LS_NONE_FIRST_DE;
+ struct ll_dir_chain chain;
+ struct qstr *target = &dentry->d_name;
+ struct page *page;
+ __u64 pos = 0;
+ int dot_de;
+ int rc = LS_NONE_FIRST_DE;
ENTRY;
- lli->lli_sa_pos = 0;
ll_dir_chain_init(&chain);
- page = ll_get_dir_page(NULL, dir, pos, &chain);
+ page = ll_get_dir_page(dir, pos, &chain);
while (1) {
struct lu_dirpage *dp;
*/
ll_release_page(page, le32_to_cpu(dp->ldp_flags) &
LDF_COLLIDE);
- lli->lli_sa_pos = pos;
- page = ll_get_dir_page(NULL, dir, pos, &chain);
+ page = ll_get_dir_page(dir, pos, &chain);
} else {
/*
* go into overflow page.
static void
ll_sai_unplug(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
{
- struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode);
- int hit;
+ struct ptlrpc_thread *thread = &sai->sai_thread;
+ struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode);
+ int hit;
ENTRY;
if (entry != NULL && entry->se_stat == SA_ENTRY_SUCC)
sai->sai_miss++;
sai->sai_consecutive_miss++;
- if (sa_low_hit(sai) && sa_is_running(sai)) {
+ if (sa_low_hit(sai) && thread_is_running(thread)) {
atomic_inc(&sbi->ll_sa_wrong);
CDEBUG(D_READA, "Statahead for dir "DFID" hit "
"ratio too low: hit/miss "LPU64"/"LPU64
PFID(&lli->lli_fid), sai->sai_hit,
sai->sai_miss, sai->sai_sent,
sai->sai_replied, cfs_curproc_pid());
- cfs_spin_lock(&lli->lli_sa_lock);
- if (!sa_is_stopped(sai))
- sai->sai_thread.t_flags = SVC_STOPPING;
- cfs_spin_unlock(&lli->lli_sa_lock);
- }
- }
+ spin_lock(&lli->lli_sa_lock);
+ if (!thread_is_stopped(thread))
+ thread_set_flags(thread, SVC_STOPPING);
+ spin_unlock(&lli->lli_sa_lock);
+ }
+ }
- if (!sa_is_stopped(sai))
- cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
+ if (!thread_is_stopped(thread))
+ cfs_waitq_signal(&thread->t_ctl_waitq);
- EXIT;
+ EXIT;
}
/**
struct ll_statahead_info *sai = lli->lli_sai;
struct dentry *parent;
struct ll_sa_entry *entry;
+ struct ptlrpc_thread *thread;
struct l_wait_info lwi = { 0 };
int rc = 0;
ENTRY;
LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
if (sai) {
- if (unlikely(sa_is_stopped(sai) &&
+ thread = &sai->sai_thread;
+ if (unlikely(thread_is_stopped(thread) &&
cfs_list_empty(&sai->sai_entries_stated))) {
/* to release resource */
ll_stop_statahead(dir, lli->lli_opendir_key);
LWI_ON_SIGNAL_NOOP, NULL);
rc = l_wait_event(sai->sai_waitq,
ll_sa_entry_stated(entry) ||
- sa_is_stopped(sai),
+ thread_is_stopped(thread),
&lwi);
if (rc < 0) {
ll_sai_unplug(sai, entry);
struct lookup_intent it = { .it_op = IT_GETATTR,
.d.lustre.it_lock_handle =
entry->se_handle };
- struct ll_dentry_data *lld;
- __u64 bits;
-
- rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
- ll_inode2fid(inode), &bits);
- if (rc == 1) {
- if ((*dentryp)->d_inode == NULL) {
- *dentryp = ll_find_alias(inode,
- *dentryp);
- lld = ll_d2d(*dentryp);
- if (unlikely(lld == NULL))
- ll_dops_init(*dentryp, 1, 1);
+ __u64 bits;
+
+ rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
+ ll_inode2fid(inode), &bits);
+ if (rc == 1) {
+ if ((*dentryp)->d_inode == NULL) {
+ *dentryp = ll_splice_alias(inode,
+ *dentryp);
+ } else if ((*dentryp)->d_inode != inode) {
+ /* revalidate, but inode is recreated */
+ CDEBUG(D_READA,
+ "stale dentry %.*s inode %lu/%u, "
+ "statahead inode %lu/%u\n",
+ (*dentryp)->d_name.len,
+ (*dentryp)->d_name.name,
+ (*dentryp)->d_inode->i_ino,
+ (*dentryp)->d_inode->i_generation,
+ inode->i_ino,
+ inode->i_generation);
+ ll_sai_unplug(sai, entry);
+ RETURN(-ESTALE);
} else {
- LASSERT((*dentryp)->d_inode == inode);
-
- ll_dentry_rehash(*dentryp, 0);
- iput(inode);
- }
- entry->se_inode = NULL;
+ iput(inode);
+ }
+ entry->se_inode = NULL;
- ll_dentry_reset_flags(*dentryp, bits);
+ if ((bits & MDS_INODELOCK_LOOKUP) &&
+ d_lustre_invalid(*dentryp))
+ d_lustre_revalidate(*dentryp);
ll_intent_release(&it);
}
}
GOTO(out, rc = -EAGAIN);
}
+ CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n",
+ cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
+
lli->lli_sai = sai;
rc = cfs_create_thread(ll_statahead_thread, parent, 0);
+ thread = &sai->sai_thread;
if (rc < 0) {
CERROR("can't start ll_sa thread, rc: %d\n", rc);
dput(parent);
lli->lli_opendir_key = NULL;
- sai->sai_thread.t_flags = SVC_STOPPED;
+ thread_set_flags(thread, SVC_STOPPED);
+ thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED);
ll_sai_put(sai);
LASSERT(lli->lli_sai == NULL);
RETURN(-EAGAIN);
}
- l_wait_event(sai->sai_thread.t_ctl_waitq,
- sa_is_running(sai) || sa_is_stopped(sai),
+ l_wait_event(thread->t_ctl_waitq,
+ thread_is_running(thread) || thread_is_stopped(thread),
&lwi);
/*
* We don't stat-ahead for the first dirent since we are already in
- * lookup, and -EEXIST also indicates that this is the first dirent.
+ * lookup.
*/
- RETURN(-EEXIST);
+ RETURN(-EAGAIN);
out:
if (sai != NULL)
OBD_FREE_PTR(sai);
- cfs_spin_lock(&lli->lli_sa_lock);
- lli->lli_opendir_key = NULL;
- lli->lli_opendir_pid = 0;
- cfs_spin_unlock(&lli->lli_sa_lock);
- return rc;
+ spin_lock(&lli->lli_sa_lock);
+ lli->lli_opendir_key = NULL;
+ lli->lli_opendir_pid = 0;
+ spin_unlock(&lli->lli_sa_lock);
+ return rc;
}