Revert bug 11401 due to build breakage and test badness.
Details : Do not replay locks that are being cancelled. Do not reference
locks by their address during replay, just by their handle.
-Severity : enhancement
-Bugzilla : 11401
-Description: client-side metadata stat-ahead during readdir(directory readahead)
-Details : perform client-side metadata stat-ahead when the client detects
- readdir and sequential stat of dir entries therein
Severity : normal
Bugzilla : 11679
/* mds/mds_lov.c */
/* mdc/mdc_locks.c */
-struct md_enqueue_info;
-
int it_disposition(struct lookup_intent *it, int flag);
void it_set_disposition(struct lookup_intent *it, int flag);
void it_clear_disposition(struct lookup_intent *it, int flag);
void mdc_set_lock_data(__u64 *lockh, void *data);
int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
ldlm_iterator_t it, void *data);
-int mdc_revalidate_lock(struct obd_export *exp,
- struct lookup_intent *it,
- struct ll_fid *fid);
int mdc_intent_lock(struct obd_export *exp,
struct mdc_op_data *,
void *lmm, int lmmsize,
struct lookup_intent *it, struct mdc_op_data *data,
struct lustre_handle *lockh, void *lmm, int lmmlen,
int extra_lock_flags);
-int mdc_intent_getattr_async(struct obd_export *exp,
- struct md_enqueue_info *minfo,
- struct ldlm_enqueue_info *einfo);
/* mdc/mdc_request.c */
int mdc_init_ea_size(struct obd_export *mdc_exp, struct obd_export *lov_exp);
fid->f_type = type;
}
-static inline int it_to_lock_mode(struct lookup_intent *it)
-{
- /* CREAT needs to be tested before open (both could be set) */
- if (it->it_op & IT_CREAT)
- return LCK_CW;
- else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
- return LCK_CR;
-
- LBUG();
- return -EINVAL;
-}
-
/* ioctls for trying requests */
#define IOC_REQUEST_TYPE 'f'
#define IOC_REQUEST_MIN_NR 30
#define IOC_REQUEST_CLOSE _IOWR('f', 35, long)
#define IOC_REQUEST_MAX_NR 35
-/* metadata stat-ahead */
-typedef int (* md_enqueue_cb_t)(struct obd_export *exp,
- struct ptlrpc_request *req,
- struct md_enqueue_info *minfo,
- int rc);
-
-struct md_enqueue_info {
- struct obd_export *mi_exp;
- struct mdc_op_data mi_data;
- struct lookup_intent mi_it;
- struct lustre_handle mi_lockh;
- struct dentry *mi_dentry;
- md_enqueue_cb_t mi_cb;
- void *mi_cbdata;
-};
-
-struct mdc_enqueue_args {
- struct md_enqueue_info *ma_mi;
- struct ldlm_enqueue_info *ma_ei;
-};
-
#endif
static inline int have_expired_locks(void)
{
int need_to_run;
- ENTRY;
+ ENTRY;
spin_lock_bh(&waiting_locks_spinlock);
need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
spin_unlock_bh(&waiting_locks_spinlock);
MODULES := lustre
-lustre-objs := dcache.o dir.o file.o llite_close.o llite_lib.o llite_nfs.o rw.o lproc_llite.o namei.o symlink.o llite_mmap.o xattr.o statahead.o
+lustre-objs := dcache.o dir.o file.o llite_close.o llite_lib.o llite_nfs.o rw.o lproc_llite.o namei.o symlink.o llite_mmap.o xattr.o
ifeq ($(PATCHLEVEL),4)
lustre-objs += rw24.o super.o
int ll_revalidate_it(struct dentry *de, int lookup_flags,
struct lookup_intent *it)
{
+ int rc;
struct mdc_op_data op_data;
struct ptlrpc_request *req = NULL;
struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
struct obd_export *exp;
- int first = 0, rc;
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
}
}
- if (it->it_op == IT_GETATTR)
- first = ll_statahead_enter(de->d_parent->d_inode, &de, 0);
-
do_lock:
it->it_create_mode &= ~current->fs->umask;
rc = mdc_intent_lock(exp, &op_data, NULL, 0, it, lookup_flags,
&req, ll_mdc_blocking_ast, 0);
- if (it->it_op == IT_GETATTR && !first)
- ll_statahead_exit(de, rc);
/* If req is NULL, then mdc_intent_lock only tried to do a lock match;
* if all was well, it will return 1 if it found locks, 0 otherwise. */
if (req == NULL && rc >= 0) {
*/
#include <linux/fs.h>
+#include <linux/ext2_fs.h>
#include <linux/pagemap.h>
#include <linux/mm.h>
#include <linux/version.h>
#include <lustre_dlm.h>
#include "llite_internal.h"
+typedef struct ext2_dir_entry_2 ext2_dirent;
+
#ifdef HAVE_PG_FS_MISC
#define PageChecked(page) test_bit(PG_fs_misc, &(page)->flags)
#define SetPageChecked(page) set_bit(PG_fs_misc, &(page)->flags)
return inode->i_sb->s_blocksize;
}
+static inline void ext2_put_page(struct page *page)
+{
+ kunmap(page);
+ page_cache_release(page);
+}
+
+static inline unsigned long dir_pages(struct inode *inode)
+{
+ return (inode->i_size+CFS_PAGE_SIZE-1) >> CFS_PAGE_SHIFT;
+}
+
+
static void ext2_check_page(struct inode *dir, struct page *page)
{
unsigned chunk_size = ext2_chunk_size(dir);
SetPageError(page);
}
-struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
+static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
{
struct ldlm_res_id res_id =
{ .name = { dir->i_ino, (__u64)dir->i_generation} };
goto out_unlock;
}
+/*
+ * p is at least 6 bytes before the end of page
+ */
+static inline ext2_dirent *ext2_next_entry(ext2_dirent *p)
+{
+ return (ext2_dirent *)((char*)p + le16_to_cpu(p->rec_len));
+}
+
+static inline unsigned
+ext2_validate_entry(char *base, unsigned offset, unsigned mask)
+{
+ ext2_dirent *de = (ext2_dirent*)(base + offset);
+ ext2_dirent *p = (ext2_dirent*)(base + (offset&mask));
+ while ((char*)p < (char*)de)
+ p = ext2_next_entry(p);
+ return (char *)p - base;
+}
+
static unsigned char ext2_filetype_table[EXT2_FT_MAX] = {
[EXT2_FT_UNKNOWN] DT_UNKNOWN,
[EXT2_FT_REG_FILE] DT_REG,
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
inode->i_generation, inode);
- if (S_ISDIR(inode->i_mode))
- ll_stop_statahead(inode);
-
/* don't do anything for / */
if (inode->i_sb->s_root == file->f_dentry)
RETURN(0);
struct inode *inode = file->f_dentry->d_inode;
struct ptlrpc_request *req;
int rc;
- ENTRY;
if (!parent)
RETURN(-ENOENT);
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
inode->i_generation, inode, file->f_flags);
- if (S_ISDIR(inode->i_mode) && lli->lli_opendir_pid == 0)
- lli->lli_opendir_pid = current->pid;
-
/* don't do anything for / */
if (inode->i_sb->s_root == file->f_dentry)
RETURN(0);
#ifndef LLITE_INTERNAL_H
#define LLITE_INTERNAL_H
-#include <linux/ext2_fs.h>
#ifdef CONFIG_FS_POSIX_ACL
# include <linux/fs.h>
#ifdef HAVE_XATTR_ACL
#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
struct inode lli_vfs_inode;
#endif
-
- /* metadata stat-ahead */
- pid_t lli_opendir_pid;
- struct ll_statahead_info *lli_sai;
};
/*
enum stats_track_type ll_stats_track_type;
int ll_stats_track_id;
int ll_rw_stats_on;
+
dev_t ll_sdev_orig; /* save s_dev before assign for
* clustred nfs */
-
- /* metadata stat-ahead */
- unsigned int ll_sa_count; /* current statahead RPCs */
- unsigned int ll_sa_max; /* max statahead RPCs */
- unsigned int ll_sa_wrong; /* statahead thread stopped for
- * low hit ratio */
- unsigned int ll_sa_total; /* statahead thread started
- * count */
- unsigned long long ll_sa_blocked; /* ls count waiting for
- * statahead */
- unsigned long long ll_sa_cached; /* ls count got in cache */
};
#define LL_DEFAULT_MAX_RW_CHUNK (32 * 1024 * 1024)
}
struct it_cb_data {
- struct inode *icbd_parent;
+ struct inode *icbd_parent;
struct dentry **icbd_childp;
- obd_id hash;
+ obd_id hash;
};
void ll_i2gids(__u32 *suppgids, struct inode *i1,struct inode *i2);
extern struct file_operations ll_dir_operations;
extern struct inode_operations ll_dir_inode_operations;
-struct page *ll_get_dir_page(struct inode *dir, unsigned long n);
-/*
- * p is at least 6 bytes before the end of page
- */
-typedef struct ext2_dir_entry_2 ext2_dirent;
-
-static inline ext2_dirent *ext2_next_entry(ext2_dirent *p)
-{
- return (ext2_dirent *)((char*)p + le16_to_cpu(p->rec_len));
-}
-
-static inline unsigned
-ext2_validate_entry(char *base, unsigned offset, unsigned mask)
-{
- ext2_dirent *de = (ext2_dirent*)(base + offset);
- ext2_dirent *p = (ext2_dirent*)(base + (offset&mask));
- while ((char*)p < (char*)de)
- p = ext2_next_entry(p);
- return (char *)p - base;
-}
-
-static inline void ext2_put_page(struct page *page)
-{
- kunmap(page);
- page_cache_release(page);
-}
-
-static inline unsigned long dir_pages(struct inode *inode)
-{
- return (inode->i_size + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
-}
-
/* llite/namei.c */
int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir);
struct inode *ll_iget(struct super_block *sb, ino_t hash,
struct lookup_intent *ll_convert_intent(struct open_intent *oit,
int lookup_flags);
#endif
-int lookup_it_finish(struct ptlrpc_request *request, int offset,
- struct lookup_intent *it, void *data);
-void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry);
/* llite/rw.c */
int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to);
ssize_t ll_listxattr(struct dentry *dentry, char *buffer, size_t size);
int ll_removexattr(struct dentry *dentry, const char *name);
-/* statahead.c */
-
-#define LL_STATAHEAD_MIN 1
-#define LL_STATAHEAD_DEF 32
-#define LL_STATAHEAD_MAX 10000
-
-/* per inode struct, for dir only */
-struct ll_statahead_info {
- struct inode *sai_inode;
- atomic_t sai_refc; /* when access this struct, hold
- * refcount */
- unsigned int sai_max; /* max ahead of lookup */
- unsigned int sai_sent; /* stat requests sent count */
- unsigned int sai_replied; /* stat requests which received
- * reply */
- unsigned int sai_cached; /* UPDATE lock cached locally
- * already */
- unsigned int sai_hit; /* hit count */
- unsigned int sai_miss; /* miss count */
- unsigned int sai_consecutive_miss; /* consecutive miss */
- unsigned sai_ls_all:1; /* ls -al, do stat-ahead for
- * hidden entries */
- struct ptlrpc_thread sai_thread; /* stat-ahead thread */
- struct list_head sai_entries; /* stat-ahead entries */
- unsigned int sai_entries_nr; /* stat-ahead entries count */
-};
-
-int ll_statahead_enter(struct inode *dir, struct dentry **dentry, int lookup);
-void ll_statahead_exit(struct dentry *dentry, int result);
-void ll_stop_statahead(struct inode *inode);
-
#endif /* LLITE_INTERNAL_H */
spin_lock_init(&sbi->ll_rw_extents_info.pp_extents[i].pp_w_hist.oh_lock);
}
- /* metadata statahead is enabled by default */
- sbi->ll_sa_max = LL_STATAHEAD_DEF;
-
RETURN(sbi);
}
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
inode->i_generation, inode);
- if (S_ISDIR(inode->i_mode)) {
- /* these should have been cleared in ll_file_release */
- LASSERT(lli->lli_sai == NULL);
- LASSERT(lli->lli_opendir_pid == 0);
- }
-
ll_inode2fid(&fid, inode);
clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags);
mdc_change_cbdata(sbi->ll_mdc_exp, &fid, null_if_equal, inode);
return (ll_wr_track_id(buffer, count, data, STATS_TRACK_GID));
}
-static int ll_rd_statahead_count(char *page, char **start, off_t off,
- int count, int *eof, void *data)
-{
- struct super_block *sb = data;
- struct ll_sb_info *sbi = ll_s2sbi(sb);
-
- return snprintf(page, count, "%u\n", sbi->ll_sa_count);
-}
-
-static int ll_rd_statahead_max(char *page, char **start, off_t off,
- int count, int *eof, void *data)
-{
- struct super_block *sb = data;
- struct ll_sb_info *sbi = ll_s2sbi(sb);
-
- return snprintf(page, count, "%u\n", sbi->ll_sa_max);
-}
-
-static int ll_wr_statahead_max(struct file *file, const char *buffer,
- unsigned long count, void *data)
-{
- struct super_block *sb = data;
- struct ll_sb_info *sbi = ll_s2sbi(sb);
- int val, rc;
-
- rc = lprocfs_write_helper(buffer, count, &val);
- if (rc)
- return rc;
- if (val >= 0 && val <= LL_STATAHEAD_MAX)
- sbi->ll_sa_max = val;
- else
- CERROR("Bad statahead_max value %d. Valid values are in the "
- "range [0, %d]\n", val, LL_STATAHEAD_MAX);
-
- return count;
-}
-
-static int ll_rd_statahead_stats(char *page, char **start, off_t off,
- int count, int *eof, void *data)
-{
- struct super_block *sb = data;
- struct ll_sb_info *sbi = ll_s2sbi(sb);
-
- return snprintf(page, count,
- "statahead wrong: %u\n"
- "statahead total: %u\n"
- "ls blocked: %llu\n"
- "ls total: %llu\n",
- sbi->ll_sa_wrong, sbi->ll_sa_total,
- sbi->ll_sa_blocked,
- sbi->ll_sa_blocked + sbi->ll_sa_cached);
-}
-
static struct lprocfs_vars lprocfs_obd_vars[] = {
{ "uuid", ll_rd_sb_uuid, 0, 0 },
//{ "mntpt_path", ll_rd_path, 0, 0 },
{ "stats_track_pid", ll_rd_track_pid, ll_wr_track_pid, 0 },
{ "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 },
{ "stats_track_gid", ll_rd_track_gid, ll_wr_track_gid, 0 },
- { "statahead_count", ll_rd_statahead_count, 0, 0 },
- { "statahead_max", ll_rd_statahead_max, ll_wr_statahead_max, 0 },
- { "statahead_stats", ll_rd_statahead_stats, 0, 0 },
{ 0 }
};
* in ll_revalidate_it. After revaliadate inode will be have hashed aliases
* and it triggers BUG_ON in d_instantiate_unique (bug #10954).
*/
-static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
+struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
{
struct list_head *tmp;
struct dentry *dentry;
return de;
}
-int lookup_it_finish(struct ptlrpc_request *request, int offset,
- struct lookup_intent *it, void *data)
+static int lookup_it_finish(struct ptlrpc_request *request, int offset,
+ struct lookup_intent *it, void *data)
{
struct it_cb_data *icbd = data;
struct dentry **de = icbd->icbd_childp;
RETURN(ERR_PTR(rc));
}
- if (it->it_op == IT_GETATTR) {
- rc = ll_statahead_enter(parent, &dentry, 1);
- if (rc >= 0) {
- ll_statahead_exit(dentry, rc);
- if (rc == 1)
- RETURN(retval = dentry);
- }
- }
-
- icbd.icbd_parent = parent;
icbd.icbd_childp = &dentry;
+ icbd.icbd_parent = parent;
rc = ll_prepare_mdc_op_data(&op_data, parent, NULL, dentry->d_name.name,
dentry->d_name.len, lookup_flags, NULL);
it->it_create_mode &= ~current->fs->umask;
- up(&parent->i_sem);
rc = mdc_intent_lock(ll_i2mdcexp(parent), &op_data, NULL, 0, it,
lookup_flags, &req, ll_mdc_blocking_ast, 0);
- down(&parent->i_sem);
+
if (rc < 0)
GOTO(out, retval = ERR_PTR(rc));
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * Copyright (c) 2007 Cluster File Systems, Inc.
- *
- * This file is part of Lustre, http://www.lustre.org.
- *
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
- *
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#include <linux/fs.h>
-#include <linux/sched.h>
-#include <linux/mm.h>
-#include <linux/smp_lock.h>
-#include <linux/highmem.h>
-#include <linux/pagemap.h>
-
-#define DEBUG_SUBSYSTEM S_LLITE
-
-#include <obd_support.h>
-#include <lustre_lite.h>
-#include <lustre_dlm.h>
-#include <linux/lustre_version.h>
-#include "llite_internal.h"
-
-struct ll_sai_entry {
- struct list_head se_list;
- int se_index;
- int se_stat;
-};
-
-enum {
- SA_ENTRY_UNSTATED = 0,
- SA_ENTRY_STATED
-};
-
-static struct ll_statahead_info *ll_sai_alloc(void)
-{
- struct ll_statahead_info *sai;
-
- OBD_ALLOC_PTR(sai);
- if (!sai)
- return NULL;
-
- sai->sai_max = LL_STATAHEAD_MIN;
- init_waitqueue_head(&sai->sai_thread.t_ctl_waitq);
- INIT_LIST_HEAD(&sai->sai_entries);
- atomic_set(&sai->sai_refc, 1);
- return sai;
-}
-
-static inline struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
-{
- LASSERT(sai);
- atomic_inc(&sai->sai_refc);
- return sai;
-}
-
-static void ll_sai_put(struct ll_statahead_info *sai)
-{
- struct inode *inode = sai->sai_inode;
- struct ll_inode_info *lli = ll_i2info(inode);
-
- if (atomic_dec_and_lock(&sai->sai_refc, &lli->lli_lock)) {
- struct ll_sai_entry *entry, *next;
-
- LASSERT(sai->sai_thread.t_flags & SVC_STOPPED);
- list_for_each_entry_safe(entry, next, &sai->sai_entries,
- se_list) {
- list_del(&entry->se_list);
- OBD_FREE_PTR(entry);
- }
- OBD_FREE_PTR(sai);
- lli->lli_sai = NULL;
- spin_unlock(&lli->lli_lock);
- iput(inode);
- }
-}
-
-static struct ll_sai_entry *ll_sai_entry_get(struct ll_statahead_info *sai,
- int index, int stat)
-{
- struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
- struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode);
- struct ll_sai_entry *entry;
-
- OBD_ALLOC_PTR(entry);
- if (entry == NULL)
- return NULL;
-
- CDEBUG(D_READA, "sai entry %p index %d, stat %d\n", entry, index, stat);
- entry->se_index = index;
- entry->se_stat = stat;
-
- spin_lock(&lli->lli_lock);
- list_add_tail(&entry->se_list, &sai->sai_entries);
- sai->sai_entries_nr++;
- sbi->ll_sa_count = sai->sai_entries_nr;
- spin_unlock(&lli->lli_lock);
-
- LASSERT(sai->sai_entries_nr <= sbi->ll_sa_max);
- return entry;
-}
-
-static void ll_sai_entry_set(struct ll_statahead_info *sai, int index,
- int stat)
-{
- struct ll_sai_entry *entry;
-
- list_for_each_entry(entry, &sai->sai_entries, se_list) {
- if (entry->se_index == index) {
- LASSERT(entry->se_stat == SA_ENTRY_UNSTATED);
- entry->se_stat = stat;
- CDEBUG(D_READA, "set sai entry %p index %d stat %d\n",
- entry, index, stat);
- return;
- }
- }
- CERROR("can't find sai entry index %d\n", index);
- LBUG();
-}
-
-/* check first entry was stated already */
-static int ll_sai_entry_stated(struct ll_statahead_info *sai)
-{
- struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
- struct ll_sai_entry *entry;
- int rc = 0;
-
- spin_lock(&lli->lli_lock);
- if (!list_empty(&sai->sai_entries)) {
- entry = list_entry(sai->sai_entries.next, struct ll_sai_entry,
- se_list);
- CDEBUG(D_READA, "sai entry %p index %d stat %d\n",
- entry, entry->se_index, entry->se_stat);
- rc = (entry->se_stat != SA_ENTRY_UNSTATED);
- }
- spin_unlock(&lli->lli_lock);
-
- return rc;
-}
-
-/* inside lli_lock */
-static void ll_sai_entry_put(struct ll_statahead_info *sai)
-{
- struct ll_sai_entry *entry;
-
- LASSERT(!list_empty(&sai->sai_entries));
- LASSERT(sai->sai_entries_nr > 0);
-
- entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, se_list);
- list_del(&entry->se_list);
- sai->sai_entries_nr--;
-
- CDEBUG(D_READA, "free sa entry %p index %d stat %d\n",
- entry, entry->se_index, entry->se_stat);
- OBD_FREE_PTR(entry);
-}
-
-/* finish lookup/revalidate */
-static int ll_statahead_interpret(struct obd_export *exp,
- struct ptlrpc_request *req,
- struct md_enqueue_info *minfo,
- int rc)
-{
- struct lookup_intent *it = &minfo->mi_it;
- struct dentry *dentry = minfo->mi_dentry;
- struct inode *dir = dentry->d_parent->d_inode;
- struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_statahead_info *sai;
- ENTRY;
-
- CDEBUG(D_READA, "statahead %.*s rc %d\n",
- dentry->d_name.len, dentry->d_name.name, rc);
- if (rc)
- GOTO(out, rc);
-
- if (dentry->d_inode == NULL) {
- /* lookup */
- struct dentry *save = dentry;
- struct it_cb_data icbd = {
- .icbd_parent = dir,
- .icbd_childp = &dentry
- };
-
- down(&dir->i_sem);
- rc = lookup_it_finish(req, DLM_REPLY_REC_OFF, it, &icbd);
- if (!rc) {
- LASSERT(dentry->d_inode);
- if (dentry != save)
- dput(save);
- ll_lookup_finish_locks(it, dentry);
- }
- up(&dir->i_sem);
- } else {
- /* revalidate */
- struct mds_body *body;
-
- body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
- sizeof(*body));
- if (memcmp(&minfo->mi_data.fid2, &body->fid1,
- sizeof(body->fid1))) {
- ll_unhash_aliases(dentry->d_inode);
- GOTO(out, rc = -EAGAIN);
- }
-
- rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, dentry);
- if (rc) {
- ll_unhash_aliases(dentry->d_inode);
- GOTO(out, rc);
- }
-
- spin_lock(&dcache_lock);
- lock_dentry(dentry);
- __d_drop(dentry);
- dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
- unlock_dentry(dentry);
- __d_rehash(dentry, 0);
- spin_unlock(&dcache_lock);
-
- ll_lookup_finish_locks(it, dentry);
-
- }
- EXIT;
-out:
- spin_lock(&lli->lli_lock);
- sai = lli->lli_sai;
- if (sai) {
- lli->lli_sai->sai_replied++;
- ll_sai_entry_set(lli->lli_sai, (int)minfo->mi_cbdata,
- SA_ENTRY_STATED);
- wake_up(&lli->lli_sai->sai_thread.t_ctl_waitq);
- }
- spin_unlock(&lli->lli_lock);
- ll_intent_release(it);
- OBD_FREE_PTR(minfo);
-
- dput(dentry);
- return rc;
-}
-
-static void sa_args_fini(struct md_enqueue_info *minfo,
- struct ldlm_enqueue_info *einfo)
-{
- LASSERT(minfo && einfo);
- OBD_FREE_PTR(minfo);
- OBD_FREE_PTR(einfo);
-}
-
-static int sa_args_prep(struct inode *dir, struct dentry *dentry,
- struct md_enqueue_info **pmi,
- struct ldlm_enqueue_info **pei)
-{
- struct ll_inode_info *lli = ll_i2info(dir);
- struct md_enqueue_info *minfo;
- struct ldlm_enqueue_info *einfo;
-
- OBD_ALLOC_PTR(einfo);
- if (einfo == NULL)
- return -ENOMEM;
-
- OBD_ALLOC_PTR(minfo);
- if (minfo == NULL) {
- OBD_FREE_PTR(einfo);
- return -ENOMEM;
- }
-
- minfo->mi_exp = ll_i2mdcexp(dir);
- intent_init(&minfo->mi_it, IT_GETATTR);
- minfo->mi_dentry = dentry;
- minfo->mi_cb = ll_statahead_interpret;
- minfo->mi_cbdata = (void *)lli->lli_sai->sai_sent;
-
- einfo->ei_type = LDLM_IBITS;
- einfo->ei_mode = it_to_lock_mode(&minfo->mi_it);
- einfo->ei_cb_bl = ll_mdc_blocking_ast;
- einfo->ei_cb_cp = ldlm_completion_ast;
- einfo->ei_cb_gl = NULL;
- einfo->ei_cbdata = NULL;
-
- *pmi = minfo;
- *pei = einfo;
-
- return 0;
-}
-
-/* similar to ll_lookup_it(). */
-static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
-{
- struct md_enqueue_info *minfo;
- struct ldlm_enqueue_info *einfo;
- int rc;
- ENTRY;
-
- rc = sa_args_prep(dir, dentry, &minfo, &einfo);
- if (rc)
- RETURN(rc);
-
- rc = ll_prepare_mdc_op_data(&minfo->mi_data, dir, NULL,
- dentry->d_name.name, dentry->d_name.len, 0,
- NULL);
- if (rc == 0)
- rc = mdc_intent_getattr_async(minfo->mi_exp, minfo, einfo);
-
- if (rc)
- sa_args_fini(minfo, einfo);
-
- RETURN(rc);
-}
-
-/* similar to ll_revalidate_it().
- * return 1: dentry valid.
- * 0: will send stat-ahead request.
- * -errno: prepare stat-ahead request failed. */
-static int do_sa_revalidate(struct dentry *dentry)
-{
- struct inode *inode = dentry->d_inode;
- struct ll_inode_info *lli = ll_i2info(dentry->d_parent->d_inode);
- struct ll_fid fid;
- struct lookup_intent it;
- struct md_enqueue_info *minfo;
- struct ldlm_enqueue_info *einfo;
- int rc;
- ENTRY;
-
- if (inode == NULL)
- RETURN(1);
-
- if (d_mountpoint(dentry))
- RETURN(1);
-
- ll_inode2fid(&fid, inode);
-
- intent_init(&it, IT_GETATTR);
- rc = mdc_revalidate_lock(ll_i2mdcexp(inode), &it, &fid);
- if (rc == 1) {
- ll_intent_release(&it);
- lli->lli_sai->sai_cached++;
- wake_up(&lli->lli_sai->sai_thread.t_ctl_waitq);
- RETURN(1);
- }
-
- rc = sa_args_prep(dentry->d_parent->d_inode, dentry, &minfo, &einfo);
- if (rc)
- RETURN(rc);
-
- rc = ll_prepare_mdc_op_data(&minfo->mi_data, dentry->d_parent->d_inode,
- inode, dentry->d_name.name,
- dentry->d_name.len, 0, NULL);
- if (rc == 0)
- rc = mdc_intent_getattr_async(minfo->mi_exp, minfo, einfo);
-
- if (rc)
- sa_args_fini(minfo, einfo);
-
- RETURN(rc);
-}
-
-/* copied from kernel */
-static inline void name2qstr(struct qstr *this, const char *name, int namelen)
-{
- unsigned long hash;
- const unsigned char *p = (const unsigned char *)name;
- int len;
- unsigned int c;
-
- hash = init_name_hash();
- for (len = 0; len < namelen; len++, p++) {
- c = *p;
- hash = partial_name_hash(c, hash);
- }
- this->name = name;
- this->len = namelen;
- this->hash = end_name_hash(hash);
-}
-
-static int ll_statahead_one(struct dentry *parent, ext2_dirent *de)
-{
- struct inode *dir = parent->d_inode;
- struct ll_inode_info *lli = ll_i2info(dir);
- struct qstr name;
- struct dentry *dentry;
- struct ll_sai_entry *se;
- int rc;
- ENTRY;
-
- name2qstr(&name, de->name, de->name_len);
-
- se = ll_sai_entry_get(lli->lli_sai, lli->lli_sai->sai_sent,
- SA_ENTRY_UNSTATED);
-
- down(&dir->i_sem);
- if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
- CDEBUG(D_READA, "parent dentry@%p %.*s is "
- "DCACHE_LUSTRE_INVALID, skip statahead\n",
- parent, parent->d_name.len, parent->d_name.name);
- up(&dir->i_sem);
- GOTO(out, rc = -EINVAL);
- }
-
- dentry = d_lookup(parent, &name);
- if (!dentry) {
- struct dentry *dentry = d_alloc(parent, &name);
-
- up(&dir->i_sem);
- rc = -ENOMEM;
- if (dentry) {
- rc = do_sa_lookup(dir, dentry);
- if (rc)
- dput(dentry);
- }
- GOTO(out, rc);
- }
- up(&dir->i_sem);
-
- rc = do_sa_revalidate(dentry);
- if (rc)
- dput(dentry);
- GOTO(out, rc);
-out:
- if (rc) {
- CDEBUG(D_READA, "set sai entry %p index %d stat %d\n",
- se, se->se_index, se->se_stat);
- se->se_stat = rc;
- wake_up(&lli->lli_sai->sai_thread.t_ctl_waitq);
- }
- lli->lli_sai->sai_sent++;
- return rc;
-}
-
-static inline int sa_check_stop(struct ll_statahead_info *sai)
-{
- return !!(sai->sai_thread.t_flags & SVC_STOPPING);
-}
-
-static inline int sa_not_full(struct ll_statahead_info *sai)
-{
- return sai->sai_sent - sai->sai_miss - sai->sai_hit < sai->sai_max;
-}
-
-struct ll_sa_thread_args {
- struct dentry *sta_parent;
- pid_t sta_pid;
-};
-
-static int ll_statahead_thread(void *arg)
-{
- struct ll_sa_thread_args *sta = arg;
- struct dentry *parent = dget(sta->sta_parent);
- struct inode *dir = parent->d_inode;
- struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_sb_info *sbi = ll_i2sbi(dir);
- struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
- struct ptlrpc_thread *thread = &sai->sai_thread;
- struct l_wait_info lwi = { 0 };
- unsigned long index = 0;
- __u64 offset = 0;
- int skip = 0;
- int rc = 0;
- char name[16] = "";
- ENTRY;
-
- sbi->ll_sa_total++;
-
- snprintf(name, 15, "ll_sa_%u", sta->sta_pid);
- cfs_daemonize(name);
- thread->t_flags = SVC_RUNNING;
- wake_up(&thread->t_ctl_waitq);
- CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
-
- if (sai->sai_ls_all)
- CDEBUG(D_READA, "do statahead for hidden files\n");
-
- while (1) {
- unsigned long npages = dir_pages(dir);
-
- /* hit ratio < 80% */
- if ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) ||
- (sai->sai_consecutive_miss > 8)) {
- sbi->ll_sa_wrong++;
- CDEBUG(D_READA, "statahead for dir %.*s hit ratio too "
- "low: hit/miss %u/%u, sent/replied %u/%u, "
- "cached %u\n",
- parent->d_name.len, parent->d_name.name,
- sai->sai_hit, sai->sai_miss, sai->sai_sent,
- sai->sai_replied, sai->sai_cached);
- break;
- }
-
- /* reach the end of dir */
- if (index == npages) {
- CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n",
- index, npages);
- break;
- }
-
- l_wait_event(thread->t_ctl_waitq,
- sa_check_stop(sai) || sa_not_full(sai),
- &lwi);
-
- if (sa_check_stop(sai))
- break;
-
- for (; index < npages; index++, offset = 0) {
- char *kaddr, *limit;
- ext2_dirent *de;
- struct page *page;
-
- CDEBUG(D_EXT2,"read %lu of dir %lu/%u page %lu"
- "/%lu size %llu\n",
- CFS_PAGE_SIZE, dir->i_ino, dir->i_generation,
- index, npages, dir->i_size);
-
- page = ll_get_dir_page(dir, index);
- npages = dir_pages(dir);
-
- if (IS_ERR(page)) {
- rc = PTR_ERR(page);
- CERROR("error reading dir %lu/%u page %lu: "
- "rc %d\n",
- dir->i_ino, dir->i_generation, index,
- rc);
- GOTO(out, rc);
- }
-
- kaddr = page_address(page);
- de = (ext2_dirent *)(kaddr + offset);
- limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1);
- for (; (char*)de <= limit && sa_not_full(sai);
- de = ext2_next_entry(de)) {
- if (!de->inode)
- continue;
-
- /* don't stat-ahead ".", ".." */
- if (skip < 2) {
- skip++;
- continue;
- }
-
- /* don't stat-ahead for hidden files */
- if (de->name[0] == '.' && !sai->sai_ls_all)
- continue;
-
- /* don't stat-ahead for the first de */
- if (skip < 3) {
- skip++;
- continue;
- }
-
- rc = ll_statahead_one(parent, de);
- if (rc < 0) {
- ext2_put_page(page);
- GOTO(out, rc);
- }
- }
- offset = (char *)de - kaddr;
- ext2_put_page(page);
-
- if ((char *)de <= limit)
- /* !sa_not_full() */
- break;
- }
- }
- EXIT;
-out:
- thread->t_flags = SVC_STOPPED;
- wake_up(&thread->t_ctl_waitq);
- lli->lli_opendir_pid = 0; /* avoid statahead again */
- ll_sai_put(sai);
- dput(parent);
- return 0;
-}
-
-/* called in ll_file_release */
-void ll_stop_statahead(struct inode *inode)
-{
- struct ll_inode_info *lli = ll_i2info(inode);
- struct ptlrpc_thread *thread;
-
- /* don't check pid here. upon fork, if parent closedir before child,
- * child will not have chance to stop this thread. */
- lli->lli_opendir_pid = 0;
-
- spin_lock(&lli->lli_lock);
- if (lli->lli_sai) {
- ll_sai_get(lli->lli_sai);
- spin_unlock(&lli->lli_lock);
-
- thread = &lli->lli_sai->sai_thread;
- thread->t_flags = SVC_STOPPING;
- wake_up(&thread->t_ctl_waitq);
- wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
- ll_sai_put(lli->lli_sai);
-
- CDEBUG(D_READA, "stop statahead thread, pid %d\n",
- current->pid);
- return;
- }
- spin_unlock(&lli->lli_lock);
-}
-
-enum {
- LS_NONE_FIRST_DE = 0, /* not first dirent, or is "." */
- LS_FIRST_DE, /* the first non-hidden dirent */
- LS_FIRST_DOT_DE /* the first hidden dirent, that is ".xxx" */
-};
-
-static int is_first_dirent(struct inode *dir, struct dentry *dentry)
-{
- struct qstr *d_name = &dentry->d_name;
- unsigned long npages = dir_pages(dir);
- struct page *page;
- ext2_dirent *de;
- unsigned long index;
- __u64 offset = 0;
- char *kaddr, *limit;
- int dot_de = 1; /* dirent is dotfile till now */
- int rc = LS_NONE_FIRST_DE;
- ENTRY;
-
- page = ll_get_dir_page(dir, 0);
- if (IS_ERR(page)) {
- CERROR("error reading dir %lu/%u page 0: rc %ld\n",
- dir->i_ino, dir->i_generation, PTR_ERR(page));
- RETURN(LS_NONE_FIRST_DE);
- }
-
- kaddr = page_address(page);
- de = (ext2_dirent *)kaddr;
- if (!(de->name_len == 1 && strncmp(de->name, ".", 1) == 0))
- CWARN("Maybe got bad on-disk dir:%lu\n", dir->i_ino);
- de = ext2_next_entry(de); /* skip ".", or ingore bad entry */
- if (!(de->name_len == 2 && strncmp(de->name, "..", 2) == 0))
- CWARN("Maybe got bad on-disk dir:%lu\n", dir->i_ino);
- de = ext2_next_entry(de); /* skip "..", or ingore bad entry */
-
- offset = (char *)de - kaddr;
-
- for (index = 0; index < npages; offset = 0) {
- de = (ext2_dirent *)(kaddr + offset);
- limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1);
- for (; (char*)de <= limit; de = ext2_next_entry(de)) {
- if (!de->inode)
- continue;
-
- if (de->name[0] != '.')
- dot_de = 0;
-
- if (dot_de && d_name->name[0] != '.') {
- CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
- d_name->len, d_name->name,
- de->name_len, de->name);
- continue;
- }
-
- if (d_name->len == de->name_len &&
- !strncmp(d_name->name, de->name, d_name->len))
- rc = LS_FIRST_DE + dot_de;
- else
- rc = LS_NONE_FIRST_DE;
- GOTO(out, rc);
- }
-
- if (++index >= npages)
- break;
-
- ext2_put_page(page);
-
- page = ll_get_dir_page(dir, index);
- if (IS_ERR(page)) {
- CERROR("error reading dir %lu/%u page %lu: rc %ld\n",
- dir->i_ino, dir->i_generation, index,
- PTR_ERR(page));
- RETURN(LS_NONE_FIRST_DE);
- }
- kaddr = page_address(page);
- }
- CERROR("%.*s not found in dir %.*s!\n", d_name->len, d_name->name,
- dentry->d_parent->d_name.len, dentry->d_parent->d_name.name);
- EXIT;
-out:
- ext2_put_page(page);
- return rc;
-}
-
-/* start stat-ahead thread if this is the first dir entry, otherwise if a thread
- * is started already, wait until thread is ahead of me.
- * Return value:
- * 0 -- miss,
- * 1 -- hit,
- * -EEXIST -- stat ahead thread started, and this is the first try.
- * other negative value -- error.
- */
-int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
-{
- struct ll_sb_info *sbi = ll_i2sbi(dir);
- struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_statahead_info *sai;
- struct ll_sa_thread_args sta;
- int rc;
- ENTRY;
-
- if (sbi->ll_sa_max == 0)
- RETURN(-ENOTSUPP);
-
- /* not the same process, don't statahead */
- if (lli->lli_opendir_pid != current->pid)
- RETURN(-EBADF);
-
- spin_lock(&lli->lli_lock);
- if (lli->lli_sai) {
- sai = ll_sai_get(lli->lli_sai);
- spin_unlock(&lli->lli_lock);
-
- if (ll_sai_entry_stated(sai)) {
- sbi->ll_sa_cached++;
- } else {
- struct l_wait_info lwi = { 0 };
-
- sbi->ll_sa_blocked++;
- up(&dir->i_sem);
- /* thread started already, avoid double-stat */
- l_wait_event(sai->sai_thread.t_ctl_waitq,
- ll_sai_entry_stated(sai) ||
- sai->sai_thread.t_flags & SVC_STOPPED,
- &lwi);
- down(&dir->i_sem);
- }
-
- ll_sai_put(sai);
-
- if (lookup) {
- struct dentry *result;
-
- result = d_lookup((*dentryp)->d_parent,
- &(*dentryp)->d_name);
- if (result) {
- LASSERT(result != *dentryp);
- dput(*dentryp);
- *dentryp = result;
- }
- RETURN(result != NULL);
- }
- /* do nothing for revalidate */
- RETURN(0);
- }
- spin_unlock(&lli->lli_lock);
-
- rc = is_first_dirent(dir, *dentryp);
- if (!rc) {
- /* optimization: don't statahead for this pid any longer */
- spin_lock(&lli->lli_lock);
- if (lli->lli_sai == NULL)
- lli->lli_opendir_pid = 0;
- spin_unlock(&lli->lli_lock);
- RETURN(-EBADF);
- }
-
- spin_lock(&lli->lli_lock);
- if (lli->lli_sai == NULL) {
- lli->lli_sai = ll_sai_alloc();
- if (lli->lli_sai == NULL) {
- spin_unlock(&lli->lli_lock);
- RETURN(-ENOMEM);
- }
- } else {
- /* sai is already there */
- spin_unlock(&lli->lli_lock);
- RETURN(-EBUSY);
- }
- spin_unlock(&lli->lli_lock);
-
- sai = lli->lli_sai;
- sai->sai_inode = igrab(dir);
- sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
-
- sta.sta_parent = (*dentryp)->d_parent;
- sta.sta_pid = current->pid;
- rc = kernel_thread(ll_statahead_thread, &sta, 0);
- if (rc < 0) {
- CERROR("can't start ll_sa thread, rc: %d\n", rc);
- ll_sai_put(sai);
- RETURN(rc);
- }
-
- wait_event(sai->sai_thread.t_ctl_waitq,
- sai->sai_thread.t_flags & (SVC_RUNNING | SVC_STOPPED));
- ll_sai_put(sai);
-
- /* we don't stat-ahead for the first dirent since we are already in
- * lookup, and -EEXIST also indicates that this is the first dirent.
- */
- RETURN(-EEXIST);
-}
-
-/* update hit/miss count */
-void ll_statahead_exit(struct dentry *dentry, int result)
-{
- struct ll_inode_info *lli = ll_i2info(dentry->d_parent->d_inode);
- struct ll_sb_info *sbi = ll_i2sbi(dentry->d_parent->d_inode);
-
- if (lli->lli_opendir_pid != current->pid)
- return;
-
- spin_lock(&lli->lli_lock);
- if (lli->lli_sai) {
- struct ll_statahead_info *sai = lli->lli_sai;
-
- ll_sai_entry_put(sai);
- if (result == 1) {
- sai->sai_hit++;
- sai->sai_consecutive_miss = 0;
- sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
- CDEBUG(D_READA, "statahead %.*s hit (hit/miss %u/%u)\n",
- dentry->d_name.len, dentry->d_name.name,
- sai->sai_hit, sai->sai_miss);
- } else {
- sai->sai_miss++;
- sai->sai_consecutive_miss++;
- /* upon miss, it's always because some dentry is added
- * by statahead thread, and at the mean time `ls`
- * processs finds this dentry, but the d_op for this
- * dentry is NULL, then revalidate is not done, and
- * ll_statahead_exit() not called for this dentry,
- * so statahead thread should be behind of `ls` process,
- * put one entry to go ahead.
- */
- ll_sai_entry_put(sai);
- CDEBUG(D_READA, "statahead %.*s miss (hit/miss %u/%u)\n",
- dentry->d_name.len, dentry->d_name.name,
- sai->sai_hit, sai->sai_miss);
- }
- wake_up(&sai->sai_thread.t_ctl_waitq);
- }
- spin_unlock(&lli->lli_lock);
-}
posix_acl_release(acl);
RETURN(rc);
}
- if (xattr_type == XATTR_ACL_DEFAULT_T && !S_ISDIR(inode->i_mode))
- RETURN(-ENODATA);
#endif
do_getxattr:
char *tmp;
tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1,
data->namelen + 1);
- memcpy(tmp, data->name, data->namelen);
- data->name = tmp;
+ LOGL0(data->name, data->namelen, tmp);
}
}
spin_lock(&cli->cl_loi_list_lock);
cli->cl_r_in_flight--;
-
list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
+
if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
/* No free request slots anymore */
break;
}
EXPORT_SYMBOL(it_clear_disposition);
+static int it_to_lock_mode(struct lookup_intent *it)
+{
+ /* CREAT needs to be tested before open (both could be set) */
+ if (it->it_op & IT_CREAT)
+ return LCK_CW;
+ else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
+ return LCK_CR;
+
+ LBUG();
+ RETURN(-EINVAL);
+}
+
int it_open_error(int phase, struct lookup_intent *it)
{
if (it_disposition(it, DISP_OPEN_OPEN)) {
* but this is incredibly unlikely, and questionable whether the client
* could do MDS recovery under OOM anyways... */
static void mdc_realloc_openmsg(struct ptlrpc_request *req,
- struct mds_body *body)
+ struct mds_body *body, int size[6])
{
- int old_len, new_size, old_size;
- struct lustre_msg *old_msg = req->rq_reqmsg;
+ int new_size, old_size;
struct lustre_msg *new_msg;
- old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + 2);
/* save old size */
- old_size = lustre_msg_size(lustre_request_magic(req),
- req->rq_reqmsg->lm_bufcount,
- req->rq_reqmsg->lm_buflens);
-
- lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2,
- body->eadatasize);
- new_size = lustre_msg_size(lustre_request_magic(req),
- req->rq_reqmsg->lm_bufcount,
- req->rq_reqmsg->lm_buflens);
+ old_size = lustre_msg_size(lustre_request_magic(req), 6, size);
+
+ size[DLM_INTENT_REC_OFF + 2] = body->eadatasize;
+ new_size = lustre_msg_size(lustre_request_magic(req), 6, size);
OBD_ALLOC(new_msg, new_size);
if (new_msg != NULL) {
- DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n",
+ struct lustre_msg *old_msg = req->rq_reqmsg;
+
+ DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
body->eadatasize);
memcpy(new_msg, old_msg, old_size);
+ lustre_msg_set_buflen(new_msg, DLM_INTENT_REC_OFF + 2,
+ body->eadatasize);
spin_lock(&req->rq_lock);
req->rq_reqmsg = new_msg;
OBD_FREE(old_msg, old_size);
} else {
- lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2, old_len);
body->valid &= ~OBD_MD_FLEASIZE;
body->eadatasize = 0;
}
}
-static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
- struct lookup_intent *it,
- struct mdc_op_data *data,
- void *lmm, int lmmsize)
+/* We always reserve enough space in the reply packet for a stripe MD, because
+ * we don't know in advance the file type. */
+int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+ struct lookup_intent *it, struct mdc_op_data *op_data,
+ struct lustre_handle *lockh, void *lmm, int lmmsize,
+ int extra_lock_flags)
{
struct ptlrpc_request *req;
- struct ldlm_intent *lit;
struct obd_device *obddev = class_exp2obd(exp);
+ struct ldlm_res_id res_id =
+ { .name = {op_data->fid1.id, op_data->fid1.generation} };
+ ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
+ struct ldlm_request *lockreq;
+ struct ldlm_intent *lit;
+ struct ldlm_reply *lockrep;
int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
- [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
+ [DLM_LOCKREQ_OFF] = sizeof(*lockreq),
[DLM_INTENT_IT_OFF] = sizeof(*lit),
- [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create),
- [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
- /* As an optimization, we allocate an RPC request buffer
- * for at least a default-sized LOV EA even if we aren't
- * sending one. We grow the whole request to the next
- * power-of-two size since we get that much from a slab
- * allocation anyways. This avoids an allocation below
- * in the common case where we need to save a
- * default-sized LOV EA for open replay. */
- [DLM_INTENT_REC_OFF+2]= max(lmmsize,
- obddev->u.cli.cl_default_mds_easize) };
+ 0, 0, 0, 0 };
int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
- [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
+ [DLM_LOCKREPLY_OFF] = sizeof(*lockrep),
[DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
[DLM_REPLY_REC_OFF+1] = obddev->u.cli.
- cl_max_mds_easize,
- [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
- CFS_LIST_HEAD(cancels);
- int count = 0;
- int mode;
- int rc;
-
- it->it_create_mode |= S_IFREG;
-
- rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6, size);
- if (rc & (rc - 1))
- size[DLM_INTENT_REC_OFF + 2] =
- min(size[DLM_INTENT_REC_OFF + 2] + round_up(rc) - rc,
- obddev->u.cli.cl_max_mds_easize);
-
- /* If inode is known, cancel conflicting OPEN locks. */
- if (data->fid2.id) {
- if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
- mode = LCK_CW;
+ cl_max_mds_easize, 0 };
+ int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
+ int repbufcnt = 4, rc;
+ void *eadata;
+ ENTRY;
+
+ LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
+// LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
+// ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
+
+ if (it->it_op & IT_OPEN) {
+ CFS_LIST_HEAD(cancels);
+ int count = 0;
+ int mode;
+
+ it->it_create_mode |= S_IFREG;
+
+ size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create);
+ size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
+ /* As an optimization, we allocate an RPC request buffer for
+ * at least a default-sized LOV EA even if we aren't sending
+ * one. We grow the whole request to the next power-of-two
+ * size since we get that much from a slab allocation anyways.
+ * This avoids an allocation below in the common case where
+ * we need to save a default-sized LOV EA for open replay. */
+ size[DLM_INTENT_REC_OFF + 2] = max(lmmsize,
+ obddev->u.cli.cl_default_mds_easize);
+ rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6,
+ size);
+ if (rc & (rc - 1))
+ size[DLM_INTENT_REC_OFF + 2] =
+ min(size[DLM_INTENT_REC_OFF+2]+round_up(rc)-rc,
+ obddev->u.cli.cl_max_mds_easize);
+
+ /* If inode is known, cancel conflicting OPEN locks. */
+ if (op_data->fid2.id) {
+ if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
+ mode = LCK_CW;
#ifdef FMODE_EXEC
- else if (it->it_flags & FMODE_EXEC)
- mode = LCK_PR;
+ else if (it->it_flags & FMODE_EXEC)
+ mode = LCK_PR;
#endif
- else
+ else
+ mode = LCK_CR;
+ count = mdc_resource_get_unused(exp, &op_data->fid2,
+ &cancels, mode,
+ MDS_INODELOCK_OPEN);
+ }
+
+ /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
+ if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE)
+ mode = LCK_EX;
+ else
mode = LCK_CR;
- count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
- mode, MDS_INODELOCK_OPEN);
- }
+ count += mdc_resource_get_unused(exp, &op_data->fid1, &cancels,
+ mode, MDS_INODELOCK_UPDATE);
+ if (it->it_flags & O_JOIN_FILE) {
+ /* join is like an unlink of the tail */
+ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+ size[DLM_INTENT_REC_OFF + 3] =
+ sizeof(struct mds_rec_join);
+ req = ldlm_prep_enqueue_req(exp, 7, size, &cancels,
+ count);
+ mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, op_data,
+ (*(__u64 *)op_data->data));
+ } else {
+ req = ldlm_prep_enqueue_req(exp, 6, size, &cancels,
+ count);
+ }
- /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
- if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE)
- mode = LCK_EX;
- else
- mode = LCK_CR;
- count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
- MDS_INODELOCK_UPDATE);
- if (it->it_flags & O_JOIN_FILE) {
- __u64 head_size = (*(__u64 *)data->data);
- /* join is like an unlink of the tail */
- size[DLM_INTENT_REC_OFF + 3] = sizeof(struct mds_rec_join);
- req = ldlm_prep_enqueue_req(exp, 7, size, &cancels, count);
- mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data, head_size);
- } else {
- req = ldlm_prep_enqueue_req(exp, 6, size, &cancels, count);
- }
+ if (!req)
+ RETURN(-ENOMEM);
- if (req) {
spin_lock(&req->rq_lock);
req->rq_replay = 1;
spin_unlock(&req->rq_lock);
lit->opc = (__u64)it->it_op;
/* pack the intended request */
- mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
+ mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data,
it->it_create_mode, 0, it->it_flags,
lmm, lmmsize);
- ptlrpc_req_set_repsize(req, 5, repsize);
- }
- return req;
-}
-
-static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
- struct lookup_intent *it,
- struct mdc_op_data *data)
-{
- struct ptlrpc_request *req;
- struct ldlm_intent *lit;
- struct obd_device *obddev = class_exp2obd(exp);
- int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
- [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
- [DLM_INTENT_IT_OFF] = sizeof(*lit),
- [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink),
- [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
- int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
- [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
- [DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
- [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
- cl_max_mds_easize,
- [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
- cl_max_mds_cookiesize };
+ repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
+ } else if (it->it_op & IT_UNLINK) {
+ size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink);
+ size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
+ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+ req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
+ if (!req)
+ RETURN(-ENOMEM);
- req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
- if (req) {
/* pack the intent */
lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
sizeof(*lit));
lit->opc = (__u64)it->it_op;
/* pack the intended request */
- mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
+ mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data);
- ptlrpc_req_set_repsize(req, 5, repsize);
- }
- return req;
-}
+ repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize;
+ } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
+ obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
+ OBD_MD_FLACL | OBD_MD_FLMODEASIZE |
+ OBD_MD_FLDIREA;
+ size[DLM_INTENT_REC_OFF] = sizeof(struct mds_body);
+ size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
-static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
- struct lookup_intent *it,
- struct mdc_op_data *data)
-{
- struct ptlrpc_request *req;
- struct ldlm_intent *lit;
- struct obd_device *obddev = class_exp2obd(exp);
- int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
- [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
- [DLM_INTENT_IT_OFF] = sizeof(*lit),
- [DLM_INTENT_REC_OFF] = sizeof(struct mds_body),
- [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
- int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
- [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
- [DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
- [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
- cl_max_mds_easize,
- [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
- obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
- OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
+ if (it->it_op & IT_GETATTR)
+ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+
+ req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
+ if (!req)
+ RETURN(-ENOMEM);
- req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
- if (req) {
/* pack the intent */
lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
sizeof(*lit));
lit->opc = (__u64)it->it_op;
/* pack the intended request */
- mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
- data);
- ptlrpc_req_set_repsize(req, 5, repsize);
+ mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid,
+ it->it_flags, op_data);
+
+ repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
+ } else if (it->it_op == IT_READDIR) {
+ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+ req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
+ if (!req)
+ RETURN(-ENOMEM);
+
+ repbufcnt = 2;
+ } else {
+ LBUG();
+ RETURN(-EINVAL);
}
- return req;
-}
-static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
-{
- struct ptlrpc_request *req;
- int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
- [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
- int repsize[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
- [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply) };
-
- req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
- if (req)
- ptlrpc_req_set_repsize(req, 2, repsize);
- return req;
-}
+ /* get ready for the reply */
+ ptlrpc_req_set_repsize(req, repbufcnt, repsize);
-static int mdc_finish_enqueue(struct obd_export *exp,
- struct ptlrpc_request *req,
- struct ldlm_enqueue_info *einfo,
- struct lookup_intent *it,
- struct lustre_handle *lockh,
- int rc)
-{
- struct ldlm_request *lockreq;
- struct ldlm_reply *lockrep;
- ENTRY;
+ /* It is important to obtain rpc_lock first (if applicable), so that
+ * threads that are serialised with rpc_lock are not polluting our
+ * rpcs in flight counter */
+ mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+ mdc_enter_request(&obddev->u.cli);
+ rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
+ 0, NULL, lockh, 0);
+ mdc_exit_request(&obddev->u.cli);
+ mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
/* Similarly, if we're going to replay this request, we don't want to
* actually get a lock, just perform the intent. */
* It's important that we do this first! Otherwise we might exit the
* function without doing so, and try to replay a failed create
* (bug 3440) */
- if ((it->it_op & IT_OPEN) &&
- req->rq_replay &&
- (!it_disposition(it, DISP_OPEN_OPEN) ||
- it->d.lustre.it_status != 0))
+ if (it->it_op & IT_OPEN && req->rq_replay &&
+ (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
mdc_clear_replay_flag(req, it->d.lustre.it_status);
DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
/* We know what to expect, so we do any byte flipping required here */
- if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
+ LASSERT(repbufcnt == 5 || repbufcnt == 2);
+ if (repbufcnt == 5) {
struct mds_body *body;
body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
mdc_set_open_replay_data(NULL, req);
if ((body->valid & OBD_MD_FLEASIZE) != 0) {
- void *eadata;
-
/* The eadata is opaque; just check that it is there.
* Eventually, obd_unpackmd() will check the contents */
eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
RETURN (-EPROTO);
}
if (body->valid & OBD_MD_FLMODEASIZE) {
- struct obd_device *obddev = class_exp2obd(exp);
-
if (obddev->u.cli.cl_max_mds_easize <
- body->max_mdsize) {
+ body->max_mdsize) {
obddev->u.cli.cl_max_mds_easize =
body->max_mdsize;
CDEBUG(D_INFO, "maxeasize become %d\n",
body->max_mdsize);
}
if (obddev->u.cli.cl_max_mds_cookiesize <
- body->max_cookiesize) {
+ body->max_cookiesize) {
obddev->u.cli.cl_max_mds_cookiesize =
body->max_cookiesize;
CDEBUG(D_INFO, "cookiesize become %d\n",
* reallocate it here to hold the actual LOV EA. */
if (it->it_op & IT_OPEN) {
int offset = DLM_INTENT_REC_OFF + 2;
- void *lmm;
if (lustre_msg_buflen(req->rq_reqmsg, offset) <
body->eadatasize)
- mdc_realloc_openmsg(req, body);
+ mdc_realloc_openmsg(req, body, size);
lmm = lustre_msg_buf(req->rq_reqmsg, offset,
body->eadatasize);
RETURN(rc);
}
-
-/* We always reserve enough space in the reply packet for a stripe MD, because
- * we don't know in advance the file type. */
-int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
- struct lookup_intent *it, struct mdc_op_data *data,
- struct lustre_handle *lockh, void *lmm, int lmmsize,
- int extra_lock_flags)
-{
- struct ptlrpc_request *req;
- struct obd_device *obddev = class_exp2obd(exp);
- struct ldlm_res_id res_id =
- { .name = {data->fid1.id, data->fid1.generation} };
- ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
- int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
- int rc;
- ENTRY;
-
- LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
- if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
- policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-
- if (it->it_op & IT_OPEN) {
- req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
- if (it->it_flags & O_JOIN_FILE) {
- policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- }
- } else if (it->it_op & IT_UNLINK) {
- req = mdc_intent_unlink_pack(exp, it, data);
- } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
- req = mdc_intent_lookup_pack(exp, it, data);
- } else if (it->it_op == IT_READDIR) {
- req = mdc_intent_readdir_pack(exp);
- } else {
- CERROR("bad it_op %x\n", it->it_op);
- RETURN(-EINVAL);
- }
-
- if (!req)
- RETURN(-ENOMEM);
-
- /* It is important to obtain rpc_lock first (if applicable), so that
- * threads that are serialised with rpc_lock are not polluting our
- * rpcs in flight counter */
- mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
- mdc_enter_request(&obddev->u.cli);
- rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
- 0, NULL, lockh, 0);
- mdc_exit_request(&obddev->u.cli);
- mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
-
- rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
-
- RETURN(rc);
-}
EXPORT_SYMBOL(mdc_enqueue);
-int mdc_revalidate_lock(struct obd_export *exp,
- struct lookup_intent *it,
- struct ll_fid *fid)
-{
- /* We could just return 1 immediately, but since we should only
- * be called in revalidate_it if we already have a lock, let's
- * verify that. */
- struct ldlm_res_id res_id = {.name ={fid->id, fid->generation}};
- struct lustre_handle lockh;
- ldlm_policy_data_t policy;
- int mode = LCK_CR;
- int rc;
-
- /* As not all attributes are kept under update lock, e.g.
- owner/group/acls are under lookup lock, we need both
- ibits for GETATTR. */
- policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
- MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
- MDS_INODELOCK_LOOKUP;
-
- rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED,
- &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh);
- if (!rc) {
- mode = LCK_CW;
- rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
- LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
- &policy, LCK_CW, &lockh);
- }
- if (!rc) {
- mode = LCK_PR;
- rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
- LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
- &policy, LCK_PR, &lockh);
- }
- if (rc) {
- memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
- it->d.lustre.it_lock_mode = mode;
- }
-
- return rc;
-}
-EXPORT_SYMBOL(mdc_revalidate_lock);
-
-static int mdc_finish_intent_lock(struct obd_export *exp,
- struct ptlrpc_request *req,
- struct mdc_op_data *data,
- struct lookup_intent *it,
- struct lustre_handle *lockh)
-{
- struct mds_body *mds_body;
- struct lustre_handle old_lock;
- struct ldlm_lock *lock;
- int rc;
- ENTRY;
-
- LASSERT(req != NULL);
- LASSERT(req != LP_POISON);
- LASSERT(req->rq_repmsg != LP_POISON);
-
- if (!it_disposition(it, DISP_IT_EXECD)) {
- /* The server failed before it even started executing the
- * intent, i.e. because it couldn't unpack the request. */
- LASSERT(it->d.lustre.it_status != 0);
- RETURN(it->d.lustre.it_status);
- }
- rc = it_open_error(DISP_IT_EXECD, it);
- if (rc)
- RETURN(rc);
-
- mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
- sizeof(*mds_body));
- LASSERT(mds_body != NULL); /* mdc_enqueue checked */
- LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* mdc_enqueue swabbed */
-
- /* If we were revalidating a fid/name pair, mark the intent in
- * case we fail and get called again from lookup */
- if (data->fid2.id && (it->it_op != IT_GETATTR)) {
- it_set_disposition(it, DISP_ENQ_COMPLETE);
- /* Also: did we find the same inode? */
- if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)))
- RETURN(-ESTALE);
- }
-
- rc = it_open_error(DISP_LOOKUP_EXECD, it);
- if (rc)
- RETURN(rc);
-
- /* keep requests around for the multiple phases of the call
- * this shows the DISP_XX must guarantee we make it into the call
- */
- if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
- it_disposition(it, DISP_OPEN_CREATE) &&
- !it_open_error(DISP_OPEN_CREATE, it)) {
- it_set_disposition(it, DISP_ENQ_CREATE_REF);
- ptlrpc_request_addref(req); /* balanced in ll_create_node */
- }
- if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
- it_disposition(it, DISP_OPEN_OPEN) &&
- !it_open_error(DISP_OPEN_OPEN, it)) {
- it_set_disposition(it, DISP_ENQ_OPEN_REF);
- ptlrpc_request_addref(req); /* balanced in ll_file_open */
- /* BUG 11546 - eviction in the middle of open rpc processing */
- OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
- }
-
- if (it->it_op & IT_CREAT) {
- /* XXX this belongs in ll_create_it */
- } else if (it->it_op == IT_OPEN) {
- LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
- } else {
- LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
- }
-
- /* If we already have a matching lock, then cancel the new
- * one. We have to set the data here instead of in
- * mdc_enqueue, because we need to use the child's inode as
- * the l_ast_data to match, and that's not available until
- * intent_finish has performed the iget().) */
- lock = ldlm_handle2lock(lockh);
- if (lock) {
- ldlm_policy_data_t policy = lock->l_policy_data;
-
- LDLM_DEBUG(lock, "matching against this");
- LDLM_LOCK_PUT(lock);
- memcpy(&old_lock, lockh, sizeof(*lockh));
- if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
- LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
- ldlm_lock_decref_and_cancel(lockh,
- it->d.lustre.it_lock_mode);
- memcpy(lockh, &old_lock, sizeof(old_lock));
- memcpy(&it->d.lustre.it_lock_handle, lockh,
- sizeof(*lockh));
- }
- }
-
- CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
- data->namelen, data->name, ldlm_it2str(it->it_op),
- it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
- RETURN(rc);
-}
-
/*
* This long block is all about fixing up the lock and request state
* so that it is correct as of the moment _before_ the operation was
ldlm_blocking_callback cb_blocking, int extra_lock_flags)
{
struct lustre_handle lockh;
- int rc;
+ struct ptlrpc_request *request;
+ int rc = 0;
+ struct mds_body *mds_body;
+ struct lustre_handle old_lock;
+ struct ldlm_lock *lock;
ENTRY;
-
LASSERT(it);
CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
if (op_data->fid2.id &&
(it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
- rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
+ /* We could just return 1 immediately, but since we should only
+ * be called in revalidate_it if we already have a lock, let's
+ * verify that. */
+ struct ldlm_res_id res_id = {.name ={op_data->fid2.id,
+ op_data->fid2.generation}};
+ struct lustre_handle lockh;
+ ldlm_policy_data_t policy;
+ int mode = LCK_CR;
+
+ /* As not all attributes are kept under update lock, e.g.
+ owner/group/acls are under lookup lock, we need both
+ ibits for GETATTR. */
+ policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
+ MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
+ MDS_INODELOCK_LOOKUP;
+
+ rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
+ LDLM_FL_BLOCK_GRANTED, &res_id,
+ LDLM_IBITS, &policy, LCK_CR, &lockh);
+ if (!rc) {
+ mode = LCK_CW;
+ rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
+ LDLM_FL_BLOCK_GRANTED, &res_id,
+ LDLM_IBITS, &policy,LCK_CW,&lockh);
+ }
+ if (!rc) {
+ mode = LCK_PR;
+ rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
+ LDLM_FL_BLOCK_GRANTED, &res_id,
+ LDLM_IBITS, &policy,LCK_PR,&lockh);
+ }
+ if (rc) {
+ memcpy(&it->d.lustre.it_lock_handle, &lockh,
+ sizeof(lockh));
+ it->d.lustre.it_lock_mode = mode;
+ }
+
/* Only return failure if it was not GETATTR by cfid
(from inode_revalidate) */
if (rc || op_data->namelen != 0)
* lookup, so we clear DISP_ENQ_COMPLETE */
it_clear_disposition(it, DISP_ENQ_COMPLETE);
}
+ request = *reqp = it->d.lustre.it_data;
+ LASSERT(request != NULL);
+ LASSERT(request != LP_POISON);
+ LASSERT(request->rq_repmsg != LP_POISON);
- *reqp = it->d.lustre.it_data;
- rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(mdc_intent_lock);
-
-static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
- void *unused, int rc)
-{
- struct mdc_enqueue_args *ma;
- struct md_enqueue_info *minfo;
- struct ldlm_enqueue_info *einfo;
- struct obd_export *exp;
- struct lookup_intent *it;
- struct lustre_handle *lockh;
- struct obd_device *obddev;
- int flags = LDLM_FL_HAS_INTENT;
- ENTRY;
-
- ma = (struct mdc_enqueue_args *)&req->rq_async_args;
- minfo = ma->ma_mi;
- einfo = ma->ma_ei;
-
- exp = minfo->mi_exp;
- it = &minfo->mi_it;
- lockh = &minfo->mi_lockh;
-
- obddev = class_exp2obd(exp);
-
- mdc_exit_request(&obddev->u.cli);
-
- rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
- &flags, NULL, 0, NULL, lockh, rc);
-
- rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
+ if (!it_disposition(it, DISP_IT_EXECD)) {
+ /* The server failed before it even started executing the
+ * intent, i.e. because it couldn't unpack the request. */
+ LASSERT(it->d.lustre.it_status != 0);
+ RETURN(it->d.lustre.it_status);
+ }
+ rc = it_open_error(DISP_IT_EXECD, it);
if (rc)
- GOTO(out, rc);
-
- memcpy(&it->d.lustre.it_lock_handle, lockh, sizeof(*lockh));
-
- rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
- GOTO(out, rc);
-out:
- OBD_FREE_PTR(einfo);
- minfo->mi_cb(exp, req, minfo, rc);
+ RETURN(rc);
- return 0;
-}
+ mds_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF,
+ sizeof(*mds_body));
+ LASSERT(mds_body != NULL); /* mdc_enqueue checked */
+ LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */
-int mdc_intent_getattr_async(struct obd_export *exp,
- struct md_enqueue_info *minfo,
- struct ldlm_enqueue_info *einfo)
-{
- struct mdc_op_data *op_data = &minfo->mi_data;
- struct lookup_intent *it = &minfo->mi_it;
- struct ptlrpc_request *req;
- struct obd_device *obddev = class_exp2obd(exp);
- struct ldlm_res_id res_id = {
- .name = {op_data->fid1.id,
- op_data->fid1.generation}
- };
- ldlm_policy_data_t policy = {
- .l_inodebits = { MDS_INODELOCK_LOOKUP }
- };
- struct mdc_enqueue_args *aa;
- int rc;
- int flags = LDLM_FL_HAS_INTENT;
- ENTRY;
+ /* If we were revalidating a fid/name pair, mark the intent in
+ * case we fail and get called again from lookup */
+ if (op_data->fid2.id && (it->it_op != IT_GETATTR)) {
+ it_set_disposition(it, DISP_ENQ_COMPLETE);
+ /* Also: did we find the same inode? */
+ if (memcmp(&op_data->fid2, &mds_body->fid1,
+ sizeof(op_data->fid2)))
+ RETURN (-ESTALE);
+ }
- CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
- op_data->namelen, op_data->name, op_data->fid1.id,
- ldlm_it2str(it->it_op), it->it_flags);
+ rc = it_open_error(DISP_LOOKUP_EXECD, it);
+ if (rc)
+ RETURN(rc);
- req = mdc_intent_lookup_pack(exp, it, op_data);
- if (!req)
- RETURN(-ENOMEM);
+ /* keep requests around for the multiple phases of the call
+ * this shows the DISP_XX must guarantee we make it into the call
+ */
+ if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
+ it_disposition(it, DISP_OPEN_CREATE) &&
+ !it_open_error(DISP_OPEN_CREATE, it)) {
+ it_set_disposition(it, DISP_ENQ_CREATE_REF);
+ ptlrpc_request_addref(request); /* balanced in ll_create_node */
+ }
+ if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
+ it_disposition(it, DISP_OPEN_OPEN) &&
+ !it_open_error(DISP_OPEN_OPEN, it)) {
+ it_set_disposition(it, DISP_ENQ_OPEN_REF);
+ ptlrpc_request_addref(request); /* balanced in ll_file_open */
+ /* BUG 11546 - eviction in the middle of open rpc processing */
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
+ }
- mdc_enter_request(&obddev->u.cli);
- rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
- 0, NULL, &minfo->mi_lockh, 1);
- if (rc < 0) {
- mdc_exit_request(&obddev->u.cli);
- RETURN(rc);
+ if (it->it_op & IT_CREAT) {
+ /* XXX this belongs in ll_create_it */
+ } else if (it->it_op == IT_OPEN) {
+ LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
+ } else {
+ LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
}
- CLASSERT(sizeof(*aa) < sizeof(req->rq_async_args));
- aa = (struct mdc_enqueue_args *)&req->rq_async_args;
- aa->ma_mi = minfo;
- aa->ma_ei = einfo;
- req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
- ptlrpcd_add_req(req);
+ /* If we already have a matching lock, then cancel the new
+ * one. We have to set the data here instead of in
+ * mdc_enqueue, because we need to use the child's inode as
+ * the l_ast_data to match, and that's not available until
+ * intent_finish has performed the iget().) */
+ lock = ldlm_handle2lock(&lockh);
+ if (lock) {
+ ldlm_policy_data_t policy = lock->l_policy_data;
+ LDLM_DEBUG(lock, "matching against this");
+ LDLM_LOCK_PUT(lock);
+ memcpy(&old_lock, &lockh, sizeof(lockh));
+ if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
+ LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
+ ldlm_lock_decref_and_cancel(&lockh,
+ it->d.lustre.it_lock_mode);
+ memcpy(&lockh, &old_lock, sizeof(old_lock));
+ memcpy(&it->d.lustre.it_lock_handle, &lockh,
+ sizeof(lockh));
+ }
+ }
+ CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
+ op_data->namelen, op_data->name, ldlm_it2str(it->it_op),
+ it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
- RETURN(0);
+ RETURN(rc);
}
-EXPORT_SYMBOL(mdc_intent_getattr_async);
+EXPORT_SYMBOL(mdc_intent_lock);
int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size,
unsigned int acl_size, struct ptlrpc_request *req)
{
- struct obd_device *obddev = class_exp2obd(exp);
struct mds_body *body;
void *eadata;
int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
ptlrpc_req_set_repsize(req, bufcount, size);
- mdc_enter_request(&obddev->u.cli);
rc = ptlrpc_queue_wait(req);
- mdc_exit_request(&obddev->u.cli);
if (rc != 0)
RETURN (rc);
const char *input, int input_size, int output_size,
int flags, struct ptlrpc_request **request)
{
- struct obd_device *obddev = class_exp2obd(exp);
struct ptlrpc_request *req;
int size[4] = { sizeof(struct ptlrpc_body), sizeof(struct mds_body) };
// int size[3] = {sizeof(struct mds_body)}, bufcnt = 1;
/* make rpc */
if (opcode == MDS_SETXATTR)
mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
- else
- mdc_enter_request(&obddev->u.cli);
rc = ptlrpc_queue_wait(req);
if (opcode == MDS_SETXATTR)
mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
- else
- mdc_exit_request(&obddev->u.cli);
if (rc != 0)
GOTO(err_out, rc);
cleanup() {
echo -n "cln.."
- pgrep ll_sa > /dev/null && { echo "There are ll_sa thread not exit!"; exit 20; }
cleanupall ${FORCE} $* || { echo "FAILed to clean up"; exit 20; }
}
CLEANUP=${CLEANUP:-:}
}
run_test 122 "fail client bulk callback (shouldn't LBUG) ======="
-test_123() # statahead(bug 11401)
-{
- if [ -z "$(grep "processor.*: 1" /proc/cpuinfo)" ]; then
- log "single core CPU, skipping test" # && return
- fi
-
- mkdir -p $DIR/$tdir
-
- for ((i=1, j=0; i<=10000; j=$i, i=$((i * 10)) )); do
- createmany -o $DIR/$tdir/$tfile $j $((i - j))
-
- grep '[0-9]' $LPROC/llite/*/statahead_max
- cancel_lru_locks mdc
- stime=`date +%s`
- ls -l $DIR/$tdir > /dev/null
- etime=`date +%s`
- delta_sa=$((etime - stime))
- echo "ls $i files with statahead: $delta_sa sec"
-
- for client in $LPROC/llite/*; do
- max=`cat $client/statahead_max`
- cat $client/statahead_stats
- echo 0 > $client/statahead_max
- done
-
- grep '[0-9]' $LPROC/llite/*/statahead_max
- cancel_lru_locks mdc
- stime=`date +%s`
- ls -l $DIR/$tdir > /dev/null
- etime=`date +%s`
- delta=$((etime - stime))
- echo "ls $i files without statahead: $delta sec"
-
- for client in /proc/fs/lustre/llite/*; do
- cat $client/statahead_stats
- echo $max > $client/statahead_max
- done
-
- if [ $delta_sa -gt $delta ]; then
- error "ls $i files is slower with statahead!"
- fi
- done
- echo "ls done"
-
- stime=`date +%s`
- rm -r $DIR/$tdir
- sync
- etime=`date +%s`
- delta=$((etime - stime))
- echo "rm -r $DIR/$tdir/: $delta seconds"
- echo "rm done"
- cat /proc/fs/lustre/llite/*/statahead_stats
- # wait for commitment of removal
- sleep 2
-}
-run_test 123 "verify statahead work"
-
TMPDIR=$OLDTMPDIR
TMP=$OLDTMP
HOME=$OLDHOME
log "cleanup: ======================================================"
if [ "`mount | grep $MOUNT`" ]; then
- rm -rf $DIR/[Rdfs][1-9]*
+ rm -rf $DIR/[Rdfs][1-9]*
fi
if [ "$I_MOUNTED" = "yes" ]; then
- cleanupall -f || error "cleanup failed"
+ cleanupall -f || error "cleanup failed"
else
- sysctl -w lnet.debug="$OLDDEBUG" 2> /dev/null || true
+ sysctl -w lnet.debug="$OLDDEBUG" 2> /dev/null || true
fi