Details : file contention detection and lockless i/o implementation
for contended files.
+Severity : enhancement
+Bugzilla : 11401
+Description: client-side metadata stat-ahead during readdir(directory readahead)
+Details : perform client-side metadata stat-ahead when the client detects
+ readdir and sequential stat of dir entries therein
--------------------------------------------------------------------------------
2007-08-27 Cluster File Systems, Inc. <info@clusterfs.com>
/* mds/mds_lov.c */
/* mdc/mdc_locks.c */
+struct md_enqueue_info;
+
int it_disposition(struct lookup_intent *it, int flag);
void it_set_disposition(struct lookup_intent *it, int flag);
void it_clear_disposition(struct lookup_intent *it, int flag);
void mdc_set_lock_data(__u64 *lockh, void *data);
int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
ldlm_iterator_t it, void *data);
+int mdc_revalidate_lock(struct obd_export *exp,
+ struct lookup_intent *it,
+ struct ll_fid *fid);
int mdc_intent_lock(struct obd_export *exp,
struct mdc_op_data *,
void *lmm, int lmmsize,
struct lookup_intent *it, struct mdc_op_data *data,
struct lustre_handle *lockh, void *lmm, int lmmlen,
int extra_lock_flags);
+int mdc_intent_getattr_async(struct obd_export *exp,
+ struct md_enqueue_info *minfo,
+ struct ldlm_enqueue_info *einfo);
/* mdc/mdc_request.c */
int mdc_init_ea_size(struct obd_export *mdc_exp, struct obd_export *lov_exp);
fid->f_type = type;
}
+static inline int it_to_lock_mode(struct lookup_intent *it)
+{
+ /* CREAT needs to be tested before open (both could be set) */
+ if (it->it_op & IT_CREAT)
+ return LCK_CW;
+ else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
+ return LCK_CR;
+
+ LBUG();
+ return -EINVAL;
+}
+
/* ioctls for trying requests */
#define IOC_REQUEST_TYPE 'f'
#define IOC_REQUEST_MIN_NR 30
#define IOC_REQUEST_CLOSE _IOWR('f', 35, long)
#define IOC_REQUEST_MAX_NR 35
+/* metadata stat-ahead */
+typedef int (* md_enqueue_cb_t)(struct obd_export *exp,
+ struct ptlrpc_request *req,
+ struct md_enqueue_info *minfo,
+ int rc);
+
+struct md_enqueue_info {
+ struct obd_export *mi_exp;
+ struct mdc_op_data mi_data;
+ struct lookup_intent mi_it;
+ struct lustre_handle mi_lockh;
+ struct dentry *mi_dentry;
+ md_enqueue_cb_t mi_cb;
+ void *mi_cbdata;
+};
+
+struct mdc_enqueue_args {
+ struct md_enqueue_info *ma_mi;
+ struct ldlm_enqueue_info *ma_ei;
+};
+
#endif
MODULES := lustre
-lustre-objs := dcache.o dir.o file.o llite_close.o llite_lib.o llite_nfs.o rw.o lproc_llite.o namei.o symlink.o llite_mmap.o xattr.o
+lustre-objs := dcache.o dir.o file.o llite_close.o llite_lib.o llite_nfs.o rw.o lproc_llite.o namei.o symlink.o llite_mmap.o xattr.o statahead.o
ifeq ($(PATCHLEVEL),4)
lustre-objs += rw24.o super.o
int ll_revalidate_it(struct dentry *de, int lookup_flags,
struct lookup_intent *it)
{
- int rc;
struct mdc_op_data op_data;
struct ptlrpc_request *req = NULL;
struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
struct obd_export *exp;
+ int first = 0, rc;
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
}
}
+ if (it->it_op == IT_GETATTR)
+ first = ll_statahead_enter(de->d_parent->d_inode, &de, 0);
+
do_lock:
it->it_create_mode &= ~current->fs->umask;
rc = mdc_intent_lock(exp, &op_data, NULL, 0, it, lookup_flags,
&req, ll_mdc_blocking_ast, 0);
+ if (it->it_op == IT_GETATTR && !first)
+ ll_statahead_exit(de, rc);
/* If req is NULL, then mdc_intent_lock only tried to do a lock match;
* if all was well, it will return 1 if it found locks, 0 otherwise. */
if (req == NULL && rc >= 0) {
*/
#include <linux/fs.h>
-#include <linux/ext2_fs.h>
#include <linux/pagemap.h>
#include <linux/mm.h>
#include <linux/version.h>
#include <lustre_dlm.h>
#include "llite_internal.h"
-typedef struct ext2_dir_entry_2 ext2_dirent;
-
#ifdef HAVE_PG_FS_MISC
#define PageChecked(page) test_bit(PG_fs_misc, &(page)->flags)
#define SetPageChecked(page) set_bit(PG_fs_misc, &(page)->flags)
return inode->i_sb->s_blocksize;
}
-static inline void ext2_put_page(struct page *page)
-{
- kunmap(page);
- page_cache_release(page);
-}
-
-static inline unsigned long dir_pages(struct inode *inode)
-{
- return (inode->i_size+CFS_PAGE_SIZE-1) >> CFS_PAGE_SHIFT;
-}
-
-
static void ext2_check_page(struct inode *dir, struct page *page)
{
unsigned chunk_size = ext2_chunk_size(dir);
SetPageError(page);
}
-static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
+struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
{
struct ldlm_res_id res_id =
{ .name = { dir->i_ino, (__u64)dir->i_generation} };
goto out_unlock;
}
-/*
- * p is at least 6 bytes before the end of page
- */
-static inline ext2_dirent *ext2_next_entry(ext2_dirent *p)
-{
- return (ext2_dirent *)((char*)p + le16_to_cpu(p->rec_len));
-}
-
-static inline unsigned
-ext2_validate_entry(char *base, unsigned offset, unsigned mask)
-{
- ext2_dirent *de = (ext2_dirent*)(base + offset);
- ext2_dirent *p = (ext2_dirent*)(base + (offset&mask));
- while ((char*)p < (char*)de)
- p = ext2_next_entry(p);
- return (char *)p - base;
-}
-
static unsigned char ext2_filetype_table[EXT2_FT_MAX] = {
[EXT2_FT_UNKNOWN] DT_UNKNOWN,
[EXT2_FT_REG_FILE] DT_REG,
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
inode->i_generation, inode);
+ if (S_ISDIR(inode->i_mode))
+ ll_stop_statahead(inode);
+
/* don't do anything for / */
if (inode->i_sb->s_root == file->f_dentry)
RETURN(0);
struct inode *inode = file->f_dentry->d_inode;
struct ptlrpc_request *req;
int rc;
+ ENTRY;
if (!parent)
RETURN(-ENOENT);
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
inode->i_generation, inode, file->f_flags);
+ if (S_ISDIR(inode->i_mode) && lli->lli_opendir_pid == 0)
+ lli->lli_opendir_pid = current->pid;
+
/* don't do anything for / */
if (inode->i_sb->s_root == file->f_dentry)
RETURN(0);
#endif
fd = ll_file_data_get();
- if (fd == NULL)
+ if (fd == NULL) {
+ lli->lli_opendir_pid = 0;
RETURN(-ENOMEM);
-
+ }
if (!it || !it->d.lustre.it_disposition) {
/* Convert f_flags into access mode. We cannot use file->f_mode,
* because everything but O_ACCMODE mask was stripped from it */
(*och_usecount)--;
}
up(&lli->lli_och_sem);
+ lli->lli_opendir_pid = 0;
}
return rc;
}
#ifndef LLITE_INTERNAL_H
#define LLITE_INTERNAL_H
+#include <linux/ext2_fs.h>
#ifdef CONFIG_FS_POSIX_ACL
# include <linux/fs.h>
#ifdef HAVE_XATTR_ACL
#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
struct inode lli_vfs_inode;
#endif
+
+ /* metadata stat-ahead */
+ pid_t lli_opendir_pid;
+ struct ll_statahead_info *lli_sai;
};
/*
enum stats_track_type ll_stats_track_type;
int ll_stats_track_id;
int ll_rw_stats_on;
-
dev_t ll_sdev_orig; /* save s_dev before assign for
* clustred nfs */
+
+ /* metadata stat-ahead */
+ unsigned int ll_sa_count; /* current statahead RPCs */
+ unsigned int ll_sa_max; /* max statahead RPCs */
+ unsigned int ll_sa_wrong; /* statahead thread stopped for
+ * low hit ratio */
+ unsigned int ll_sa_total; /* statahead thread started
+ * count */
+ unsigned long long ll_sa_blocked; /* ls count waiting for
+ * statahead */
+ unsigned long long ll_sa_cached; /* ls count got in cache */
};
#define LL_DEFAULT_MAX_RW_CHUNK (32 * 1024 * 1024)
extern struct file_operations ll_dir_operations;
extern struct inode_operations ll_dir_inode_operations;
+struct page *ll_get_dir_page(struct inode *dir, unsigned long n);
+/*
+ * p is at least 6 bytes before the end of page
+ */
+typedef struct ext2_dir_entry_2 ext2_dirent;
+
+static inline ext2_dirent *ext2_next_entry(ext2_dirent *p)
+{
+ return (ext2_dirent *)((char*)p + le16_to_cpu(p->rec_len));
+}
+
+static inline unsigned
+ext2_validate_entry(char *base, unsigned offset, unsigned mask)
+{
+ ext2_dirent *de = (ext2_dirent*)(base + offset);
+ ext2_dirent *p = (ext2_dirent*)(base + (offset&mask));
+ while ((char*)p < (char*)de)
+ p = ext2_next_entry(p);
+ return (char *)p - base;
+}
+
+static inline void ext2_put_page(struct page *page)
+{
+ kunmap(page);
+ page_cache_release(page);
+}
+
+static inline unsigned long dir_pages(struct inode *inode)
+{
+ return (inode->i_size + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
+}
+
/* llite/namei.c */
int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir);
struct inode *ll_iget(struct super_block *sb, ino_t hash,
struct lookup_intent *ll_convert_intent(struct open_intent *oit,
int lookup_flags);
#endif
+int lookup_it_finish(struct ptlrpc_request *request, int offset,
+ struct lookup_intent *it, void *data);
+void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry);
/* llite/rw.c */
int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to);
ssize_t ll_listxattr(struct dentry *dentry, char *buffer, size_t size);
int ll_removexattr(struct dentry *dentry, const char *name);
+/* statahead.c */
+
+#define LL_STATAHEAD_MIN 1
+#define LL_STATAHEAD_DEF 32
+#define LL_STATAHEAD_MAX 10000
+
+/* per inode struct, for dir only */
+struct ll_statahead_info {
+ struct inode *sai_inode;
+ atomic_t sai_refc; /* when access this struct, hold
+ * refcount */
+ unsigned int sai_max; /* max ahead of lookup */
+ unsigned int sai_sent; /* stat requests sent count */
+ unsigned int sai_replied; /* stat requests which received
+ * reply */
+ unsigned int sai_cached; /* UPDATE lock cached locally
+ * already */
+ unsigned int sai_hit; /* hit count */
+ unsigned int sai_miss; /* miss count */
+ unsigned int sai_consecutive_miss; /* consecutive miss */
+ unsigned sai_ls_all:1; /* ls -al, do stat-ahead for
+ * hidden entries */
+ struct ptlrpc_thread sai_thread; /* stat-ahead thread */
+ struct list_head sai_entries; /* stat-ahead entries */
+ unsigned int sai_entries_nr; /* stat-ahead entries count */
+};
+
+int ll_statahead_enter(struct inode *dir, struct dentry **dentry, int lookup);
+void ll_statahead_exit(struct dentry *dentry, int result);
+void ll_stop_statahead(struct inode *inode);
+
#endif /* LLITE_INTERNAL_H */
spin_lock_init(&sbi->ll_rw_extents_info.pp_extents[i].pp_w_hist.oh_lock);
}
+ /* metadata statahead is enabled by default */
+ sbi->ll_sa_max = LL_STATAHEAD_DEF;
+
RETURN(sbi);
}
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
inode->i_generation, inode);
+ if (S_ISDIR(inode->i_mode)) {
+ /* these should have been cleared in ll_file_release */
+ LASSERT(lli->lli_sai == NULL);
+ LASSERT(lli->lli_opendir_pid == 0);
+ }
+
ll_inode2fid(&fid, inode);
clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags);
mdc_change_cbdata(sbi->ll_mdc_exp, &fid, null_if_equal, inode);
count;
}
+static int ll_rd_statahead_count(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct super_block *sb = data;
+ struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+ return snprintf(page, count, "%u\n", sbi->ll_sa_count);
+}
+
+static int ll_rd_statahead_max(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct super_block *sb = data;
+ struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+ return snprintf(page, count, "%u\n", sbi->ll_sa_max);
+}
+
+static int ll_wr_statahead_max(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct super_block *sb = data;
+ struct ll_sb_info *sbi = ll_s2sbi(sb);
+ int val, rc;
+
+ rc = lprocfs_write_helper(buffer, count, &val);
+ if (rc)
+ return rc;
+ if (val >= 0 && val <= LL_STATAHEAD_MAX)
+ sbi->ll_sa_max = val;
+ else
+ CERROR("Bad statahead_max value %d. Valid values are in the "
+ "range [0, %d]\n", val, LL_STATAHEAD_MAX);
+
+ return count;
+}
+
+static int ll_rd_statahead_stats(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct super_block *sb = data;
+ struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+ return snprintf(page, count,
+ "statahead wrong: %u\n"
+ "statahead total: %u\n"
+ "ls blocked: %llu\n"
+ "ls total: %llu\n",
+ sbi->ll_sa_wrong, sbi->ll_sa_total,
+ sbi->ll_sa_blocked,
+ sbi->ll_sa_blocked + sbi->ll_sa_cached);
+}
+
static struct lprocfs_vars lprocfs_obd_vars[] = {
{ "uuid", ll_rd_sb_uuid, 0, 0 },
//{ "mntpt_path", ll_rd_path, 0, 0 },
{ "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 },
{ "stats_track_gid", ll_rd_track_gid, ll_wr_track_gid, 0 },
{ "contention_seconds", ll_rd_contention_time, ll_wr_contention_time, 0},
+ { "statahead_count", ll_rd_statahead_count, 0, 0 },
+ { "statahead_max", ll_rd_statahead_max, ll_wr_statahead_max, 0 },
+ { "statahead_stats", ll_rd_statahead_stats, 0, 0 },
{ 0 }
};
* in ll_revalidate_it. After revaliadate inode will be have hashed aliases
* and it triggers BUG_ON in d_instantiate_unique (bug #10954).
*/
-struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
+static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
{
struct list_head *tmp;
struct dentry *dentry;
return de;
}
-static int lookup_it_finish(struct ptlrpc_request *request, int offset,
+int lookup_it_finish(struct ptlrpc_request *request, int offset,
struct lookup_intent *it, void *data)
{
struct it_cb_data *icbd = data;
RETURN(ERR_PTR(rc));
}
- icbd.icbd_childp = &dentry;
+ if (it->it_op == IT_GETATTR) {
+ rc = ll_statahead_enter(parent, &dentry, 1);
+ if (rc >= 0) {
+ ll_statahead_exit(dentry, rc);
+ if (rc == 1)
+ RETURN(retval = dentry);
+ }
+ }
+
icbd.icbd_parent = parent;
+ icbd.icbd_childp = &dentry;
rc = ll_prepare_mdc_op_data(&op_data, parent, NULL, dentry->d_name.name,
dentry->d_name.len, lookup_flags, NULL);
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (c) 2007 Cluster File Systems, Inc.
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <linux/fs.h>
+#include <linux/sched.h>
+#include <linux/mm.h>
+#include <linux/smp_lock.h>
+#include <linux/highmem.h>
+#include <linux/pagemap.h>
+
+#define DEBUG_SUBSYSTEM S_LLITE
+
+#include <obd_support.h>
+#include <lustre_lite.h>
+#include <lustre_dlm.h>
+#include <linux/lustre_version.h>
+#include "llite_internal.h"
+
+struct ll_sai_entry {
+ struct list_head se_list;
+ int se_index;
+ int se_stat;
+};
+
+enum {
+ SA_ENTRY_UNSTATED = 0,
+ SA_ENTRY_STATED
+};
+
+static struct ll_statahead_info *ll_sai_alloc(void)
+{
+ struct ll_statahead_info *sai;
+
+ OBD_ALLOC_PTR(sai);
+ if (!sai)
+ return NULL;
+
+ sai->sai_max = LL_STATAHEAD_MIN;
+ init_waitqueue_head(&sai->sai_thread.t_ctl_waitq);
+ INIT_LIST_HEAD(&sai->sai_entries);
+ atomic_set(&sai->sai_refc, 1);
+ return sai;
+}
+
+static inline
+struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
+{
+ LASSERT(sai);
+ atomic_inc(&sai->sai_refc);
+ return sai;
+}
+
+static void ll_sai_put(struct ll_statahead_info *sai)
+{
+ struct inode *inode = sai->sai_inode;
+ struct ll_inode_info *lli = ll_i2info(inode);
+ ENTRY;
+
+ if (atomic_dec_and_lock(&sai->sai_refc, &lli->lli_lock)) {
+ struct ll_sai_entry *entry, *next;
+
+ LASSERT(sai->sai_thread.t_flags & SVC_STOPPED);
+ list_for_each_entry_safe(entry, next, &sai->sai_entries,
+ se_list) {
+ list_del(&entry->se_list);
+ OBD_FREE_PTR(entry);
+ }
+ OBD_FREE_PTR(sai);
+ lli->lli_sai = NULL;
+ spin_unlock(&lli->lli_lock);
+ iput(inode);
+ }
+ EXIT;
+}
+
+static struct ll_sai_entry *ll_sai_entry_get(struct ll_statahead_info *sai,
+ int index, int stat)
+{
+ struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
+ struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode);
+ struct ll_sai_entry *entry;
+ ENTRY;
+
+ OBD_ALLOC_PTR(entry);
+ if (entry == NULL)
+ RETURN(NULL);
+
+ CDEBUG(D_READA, "alloc sai entry %p index %d, stat %d\n",
+ entry, index, stat);
+ entry->se_index = index;
+ entry->se_stat = stat;
+
+ spin_lock(&lli->lli_lock);
+ list_add_tail(&entry->se_list, &sai->sai_entries);
+ sai->sai_entries_nr++;
+ sbi->ll_sa_count = sai->sai_entries_nr;
+ spin_unlock(&lli->lli_lock);
+
+ LASSERT(sai->sai_entries_nr <= sbi->ll_sa_max);
+ RETURN(entry);
+}
+
+static void ll_sai_entry_set(struct ll_statahead_info *sai, int index,
+ int stat)
+{
+ struct ll_sai_entry *entry;
+ ENTRY;
+
+ list_for_each_entry(entry, &sai->sai_entries, se_list) {
+ if (entry->se_index == index) {
+ LASSERT(entry->se_stat == SA_ENTRY_UNSTATED);
+ entry->se_stat = stat;
+ CDEBUG(D_READA, "set sai entry %p index %d stat %d\n",
+ entry, index, stat);
+ EXIT;
+ return;
+ }
+ }
+ /* Sometimes, this happens when entry has been put and freed */
+ CDEBUG(D_READA, "can't find sai entry index %d\n", index);
+ EXIT;
+}
+
+/* check first entry was stated already */
+static int ll_sai_entry_stated(struct ll_statahead_info *sai)
+{
+ struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
+ struct ll_sai_entry *entry;
+ int rc = 0;
+ ENTRY;
+
+ spin_lock(&lli->lli_lock);
+ if (!list_empty(&sai->sai_entries)) {
+ entry = list_entry(sai->sai_entries.next, struct ll_sai_entry,
+ se_list);
+ CDEBUG(D_READA, "check sai entry %p index %d stat %d\n",
+ entry, entry->se_index, entry->se_stat);
+ rc = (entry->se_stat != SA_ENTRY_UNSTATED);
+ }
+ spin_unlock(&lli->lli_lock);
+
+ RETURN(rc);
+}
+
+/* inside lli_lock */
+static void ll_sai_entry_put(struct ll_statahead_info *sai)
+{
+ struct ll_sai_entry *entry;
+ ENTRY;
+
+ if (list_empty(&sai->sai_entries)) {
+ EXIT;
+ return;
+ }
+ LASSERT(sai->sai_entries_nr > 0);
+
+ entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, se_list);
+ list_del(&entry->se_list);
+ sai->sai_entries_nr--;
+
+ CDEBUG(D_READA, "free sa entry %p index %d stat %d\n",
+ entry, entry->se_index, entry->se_stat);
+ OBD_FREE_PTR(entry);
+ EXIT;
+}
+
+/* finish lookup/revalidate */
+static int ll_statahead_interpret(struct obd_export *exp,
+ struct ptlrpc_request *req,
+ struct md_enqueue_info *minfo,
+ int rc)
+{
+ struct lookup_intent *it = &minfo->mi_it;
+ struct dentry *dentry = minfo->mi_dentry;
+ struct inode *dir = dentry->d_parent->d_inode;
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_statahead_info *sai;
+ ENTRY;
+
+ CDEBUG(D_READA, "interpret statahead %.*s rc %d\n",
+ dentry->d_name.len, dentry->d_name.name, rc);
+ if (rc)
+ GOTO(out, rc);
+
+ if (dentry->d_inode == NULL) {
+ /* lookup */
+ struct dentry *save = dentry;
+ struct it_cb_data icbd = {
+ .icbd_parent = dir,
+ .icbd_childp = &dentry
+ };
+
+ rc = lookup_it_finish(req, DLM_REPLY_REC_OFF, it, &icbd);
+ if (!rc) {
+ LASSERT(dentry->d_inode);
+ if (dentry != save)
+ dput(save);
+ ll_lookup_finish_locks(it, dentry);
+ }
+ } else {
+ /* revalidate */
+ struct mds_body *body;
+
+ body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
+ sizeof(*body));
+ if (memcmp(&minfo->mi_data.fid2, &body->fid1,
+ sizeof(body->fid1))) {
+ ll_unhash_aliases(dentry->d_inode);
+ GOTO(out, rc = -EAGAIN);
+ }
+
+ rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, dentry);
+ if (rc) {
+ ll_unhash_aliases(dentry->d_inode);
+ GOTO(out, rc);
+ }
+
+ spin_lock(&dcache_lock);
+ lock_dentry(dentry);
+ __d_drop(dentry);
+#ifdef DCACHE_LUSTRE_INVALID
+ dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
+#endif
+ unlock_dentry(dentry);
+ __d_rehash(dentry, 0);
+ spin_unlock(&dcache_lock);
+
+ ll_lookup_finish_locks(it, dentry);
+
+ }
+ EXIT;
+out:
+ spin_lock(&lli->lli_lock);
+ sai = lli->lli_sai;
+ if (sai) {
+ lli->lli_sai->sai_replied++;
+ ll_sai_entry_set(lli->lli_sai, (int)minfo->mi_cbdata,
+ SA_ENTRY_STATED);
+ wake_up(&lli->lli_sai->sai_thread.t_ctl_waitq);
+ }
+ spin_unlock(&lli->lli_lock);
+ ll_intent_release(it);
+ OBD_FREE_PTR(minfo);
+
+ dput(dentry);
+ return rc;
+}
+
+static void sa_args_fini(struct md_enqueue_info *minfo,
+ struct ldlm_enqueue_info *einfo)
+{
+ LASSERT(minfo && einfo);
+ OBD_FREE_PTR(minfo);
+ OBD_FREE_PTR(einfo);
+}
+
+static int sa_args_prep(struct inode *dir, struct dentry *dentry,
+ struct md_enqueue_info **pmi,
+ struct ldlm_enqueue_info **pei)
+{
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct md_enqueue_info *minfo;
+ struct ldlm_enqueue_info *einfo;
+
+ OBD_ALLOC_PTR(einfo);
+ if (einfo == NULL)
+ return -ENOMEM;
+
+ OBD_ALLOC_PTR(minfo);
+ if (minfo == NULL) {
+ OBD_FREE_PTR(einfo);
+ return -ENOMEM;
+ }
+
+ minfo->mi_exp = ll_i2mdcexp(dir);
+ minfo->mi_it.it_op = IT_GETATTR;
+ minfo->mi_dentry = dentry;
+ minfo->mi_cb = ll_statahead_interpret;
+ minfo->mi_cbdata = (void *)lli->lli_sai->sai_sent;
+
+ einfo->ei_type = LDLM_IBITS;
+ einfo->ei_mode = it_to_lock_mode(&minfo->mi_it);
+ einfo->ei_cb_bl = ll_mdc_blocking_ast;
+ einfo->ei_cb_cp = ldlm_completion_ast;
+ einfo->ei_cb_gl = NULL;
+ einfo->ei_cbdata = NULL;
+
+ *pmi = minfo;
+ *pei = einfo;
+
+ return 0;
+}
+
+/* similar to ll_lookup_it(). */
+static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
+{
+ struct md_enqueue_info *minfo;
+ struct ldlm_enqueue_info *einfo;
+ int rc;
+ ENTRY;
+
+ rc = sa_args_prep(dir, dentry, &minfo, &einfo);
+ if (rc)
+ RETURN(rc);
+
+ rc = ll_prepare_mdc_op_data(&minfo->mi_data, dir, NULL,
+ dentry->d_name.name, dentry->d_name.len, 0,
+ NULL);
+ if (rc == 0)
+ rc = mdc_intent_getattr_async(minfo->mi_exp, minfo, einfo);
+
+ if (rc)
+ sa_args_fini(minfo, einfo);
+
+ RETURN(rc);
+}
+
+/* similar to ll_revalidate_it().
+ * return 1: dentry valid.
+ * 0: will send stat-ahead request.
+ * -errno: prepare stat-ahead request failed. */
+static int do_sa_revalidate(struct dentry *dentry)
+{
+ struct inode *inode = dentry->d_inode;
+ struct ll_inode_info *lli = ll_i2info(dentry->d_parent->d_inode);
+ struct ll_fid fid;
+ struct lookup_intent it = { .it_op = IT_GETATTR };
+ struct md_enqueue_info *minfo;
+ struct ldlm_enqueue_info *einfo;
+ int rc;
+ ENTRY;
+
+ if (inode == NULL)
+ RETURN(1);
+
+ if (d_mountpoint(dentry))
+ RETURN(1);
+
+ ll_inode2fid(&fid, inode);
+
+ rc = mdc_revalidate_lock(ll_i2mdcexp(inode), &it, &fid);
+ if (rc == 1) {
+ ll_intent_release(&it);
+ lli->lli_sai->sai_cached++;
+ wake_up(&lli->lli_sai->sai_thread.t_ctl_waitq);
+ RETURN(1);
+ }
+
+ rc = sa_args_prep(dentry->d_parent->d_inode, dentry, &minfo, &einfo);
+ if (rc)
+ RETURN(rc);
+
+ rc = ll_prepare_mdc_op_data(&minfo->mi_data, dentry->d_parent->d_inode,
+ inode, dentry->d_name.name,
+ dentry->d_name.len, 0, NULL);
+ if (rc == 0)
+ rc = mdc_intent_getattr_async(minfo->mi_exp, minfo, einfo);
+
+ if (rc)
+ sa_args_fini(minfo, einfo);
+
+ RETURN(rc);
+}
+
+/* copied from kernel */
+static inline void name2qstr(struct qstr *this, const char *name, int namelen)
+{
+ unsigned long hash;
+ const unsigned char *p = (const unsigned char *)name;
+ int len;
+ unsigned int c;
+
+ hash = init_name_hash();
+ for (len = 0; len < namelen; len++, p++) {
+ c = *p;
+ hash = partial_name_hash(c, hash);
+ }
+ this->name = name;
+ this->len = namelen;
+ this->hash = end_name_hash(hash);
+}
+
+static int ll_statahead_one(struct dentry *parent, ext2_dirent *de)
+{
+ struct inode *dir = parent->d_inode;
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct qstr name;
+ struct dentry *dentry;
+ struct ll_sai_entry *se;
+ int rc;
+ ENTRY;
+
+ name2qstr(&name, de->name, de->name_len);
+
+ se = ll_sai_entry_get(lli->lli_sai, lli->lli_sai->sai_sent,
+ SA_ENTRY_UNSTATED);
+
+#ifdef DCACHE_LUSTRE_INVALID
+ if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
+#else
+ if (d_unhashed(parent)) {
+#endif
+ CDEBUG(D_READA, "parent dentry@%p %.*s is "
+ "invalid, skip statahead\n",
+ parent, parent->d_name.len, parent->d_name.name);
+ GOTO(out, rc = -EINVAL);
+ }
+
+ dentry = d_lookup(parent, &name);
+ if (!dentry) {
+ struct dentry *dentry = d_alloc(parent, &name);
+
+ rc = -ENOMEM;
+ if (dentry) {
+ rc = do_sa_lookup(dir, dentry);
+ if (rc)
+ dput(dentry);
+ }
+ GOTO(out, rc);
+ }
+
+ rc = do_sa_revalidate(dentry);
+ if (rc)
+ dput(dentry);
+ GOTO(out, rc);
+out:
+ if (rc) {
+ CDEBUG(D_READA, "set sai entry %p index %d stat %d, rc %d\n",
+ se, se->se_index, se->se_stat, rc);
+ se->se_stat = rc;
+ wake_up(&lli->lli_sai->sai_thread.t_ctl_waitq);
+ }
+ lli->lli_sai->sai_sent++;
+ return rc;
+}
+
+static inline int sa_check_stop(struct ll_statahead_info *sai)
+{
+ return !!(sai->sai_thread.t_flags & SVC_STOPPING);
+}
+
+static inline int sa_not_full(struct ll_statahead_info *sai)
+{
+ return sai->sai_sent - sai->sai_miss - sai->sai_hit < sai->sai_max;
+}
+
+struct ll_sa_thread_args {
+ struct dentry *sta_parent;
+ pid_t sta_pid;
+};
+
+static int ll_statahead_thread(void *arg)
+{
+ struct ll_sa_thread_args *sta = arg;
+ struct dentry *parent = dget(sta->sta_parent);
+ struct inode *dir = parent->d_inode;
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_sb_info *sbi = ll_i2sbi(dir);
+ struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
+ struct ptlrpc_thread *thread = &sai->sai_thread;
+ struct l_wait_info lwi = { 0 };
+ unsigned long index = 0;
+ __u64 offset = 0;
+ int skip = 0;
+ int rc = 0;
+ char name[16] = "";
+ ENTRY;
+
+ sbi->ll_sa_total++;
+
+ snprintf(name, 15, "ll_sa_%u", sta->sta_pid);
+ cfs_daemonize(name);
+ thread->t_flags = SVC_RUNNING;
+ wake_up(&thread->t_ctl_waitq);
+ CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
+
+ if (sai->sai_ls_all)
+ CDEBUG(D_READA, "do statahead for hidden files\n");
+
+ while (1) {
+ unsigned long npages = dir_pages(dir);
+
+ /* hit ratio < 80% */
+ if ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) ||
+ (sai->sai_consecutive_miss > 8)) {
+ sbi->ll_sa_wrong++;
+ CDEBUG(D_READA, "statahead for dir %.*s hit ratio too "
+ "low: hit/miss %u/%u, sent/replied %u/%u, "
+ "cached %u\n",
+ parent->d_name.len, parent->d_name.name,
+ sai->sai_hit, sai->sai_miss, sai->sai_sent,
+ sai->sai_replied, sai->sai_cached);
+ break;
+ }
+
+ /* reach the end of dir */
+ if (index == npages) {
+ CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n",
+ index, npages);
+ break;
+ }
+
+ l_wait_event(thread->t_ctl_waitq,
+ sa_check_stop(sai) || sa_not_full(sai),
+ &lwi);
+
+ if (sa_check_stop(sai))
+ break;
+
+ for (; index < npages; index++, offset = 0) {
+ char *kaddr, *limit;
+ ext2_dirent *de;
+ struct page *page;
+
+ CDEBUG(D_EXT2,"read %lu of dir %lu/%u page %lu"
+ "/%lu size %llu\n",
+ CFS_PAGE_SIZE, dir->i_ino, dir->i_generation,
+ index, npages, dir->i_size);
+
+ page = ll_get_dir_page(dir, index);
+ npages = dir_pages(dir);
+
+ if (IS_ERR(page)) {
+ rc = PTR_ERR(page);
+ CERROR("error reading dir %lu/%u page %lu: "
+ "rc %d\n",
+ dir->i_ino, dir->i_generation, index,
+ rc);
+ GOTO(out, rc);
+ }
+
+ kaddr = page_address(page);
+ de = (ext2_dirent *)(kaddr + offset);
+ limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1);
+ for (; (char*)de <= limit && sa_not_full(sai);
+ de = ext2_next_entry(de)) {
+ if (!de->inode)
+ continue;
+
+ /* don't stat-ahead ".", ".." */
+ if (skip < 2) {
+ skip++;
+ continue;
+ }
+
+ /* don't stat-ahead for hidden files */
+ if (de->name[0] == '.' && !sai->sai_ls_all)
+ continue;
+
+ /* don't stat-ahead for the first de */
+ if (skip < 3) {
+ skip++;
+ continue;
+ }
+
+ rc = ll_statahead_one(parent, de);
+ if (rc < 0) {
+ ext2_put_page(page);
+ GOTO(out, rc);
+ }
+ }
+ offset = (char *)de - kaddr;
+ ext2_put_page(page);
+
+ if ((char *)de <= limit)
+ /* !sa_not_full() */
+ break;
+ }
+ }
+ EXIT;
+out:
+ thread->t_flags = SVC_STOPPED;
+ wake_up(&thread->t_ctl_waitq);
+ lli->lli_opendir_pid = 0; /* avoid statahead again */
+ ll_sai_put(sai);
+ dput(parent);
+ CDEBUG(D_READA, "stopped statahead thread, pid %d for %s\n",
+ current->pid, parent->d_name.name);
+ return 0;
+}
+
+/* called in ll_file_release */
+void ll_stop_statahead(struct inode *inode)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct ptlrpc_thread *thread;
+
+ /* don't check pid here. upon fork, if parent closedir before child,
+ * child will not have chance to stop this thread. */
+ lli->lli_opendir_pid = 0;
+
+ spin_lock(&lli->lli_lock);
+ if (lli->lli_sai) {
+ ll_sai_get(lli->lli_sai);
+ spin_unlock(&lli->lli_lock);
+
+ CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
+ current->pid);
+ thread = &lli->lli_sai->sai_thread;
+ thread->t_flags = SVC_STOPPING;
+ wake_up(&thread->t_ctl_waitq);
+ wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
+ ll_sai_put(lli->lli_sai);
+
+ return;
+ }
+ spin_unlock(&lli->lli_lock);
+}
+
+enum {
+ LS_NONE_FIRST_DE = 0, /* not first dirent, or is "." */
+ LS_FIRST_DE, /* the first non-hidden dirent */
+ LS_FIRST_DOT_DE /* the first hidden dirent, that is ".xxx" */
+};
+
+static int is_first_dirent(struct inode *dir, struct dentry *dentry)
+{
+ struct qstr *d_name = &dentry->d_name;
+ unsigned long npages = dir_pages(dir);
+ struct page *page;
+ ext2_dirent *de;
+ unsigned long index;
+ __u64 offset = 0;
+ char *kaddr, *limit;
+ int dot_de = 1; /* dirent is dotfile till now */
+ int rc = LS_NONE_FIRST_DE;
+ ENTRY;
+
+ page = ll_get_dir_page(dir, 0);
+ if (IS_ERR(page)) {
+ CERROR("error reading dir %lu/%u page 0: rc %ld\n",
+ dir->i_ino, dir->i_generation, PTR_ERR(page));
+ RETURN(LS_NONE_FIRST_DE);
+ }
+
+ kaddr = page_address(page);
+ de = (ext2_dirent *)kaddr;
+ if (!(de->name_len == 1 && strncmp(de->name, ".", 1) == 0))
+ CWARN("Maybe got bad on-disk dir:%lu\n", dir->i_ino);
+ de = ext2_next_entry(de); /* skip ".", or ingore bad entry */
+ if (!(de->name_len == 2 && strncmp(de->name, "..", 2) == 0))
+ CWARN("Maybe got bad on-disk dir:%lu\n", dir->i_ino);
+ de = ext2_next_entry(de); /* skip "..", or ingore bad entry */
+
+ offset = (char *)de - kaddr;
+
+ for (index = 0; index < npages; offset = 0) {
+ de = (ext2_dirent *)(kaddr + offset);
+ limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1);
+ for (; (char*)de <= limit; de = ext2_next_entry(de)) {
+ if (!de->inode)
+ continue;
+
+ if (de->name[0] != '.')
+ dot_de = 0;
+
+ if (dot_de && d_name->name[0] != '.') {
+ CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
+ d_name->len, d_name->name,
+ de->name_len, de->name);
+ continue;
+ }
+
+ if (d_name->len == de->name_len &&
+ !strncmp(d_name->name, de->name, d_name->len))
+ rc = LS_FIRST_DE + dot_de;
+ else
+ rc = LS_NONE_FIRST_DE;
+ GOTO(out, rc);
+ }
+
+ if (++index >= npages)
+ break;
+
+ ext2_put_page(page);
+
+ page = ll_get_dir_page(dir, index);
+ if (IS_ERR(page)) {
+ CERROR("error reading dir %lu/%u page %lu: rc %ld\n",
+ dir->i_ino, dir->i_generation, index,
+ PTR_ERR(page));
+ RETURN(LS_NONE_FIRST_DE);
+ }
+ kaddr = page_address(page);
+ }
+ CERROR("%.*s not found in dir %.*s!\n", d_name->len, d_name->name,
+ dentry->d_parent->d_name.len, dentry->d_parent->d_name.name);
+ EXIT;
+out:
+ ext2_put_page(page);
+ return rc;
+}
+
+/* start stat-ahead thread if this is the first dir entry, otherwise if a thread
+ * is started already, wait until thread is ahead of me.
+ * Return value:
+ * 0 -- miss,
+ * 1 -- hit,
+ * -EEXIST -- stat ahead thread started, and this is the first try.
+ * other negative value -- error.
+ */
+int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
+{
+ struct ll_sb_info *sbi = ll_i2sbi(dir);
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_statahead_info *sai;
+ struct ll_sa_thread_args sta;
+ int rc;
+ ENTRY;
+
+ if (sbi->ll_sa_max == 0)
+ RETURN(-ENOTSUPP);
+
+ /* not the same process, don't statahead */
+ if (lli->lli_opendir_pid != current->pid)
+ RETURN(-EBADF);
+
+ spin_lock(&lli->lli_lock);
+ if (lli->lli_sai) {
+ sai = ll_sai_get(lli->lli_sai);
+ spin_unlock(&lli->lli_lock);
+
+ if (ll_sai_entry_stated(sai)) {
+ sbi->ll_sa_cached++;
+ } else {
+ struct l_wait_info lwi = { 0 };
+
+ sbi->ll_sa_blocked++;
+ /* thread started already, avoid double-stat */
+ l_wait_event(sai->sai_thread.t_ctl_waitq,
+ ll_sai_entry_stated(sai) ||
+ sai->sai_thread.t_flags & SVC_STOPPED,
+ &lwi);
+ }
+
+ ll_sai_put(sai);
+
+ if (lookup) {
+ struct dentry *result;
+
+ result = d_lookup((*dentryp)->d_parent,
+ &(*dentryp)->d_name);
+ if (result) {
+ LASSERT(result != *dentryp);
+ dput(*dentryp);
+ *dentryp = result;
+ }
+ RETURN(result != NULL);
+ }
+ /* do nothing for revalidate */
+ RETURN(0);
+ }
+ spin_unlock(&lli->lli_lock);
+
+ rc = is_first_dirent(dir, *dentryp);
+ if (!rc) {
+ /* optimization: don't statahead for this pid any longer */
+ spin_lock(&lli->lli_lock);
+ if (lli->lli_sai == NULL)
+ lli->lli_opendir_pid = 0;
+ spin_unlock(&lli->lli_lock);
+ RETURN(-EBADF);
+ }
+
+ spin_lock(&lli->lli_lock);
+ if (lli->lli_sai == NULL) {
+ lli->lli_sai = ll_sai_alloc();
+ if (lli->lli_sai == NULL) {
+ spin_unlock(&lli->lli_lock);
+ RETURN(-ENOMEM);
+ }
+ } else {
+ /* sai is already there */
+ spin_unlock(&lli->lli_lock);
+ RETURN(-EBUSY);
+ }
+ spin_unlock(&lli->lli_lock);
+
+ sai = lli->lli_sai;
+ sai->sai_inode = igrab(dir);
+ sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
+
+ sta.sta_parent = (*dentryp)->d_parent;
+ sta.sta_pid = current->pid;
+ rc = kernel_thread(ll_statahead_thread, &sta, 0);
+ if (rc < 0) {
+ CERROR("can't start ll_sa thread, rc: %d\n", rc);
+ ll_sai_put(sai);
+ RETURN(rc);
+ }
+
+ wait_event(sai->sai_thread.t_ctl_waitq,
+ sai->sai_thread.t_flags & (SVC_RUNNING | SVC_STOPPED));
+ ll_sai_put(sai);
+
+ /* we don't stat-ahead for the first dirent since we are already in
+ * lookup, and -EEXIST also indicates that this is the first dirent.
+ */
+ RETURN(-EEXIST);
+}
+
+/* update hit/miss count */
+void ll_statahead_exit(struct dentry *dentry, int result)
+{
+ struct ll_inode_info *lli = ll_i2info(dentry->d_parent->d_inode);
+ struct ll_sb_info *sbi = ll_i2sbi(dentry->d_parent->d_inode);
+
+ if (lli->lli_opendir_pid != current->pid)
+ return;
+
+ spin_lock(&lli->lli_lock);
+ if (lli->lli_sai) {
+ struct ll_statahead_info *sai = lli->lli_sai;
+
+ ll_sai_entry_put(sai);
+ if (result == 1) {
+ sai->sai_hit++;
+ sai->sai_consecutive_miss = 0;
+ sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
+ CDEBUG(D_READA, "statahead %.*s hit(hit/miss %u/%u)\n",
+ dentry->d_name.len, dentry->d_name.name,
+ sai->sai_hit, sai->sai_miss);
+ } else {
+ sai->sai_miss++;
+ sai->sai_consecutive_miss++;
+ /* upon miss, it's always because some dentry is added
+ * by statahead thread, and at the mean time `ls`
+ * processs finds this dentry, but the d_op for this
+ * dentry is NULL, then revalidate is not done, and
+ * ll_statahead_exit() not called for this dentry,
+ * so statahead thread should be behind of `ls` process,
+ * put one entry to go ahead.
+ */
+ CDEBUG(D_READA, "statahead %.*s miss(hit/miss %u/%u)\n",
+ dentry->d_name.len, dentry->d_name.name,
+ sai->sai_hit, sai->sai_miss);
+ ll_sai_entry_put(sai);
+ }
+ wake_up(&sai->sai_thread.t_ctl_waitq);
+ }
+ spin_unlock(&lli->lli_lock);
+}
posix_acl_release(acl);
RETURN(rc);
}
+ if (xattr_type == XATTR_ACL_DEFAULT_T && !S_ISDIR(inode->i_mode))
+ RETURN(-ENODATA);
#endif
do_getxattr:
char *tmp;
tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1,
data->namelen + 1);
- LOGL0(data->name, data->namelen, tmp);
+ memcpy(tmp, data->name, data->namelen);
+ data->name = tmp;
}
}
spin_lock(&cli->cl_loi_list_lock);
cli->cl_r_in_flight--;
- list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
+ list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
/* No free request slots anymore */
break;
}
EXPORT_SYMBOL(it_clear_disposition);
-static int it_to_lock_mode(struct lookup_intent *it)
-{
- /* CREAT needs to be tested before open (both could be set) */
- if (it->it_op & IT_CREAT)
- return LCK_CW;
- else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
- return LCK_CR;
-
- LBUG();
- RETURN(-EINVAL);
-}
-
int it_open_error(int phase, struct lookup_intent *it)
{
if (it_disposition(it, DISP_OPEN_OPEN)) {
* but this is incredibly unlikely, and questionable whether the client
* could do MDS recovery under OOM anyways... */
static void mdc_realloc_openmsg(struct ptlrpc_request *req,
- struct mds_body *body, int size[6])
+ struct mds_body *body)
{
- int new_size, old_size;
+ int old_len, new_size, old_size;
+ struct lustre_msg *old_msg = req->rq_reqmsg;
struct lustre_msg *new_msg;
+ old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + 2);
/* save old size */
- old_size = lustre_msg_size(lustre_request_magic(req), 6, size);
-
- size[DLM_INTENT_REC_OFF + 2] = body->eadatasize;
- new_size = lustre_msg_size(lustre_request_magic(req), 6, size);
+ old_size = lustre_msg_size(lustre_request_magic(req),
+ req->rq_reqmsg->lm_bufcount,
+ req->rq_reqmsg->lm_buflens);
+
+ lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2,
+ body->eadatasize);
+ new_size = lustre_msg_size(lustre_request_magic(req),
+ req->rq_reqmsg->lm_bufcount,
+ req->rq_reqmsg->lm_buflens);
OBD_ALLOC(new_msg, new_size);
if (new_msg != NULL) {
- struct lustre_msg *old_msg = req->rq_reqmsg;
-
- DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
+ DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n",
body->eadatasize);
memcpy(new_msg, old_msg, old_size);
- lustre_msg_set_buflen(new_msg, DLM_INTENT_REC_OFF + 2,
- body->eadatasize);
spin_lock(&req->rq_lock);
req->rq_reqmsg = new_msg;
OBD_FREE(old_msg, old_size);
} else {
+ lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2, old_len);
body->valid &= ~OBD_MD_FLEASIZE;
body->eadatasize = 0;
}
}
-/* We always reserve enough space in the reply packet for a stripe MD, because
- * we don't know in advance the file type. */
-int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
- struct lookup_intent *it, struct mdc_op_data *op_data,
- struct lustre_handle *lockh, void *lmm, int lmmsize,
- int extra_lock_flags)
+static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
+ struct lookup_intent *it,
+ struct mdc_op_data *data,
+ void *lmm, int lmmsize)
{
struct ptlrpc_request *req;
- struct obd_device *obddev = class_exp2obd(exp);
- struct ldlm_res_id res_id =
- { .name = {op_data->fid1.id, op_data->fid1.generation} };
- ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
- struct ldlm_request *lockreq;
struct ldlm_intent *lit;
- struct ldlm_reply *lockrep;
+ struct obd_device *obddev = class_exp2obd(exp);
int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
- [DLM_LOCKREQ_OFF] = sizeof(*lockreq),
+ [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
[DLM_INTENT_IT_OFF] = sizeof(*lit),
- 0, 0, 0, 0 };
+ [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create),
+ [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
+ /* As an optimization, we allocate an RPC request buffer
+ * for at least a default-sized LOV EA even if we aren't
+ * sending one. We grow the whole request to the next
+ * power-of-two size since we get that much from a slab
+ * allocation anyways. This avoids an allocation below
+ * in the common case where we need to save a
+ * default-sized LOV EA for open replay. */
+ [DLM_INTENT_REC_OFF+2]= max(lmmsize,
+ obddev->u.cli.cl_default_mds_easize) };
int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
- [DLM_LOCKREPLY_OFF] = sizeof(*lockrep),
+ [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
[DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
[DLM_REPLY_REC_OFF+1] = obddev->u.cli.
- cl_max_mds_easize, 0 };
- int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
- int repbufcnt = 4, rc;
- void *eadata;
- ENTRY;
-
- LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
-// LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
-// ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
-
- if (it->it_op & IT_OPEN) {
+ cl_max_mds_easize,
+ [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
CFS_LIST_HEAD(cancels);
int count = 0;
int mode;
+ int rc;
it->it_create_mode |= S_IFREG;
- size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create);
- size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
- /* As an optimization, we allocate an RPC request buffer for
- * at least a default-sized LOV EA even if we aren't sending
- * one. We grow the whole request to the next power-of-two
- * size since we get that much from a slab allocation anyways.
- * This avoids an allocation below in the common case where
- * we need to save a default-sized LOV EA for open replay. */
- size[DLM_INTENT_REC_OFF + 2] = max(lmmsize,
- obddev->u.cli.cl_default_mds_easize);
- rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6,
- size);
+ rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6, size);
if (rc & (rc - 1))
size[DLM_INTENT_REC_OFF + 2] =
- min(size[DLM_INTENT_REC_OFF+2]+round_up(rc)-rc,
+ min(size[DLM_INTENT_REC_OFF + 2] + round_up(rc) - rc,
obddev->u.cli.cl_max_mds_easize);
/* If inode is known, cancel conflicting OPEN locks. */
- if (op_data->fid2.id) {
+ if (data->fid2.id) {
if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
mode = LCK_CW;
#ifdef FMODE_EXEC
#endif
else
mode = LCK_CR;
- count = mdc_resource_get_unused(exp, &op_data->fid2,
- &cancels, mode,
- MDS_INODELOCK_OPEN);
+ count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
+ mode, MDS_INODELOCK_OPEN);
}
/* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
mode = LCK_EX;
else
mode = LCK_CR;
- count += mdc_resource_get_unused(exp, &op_data->fid1, &cancels,
- mode, MDS_INODELOCK_UPDATE);
+ count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
+ MDS_INODELOCK_UPDATE);
if (it->it_flags & O_JOIN_FILE) {
+ __u64 head_size = (*(__u64 *)data->data);
/* join is like an unlink of the tail */
- policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- size[DLM_INTENT_REC_OFF + 3] =
- sizeof(struct mds_rec_join);
- req = ldlm_prep_enqueue_req(exp, 7, size, &cancels,
- count);
- mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, op_data,
- (*(__u64 *)op_data->data));
+ size[DLM_INTENT_REC_OFF + 3] = sizeof(struct mds_rec_join);
+ req = ldlm_prep_enqueue_req(exp, 7, size, &cancels, count);
+ mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data, head_size);
} else {
- req = ldlm_prep_enqueue_req(exp, 6, size, &cancels,
- count);
+ req = ldlm_prep_enqueue_req(exp, 6, size, &cancels, count);
}
- if (!req)
- RETURN(-ENOMEM);
-
+ if (req) {
spin_lock(&req->rq_lock);
req->rq_replay = 1;
spin_unlock(&req->rq_lock);
lit->opc = (__u64)it->it_op;
/* pack the intended request */
- mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data,
+ mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
it->it_create_mode, 0, it->it_flags,
lmm, lmmsize);
- repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
- } else if (it->it_op & IT_UNLINK) {
- size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink);
- size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
- policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
- if (!req)
- RETURN(-ENOMEM);
+ ptlrpc_req_set_repsize(req, 5, repsize);
+ }
+ return req;
+}
+
+static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
+ struct lookup_intent *it,
+ struct mdc_op_data *data)
+{
+ struct ptlrpc_request *req;
+ struct ldlm_intent *lit;
+ struct obd_device *obddev = class_exp2obd(exp);
+ int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+ [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
+ [DLM_INTENT_IT_OFF] = sizeof(*lit),
+ [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink),
+ [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
+ int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+ [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
+ [DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
+ [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
+ cl_max_mds_easize,
+ [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
+ cl_max_mds_cookiesize };
+ req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
+ if (req) {
/* pack the intent */
lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
sizeof(*lit));
lit->opc = (__u64)it->it_op;
/* pack the intended request */
- mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data);
+ mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
- repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize;
- } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
- obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
- OBD_MD_FLACL | OBD_MD_FLMODEASIZE |
- OBD_MD_FLDIREA;
- size[DLM_INTENT_REC_OFF] = sizeof(struct mds_body);
- size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
+ ptlrpc_req_set_repsize(req, 5, repsize);
+ }
+ return req;
+}
- if (it->it_op & IT_GETATTR)
- policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
+ struct lookup_intent *it,
+ struct mdc_op_data *data)
+{
+ struct ptlrpc_request *req;
+ struct ldlm_intent *lit;
+ struct obd_device *obddev = class_exp2obd(exp);
+ int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+ [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
+ [DLM_INTENT_IT_OFF] = sizeof(*lit),
+ [DLM_INTENT_REC_OFF] = sizeof(struct mds_body),
+ [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
+ int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+ [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
+ [DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
+ [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
+ cl_max_mds_easize,
+ [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
+ obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
+ OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
- if (!req)
- RETURN(-ENOMEM);
-
+ if (req) {
/* pack the intent */
lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
sizeof(*lit));
lit->opc = (__u64)it->it_op;
/* pack the intended request */
- mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid,
- it->it_flags, op_data);
-
- repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
- } else if (it->it_op == IT_READDIR) {
- policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
- if (!req)
- RETURN(-ENOMEM);
-
- repbufcnt = 2;
- } else {
- LBUG();
- RETURN(-EINVAL);
+ mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
+ data);
+ ptlrpc_req_set_repsize(req, 5, repsize);
}
+ return req;
+}
- /* get ready for the reply */
- ptlrpc_req_set_repsize(req, repbufcnt, repsize);
+static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
+{
+ struct ptlrpc_request *req;
+ int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+ [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
+ int repsize[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+ [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply) };
+
+ req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
+ if (req)
+ ptlrpc_req_set_repsize(req, 2, repsize);
+ return req;
+}
- /* It is important to obtain rpc_lock first (if applicable), so that
- * threads that are serialised with rpc_lock are not polluting our
- * rpcs in flight counter */
- mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
- mdc_enter_request(&obddev->u.cli);
- rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
- 0, NULL, lockh, 0);
- mdc_exit_request(&obddev->u.cli);
- mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+static int mdc_finish_enqueue(struct obd_export *exp,
+ struct ptlrpc_request *req,
+ struct ldlm_enqueue_info *einfo,
+ struct lookup_intent *it,
+ struct lustre_handle *lockh,
+ int rc)
+{
+ struct ldlm_request *lockreq;
+ struct ldlm_reply *lockrep;
+ ENTRY;
/* Similarly, if we're going to replay this request, we don't want to
* actually get a lock, just perform the intent. */
* It's important that we do this first! Otherwise we might exit the
* function without doing so, and try to replay a failed create
* (bug 3440) */
- if (it->it_op & IT_OPEN && req->rq_replay &&
- (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
+ if ((it->it_op & IT_OPEN) &&
+ req->rq_replay &&
+ (!it_disposition(it, DISP_OPEN_OPEN) ||
+ it->d.lustre.it_status != 0))
mdc_clear_replay_flag(req, it->d.lustre.it_status);
DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
/* We know what to expect, so we do any byte flipping required here */
- LASSERT(repbufcnt == 5 || repbufcnt == 2);
- if (repbufcnt == 5) {
+ if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
struct mds_body *body;
body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
mdc_set_open_replay_data(NULL, req);
if ((body->valid & OBD_MD_FLEASIZE) != 0) {
+ void *eadata;
+
/* The eadata is opaque; just check that it is there.
* Eventually, obd_unpackmd() will check the contents */
eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
RETURN (-EPROTO);
}
if (body->valid & OBD_MD_FLMODEASIZE) {
+ struct obd_device *obddev = class_exp2obd(exp);
+
if (obddev->u.cli.cl_max_mds_easize <
body->max_mdsize) {
obddev->u.cli.cl_max_mds_easize =
* reallocate it here to hold the actual LOV EA. */
if (it->it_op & IT_OPEN) {
int offset = DLM_INTENT_REC_OFF + 2;
+ void *lmm;
if (lustre_msg_buflen(req->rq_reqmsg, offset) <
body->eadatasize)
- mdc_realloc_openmsg(req, body, size);
+ mdc_realloc_openmsg(req, body);
lmm = lustre_msg_buf(req->rq_reqmsg, offset,
body->eadatasize);
RETURN(rc);
}
-EXPORT_SYMBOL(mdc_enqueue);
-/*
- * This long block is all about fixing up the lock and request state
- * so that it is correct as of the moment _before_ the operation was
- * applied; that way, the VFS will think that everything is normal and
- * call Lustre's regular VFS methods.
- *
- * If we're performing a creation, that means that unless the creation
- * failed with EEXIST, we should fake up a negative dentry.
- *
- * For everything else, we want to lookup to succeed.
- *
- * One additional note: if CREATE or OPEN succeeded, we add an extra
- * reference to the request because we need to keep it around until
- * ll_create/ll_open gets called.
- *
- * The server will return to us, in it_disposition, an indication of
- * exactly what d.lustre.it_status refers to.
- *
- * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
- * otherwise if DISP_OPEN_CREATE is set, then it status is the
- * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
- * DISP_LOOKUP_POS will be set, indicating whether the child lookup
- * was successful.
- *
- * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
- * child lookup.
- */
-int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
- void *lmm, int lmmsize, struct lookup_intent *it,
- int lookup_flags, struct ptlrpc_request **reqp,
- ldlm_blocking_callback cb_blocking, int extra_lock_flags)
+/* We always reserve enough space in the reply packet for a stripe MD, because
+ * we don't know in advance the file type. */
+int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+ struct lookup_intent *it, struct mdc_op_data *data,
+ struct lustre_handle *lockh, void *lmm, int lmmsize,
+ int extra_lock_flags)
{
- struct lustre_handle lockh;
- struct ptlrpc_request *request;
- int rc = 0;
- struct mds_body *mds_body;
- struct lustre_handle old_lock;
- struct ldlm_lock *lock;
+ struct ptlrpc_request *req;
+ struct obd_device *obddev = class_exp2obd(exp);
+ struct ldlm_res_id res_id =
+ { .name = {data->fid1.id, data->fid1.generation} };
+ ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
+ int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
+ int rc;
ENTRY;
- LASSERT(it);
- CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
- op_data->namelen, op_data->name, op_data->fid1.id,
- ldlm_it2str(it->it_op), it->it_flags);
+ LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
+ if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
+ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- if (op_data->fid2.id &&
- (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
+ if (it->it_op & IT_OPEN) {
+ req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
+ if (it->it_flags & O_JOIN_FILE) {
+ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+ }
+ } else if (it->it_op & IT_UNLINK) {
+ req = mdc_intent_unlink_pack(exp, it, data);
+ } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
+ req = mdc_intent_lookup_pack(exp, it, data);
+ } else if (it->it_op == IT_READDIR) {
+ req = mdc_intent_readdir_pack(exp);
+ } else {
+ CERROR("bad it_op %x\n", it->it_op);
+ RETURN(-EINVAL);
+ }
+
+ if (!req)
+ RETURN(-ENOMEM);
+
+ /* It is important to obtain rpc_lock first (if applicable), so that
+ * threads that are serialised with rpc_lock are not polluting our
+ * rpcs in flight counter */
+ mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+ mdc_enter_request(&obddev->u.cli);
+ rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
+ 0, NULL, lockh, 0);
+ mdc_exit_request(&obddev->u.cli);
+ mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+
+ rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(mdc_enqueue);
+
+int mdc_revalidate_lock(struct obd_export *exp,
+ struct lookup_intent *it,
+ struct ll_fid *fid)
+{
/* We could just return 1 immediately, but since we should only
* be called in revalidate_it if we already have a lock, let's
* verify that. */
- struct ldlm_res_id res_id = {.name ={op_data->fid2.id,
- op_data->fid2.generation}};
+ struct ldlm_res_id res_id = {.name ={fid->id, fid->generation}};
struct lustre_handle lockh;
ldlm_policy_data_t policy;
int mode = LCK_CR;
+ int rc;
/* As not all attributes are kept under update lock, e.g.
owner/group/acls are under lookup lock, we need both
MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
MDS_INODELOCK_LOOKUP;
- rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
- LDLM_FL_BLOCK_GRANTED, &res_id,
- LDLM_IBITS, &policy, LCK_CR, &lockh);
+ rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED,
+ &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh);
if (!rc) {
mode = LCK_CW;
rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
- LDLM_FL_BLOCK_GRANTED, &res_id,
- LDLM_IBITS, &policy,LCK_CW,&lockh);
+ LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
+ &policy, LCK_CW, &lockh);
}
if (!rc) {
mode = LCK_PR;
rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
- LDLM_FL_BLOCK_GRANTED, &res_id,
- LDLM_IBITS, &policy,LCK_PR,&lockh);
+ LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
+ &policy, LCK_PR, &lockh);
}
if (rc) {
- memcpy(&it->d.lustre.it_lock_handle, &lockh,
- sizeof(lockh));
+ memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
it->d.lustre.it_lock_mode = mode;
}
- /* Only return failure if it was not GETATTR by cfid
- (from inode_revalidate) */
- if (rc || op_data->namelen != 0)
- RETURN(rc);
- }
+ return rc;
+}
+EXPORT_SYMBOL(mdc_revalidate_lock);
- /* lookup_it may be called only after revalidate_it has run, because
- * revalidate_it cannot return errors, only zero. Returning zero causes
- * this call to lookup, which *can* return an error.
- *
- * We only want to execute the request associated with the intent one
- * time, however, so don't send the request again. Instead, skip past
- * this and use the request from revalidate. In this case, revalidate
- * never dropped its reference, so the refcounts are all OK */
- if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
- struct ldlm_enqueue_info einfo =
- { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
- ldlm_completion_ast, NULL, NULL };
+static int mdc_finish_intent_lock(struct obd_export *exp,
+ struct ptlrpc_request *req,
+ struct mdc_op_data *data,
+ struct lookup_intent *it,
+ struct lustre_handle *lockh)
+{
+ struct mds_body *mds_body;
+ struct lustre_handle old_lock;
+ struct ldlm_lock *lock;
+ int rc;
+ ENTRY;
- rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
- lmm, lmmsize, extra_lock_flags);
- if (rc < 0)
- RETURN(rc);
- memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
- } else if (!op_data->fid2.id) {
- /* DISP_ENQ_COMPLETE set means there is extra reference on
- * request referenced from this intent, saved for subsequent
- * lookup. This path is executed when we proceed to this
- * lookup, so we clear DISP_ENQ_COMPLETE */
- it_clear_disposition(it, DISP_ENQ_COMPLETE);
- }
- request = *reqp = it->d.lustre.it_data;
- LASSERT(request != NULL);
- LASSERT(request != LP_POISON);
- LASSERT(request->rq_repmsg != LP_POISON);
+ LASSERT(req != NULL);
+ LASSERT(req != LP_POISON);
+ LASSERT(req->rq_repmsg != LP_POISON);
if (!it_disposition(it, DISP_IT_EXECD)) {
/* The server failed before it even started executing the
if (rc)
RETURN(rc);
- mds_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF,
+ mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
sizeof(*mds_body));
LASSERT(mds_body != NULL); /* mdc_enqueue checked */
- LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */
+ LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* mdc_enqueue swabbed */
/* If we were revalidating a fid/name pair, mark the intent in
* case we fail and get called again from lookup */
- if (op_data->fid2.id && (it->it_op != IT_GETATTR)) {
+ if (data->fid2.id && (it->it_op != IT_GETATTR)) {
it_set_disposition(it, DISP_ENQ_COMPLETE);
/* Also: did we find the same inode? */
- if (memcmp(&op_data->fid2, &mds_body->fid1,
- sizeof(op_data->fid2)))
- RETURN (-ESTALE);
+ if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)))
+ RETURN(-ESTALE);
}
rc = it_open_error(DISP_LOOKUP_EXECD, it);
it_disposition(it, DISP_OPEN_CREATE) &&
!it_open_error(DISP_OPEN_CREATE, it)) {
it_set_disposition(it, DISP_ENQ_CREATE_REF);
- ptlrpc_request_addref(request); /* balanced in ll_create_node */
+ ptlrpc_request_addref(req); /* balanced in ll_create_node */
}
if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
it_disposition(it, DISP_OPEN_OPEN) &&
!it_open_error(DISP_OPEN_OPEN, it)) {
it_set_disposition(it, DISP_ENQ_OPEN_REF);
- ptlrpc_request_addref(request); /* balanced in ll_file_open */
+ ptlrpc_request_addref(req); /* balanced in ll_file_open */
/* BUG 11546 - eviction in the middle of open rpc processing */
OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
}
* mdc_enqueue, because we need to use the child's inode as
* the l_ast_data to match, and that's not available until
* intent_finish has performed the iget().) */
- lock = ldlm_handle2lock(&lockh);
+ lock = ldlm_handle2lock(lockh);
if (lock) {
ldlm_policy_data_t policy = lock->l_policy_data;
+
LDLM_DEBUG(lock, "matching against this");
LDLM_LOCK_PUT(lock);
- memcpy(&old_lock, &lockh, sizeof(lockh));
+ memcpy(&old_lock, lockh, sizeof(*lockh));
if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
- ldlm_lock_decref_and_cancel(&lockh,
+ ldlm_lock_decref_and_cancel(lockh,
it->d.lustre.it_lock_mode);
- memcpy(&lockh, &old_lock, sizeof(old_lock));
- memcpy(&it->d.lustre.it_lock_handle, &lockh,
- sizeof(lockh));
+ memcpy(lockh, &old_lock, sizeof(old_lock));
+ memcpy(&it->d.lustre.it_lock_handle, lockh,
+ sizeof(*lockh));
}
}
+
CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
- op_data->namelen, op_data->name, ldlm_it2str(it->it_op),
+ data->namelen, data->name, ldlm_it2str(it->it_op),
it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
+ RETURN(rc);
+}
+
+/*
+ * This long block is all about fixing up the lock and request state
+ * so that it is correct as of the moment _before_ the operation was
+ * applied; that way, the VFS will think that everything is normal and
+ * call Lustre's regular VFS methods.
+ *
+ * If we're performing a creation, that means that unless the creation
+ * failed with EEXIST, we should fake up a negative dentry.
+ *
+ * For everything else, we want to lookup to succeed.
+ *
+ * One additional note: if CREATE or OPEN succeeded, we add an extra
+ * reference to the request because we need to keep it around until
+ * ll_create/ll_open gets called.
+ *
+ * The server will return to us, in it_disposition, an indication of
+ * exactly what d.lustre.it_status refers to.
+ *
+ * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
+ * otherwise if DISP_OPEN_CREATE is set, then it status is the
+ * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
+ * DISP_LOOKUP_POS will be set, indicating whether the child lookup
+ * was successful.
+ *
+ * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
+ * child lookup.
+ */
+int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
+ void *lmm, int lmmsize, struct lookup_intent *it,
+ int lookup_flags, struct ptlrpc_request **reqp,
+ ldlm_blocking_callback cb_blocking, int extra_lock_flags)
+{
+ struct lustre_handle lockh;
+ int rc;
+ ENTRY;
+
+ LASSERT(it);
+
+ CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
+ op_data->namelen, op_data->name, op_data->fid1.id,
+ ldlm_it2str(it->it_op), it->it_flags);
+
+ if (op_data->fid2.id &&
+ (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
+ rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
+ /* Only return failure if it was not GETATTR by cfid
+ (from inode_revalidate) */
+ if (rc || op_data->namelen != 0)
+ RETURN(rc);
+ }
+
+ /* lookup_it may be called only after revalidate_it has run, because
+ * revalidate_it cannot return errors, only zero. Returning zero causes
+ * this call to lookup, which *can* return an error.
+ *
+ * We only want to execute the request associated with the intent one
+ * time, however, so don't send the request again. Instead, skip past
+ * this and use the request from revalidate. In this case, revalidate
+ * never dropped its reference, so the refcounts are all OK */
+ if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
+ struct ldlm_enqueue_info einfo =
+ { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
+ ldlm_completion_ast, NULL, NULL };
+
+ rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
+ lmm, lmmsize, extra_lock_flags);
+ if (rc < 0)
+ RETURN(rc);
+ memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
+ } else if (!op_data->fid2.id) {
+ /* DISP_ENQ_COMPLETE set means there is extra reference on
+ * request referenced from this intent, saved for subsequent
+ * lookup. This path is executed when we proceed to this
+ * lookup, so we clear DISP_ENQ_COMPLETE */
+ it_clear_disposition(it, DISP_ENQ_COMPLETE);
+ }
+
+ *reqp = it->d.lustre.it_data;
+ rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
RETURN(rc);
}
EXPORT_SYMBOL(mdc_intent_lock);
+
+static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
+ void *unused, int rc)
+{
+ struct mdc_enqueue_args *ma;
+ struct md_enqueue_info *minfo;
+ struct ldlm_enqueue_info *einfo;
+ struct obd_export *exp;
+ struct lookup_intent *it;
+ struct lustre_handle *lockh;
+ struct obd_device *obddev;
+ int flags = LDLM_FL_HAS_INTENT;
+ ENTRY;
+
+ ma = (struct mdc_enqueue_args *)&req->rq_async_args;
+ minfo = ma->ma_mi;
+ einfo = ma->ma_ei;
+
+ exp = minfo->mi_exp;
+ it = &minfo->mi_it;
+ lockh = &minfo->mi_lockh;
+
+ obddev = class_exp2obd(exp);
+
+ mdc_exit_request(&obddev->u.cli);
+
+ rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
+ &flags, NULL, 0, NULL, lockh, rc);
+
+ rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
+ if (rc)
+ GOTO(out, rc);
+
+ memcpy(&it->d.lustre.it_lock_handle, lockh, sizeof(*lockh));
+
+ rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
+ GOTO(out, rc);
+out:
+ OBD_FREE_PTR(einfo);
+ minfo->mi_cb(exp, req, minfo, rc);
+
+ return 0;
+}
+
+int mdc_intent_getattr_async(struct obd_export *exp,
+ struct md_enqueue_info *minfo,
+ struct ldlm_enqueue_info *einfo)
+{
+ struct mdc_op_data *op_data = &minfo->mi_data;
+ struct lookup_intent *it = &minfo->mi_it;
+ struct ptlrpc_request *req;
+ struct obd_device *obddev = class_exp2obd(exp);
+ struct ldlm_res_id res_id = {
+ .name = {op_data->fid1.id,
+ op_data->fid1.generation}
+ };
+ ldlm_policy_data_t policy = {
+ .l_inodebits = { MDS_INODELOCK_LOOKUP }
+ };
+ struct mdc_enqueue_args *aa;
+ int rc;
+ int flags = LDLM_FL_HAS_INTENT;
+ ENTRY;
+
+ CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
+ op_data->namelen, op_data->name, op_data->fid1.id,
+ ldlm_it2str(it->it_op), it->it_flags);
+
+ req = mdc_intent_lookup_pack(exp, it, op_data);
+ if (!req)
+ RETURN(-ENOMEM);
+
+ mdc_enter_request(&obddev->u.cli);
+ rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
+ 0, NULL, &minfo->mi_lockh, 1);
+ if (rc < 0) {
+ mdc_exit_request(&obddev->u.cli);
+ RETURN(rc);
+ }
+
+ CLASSERT(sizeof(*aa) < sizeof(req->rq_async_args));
+ aa = (struct mdc_enqueue_args *)&req->rq_async_args;
+ aa->ma_mi = minfo;
+ aa->ma_ei = einfo;
+ req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
+ ptlrpcd_add_req(req);
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(mdc_intent_getattr_async);
int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size,
unsigned int acl_size, struct ptlrpc_request *req)
{
+ struct obd_device *obddev = class_exp2obd(exp);
struct mds_body *body;
void *eadata;
int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
ptlrpc_req_set_repsize(req, bufcount, size);
+ mdc_enter_request(&obddev->u.cli);
rc = ptlrpc_queue_wait(req);
+ mdc_exit_request(&obddev->u.cli);
if (rc != 0)
RETURN (rc);
const char *input, int input_size, int output_size,
int flags, struct ptlrpc_request **request)
{
+ struct obd_device *obddev = class_exp2obd(exp);
struct ptlrpc_request *req;
int size[4] = { sizeof(struct ptlrpc_body), sizeof(struct mds_body) };
// int size[3] = {sizeof(struct mds_body)}, bufcnt = 1;
/* make rpc */
if (opcode == MDS_SETXATTR)
mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+ else
+ mdc_enter_request(&obddev->u.cli);
rc = ptlrpc_queue_wait(req);
if (opcode == MDS_SETXATTR)
mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+ else
+ mdc_exit_request(&obddev->u.cli);
if (rc != 0)
GOTO(err_out, rc);
cleanup() {
echo -n "cln.."
+ pgrep ll_sa > /dev/null && { echo "There are ll_sa thread not exit!"; exit 20; }
cleanupall ${FORCE} $* || { echo "FAILed to clean up"; exit 20; }
}
setup() {
}
run_test 122 "fail client bulk callback (shouldn't LBUG) ======="
+test_123() # statahead(bug 11401)
+{
+ if [ -z "$(grep "processor.*: 1" /proc/cpuinfo)" ]; then
+ log "testing on UP system. Performance may be not as good as expected."
+ fi
+
+ mkdir -p $DIR/$tdir
+
+ for ((i=1, j=0; i<=10000; j=$i, i=$((i * 10)) )); do
+ createmany -o $DIR/$tdir/$tfile $j $((i - j))
+
+ grep '[0-9]' $LPROC/llite/*/statahead_max
+ cancel_lru_locks mdc
+ stime=`date +%s`
+ ls -l $DIR/$tdir > /dev/null
+ etime=`date +%s`
+ delta_sa=$((etime - stime))
+ log "ls $i files with statahead: $delta_sa sec"
+
+ for client in $LPROC/llite/*; do
+ max=`cat $client/statahead_max`
+ cat $client/statahead_stats
+ echo 0 > $client/statahead_max
+ done
+
+ grep '[0-9]' $LPROC/llite/*/statahead_max
+ cancel_lru_locks mdc
+ stime=`date +%s`
+ ls -l $DIR/$tdir > /dev/null
+ etime=`date +%s`
+ delta=$((etime - stime))
+ log "ls $i files without statahead: $delta sec"
+
+ for client in /proc/fs/lustre/llite/*; do
+ cat $client/statahead_stats
+ echo $max > $client/statahead_max
+ done
+
+ if [ $delta_sa -gt $delta ]; then
+ log "ls $i files is slower with statahead!"
+ fi
+ done
+ log "ls done"
+
+ stime=`date +%s`
+ rm -r $DIR/$tdir
+ sync
+ etime=`date +%s`
+ delta=$((etime - stime))
+ log "rm -r $DIR/$tdir/: $delta seconds"
+ log "rm done"
+ cat /proc/fs/lustre/llite/*/statahead_stats
+ # wait for commitment of removal
+ sleep 2
+}
+run_test 123 "verify statahead work"
+
TMPDIR=$OLDTMPDIR
TMP=$OLDTMP
HOME=$OLDHOME