X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fllite%2Ffile.c;h=3a66cab54ebe5fb1fc338f74402a7bdcdd7c8d99;hp=b10ef2e8eecf3bd5182ff1b32b1fdf5d2fa7e686;hb=c084c6215851d238d14b0d414374b6b55c91f525;hpb=044503492c8e2e4cc519c34f282744d3d941a759 diff --git a/lustre/llite/file.c b/lustre/llite/file.c index b10ef2e..3a66cab 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -27,7 +23,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2014, Intel Corporation. + * Copyright (c) 2011, 2016, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -50,10 +46,11 @@ # include #endif #include -#include -#include "cl_object.h" +#include +#include +#include "cl_object.h" #include "llite_internal.h" #include "vvp_internal.h" @@ -126,32 +123,28 @@ static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data, * If \a bias is MDS_CLOSE_LAYOUT_SWAP then \a data is a pointer to the inode to * swap layouts with. */ -static int ll_close_inode_openhandle(struct obd_export *md_exp, +static int ll_close_inode_openhandle(struct inode *inode, struct obd_client_handle *och, - struct inode *inode, - enum mds_op_bias bias, - void *data) + enum mds_op_bias bias, void *data) { - struct obd_export *exp = ll_i2mdexp(inode); - struct md_op_data *op_data; - struct ptlrpc_request *req = NULL; - struct obd_device *obd = class_exp2obd(exp); - int rc; + struct obd_export *md_exp = ll_i2mdexp(inode); + const struct ll_inode_info *lli = ll_i2info(inode); + struct md_op_data *op_data; + struct ptlrpc_request *req = NULL; + int rc; ENTRY; - if (obd == NULL) { - /* - * XXX: in case of LMV, is this correct to access - * ->exp_handle? - */ - CERROR("Invalid MDC connection handle "LPX64"\n", - ll_i2mdexp(inode)->exp_handle.h_cookie); + if (class_exp2obd(md_exp) == NULL) { + CERROR("%s: invalid MDC connection handle closing "DFID"\n", + ll_get_fsname(inode->i_sb, NULL, 0), + PFID(&lli->lli_fid)); GOTO(out, rc = 0); } OBD_ALLOC_PTR(op_data); + /* We leak openhandle and request here on error, but not much to be + * done in OOM case since app won't retry close on error either. */ if (op_data == NULL) - /* XXX We leak openhandle and request here. */ GOTO(out, rc = -ENOMEM); ll_prepare_close(inode, op_data, och); @@ -177,12 +170,10 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp, break; } - rc = md_close(md_exp, op_data, och->och_mod, &req); - if (rc) { + rc = md_close(md_exp, op_data, och->och_mod, &req); + if (rc != 0 && rc != -EINTR) CERROR("%s: inode "DFID" mdc close failed: rc = %d\n", - ll_i2mdexp(inode)->exp_obd->obd_name, - PFID(ll_inode2fid(inode)), rc); - } + md_exp->exp_obd->obd_name, PFID(&lli->lli_fid), rc); if (rc == 0 && op_data->op_bias & (MDS_HSM_RELEASE | MDS_CLOSE_LAYOUT_SWAP)) { @@ -201,9 +192,8 @@ out: och->och_fh.cookie = DEAD_HANDLE_MAGIC; OBD_FREE_PTR(och); - if (req) /* This is close request */ - ptlrpc_req_finished(req); - return rc; + ptlrpc_req_finished(req); /* This is close request */ + return rc; } int ll_md_real_close(struct inode *inode, fmode_t fmode) @@ -242,24 +232,22 @@ int ll_md_real_close(struct inode *inode, fmode_t fmode) if (och != NULL) { /* There might be a race and this handle may already * be closed. */ - rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, - och, inode, 0, NULL); + rc = ll_close_inode_openhandle(inode, och, 0, NULL); } RETURN(rc); } -static int ll_md_close(struct obd_export *md_exp, struct inode *inode, - struct file *file) +static int ll_md_close(struct inode *inode, struct file *file) { - ldlm_policy_data_t policy = { + union ldlm_policy_data policy = { .l_inodebits = { MDS_INODELOCK_OPEN }, }; __u64 flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK; struct ll_file_data *fd = LUSTRE_FPRIVATE(file); struct ll_inode_info *lli = ll_i2info(inode); struct lustre_handle lockh; - int lockmode; + enum ldlm_mode lockmode; int rc = 0; ENTRY; @@ -280,8 +268,7 @@ static int ll_md_close(struct obd_export *md_exp, struct inode *inode, } if (fd->fd_och != NULL) { - rc = ll_close_inode_openhandle(md_exp, fd->fd_och, inode, 0, - NULL); + rc = ll_close_inode_openhandle(inode, fd->fd_och, 0, NULL); fd->fd_och = NULL; GOTO(out, rc); } @@ -304,7 +291,7 @@ static int ll_md_close(struct obd_export *md_exp, struct inode *inode, } mutex_unlock(&lli->lli_och_mutex); - if (!md_lock_match(md_exp, flags, ll_inode2fid(inode), + if (!md_lock_match(ll_i2mdexp(inode), flags, ll_inode2fid(inode), LDLM_IBITS, &policy, lockmode, &lockh)) rc = ll_md_real_close(inode, fd->fd_omode); @@ -331,21 +318,7 @@ int ll_file_release(struct inode *inode, struct file *file) CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p)\n", PFID(ll_inode2fid(inode)), inode); -#ifdef CONFIG_FS_POSIX_ACL - if (sbi->ll_flags & LL_SBI_RMT_CLIENT && - inode == inode->i_sb->s_root->d_inode) { - struct ll_file_data *fd = LUSTRE_FPRIVATE(file); - - LASSERT(fd != NULL); - if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) { - fd->fd_flags &= ~LL_FILE_RMTACL; - rct_del(&sbi->ll_rct, current_pid()); - et_search_free(&sbi->ll_et, current_pid()); - } - } -#endif - - if (inode->i_sb->s_root != file->f_path.dentry) + if (inode->i_sb->s_root != file_dentry(file)) ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1); fd = LUSTRE_FPRIVATE(file); LASSERT(fd != NULL); @@ -355,30 +328,29 @@ int ll_file_release(struct inode *inode, struct file *file) if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd) ll_deauthorize_statahead(inode, fd); - if (inode->i_sb->s_root == file->f_path.dentry) { - LUSTRE_FPRIVATE(file) = NULL; - ll_file_data_put(fd); - RETURN(0); - } + if (inode->i_sb->s_root == file_dentry(file)) { + LUSTRE_FPRIVATE(file) = NULL; + ll_file_data_put(fd); + RETURN(0); + } - if (!S_ISDIR(inode->i_mode)) { + if (!S_ISDIR(inode->i_mode)) { if (lli->lli_clob != NULL) lov_read_and_clear_async_rc(lli->lli_clob); - lli->lli_async_rc = 0; - } + lli->lli_async_rc = 0; + } - rc = ll_md_close(sbi->ll_md_exp, inode, file); + rc = ll_md_close(inode, file); - if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val)) - libcfs_debug_dumplog(); + if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val)) + libcfs_debug_dumplog(); - RETURN(rc); + RETURN(rc); } -static int ll_intent_file_open(struct file *file, void *lmm, int lmmsize, +static int ll_intent_file_open(struct dentry *de, void *lmm, int lmmsize, struct lookup_intent *itp) { - struct dentry *de = file->f_path.dentry; struct ll_sb_info *sbi = ll_i2sbi(de->d_inode); struct dentry *parent = de->d_parent; const char *name = NULL; @@ -430,26 +402,35 @@ static int ll_intent_file_open(struct file *file, void *lmm, int lmmsize, } rc = ll_prep_inode(&de->d_inode, req, NULL, itp); - if (!rc && itp->d.lustre.it_lock_mode) + if (!rc && itp->it_lock_mode) ll_set_lock_data(sbi->ll_md_exp, de->d_inode, itp, NULL); out: ptlrpc_req_finished(req); ll_intent_drop_lock(itp); + /* We did open by fid, but by the time we got to the server, + * the object disappeared. If this is a create, we cannot really + * tell the userspace that the file it was trying to create + * does not exist. Instead let's return -ESTALE, and the VFS will + * retry the create with LOOKUP_REVAL that we are going to catch + * in ll_revalidate_dentry() and use lookup then. + */ + if (rc == -ENOENT && itp->it_op & IT_CREAT) + rc = -ESTALE; + RETURN(rc); } static int ll_och_fill(struct obd_export *md_exp, struct lookup_intent *it, struct obd_client_handle *och) { - struct ptlrpc_request *req = it->d.lustre.it_data; struct mdt_body *body; - body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + body = req_capsule_server_get(&it->it_request->rq_pill, &RMF_MDT_BODY); och->och_fh = body->mbo_handle; och->och_fid = body->mbo_fid1; - och->och_lease_handle.cookie = it->d.lustre.it_lock_handle; + och->och_lease_handle.cookie = it->it_lock_handle; och->och_magic = OBD_CLIENT_HANDLE_MAGIC; och->och_flags = it->it_flags; @@ -459,7 +440,7 @@ static int ll_och_fill(struct obd_export *md_exp, struct lookup_intent *it, static int ll_local_open(struct file *file, struct lookup_intent *it, struct ll_file_data *fd, struct obd_client_handle *och) { - struct inode *inode = file->f_path.dentry->d_inode; + struct inode *inode = file_inode(file); ENTRY; LASSERT(!LUSTRE_FPRIVATE(file)); @@ -523,12 +504,12 @@ int ll_file_open(struct inode *inode, struct file *file) if (S_ISDIR(inode->i_mode)) ll_authorize_statahead(inode, fd); - if (inode->i_sb->s_root == file->f_path.dentry) { + if (inode->i_sb->s_root == file_dentry(file)) { LUSTRE_FPRIVATE(file) = fd; RETURN(0); } - if (!it || !it->d.lustre.it_disposition) { + if (!it || !it->it_disposition) { /* Convert f_flags into access mode. We cannot use file->f_mode, * because everything but O_ACCMODE mask was stripped from * there */ @@ -581,7 +562,7 @@ restart: GOTO(out_openerr, rc); } - ll_release_openhandle(file->f_path.dentry, it); + ll_release_openhandle(file_dentry(file), it); } (*och_usecount)++; @@ -593,7 +574,8 @@ restart: } } else { LASSERT(*och_usecount == 0); - if (!it->d.lustre.it_disposition) { + if (!it->it_disposition) { + struct ll_dentry_data *ldd = ll_d2d(file->f_path.dentry); /* We cannot just request lock handle now, new ELC code means that one of other OPEN locks for this file could be cancelled, and since blocking ast handler @@ -607,13 +589,26 @@ restart: * handle to be returned from LOOKUP|OPEN request, * for example if the target entry was a symlink. * - * Always fetch MDS_OPEN_LOCK if this is not setstripe. + * Only fetch MDS_OPEN_LOCK if this is in NFS path, + * marked by a bit set in ll_iget_for_nfs. Clear the + * bit so that it's not confusing later callers. * + * NB; when ldd is NULL, it must have come via normal + * lookup path only, since ll_iget_for_nfs always calls + * ll_d_init(). + */ + if (ldd && ldd->lld_nfs_dentry) { + ldd->lld_nfs_dentry = 0; + it->it_flags |= MDS_OPEN_LOCK; + } + + /* * Always specify MDS_OPEN_BY_FID because we don't want * to get file with different fid. */ - it->it_flags |= MDS_OPEN_LOCK | MDS_OPEN_BY_FID; - rc = ll_intent_file_open(file, NULL, 0, it); + it->it_flags |= MDS_OPEN_BY_FID; + rc = ll_intent_file_open(file_dentry(file), NULL, 0, + it); if (rc) GOTO(out_openerr, rc); @@ -636,7 +631,7 @@ restart: LASSERTF(it_disposition(it, DISP_ENQ_OPEN_REF), "inode %p: disposition %x, status %d\n", inode, - it_disposition(it, ~0), it->d.lustre.it_status); + it_disposition(it, ~0), it->it_status); rc = ll_local_open(file, it, fd, *och_p); if (rc) @@ -673,7 +668,7 @@ out_openerr: } if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) { - ptlrpc_req_finished(it->d.lustre.it_data); + ptlrpc_req_finished(it->it_request); it_clear_disposition(it, DISP_ENQ_OPEN_REF); } @@ -704,6 +699,95 @@ static int ll_md_blocking_lease_ast(struct ldlm_lock *lock, } /** + * When setting a lease on a file, we take ownership of the lli_mds_*_och + * and save it as fd->fd_och so as to force client to reopen the file even + * if it has an open lock in cache already. + */ +static int ll_lease_och_acquire(struct inode *inode, struct file *file, + struct lustre_handle *old_handle) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + struct obd_client_handle **och_p; + __u64 *och_usecount; + int rc = 0; + ENTRY; + + /* Get the openhandle of the file */ + mutex_lock(&lli->lli_och_mutex); + if (fd->fd_lease_och != NULL) + GOTO(out_unlock, rc = -EBUSY); + + if (fd->fd_och == NULL) { + if (file->f_mode & FMODE_WRITE) { + LASSERT(lli->lli_mds_write_och != NULL); + och_p = &lli->lli_mds_write_och; + och_usecount = &lli->lli_open_fd_write_count; + } else { + LASSERT(lli->lli_mds_read_och != NULL); + och_p = &lli->lli_mds_read_och; + och_usecount = &lli->lli_open_fd_read_count; + } + + if (*och_usecount > 1) + GOTO(out_unlock, rc = -EBUSY); + + fd->fd_och = *och_p; + *och_usecount = 0; + *och_p = NULL; + } + + *old_handle = fd->fd_och->och_fh; + + EXIT; +out_unlock: + mutex_unlock(&lli->lli_och_mutex); + return rc; +} + +/** + * Release ownership on lli_mds_*_och when putting back a file lease. + */ +static int ll_lease_och_release(struct inode *inode, struct file *file) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + struct obd_client_handle **och_p; + struct obd_client_handle *old_och = NULL; + __u64 *och_usecount; + int rc = 0; + ENTRY; + + mutex_lock(&lli->lli_och_mutex); + if (file->f_mode & FMODE_WRITE) { + och_p = &lli->lli_mds_write_och; + och_usecount = &lli->lli_open_fd_write_count; + } else { + och_p = &lli->lli_mds_read_och; + och_usecount = &lli->lli_open_fd_read_count; + } + + /* The file may have been open by another process (broken lease) so + * *och_p is not NULL. In this case we should simply increase usecount + * and close fd_och. + */ + if (*och_p != NULL) { + old_och = fd->fd_och; + (*och_usecount)++; + } else { + *och_p = fd->fd_och; + *och_usecount = 1; + } + fd->fd_och = NULL; + mutex_unlock(&lli->lli_och_mutex); + + if (old_och != NULL) + rc = ll_close_inode_openhandle(inode, old_och, 0, NULL); + + RETURN(rc); +} + +/** * Acquire a lease and open the file. */ static struct obd_client_handle * @@ -724,45 +808,12 @@ ll_lease_open(struct inode *inode, struct file *file, fmode_t fmode, RETURN(ERR_PTR(-EINVAL)); if (file != NULL) { - struct ll_inode_info *lli = ll_i2info(inode); - struct ll_file_data *fd = LUSTRE_FPRIVATE(file); - struct obd_client_handle **och_p; - __u64 *och_usecount; - if (!(fmode & file->f_mode) || (file->f_mode & FMODE_EXEC)) RETURN(ERR_PTR(-EPERM)); - /* Get the openhandle of the file */ - rc = -EBUSY; - mutex_lock(&lli->lli_och_mutex); - if (fd->fd_lease_och != NULL) { - mutex_unlock(&lli->lli_och_mutex); - RETURN(ERR_PTR(rc)); - } - - if (fd->fd_och == NULL) { - if (file->f_mode & FMODE_WRITE) { - LASSERT(lli->lli_mds_write_och != NULL); - och_p = &lli->lli_mds_write_och; - och_usecount = &lli->lli_open_fd_write_count; - } else { - LASSERT(lli->lli_mds_read_och != NULL); - och_p = &lli->lli_mds_read_och; - och_usecount = &lli->lli_open_fd_read_count; - } - if (*och_usecount == 1) { - fd->fd_och = *och_p; - *och_p = NULL; - *och_usecount = 0; - rc = 0; - } - } - mutex_unlock(&lli->lli_och_mutex); - if (rc < 0) /* more than 1 opener */ + rc = ll_lease_och_acquire(inode, file, &old_handle); + if (rc) RETURN(ERR_PTR(rc)); - - LASSERT(fd->fd_och != NULL); - old_handle = fd->fd_och->och_fh; } OBD_ALLOC_PTR(och); @@ -808,12 +859,12 @@ ll_lease_open(struct inode *inode, struct file *file, fmode_t fmode, /* already get lease, handle lease lock */ ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL); - if (it.d.lustre.it_lock_mode == 0 || - it.d.lustre.it_lock_bits != MDS_INODELOCK_OPEN) { + if (it.it_lock_mode == 0 || + it.it_lock_bits != MDS_INODELOCK_OPEN) { /* open lock must return for lease */ - CERROR(DFID "lease granted but no open lock, %d/"LPU64".\n", - PFID(ll_inode2fid(inode)), it.d.lustre.it_lock_mode, - it.d.lustre.it_lock_bits); + CERROR(DFID "lease granted but no open lock, %d/%llu.\n", + PFID(ll_inode2fid(inode)), it.it_lock_mode, + it.it_lock_bits); GOTO(out_close, rc = -EPROTO); } @@ -822,13 +873,13 @@ ll_lease_open(struct inode *inode, struct file *file, fmode_t fmode, out_close: /* Cancel open lock */ - if (it.d.lustre.it_lock_mode != 0) { + if (it.it_lock_mode != 0) { ldlm_lock_decref_and_cancel(&och->och_lease_handle, - it.d.lustre.it_lock_mode); - it.d.lustre.it_lock_mode = 0; + it.it_lock_mode); + it.it_lock_mode = 0; och->och_lease_handle.cookie = 0ULL; } - rc2 = ll_close_inode_openhandle(sbi->ll_md_exp, och, inode, 0, NULL); + rc2 = ll_close_inode_openhandle(inode, och, 0, NULL); if (rc2 < 0) CERROR("%s: error closing file "DFID": %d\n", ll_get_fsname(inode->i_sb, NULL, 0), @@ -892,8 +943,8 @@ static int ll_swap_layouts_close(struct obd_client_handle *och, /* Close the file and swap layouts between inode & inode2. * NB: lease lock handle is released in mdc_close_layout_swap_pack() * because we still need it to pack l_remote_handle to MDT. */ - rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, och, inode, - MDS_CLOSE_LAYOUT_SWAP, inode2); + rc = ll_close_inode_openhandle(inode, och, MDS_CLOSE_LAYOUT_SWAP, + inode2); och = NULL; /* freed in ll_close_inode_openhandle() */ @@ -925,16 +976,15 @@ static int ll_lease_close(struct obd_client_handle *och, struct inode *inode, } CDEBUG(D_INODE, "lease for "DFID" broken? %d\n", - PFID(&ll_i2info(inode)->lli_fid), cancelled); + PFID(&ll_i2info(inode)->lli_fid), cancelled); if (!cancelled) ldlm_cli_cancel(&och->och_lease_handle, 0); + if (lease_broken != NULL) *lease_broken = cancelled; - rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, och, inode, - 0, NULL); - + rc = ll_close_inode_openhandle(inode, och, 0, NULL); RETURN(rc); } @@ -952,9 +1002,21 @@ int ll_merge_attr(const struct lu_env *env, struct inode *inode) ll_inode_size_lock(inode); - /* merge timestamps the most recently obtained from mds with - timestamps obtained from osts */ - LTIME_S(inode->i_atime) = lli->lli_atime; + /* Merge timestamps the most recently obtained from MDS with + * timestamps obtained from OSTs. + * + * Do not overwrite atime of inode because it may be refreshed + * by file_accessed() function. If the read was served by cache + * data, there is no RPC to be sent so that atime may not be + * transferred to OSTs at all. MDT only updates atime at close time + * if it's at least 'mdd.*.atime_diff' older. + * All in all, the atime in Lustre does not strictly comply with + * POSIX. Solving this problem needs to send an RPC to MDT for each + * read, this will hurt performance. */ + if (LTIME_S(inode->i_atime) < lli->lli_atime || lli->lli_update_atime) { + LTIME_S(inode->i_atime) = lli->lli_atime; + lli->lli_update_atime = 0; + } LTIME_S(inode->i_mtime) = lli->lli_mtime; LTIME_S(inode->i_ctime) = lli->lli_ctime; @@ -978,7 +1040,7 @@ int ll_merge_attr(const struct lu_env *env, struct inode *inode) if (mtime < attr->cat_mtime) mtime = attr->cat_mtime; - CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n", + CDEBUG(D_VFSTRACE, DFID" updating i_size %llu\n", PFID(&lli->lli_fid), attr->cat_size); i_size_write(inode, attr->cat_size); @@ -997,7 +1059,7 @@ out_size_unlock: static bool file_is_noatime(const struct file *file) { const struct vfsmount *mnt = file->f_path.mnt; - const struct inode *inode = file->f_path.dentry->d_inode; + const struct inode *inode = file_inode((struct file *)file); /* Adapted from file_accessed() and touch_atime().*/ if (file->f_flags & O_NOATIME) @@ -1021,27 +1083,120 @@ static bool file_is_noatime(const struct file *file) return false; } -static void ll_io_init(struct cl_io *io, const struct file *file, int write) +static int ll_file_io_ptask(struct cfs_ptask *ptask); + +static void ll_io_init(struct cl_io *io, struct file *file, enum cl_io_type iot) +{ + struct inode *inode = file_inode(file); + + memset(&io->u.ci_rw.rw_iter, 0, sizeof(io->u.ci_rw.rw_iter)); + init_sync_kiocb(&io->u.ci_rw.rw_iocb, file); + io->u.ci_rw.rw_file = file; + io->u.ci_rw.rw_ptask = ll_file_io_ptask; + io->u.ci_rw.rw_nonblock = !!(file->f_flags & O_NONBLOCK); + if (iot == CIT_WRITE) { + io->u.ci_rw.rw_append = !!(file->f_flags & O_APPEND); + io->u.ci_rw.rw_sync = !!(file->f_flags & O_SYNC || + file->f_flags & O_DIRECT || + IS_SYNC(inode)); + } + io->ci_obj = ll_i2info(inode)->lli_clob; + io->ci_lockreq = CILR_MAYBE; + if (ll_file_nolock(file)) { + io->ci_lockreq = CILR_NEVER; + io->ci_no_srvlock = 1; + } else if (file->f_flags & O_APPEND) { + io->ci_lockreq = CILR_MANDATORY; + } + io->ci_noatime = file_is_noatime(file); + if (ll_i2sbi(inode)->ll_flags & LL_SBI_PIO) + io->ci_pio = !io->u.ci_rw.rw_append; + else + io->ci_pio = 0; +} + +static int ll_file_io_ptask(struct cfs_ptask *ptask) { - struct inode *inode = file->f_path.dentry->d_inode; + struct cl_io_pt *pt = ptask->pt_cbdata; + struct file *file = pt->cip_file; + struct lu_env *env; + struct cl_io *io; + loff_t pos = pt->cip_pos; + int rc; + __u16 refcheck; + ENTRY; + + env = cl_env_get(&refcheck); + if (IS_ERR(env)) + RETURN(PTR_ERR(env)); - io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK; - if (write) { - io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND); - io->u.ci_wr.wr_sync = file->f_flags & O_SYNC || - file->f_flags & O_DIRECT || - IS_SYNC(inode); + CDEBUG(D_VFSTRACE, "%s: %s range: [%llu, %llu)\n", + file_dentry(file)->d_name.name, + pt->cip_iot == CIT_READ ? "read" : "write", + pos, pos + pt->cip_count); + +restart: + io = vvp_env_thread_io(env); + ll_io_init(io, file, pt->cip_iot); + io->u.ci_rw.rw_iter = pt->cip_iter; + io->u.ci_rw.rw_iocb = pt->cip_iocb; + io->ci_pio = 0; /* It's already in parallel task */ + + rc = cl_io_rw_init(env, io, pt->cip_iot, pos, + pt->cip_count - pt->cip_result); + if (!rc) { + struct vvp_io *vio = vvp_env_io(env); + + vio->vui_io_subtype = IO_NORMAL; + vio->vui_fd = LUSTRE_FPRIVATE(file); + + ll_cl_add(file, env, io, LCC_RW); + rc = cl_io_loop(env, io); + ll_cl_remove(file, env); + } else { + /* cl_io_rw_init() handled IO */ + rc = io->ci_result; } - io->ci_obj = ll_i2info(inode)->lli_clob; - io->ci_lockreq = CILR_MAYBE; - if (ll_file_nolock(file)) { - io->ci_lockreq = CILR_NEVER; - io->ci_no_srvlock = 1; - } else if (file->f_flags & O_APPEND) { - io->ci_lockreq = CILR_MANDATORY; - } - io->ci_noatime = file_is_noatime(file); + if (OBD_FAIL_CHECK_RESET(OBD_FAIL_LLITE_PTASK_IO_FAIL, 0)) { + if (io->ci_nob > 0) + io->ci_nob /= 2; + rc = -EIO; + } + + if (io->ci_nob > 0) { + pt->cip_result += io->ci_nob; + iov_iter_advance(&pt->cip_iter, io->ci_nob); + pos += io->ci_nob; + pt->cip_iocb.ki_pos = pos; +#ifdef HAVE_KIOCB_KI_LEFT + pt->cip_iocb.ki_left = pt->cip_count - pt->cip_result; +#elif defined(HAVE_KI_NBYTES) + pt->cip_iocb.ki_nbytes = pt->cip_count - pt->cip_result; +#endif + } + + cl_io_fini(env, io); + + if ((rc == 0 || rc == -ENODATA) && + pt->cip_result < pt->cip_count && + io->ci_need_restart) { + CDEBUG(D_VFSTRACE, + "%s: restart %s range: [%llu, %llu) ret: %zd, rc: %d\n", + file_dentry(file)->d_name.name, + pt->cip_iot == CIT_READ ? "read" : "write", + pos, pos + pt->cip_count - pt->cip_result, + pt->cip_result, rc); + goto restart; + } + + CDEBUG(D_VFSTRACE, "%s: %s ret: %zd, rc: %d\n", + file_dentry(file)->d_name.name, + pt->cip_iot == CIT_READ ? "read" : "write", + pt->cip_result, rc); + + cl_env_put(env, &refcheck); + RETURN(pt->cip_result > 0 ? 0 : rc); } static ssize_t @@ -1049,42 +1204,45 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args, struct file *file, enum cl_io_type iot, loff_t *ppos, size_t count) { + struct range_lock range; struct vvp_io *vio = vvp_env_io(env); - struct inode *inode = file->f_path.dentry->d_inode; + struct inode *inode = file_inode(file); struct ll_inode_info *lli = ll_i2info(inode); struct ll_file_data *fd = LUSTRE_FPRIVATE(file); struct cl_io *io; + loff_t pos = *ppos; ssize_t result = 0; int rc = 0; - struct range_lock range; ENTRY; - CDEBUG(D_VFSTRACE, "file: %s, type: %d ppos: "LPU64", count: %zu\n", - file->f_path.dentry->d_name.name, iot, *ppos, count); + CDEBUG(D_VFSTRACE, "%s: %s range: [%llu, %llu)\n", + file_dentry(file)->d_name.name, + iot == CIT_READ ? "read" : "write", pos, pos + count); restart: io = vvp_env_thread_io(env); - ll_io_init(io, file, iot == CIT_WRITE); + ll_io_init(io, file, iot); + if (args->via_io_subtype == IO_NORMAL) { + io->u.ci_rw.rw_iter = *args->u.normal.via_iter; + io->u.ci_rw.rw_iocb = *args->u.normal.via_iocb; + } else { + io->ci_pio = 0; + } - if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) { + if (cl_io_rw_init(env, io, iot, pos, count) == 0) { bool range_locked = false; if (file->f_flags & O_APPEND) range_lock_init(&range, 0, LUSTRE_EOF); else - range_lock_init(&range, *ppos, *ppos + count - 1); + range_lock_init(&range, pos, pos + count - 1); vio->vui_fd = LUSTRE_FPRIVATE(file); vio->vui_io_subtype = args->via_io_subtype; switch (vio->vui_io_subtype) { case IO_NORMAL: - vio->vui_iter = args->u.normal.via_iter; -#ifndef HAVE_FILE_OPERATIONS_READ_WRITE_ITER - vio->vui_tot_nrsegs = vio->vui_iter->nr_segs; -#endif /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */ - vio->vui_iocb = args->u.normal.via_iocb; /* Direct IO reads must also take range lock, * or multiple reads will try to work on the same pages * See LU-6227 for details. */ @@ -1109,8 +1267,17 @@ restart: LBUG(); } - ll_cl_add(file, env, io); + ll_cl_add(file, env, io, LCC_RW); + if (io->ci_pio && iot == CIT_WRITE && !IS_NOSEC(inode) && + !lli->lli_inode_locked) { + inode_lock(inode); + lli->lli_inode_locked = 1; + } rc = cl_io_loop(env, io); + if (lli->lli_inode_locked) { + lli->lli_inode_locked = 0; + inode_unlock(inode); + } ll_cl_remove(file, env); if (range_locked) { @@ -1125,27 +1292,31 @@ restart: if (io->ci_nob > 0) { result += io->ci_nob; - count -= io->ci_nob; - *ppos = io->u.ci_wr.wr.crw_pos; /* for splice */ + count -= io->ci_nob; - /* prepare IO restart */ - if (count > 0 && args->via_io_subtype == IO_NORMAL) { - args->u.normal.via_iter = vio->vui_iter; -#ifndef HAVE_FILE_OPERATIONS_READ_WRITE_ITER - args->u.normal.via_iter->nr_segs = vio->vui_tot_nrsegs; -#endif /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */ + if (args->via_io_subtype == IO_NORMAL) { + iov_iter_advance(args->u.normal.via_iter, io->ci_nob); + pos += io->ci_nob; + args->u.normal.via_iocb->ki_pos = pos; +#ifdef HAVE_KIOCB_KI_LEFT + args->u.normal.via_iocb->ki_left = count; +#elif defined(HAVE_KI_NBYTES) + args->u.normal.via_iocb->ki_nbytes = count; +#endif + } else { + /* for splice */ + pos = io->u.ci_rw.rw_range.cir_pos; } } - GOTO(out, rc); out: cl_io_fini(env, io); if ((rc == 0 || rc == -ENODATA) && count > 0 && io->ci_need_restart) { CDEBUG(D_VFSTRACE, - "%s: restart %s from %lld, count:%zu, result: %zd\n", - file->f_path.dentry->d_name.name, - iot == CIT_READ ? "read" : "write", - *ppos, count, result); + "%s: restart %s range: [%llu, %llu) ret: %zd, rc: %d\n", + file_dentry(file)->d_name.name, + iot == CIT_READ ? "read" : "write", + pos, pos + count, result, rc); goto restart; } @@ -1158,14 +1329,87 @@ out: ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES, result); fd->fd_write_failed = false; + } else if (result == 0 && rc == 0) { + rc = io->ci_result; + if (rc < 0) + fd->fd_write_failed = true; + else + fd->fd_write_failed = false; } else if (rc != -ERESTARTSYS) { fd->fd_write_failed = true; } } - CDEBUG(D_VFSTRACE, "iot: %d, result: %zd\n", iot, result); + CDEBUG(D_VFSTRACE, "%s: %s *ppos: %llu, pos: %llu, ret: %zd, rc: %d\n", + file_dentry(file)->d_name.name, + iot == CIT_READ ? "read" : "write", *ppos, pos, result, rc); + + *ppos = pos; + + RETURN(result > 0 ? result : rc); +} + +/** + * The purpose of fast read is to overcome per I/O overhead and improve IOPS + * especially for small I/O. + * + * To serve a read request, CLIO has to create and initialize a cl_io and + * then request DLM lock. This has turned out to have siginificant overhead + * and affects the performance of small I/O dramatically. + * + * It's not necessary to create a cl_io for each I/O. Under the help of read + * ahead, most of the pages being read are already in memory cache and we can + * read those pages directly because if the pages exist, the corresponding DLM + * lock must exist so that page content must be valid. + * + * In fast read implementation, the llite speculatively finds and reads pages + * in memory cache. There are three scenarios for fast read: + * - If the page exists and is uptodate, kernel VM will provide the data and + * CLIO won't be intervened; + * - If the page was brought into memory by read ahead, it will be exported + * and read ahead parameters will be updated; + * - Otherwise the page is not in memory, we can't do fast read. Therefore, + * it will go back and invoke normal read, i.e., a cl_io will be created + * and DLM lock will be requested. + * + * POSIX compliance: posix standard states that read is intended to be atomic. + * Lustre read implementation is in line with Linux kernel read implementation + * and neither of them complies with POSIX standard in this matter. Fast read + * doesn't make the situation worse on single node but it may interleave write + * results from multiple nodes due to short read handling in ll_file_aio_read(). + * + * \param env - lu_env + * \param iocb - kiocb from kernel + * \param iter - user space buffers where the data will be copied + * + * \retval - number of bytes have been read, or error code if error occurred. + */ +static ssize_t +ll_do_fast_read(struct kiocb *iocb, struct iov_iter *iter) +{ + ssize_t result; + + if (!ll_sbi_has_fast_read(ll_i2sbi(file_inode(iocb->ki_filp)))) + return 0; + + /* NB: we can't do direct IO for fast read because it will need a lock + * to make IO engine happy. */ + if (iocb->ki_filp->f_flags & O_DIRECT) + return 0; + + result = generic_file_read_iter(iocb, iter); + + /* If the first page is not in cache, generic_file_aio_read() will be + * returned with -ENODATA. + * See corresponding code in ll_readpage(). */ + if (result == -ENODATA) + result = 0; + + if (result > 0) + ll_stats_ops_tally(ll_i2sbi(file_inode(iocb->ki_filp)), + LPROC_LL_READ_BYTES, result); - return result > 0 ? result : rc; + return result; } /* @@ -1173,10 +1417,15 @@ out: */ static ssize_t ll_file_read_iter(struct kiocb *iocb, struct iov_iter *to) { - struct vvp_io_args *args; struct lu_env *env; + struct vvp_io_args *args; ssize_t result; - int refcheck; + ssize_t rc2; + __u16 refcheck; + + result = ll_do_fast_read(iocb, to); + if (result < 0 || iov_iter_count(to) == 0) + GOTO(out, result); env = cl_env_get(&refcheck); if (IS_ERR(env)) @@ -1186,9 +1435,15 @@ static ssize_t ll_file_read_iter(struct kiocb *iocb, struct iov_iter *to) args->u.normal.via_iter = to; args->u.normal.via_iocb = iocb; - result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ, - &iocb->ki_pos, iov_iter_count(to)); + rc2 = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ, + &iocb->ki_pos, iov_iter_count(to)); + if (rc2 > 0) + result += rc2; + else if (result == 0) + result = rc2; + cl_env_put(env, &refcheck); +out: return result; } @@ -1200,7 +1455,7 @@ static ssize_t ll_file_write_iter(struct kiocb *iocb, struct iov_iter *from) struct vvp_io_args *args; struct lu_env *env; ssize_t result; - int refcheck; + __u16 refcheck; env = cl_env_get(&refcheck); if (IS_ERR(env)) @@ -1251,8 +1506,7 @@ static int ll_file_get_iov_count(const struct iovec *iov, static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov, unsigned long nr_segs, loff_t pos) { - struct iovec *local_iov; - struct iov_iter *to; + struct iov_iter to; size_t iov_count; ssize_t result; ENTRY; @@ -1261,43 +1515,13 @@ static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov, if (result) RETURN(result); - if (nr_segs == 1) { - struct lu_env *env; - int refcheck; - - env = cl_env_get(&refcheck); - if (IS_ERR(env)) - RETURN(PTR_ERR(env)); - - local_iov = &ll_env_info(env)->lti_local_iov; - *local_iov = *iov; - - cl_env_put(env, &refcheck); - } else { - OBD_ALLOC(local_iov, sizeof(*iov) * nr_segs); - if (local_iov == NULL) - RETURN(-ENOMEM); - - memcpy(local_iov, iov, sizeof(*iov) * nr_segs); - } - - OBD_ALLOC_PTR(to); - if (to == NULL) { - result = -ENOMEM; - goto out; - } # ifdef HAVE_IOV_ITER_INIT_DIRECTION - iov_iter_init(to, READ, local_iov, nr_segs, iov_count); + iov_iter_init(&to, READ, iov, nr_segs, iov_count); # else /* !HAVE_IOV_ITER_INIT_DIRECTION */ - iov_iter_init(to, local_iov, nr_segs, iov_count, 0); + iov_iter_init(&to, iov, nr_segs, iov_count, 0); # endif /* HAVE_IOV_ITER_INIT_DIRECTION */ - result = ll_file_read_iter(iocb, to); - - OBD_FREE_PTR(to); -out: - if (nr_segs > 1) - OBD_FREE(local_iov, sizeof(*iov) * nr_segs); + result = ll_file_read_iter(iocb, &to); RETURN(result); } @@ -1305,30 +1529,22 @@ out: static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count, loff_t *ppos) { - struct lu_env *env; struct iovec iov = { .iov_base = buf, .iov_len = count }; - struct kiocb *kiocb; - ssize_t result; - int refcheck; - ENTRY; - - env = cl_env_get(&refcheck); - if (IS_ERR(env)) - RETURN(PTR_ERR(env)); + struct kiocb kiocb; + ssize_t result; + ENTRY; - kiocb = &ll_env_info(env)->lti_kiocb; - init_sync_kiocb(kiocb, file); - kiocb->ki_pos = *ppos; + init_sync_kiocb(&kiocb, file); + kiocb.ki_pos = *ppos; #ifdef HAVE_KIOCB_KI_LEFT - kiocb->ki_left = count; + kiocb.ki_left = count; #elif defined(HAVE_KI_NBYTES) - kiocb->ki_nbytes = count; + kiocb.i_nbytes = count; #endif - result = ll_file_aio_read(kiocb, &iov, 1, kiocb->ki_pos); - *ppos = kiocb->ki_pos; + result = ll_file_aio_read(&kiocb, &iov, 1, kiocb.ki_pos); + *ppos = kiocb.ki_pos; - cl_env_put(env, &refcheck); RETURN(result); } @@ -1339,8 +1555,7 @@ static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count, static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov, unsigned long nr_segs, loff_t pos) { - struct iovec *local_iov; - struct iov_iter *from; + struct iov_iter from; size_t iov_count; ssize_t result; ENTRY; @@ -1349,43 +1564,13 @@ static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov, if (result) RETURN(result); - if (nr_segs == 1) { - struct lu_env *env; - int refcheck; - - env = cl_env_get(&refcheck); - if (IS_ERR(env)) - RETURN(PTR_ERR(env)); - - local_iov = &ll_env_info(env)->lti_local_iov; - *local_iov = *iov; - - cl_env_put(env, &refcheck); - } else { - OBD_ALLOC(local_iov, sizeof(*iov) * nr_segs); - if (local_iov == NULL) - RETURN(-ENOMEM); - - memcpy(local_iov, iov, sizeof(*iov) * nr_segs); - } - - OBD_ALLOC_PTR(from); - if (from == NULL) { - result = -ENOMEM; - goto out; - } # ifdef HAVE_IOV_ITER_INIT_DIRECTION - iov_iter_init(from, WRITE, local_iov, nr_segs, iov_count); + iov_iter_init(&from, WRITE, iov, nr_segs, iov_count); # else /* !HAVE_IOV_ITER_INIT_DIRECTION */ - iov_iter_init(from, local_iov, nr_segs, iov_count, 0); + iov_iter_init(&from, iov, nr_segs, iov_count, 0); # endif /* HAVE_IOV_ITER_INIT_DIRECTION */ - result = ll_file_write_iter(iocb, from); - - OBD_FREE_PTR(from); -out: - if (nr_segs > 1) - OBD_FREE(local_iov, sizeof(*iov) * nr_segs); + result = ll_file_write_iter(iocb, &from); RETURN(result); } @@ -1398,7 +1583,7 @@ static ssize_t ll_file_write(struct file *file, const char __user *buf, .iov_len = count }; struct kiocb *kiocb; ssize_t result; - int refcheck; + __u16 refcheck; ENTRY; env = cl_env_get(&refcheck); @@ -1432,7 +1617,7 @@ static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos, struct lu_env *env; struct vvp_io_args *args; ssize_t result; - int refcheck; + __u16 refcheck; ENTRY; env = cl_env_get(&refcheck); @@ -1448,9 +1633,8 @@ static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos, RETURN(result); } -int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file, - __u64 flags, struct lov_user_md *lum, - int lum_size) +int ll_lov_setstripe_ea_info(struct inode *inode, struct dentry *dentry, + __u64 flags, struct lov_user_md *lum, int lum_size) { struct lookup_intent oit = { .it_op = IT_OPEN, @@ -1460,16 +1644,15 @@ int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file, ENTRY; ll_inode_size_lock(inode); - rc = ll_intent_file_open(file, lum, lum_size, &oit); + rc = ll_intent_file_open(dentry, lum, lum_size, &oit); if (rc < 0) GOTO(out_unlock, rc); - ll_release_openhandle(file->f_path.dentry, &oit); + ll_release_openhandle(dentry, &oit); out_unlock: ll_inode_size_unlock(inode); ll_intent_release(&oit); - cl_lov_delay_create_clear(&file->f_flags); RETURN(rc); } @@ -1517,10 +1700,10 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize); LASSERT(lmm != NULL); - if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) && - (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) { - GOTO(out, rc = -EPROTO); - } + if (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1) && + lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3) && + lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_COMP_V1)) + GOTO(out, rc = -EPROTO); /* * This is coming from the MDS, so is probably in @@ -1530,26 +1713,35 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) { int stripe_count; - stripe_count = le16_to_cpu(lmm->lmm_stripe_count); - if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_RELEASED) - stripe_count = 0; + if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1) || + lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) { + stripe_count = le16_to_cpu(lmm->lmm_stripe_count); + if (le32_to_cpu(lmm->lmm_pattern) & + LOV_PATTERN_F_RELEASED) + stripe_count = 0; + } /* if function called for directory - we should * avoid swab not existent lsm objects */ if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) { - lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm); + lustre_swab_lov_user_md_v1( + (struct lov_user_md_v1 *)lmm); if (S_ISREG(body->mbo_mode)) lustre_swab_lov_user_md_objects( ((struct lov_user_md_v1 *)lmm)->lmm_objects, stripe_count); } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) { lustre_swab_lov_user_md_v3( - (struct lov_user_md_v3 *)lmm); + (struct lov_user_md_v3 *)lmm); if (S_ISREG(body->mbo_mode)) - lustre_swab_lov_user_md_objects( - ((struct lov_user_md_v3 *)lmm)->lmm_objects, - stripe_count); - } + lustre_swab_lov_user_md_objects( + ((struct lov_user_md_v3 *)lmm)->lmm_objects, + stripe_count); + } else if (lmm->lmm_magic == + cpu_to_le32(LOV_MAGIC_COMP_V1)) { + lustre_swab_lov_comp_md_v1( + (struct lov_comp_md_v1 *)lmm); + } } out: @@ -1560,7 +1752,7 @@ out: } static int ll_lov_setea(struct inode *inode, struct file *file, - unsigned long arg) + void __user *arg) { __u64 flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE; struct lov_user_md *lump; @@ -1576,22 +1768,22 @@ static int ll_lov_setea(struct inode *inode, struct file *file, if (lump == NULL) RETURN(-ENOMEM); - if (copy_from_user(lump, (struct lov_user_md __user *)arg, lum_size)) { - OBD_FREE_LARGE(lump, lum_size); - RETURN(-EFAULT); - } + if (copy_from_user(lump, arg, lum_size)) + GOTO(out_lump, rc = -EFAULT); - rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size); + rc = ll_lov_setstripe_ea_info(inode, file_dentry(file), flags, lump, + lum_size); + cl_lov_delay_create_clear(&file->f_flags); +out_lump: OBD_FREE_LARGE(lump, lum_size); RETURN(rc); } -static int ll_file_getstripe(struct inode *inode, - struct lov_user_md __user *lum) +static int ll_file_getstripe(struct inode *inode, void __user *lum, size_t size) { struct lu_env *env; - int refcheck; + __u16 refcheck; int rc; ENTRY; @@ -1599,13 +1791,13 @@ static int ll_file_getstripe(struct inode *inode, if (IS_ERR(env)) RETURN(PTR_ERR(env)); - rc = cl_object_getstripe(env, ll_i2info(inode)->lli_clob, lum); + rc = cl_object_getstripe(env, ll_i2info(inode)->lli_clob, lum, size); cl_env_put(env, &refcheck); RETURN(rc); } static int ll_lov_setstripe(struct inode *inode, struct file *file, - unsigned long arg) + void __user *arg) { struct lov_user_md __user *lum = (struct lov_user_md __user *)arg; struct lov_user_md *klum; @@ -1618,16 +1810,24 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file, RETURN(rc); lum_size = rc; - rc = ll_lov_setstripe_ea_info(inode, file, flags, klum, lum_size); - if (rc == 0) { + rc = ll_lov_setstripe_ea_info(inode, file_dentry(file), flags, klum, + lum_size); + if (!rc) { __u32 gen; - put_user(0, &lum->lmm_stripe_count); + rc = put_user(0, &lum->lmm_stripe_count); + if (rc) + GOTO(out, rc); - ll_layout_refresh(inode, &gen); - rc = ll_file_getstripe(inode, (struct lov_user_md __user *)arg); + rc = ll_layout_refresh(inode, &gen); + if (rc) + GOTO(out, rc); + + rc = ll_file_getstripe(inode, arg, lum_size); } + cl_lov_delay_create_clear(&file->f_flags); +out: OBD_FREE(klum, lum_size); RETURN(rc); } @@ -1635,11 +1835,12 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file, static int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg) { - struct ll_inode_info *lli = ll_i2info(inode); - struct ll_file_data *fd = LUSTRE_FPRIVATE(file); - struct ll_grouplock grouplock; - int rc; - ENTRY; + struct ll_inode_info *lli = ll_i2info(inode); + struct cl_object *obj = lli->lli_clob; + struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + struct ll_grouplock grouplock; + int rc; + ENTRY; if (arg == 0) { CWARN("group id for group lock must not be 0\n"); @@ -1656,8 +1857,33 @@ ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg) spin_unlock(&lli->lli_lock); RETURN(-EINVAL); } - LASSERT(fd->fd_grouplock.lg_lock == NULL); - spin_unlock(&lli->lli_lock); + LASSERT(fd->fd_grouplock.lg_lock == NULL); + spin_unlock(&lli->lli_lock); + + /** + * XXX: group lock needs to protect all OST objects while PFL + * can add new OST objects during the IO, so we'd instantiate + * all OST objects before getting its group lock. + */ + if (obj) { + struct lu_env *env; + __u16 refcheck; + struct cl_layout cl = { + .cl_is_composite = false, + }; + + env = cl_env_get(&refcheck); + if (IS_ERR(env)) + RETURN(PTR_ERR(env)); + + rc = cl_object_layout_get(env, obj, &cl); + if (!rc && cl.cl_is_composite) + rc = ll_layout_write_intent(inode, 0, OBD_OBJECT_EOF); + + cl_env_put(env, &refcheck); + if (rc) + RETURN(rc); + } rc = cl_get_grouplock(ll_i2info(inode)->lli_clob, arg, (file->f_flags & O_NONBLOCK), &grouplock); @@ -1748,12 +1974,11 @@ int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it) ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och); - rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, - och, inode, 0, NULL); + rc = ll_close_inode_openhandle(inode, och, 0, NULL); out: /* this one is in place of ll_file_open */ if (it_disposition(it, DISP_ENQ_OPEN_REF)) { - ptlrpc_req_finished(it->d.lustre.it_data); + ptlrpc_req_finished(it->it_request); it_clear_disposition(it, DISP_ENQ_OPEN_REF); } RETURN(rc); @@ -1769,7 +1994,7 @@ static int ll_do_fiemap(struct inode *inode, struct fiemap *fiemap, size_t num_bytes) { struct lu_env *env; - int refcheck; + __u16 refcheck; int rc = 0; struct ll_fiemap_info_key fmkey = { .lfik_name = KEY_FIEMAP, }; ENTRY; @@ -1845,6 +2070,10 @@ int ll_fid2path(struct inode *inode, void __user *arg) if (copy_from_user(gfout, arg, sizeof(*gfout))) GOTO(gf_free, rc = -EFAULT); + /* append root FID after gfout to let MDT know the root FID so that it + * can lookup the correct path, this is mainly for fileset. + * old server without fileset mount support will ignore this. */ + *gfout->gf_u.gf_root_fid = *ll_inode2fid(inode); /* Call mdc_iocontrol */ rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL); @@ -1875,7 +2104,7 @@ int ll_data_version(struct inode *inode, __u64 *data_version, int flags) struct cl_object *obj = ll_i2info(inode)->lli_clob; struct lu_env *env; struct cl_io *io; - int refcheck; + __u16 refcheck; int result; ENTRY; @@ -1918,11 +2147,11 @@ restart: */ int ll_hsm_release(struct inode *inode) { - struct cl_env_nest nest; struct lu_env *env; struct obd_client_handle *och = NULL; __u64 data_version = 0; int rc; + __u16 refcheck; ENTRY; CDEBUG(D_INODE, "%s: Releasing file "DFID".\n", @@ -1938,18 +2167,18 @@ int ll_hsm_release(struct inode *inode) if (rc != 0) GOTO(out, rc); - env = cl_env_nested_get(&nest); + env = cl_env_get(&refcheck); if (IS_ERR(env)) GOTO(out, rc = PTR_ERR(env)); ll_merge_attr(env, inode); - cl_env_nested_put(&nest, env); + cl_env_put(env, &refcheck); /* Release the file. * NB: lease lock handle is released in mdc_hsm_release_pack() because * we still need it to pack l_remote_handle to MDT. */ - rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, och, inode, - MDS_HSM_RELEASE, &data_version); + rc = ll_close_inode_openhandle(inode, och, MDS_HSM_RELEASE, + &data_version); och = NULL; EXIT; @@ -1983,8 +2212,8 @@ static int ll_swap_layouts(struct file *file1, struct file *file2, if (llss == NULL) RETURN(-ENOMEM); - llss->inode1 = file1->f_path.dentry->d_inode; - llss->inode2 = file2->f_path.dentry->d_inode; + llss->inode1 = file_inode(file1); + llss->inode2 = file_inode(file2); rc = ll_check_swap_layouts_validity(llss->inode1, llss->inode2); if (rc < 0) @@ -2151,13 +2380,13 @@ static int ll_hsm_import(struct inode *inode, struct file *file, ATTR_MTIME | ATTR_MTIME_SET | ATTR_ATIME | ATTR_ATIME_SET; - mutex_lock(&inode->i_mutex); + inode_lock(inode); - rc = ll_setattr_raw(file->f_path.dentry, attr, true); + rc = ll_setattr_raw(file_dentry(file), attr, true); if (rc == -ENODATA) rc = 0; - mutex_unlock(&inode->i_mutex); + inode_unlock(inode); out: if (hss != NULL) @@ -2175,10 +2404,149 @@ static inline long ll_lease_type_from_fmode(fmode_t fmode) ((fmode & FMODE_WRITE) ? LL_LEASE_WRLCK : 0); } +static int ll_file_futimes_3(struct file *file, const struct ll_futimes_3 *lfu) +{ + struct inode *inode = file_inode(file); + struct iattr ia = { + .ia_valid = ATTR_ATIME | ATTR_ATIME_SET | + ATTR_MTIME | ATTR_MTIME_SET | + ATTR_CTIME | ATTR_CTIME_SET, + .ia_atime = { + .tv_sec = lfu->lfu_atime_sec, + .tv_nsec = lfu->lfu_atime_nsec, + }, + .ia_mtime = { + .tv_sec = lfu->lfu_mtime_sec, + .tv_nsec = lfu->lfu_mtime_nsec, + }, + .ia_ctime = { + .tv_sec = lfu->lfu_ctime_sec, + .tv_nsec = lfu->lfu_ctime_nsec, + }, + }; + int rc; + ENTRY; + + if (!capable(CAP_SYS_ADMIN)) + RETURN(-EPERM); + + if (!S_ISREG(inode->i_mode)) + RETURN(-EINVAL); + + inode_lock(inode); + rc = ll_setattr_raw(file_dentry(file), &ia, false); + inode_unlock(inode); + + RETURN(rc); +} + +/* + * Give file access advices + * + * The ladvise interface is similar to Linux fadvise() system call, except it + * forwards the advices directly from Lustre client to server. The server side + * codes will apply appropriate read-ahead and caching techniques for the + * corresponding files. + * + * A typical workload for ladvise is e.g. a bunch of different clients are + * doing small random reads of a file, so prefetching pages into OSS cache + * with big linear reads before the random IO is a net benefit. Fetching + * all that data into each client cache with fadvise() may not be, due to + * much more data being sent to the client. + */ +static int ll_ladvise(struct inode *inode, struct file *file, __u64 flags, + struct llapi_lu_ladvise *ladvise) +{ + struct lu_env *env; + struct cl_io *io; + struct cl_ladvise_io *lio; + int rc; + __u16 refcheck; + ENTRY; + + env = cl_env_get(&refcheck); + if (IS_ERR(env)) + RETURN(PTR_ERR(env)); + + io = vvp_env_thread_io(env); + io->ci_obj = ll_i2info(inode)->lli_clob; + + /* initialize parameters for ladvise */ + lio = &io->u.ci_ladvise; + lio->li_start = ladvise->lla_start; + lio->li_end = ladvise->lla_end; + lio->li_fid = ll_inode2fid(inode); + lio->li_advice = ladvise->lla_advice; + lio->li_flags = flags; + + if (cl_io_init(env, io, CIT_LADVISE, io->ci_obj) == 0) + rc = cl_io_loop(env, io); + else + rc = io->ci_result; + + cl_io_fini(env, io); + cl_env_put(env, &refcheck); + RETURN(rc); +} + +int ll_ioctl_fsgetxattr(struct inode *inode, unsigned int cmd, + unsigned long arg) +{ + struct fsxattr fsxattr; + + if (copy_from_user(&fsxattr, + (const struct fsxattr __user *)arg, + sizeof(fsxattr))) + RETURN(-EFAULT); + + fsxattr.fsx_projid = ll_i2info(inode)->lli_projid; + if (copy_to_user((struct fsxattr __user *)arg, + &fsxattr, sizeof(fsxattr))) + RETURN(-EFAULT); + + RETURN(0); +} + +int ll_ioctl_fssetxattr(struct inode *inode, unsigned int cmd, + unsigned long arg) +{ + + struct md_op_data *op_data; + struct ptlrpc_request *req = NULL; + int rc = 0; + struct fsxattr fsxattr; + + /* only root could change project ID */ + if (!cfs_capable(CFS_CAP_SYS_ADMIN)) + RETURN(-EPERM); + + op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0, + LUSTRE_OPC_ANY, NULL); + if (IS_ERR(op_data)) + RETURN(PTR_ERR(op_data)); + + if (copy_from_user(&fsxattr, + (const struct fsxattr __user *)arg, + sizeof(fsxattr))) + GOTO(out_fsxattr1, rc = -EFAULT); + + op_data->op_projid = fsxattr.fsx_projid; + op_data->op_attr.ia_valid |= MDS_ATTR_PROJID; + rc = md_setattr(ll_i2sbi(inode)->ll_md_exp, op_data, NULL, + 0, &req); + ptlrpc_req_finished(req); + +out_fsxattr1: + ll_finish_md_op_data(op_data); + RETURN(rc); + + +} + static long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg) { - struct inode *inode = file->f_path.dentry->d_inode; + struct inode *inode = file_inode(file); struct ll_file_data *fd = LUSTRE_FPRIVATE(file); int flags, rc; ENTRY; @@ -2217,16 +2585,17 @@ ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg) fd->fd_flags &= ~flags; } RETURN(0); - case LL_IOC_LOV_SETSTRIPE: - RETURN(ll_lov_setstripe(inode, file, arg)); - case LL_IOC_LOV_SETEA: - RETURN(ll_lov_setea(inode, file, arg)); + case LL_IOC_LOV_SETSTRIPE: + case LL_IOC_LOV_SETSTRIPE_NEW: + RETURN(ll_lov_setstripe(inode, file, (void __user *)arg)); + case LL_IOC_LOV_SETEA: + RETURN(ll_lov_setea(inode, file, (void __user *)arg)); case LL_IOC_LOV_SWAP_LAYOUTS: { struct file *file2; struct lustre_swap_layouts lsl; if (copy_from_user(&lsl, (char __user *)arg, - sizeof(struct lustre_swap_layouts))) + sizeof(struct lustre_swap_layouts))) RETURN(-EFAULT); if ((file->f_flags & O_ACCMODE) == O_RDONLY) @@ -2257,7 +2626,7 @@ ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg) mutex_unlock(&lli->lli_och_mutex); if (och == NULL) GOTO(out, rc = -ENOLCK); - inode2 = file2->f_path.dentry->d_inode; + inode2 = file_inode(file2); rc = ll_swap_layouts_close(och, inode, inode2); } else { rc = ll_swap_layouts(file, file2, &lsl); @@ -2267,8 +2636,8 @@ out: RETURN(rc); } case LL_IOC_LOV_GETSTRIPE: - RETURN(ll_file_getstripe(inode, - (struct lov_user_md __user *)arg)); + case LL_IOC_LOV_GETSTRIPE_NEW: + RETURN(ll_file_getstripe(inode, (void __user *)arg, 0)); case FSFILT_IOC_GETFLAGS: case FSFILT_IOC_SETFLAGS: RETURN(ll_iocontrol(inode, file, cmd, arg)); @@ -2437,6 +2806,10 @@ out: if (rc < 0) RETURN(rc); + rc = ll_lease_och_release(inode, file); + if (rc < 0) + RETURN(rc); + if (lease_broken) fmode = 0; @@ -2506,7 +2879,70 @@ out: OBD_FREE_PTR(hui); RETURN(rc); } + case LL_IOC_FUTIMES_3: { + struct ll_futimes_3 lfu; + + if (copy_from_user(&lfu, + (const struct ll_futimes_3 __user *)arg, + sizeof(lfu))) + RETURN(-EFAULT); + + RETURN(ll_file_futimes_3(file, &lfu)); + } + case LL_IOC_LADVISE: { + struct llapi_ladvise_hdr *ladvise_hdr; + int i; + int num_advise; + int alloc_size = sizeof(*ladvise_hdr); + + rc = 0; + OBD_ALLOC_PTR(ladvise_hdr); + if (ladvise_hdr == NULL) + RETURN(-ENOMEM); + + if (copy_from_user(ladvise_hdr, + (const struct llapi_ladvise_hdr __user *)arg, + alloc_size)) + GOTO(out_ladvise, rc = -EFAULT); + + if (ladvise_hdr->lah_magic != LADVISE_MAGIC || + ladvise_hdr->lah_count < 1) + GOTO(out_ladvise, rc = -EINVAL); + + num_advise = ladvise_hdr->lah_count; + if (num_advise >= LAH_COUNT_MAX) + GOTO(out_ladvise, rc = -EFBIG); + OBD_FREE_PTR(ladvise_hdr); + alloc_size = offsetof(typeof(*ladvise_hdr), + lah_advise[num_advise]); + OBD_ALLOC(ladvise_hdr, alloc_size); + if (ladvise_hdr == NULL) + RETURN(-ENOMEM); + + /* + * TODO: submit multiple advices to one server in a single RPC + */ + if (copy_from_user(ladvise_hdr, + (const struct llapi_ladvise_hdr __user *)arg, + alloc_size)) + GOTO(out_ladvise, rc = -EFAULT); + + for (i = 0; i < num_advise; i++) { + rc = ll_ladvise(inode, file, ladvise_hdr->lah_flags, + &ladvise_hdr->lah_advise[i]); + if (rc) + break; + } + +out_ladvise: + OBD_FREE(ladvise_hdr, alloc_size); + RETURN(rc); + } + case LL_IOC_FSGETXATTR: + RETURN(ll_ioctl_fsgetxattr(inode, cmd, arg)); + case LL_IOC_FSSETXATTR: + RETURN(ll_ioctl_fssetxattr(inode, cmd, arg)); default: { int err; @@ -2540,7 +2976,7 @@ static loff_t generic_file_llseek_size(struct file *file, loff_t offset, int origin, loff_t maxsize, loff_t eof) { - struct inode *inode = file->f_path.dentry->d_inode; + struct inode *inode = file_inode(file); switch (origin) { case SEEK_END: @@ -2560,9 +2996,9 @@ generic_file_llseek_size(struct file *file, loff_t offset, int origin, * SEEK_CURs. Note that parallel writes and reads behave * like SEEK_SET. */ - mutex_lock(&inode->i_mutex); + inode_lock(inode); offset = llseek_execute(file, file->f_pos + offset, maxsize); - mutex_unlock(&inode->i_mutex); + inode_unlock(inode); return offset; case SEEK_DATA: /* @@ -2589,7 +3025,7 @@ generic_file_llseek_size(struct file *file, loff_t offset, int origin, static loff_t ll_file_seek(struct file *file, loff_t offset, int origin) { - struct inode *inode = file->f_path.dentry->d_inode; + struct inode *inode = file_inode(file); loff_t retval, eof = 0; ENTRY; @@ -2614,7 +3050,7 @@ static loff_t ll_file_seek(struct file *file, loff_t offset, int origin) static int ll_flush(struct file *file, fl_owner_t id) { - struct inode *inode = file->f_path.dentry->d_inode; + struct inode *inode = file_inode(file); struct ll_inode_info *lli = ll_i2info(inode); struct ll_file_data *fd = LUSTRE_FPRIVATE(file); int rc, err; @@ -2647,18 +3083,18 @@ static int ll_flush(struct file *file, fl_owner_t id) int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end, enum cl_fsync_mode mode, int ignore_layout) { - struct cl_env_nest nest; struct lu_env *env; struct cl_io *io; struct cl_fsync_io *fio; int result; + __u16 refcheck; ENTRY; if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL && mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL) RETURN(-EINVAL); - env = cl_env_nested_get(&nest); + env = cl_env_get(&refcheck); if (IS_ERR(env)) RETURN(PTR_ERR(env)); @@ -2681,25 +3117,26 @@ int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end, if (result == 0) result = fio->fi_nr_written; cl_io_fini(env, io); - cl_env_nested_put(&nest, env); + cl_env_put(env, &refcheck); RETURN(result); } /* - * When dentry is provided (the 'else' case), *file->f_path.dentry may be + * When dentry is provided (the 'else' case), file_dentry() may be * null and dentry must be used directly rather than pulled from - * *file->f_path.dentry as is done otherwise. + * file_dentry() as is done otherwise. */ #ifdef HAVE_FILE_FSYNC_4ARGS int ll_fsync(struct file *file, loff_t start, loff_t end, int datasync) { - struct dentry *dentry = file->f_path.dentry; + struct dentry *dentry = file_dentry(file); + bool lock_inode; #elif defined(HAVE_FILE_FSYNC_2ARGS) int ll_fsync(struct file *file, int datasync) { - struct dentry *dentry = file->f_path.dentry; + struct dentry *dentry = file_dentry(file); loff_t start = 0; loff_t end = LLONG_MAX; #else @@ -2720,7 +3157,9 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync) #ifdef HAVE_FILE_FSYNC_4ARGS rc = filemap_write_and_wait_range(inode->i_mapping, start, end); - mutex_lock(&inode->i_mutex); + lock_inode = !lli->lli_inode_locked; + if (lock_inode) + inode_lock(inode); #else /* fsync's caller has already called _fdata{sync,write}, we want * that IO to finish before calling the osc and mdc sync methods */ @@ -2734,9 +3173,11 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync) lli->lli_async_rc = 0; if (rc == 0) rc = err; - err = lov_read_and_clear_async_rc(lli->lli_clob); - if (rc == 0) - rc = err; + if (lli->lli_clob != NULL) { + err = lov_read_and_clear_async_rc(lli->lli_clob); + if (rc == 0) + rc = err; + } } err = md_fsync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), &req); @@ -2758,7 +3199,8 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync) } #ifdef HAVE_FILE_FSYNC_4ARGS - mutex_unlock(&inode->i_mutex); + if (lock_inode) + inode_unlock(inode); #endif RETURN(rc); } @@ -2766,7 +3208,7 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync) static int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) { - struct inode *inode = file->f_path.dentry->d_inode; + struct inode *inode = file_inode(file); struct ll_sb_info *sbi = ll_i2sbi(inode); struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK, @@ -2774,8 +3216,8 @@ ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) .ei_cbdata = file_lock, }; struct md_op_data *op_data; - struct lustre_handle lockh = {0}; - ldlm_policy_data_t flock = {{0}}; + struct lustre_handle lockh = { 0 }; + union ldlm_policy_data flock = { { 0 } }; int fl_type = file_lock->fl_type; __u64 flags = 0; int rc; @@ -2868,18 +3310,23 @@ ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); - CDEBUG(D_DLMTRACE, "inode="DFID", pid=%u, flags="LPX64", mode=%u, " - "start="LPU64", end="LPU64"\n", PFID(ll_inode2fid(inode)), + CDEBUG(D_DLMTRACE, "inode="DFID", pid=%u, flags=%#llx, mode=%u, " + "start=%llu, end=%llu\n", PFID(ll_inode2fid(inode)), flock.l_flock.pid, flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end); - rc = md_enqueue(sbi->ll_md_exp, &einfo, &flock, NULL, op_data, &lockh, + rc = md_enqueue(sbi->ll_md_exp, &einfo, &flock, op_data, &lockh, flags); /* Restore the file lock type if not TEST lock. */ if (!(flags & LDLM_FL_TEST_LOCK)) file_lock->fl_type = fl_type; +#ifdef HAVE_LOCKS_LOCK_FILE_WAIT + if ((rc == 0 || file_lock->fl_type == F_UNLCK) && + !(flags & LDLM_FL_TEST_LOCK)) + rc2 = locks_lock_file_wait(file, file_lock); +#else if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0 || file_lock->fl_type == F_UNLCK)) rc2 = flock_lock_file_wait(file, file_lock); @@ -2887,10 +3334,11 @@ ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) (rc == 0 || file_lock->fl_type == F_UNLCK) && !(flags & LDLM_FL_TEST_LOCK)) rc2 = posix_lock_file_wait(file, file_lock); +#endif /* HAVE_LOCKS_LOCK_FILE_WAIT */ if (rc2 && file_lock->fl_type != F_UNLCK) { einfo.ei_mode = LCK_NL; - md_enqueue(sbi->ll_md_exp, &einfo, &flock, NULL, op_data, + md_enqueue(sbi->ll_md_exp, &einfo, &flock, op_data, &lockh, flags); rc = rc2; } @@ -2901,7 +3349,8 @@ ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) } int ll_get_fid_by_name(struct inode *parent, const char *name, - int namelen, struct lu_fid *fid) + int namelen, struct lu_fid *fid, + struct inode **inode) { struct md_op_data *op_data = NULL; struct mdt_body *body; @@ -2914,7 +3363,7 @@ int ll_get_fid_by_name(struct inode *parent, const char *name, if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); - op_data->op_valid = OBD_MD_FLID; + op_data->op_valid = OBD_MD_FLID | OBD_MD_FLTYPE; rc = md_getattr_name(ll_i2sbi(parent)->ll_md_exp, op_data, &req); ll_finish_md_op_data(op_data); if (rc < 0) @@ -2925,6 +3374,9 @@ int ll_get_fid_by_name(struct inode *parent, const char *name, GOTO(out_req, rc = -EFAULT); if (fid != NULL) *fid = body->mbo_fid1; + + if (inode != NULL) + rc = ll_prep_inode(inode, req, parent->i_sb, NULL); out_req: ptlrpc_req_finished(req); RETURN(rc); @@ -2937,8 +3389,11 @@ int ll_migrate(struct inode *parent, struct file *file, int mdtidx, struct inode *child_inode = NULL; struct md_op_data *op_data; struct ptlrpc_request *request = NULL; + struct obd_client_handle *och = NULL; struct qstr qstr; + struct mdt_body *body; int rc; + __u64 data_version = 0; ENTRY; CDEBUG(D_VFSTRACE, "migrate %s under "DFID" to MDT%04x\n", @@ -2950,62 +3405,116 @@ int ll_migrate(struct inode *parent, struct file *file, int mdtidx, RETURN(PTR_ERR(op_data)); /* Get child FID first */ - qstr.hash = full_name_hash(name, namelen); + qstr.hash = ll_full_name_hash(file_dentry(file), name, namelen); qstr.name = name; qstr.len = namelen; - dchild = d_lookup(file->f_path.dentry, &qstr); + dchild = d_lookup(file_dentry(file), &qstr); if (dchild != NULL) { - if (dchild->d_inode != NULL) { + if (dchild->d_inode != NULL) child_inode = igrab(dchild->d_inode); - if (child_inode != NULL) { - mutex_lock(&child_inode->i_mutex); - op_data->op_fid3 = *ll_inode2fid(child_inode); - ll_invalidate_aliases(child_inode); - } - } dput(dchild); - } else { + } + + if (child_inode == NULL) { rc = ll_get_fid_by_name(parent, name, namelen, - &op_data->op_fid3); + &op_data->op_fid3, &child_inode); if (rc != 0) GOTO(out_free, rc); } + if (child_inode == NULL) + GOTO(out_free, rc = -EINVAL); + + /* + * lfs migrate command needs to be blocked on the client + * by checking the migrate FID against the FID of the + * filesystem root. + */ + if (child_inode == parent->i_sb->s_root->d_inode) + GOTO(out_iput, rc = -EINVAL); + + inode_lock(child_inode); + op_data->op_fid3 = *ll_inode2fid(child_inode); if (!fid_is_sane(&op_data->op_fid3)) { - CERROR("%s: migrate %s , but fid "DFID" is insane\n", + CERROR("%s: migrate %s, but FID "DFID" is insane\n", ll_get_fsname(parent->i_sb, NULL, 0), name, PFID(&op_data->op_fid3)); - GOTO(out_free, rc = -EINVAL); + GOTO(out_unlock, rc = -EINVAL); } rc = ll_get_mdt_idx_by_fid(ll_i2sbi(parent), &op_data->op_fid3); if (rc < 0) - GOTO(out_free, rc); + GOTO(out_unlock, rc); if (rc == mdtidx) { - CDEBUG(D_INFO, "%s:"DFID" is already on MDT%d.\n", name, + CDEBUG(D_INFO, "%s: "DFID" is already on MDT%04x\n", name, PFID(&op_data->op_fid3), mdtidx); - GOTO(out_free, rc = 0); + GOTO(out_unlock, rc = 0); + } +again: + if (S_ISREG(child_inode->i_mode)) { + och = ll_lease_open(child_inode, NULL, FMODE_WRITE, 0); + if (IS_ERR(och)) { + rc = PTR_ERR(och); + och = NULL; + GOTO(out_unlock, rc); + } + + rc = ll_data_version(child_inode, &data_version, + LL_DV_WR_FLUSH); + if (rc != 0) + GOTO(out_close, rc); + + op_data->op_handle = och->och_fh; + op_data->op_data = och->och_mod; + op_data->op_data_version = data_version; + op_data->op_lease_handle = och->och_lease_handle; + op_data->op_bias |= MDS_RENAME_MIGRATE; } op_data->op_mds = mdtidx; op_data->op_cli_flags = CLI_MIGRATE; rc = md_rename(ll_i2sbi(parent)->ll_md_exp, op_data, name, namelen, name, namelen, &request); - if (rc == 0) + if (rc == 0) { + LASSERT(request != NULL); ll_update_times(request, parent); - ptlrpc_req_finished(request); - if (rc != 0) - GOTO(out_free, rc); + body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY); + LASSERT(body != NULL); + + /* If the server does release layout lock, then we cleanup + * the client och here, otherwise release it in out_close: */ + if (och != NULL && + body->mbo_valid & OBD_MD_CLOSE_INTENT_EXECED) { + obd_mod_put(och->och_mod); + md_clear_open_replay_data(ll_i2sbi(parent)->ll_md_exp, + och); + och->och_fh.cookie = DEAD_HANDLE_MAGIC; + OBD_FREE_PTR(och); + och = NULL; + } + } -out_free: - if (child_inode != NULL) { - clear_nlink(child_inode); - mutex_unlock(&child_inode->i_mutex); - iput(child_inode); + if (request != NULL) { + ptlrpc_req_finished(request); + request = NULL; } + /* Try again if the file layout has changed. */ + if (rc == -EAGAIN && S_ISREG(child_inode->i_mode)) + goto again; + +out_close: + if (och != NULL) /* close the file */ + ll_lease_close(och, child_inode, NULL); + if (rc == 0) + clear_nlink(child_inode); +out_unlock: + inode_unlock(child_inode); +out_iput: + iput(child_inode); +out_free: ll_finish_md_op_data(op_data); RETURN(rc); } @@ -3028,16 +3537,16 @@ ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock) * \param l_req_mode [IN] searched lock mode * \retval boolean, true iff all bits are found */ -int ll_have_md_lock(struct inode *inode, __u64 *bits, ldlm_mode_t l_req_mode) +int ll_have_md_lock(struct inode *inode, __u64 *bits, enum ldlm_mode l_req_mode) { - struct lustre_handle lockh; - ldlm_policy_data_t policy; - ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ? - (LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode; - struct lu_fid *fid; + struct lustre_handle lockh; + union ldlm_policy_data policy; + enum ldlm_mode mode = (l_req_mode == LCK_MINMODE) ? + (LCK_CR | LCK_CW | LCK_PR | LCK_PW) : l_req_mode; + struct lu_fid *fid; __u64 flags; - int i; - ENTRY; + int i; + ENTRY; if (!inode) RETURN(0); @@ -3069,17 +3578,17 @@ int ll_have_md_lock(struct inode *inode, __u64 *bits, ldlm_mode_t l_req_mode) RETURN(*bits == 0); } -ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits, - struct lustre_handle *lockh, __u64 flags, - ldlm_mode_t mode) +enum ldlm_mode ll_take_md_lock(struct inode *inode, __u64 bits, + struct lustre_handle *lockh, __u64 flags, + enum ldlm_mode mode) { - ldlm_policy_data_t policy = { .l_inodebits = {bits}}; - struct lu_fid *fid; - ldlm_mode_t rc; - ENTRY; + union ldlm_policy_data policy = { .l_inodebits = { bits } }; + struct lu_fid *fid; + enum ldlm_mode rc; + ENTRY; - fid = &ll_i2info(inode)->lli_fid; - CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid)); + fid = &ll_i2info(inode)->lli_fid; + CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid)); rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags, fid, LDLM_IBITS, &policy, mode, lockh); @@ -3092,6 +3601,13 @@ static int ll_inode_revalidate_fini(struct inode *inode, int rc) /* Already unlinked. Just update nlink and return success */ if (rc == -ENOENT) { clear_nlink(inode); + /* If it is striped directory, and there is bad stripe + * Let's revalidate the dentry again, instead of returning + * error */ + if (S_ISDIR(inode->i_mode) && + ll_i2info(inode)->lli_lsm_md != NULL) + return 0; + /* This path cannot be hit for regular files unless in * case of obscure races, so no need to to validate * size. */ @@ -3157,8 +3673,11 @@ static int __ll_inode_revalidate(struct dentry *dentry, __u64 ibits) do_lookup() -> ll_revalidate_it(). We cannot use d_drop here to preserve get_cwd functionality on 2.6. Bug 10503 */ - if (!dentry->d_inode->i_nlink) + if (!dentry->d_inode->i_nlink) { + ll_lock_dcache(inode); d_lustre_invalidate(dentry, 0); + ll_unlock_dcache(inode); + } ll_lookup_finish_locks(&oit, dentry); } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) { @@ -3254,6 +3773,18 @@ ll_inode_revalidate(struct dentry *dentry, __u64 ibits) RETURN(rc); } +static inline dev_t ll_compat_encode_dev(dev_t dev) +{ + /* The compat_sys_*stat*() syscalls will fail unless the + * device majors and minors are both less than 256. Note that + * the value returned here will be passed through + * old_encode_dev() in cp_compat_stat(). And so we are not + * trying to return a valid compat (u16) device number, just + * one that will pass the old_valid_dev() check. */ + + return MKDEV(MAJOR(dev) & 0xff, MINOR(dev) & 0xff); +} + int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat) { struct inode *inode = de->d_inode; @@ -3268,19 +3799,25 @@ int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat) if (res) return res; - stat->dev = inode->i_sb->s_dev; - if (ll_need_32bit_api(sbi)) + OBD_FAIL_TIMEOUT(OBD_FAIL_GETATTR_DELAY, 30); + + if (ll_need_32bit_api(sbi)) { stat->ino = cl_fid_build_ino(&lli->lli_fid, 1); - else + stat->dev = ll_compat_encode_dev(inode->i_sb->s_dev); + stat->rdev = ll_compat_encode_dev(inode->i_rdev); + } else { stat->ino = inode->i_ino; + stat->dev = inode->i_sb->s_dev; + stat->rdev = inode->i_rdev; + } + stat->mode = inode->i_mode; stat->uid = inode->i_uid; stat->gid = inode->i_gid; - stat->rdev = inode->i_rdev; stat->atime = inode->i_atime; stat->mtime = inode->i_mtime; stat->ctime = inode->i_ctime; - stat->blksize = 1 << inode->i_blkbits; + stat->blksize = sbi->ll_stat_blksize ?: 1 << inode->i_blkbits; stat->nlink = inode->i_nlink; stat->size = i_size_read(inode); @@ -3341,6 +3878,59 @@ struct posix_acl *ll_get_acl(struct inode *inode, int type) RETURN(acl); } +#ifdef HAVE_IOP_SET_ACL +#ifdef CONFIG_FS_POSIX_ACL +int ll_set_acl(struct inode *inode, struct posix_acl *acl, int type) +{ + const char *name = NULL; + char *value = NULL; + size_t size = 0; + int rc = 0; + ENTRY; + + switch (type) { + case ACL_TYPE_ACCESS: + if (acl) { + rc = posix_acl_update_mode(inode, &inode->i_mode, &acl); + if (rc) + GOTO(out, rc); + } + name = XATTR_NAME_POSIX_ACL_ACCESS; + break; + case ACL_TYPE_DEFAULT: + if (!S_ISDIR(inode->i_mode)) + GOTO(out, rc = acl ? -EACCES : 0); + name = XATTR_NAME_POSIX_ACL_DEFAULT; + break; + default: + GOTO(out, rc = -EINVAL); + } + + if (acl) { + size = posix_acl_xattr_size(acl->a_count); + value = kmalloc(size, GFP_NOFS); + if (value == NULL) + GOTO(out, rc = -ENOMEM); + + rc = posix_acl_to_xattr(&init_user_ns, acl, value, size); + if (rc < 0) + GOTO(out_free, rc); + } + + /* dentry is only used for *.lov attributes so it's safe to be NULL */ + rc = __vfs_setxattr(NULL, inode, name, value, size, XATTR_CREATE); +out_free: + kfree(value); +out: + if (!rc) + set_cached_acl(inode, type, acl); + else + forget_cached_acl(inode, type); + RETURN(rc); +} +#endif /* CONFIG_FS_POSIX_ACL */ +#endif /* HAVE_IOP_SET_ACL */ + #ifndef HAVE_GENERIC_PERMISSION_2ARGS static int # ifdef HAVE_GENERIC_PERMISSION_4ARGS @@ -3442,12 +4032,7 @@ int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd) } ll_stats_ops_tally(sbi, LPROC_LL_INODE_PERM, 1); - - if (sbi->ll_flags & LL_SBI_RMT_CLIENT) - rc = lustre_check_remote_perm(inode, mask); - else - rc = ll_generic_permission(inode, mask, flags, ll_check_acl); - + rc = ll_generic_permission(inode, mask, flags, ll_check_acl); /* restore current process's credentials and FS capability */ if (squash_id) { revert_creds(old_cred); @@ -3460,9 +4045,11 @@ int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd) /* -o localflock - only provides locally consistent flock locks */ struct file_operations ll_file_operations = { #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER +# ifdef HAVE_SYNC_READ_WRITE .read = new_sync_read, - .read_iter = ll_file_read_iter, .write = new_sync_write, +# endif + .read_iter = ll_file_read_iter, .write_iter = ll_file_write_iter, #else /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */ .read = ll_file_read, @@ -3482,9 +4069,11 @@ struct file_operations ll_file_operations = { struct file_operations ll_file_operations_flock = { #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER +# ifdef HAVE_SYNC_READ_WRITE .read = new_sync_read, - .read_iter = ll_file_read_iter, .write = new_sync_write, +# endif /* HAVE_SYNC_READ_WRITE */ + .read_iter = ll_file_read_iter, .write_iter = ll_file_write_iter, #else /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */ .read = ll_file_read, @@ -3507,9 +4096,11 @@ struct file_operations ll_file_operations_flock = { /* These are for -o noflock - to return ENOSYS on flock calls */ struct file_operations ll_file_operations_noflock = { #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER +# ifdef HAVE_SYNC_READ_WRITE .read = new_sync_read, - .read_iter = ll_file_read_iter, .write = new_sync_write, +# endif /* HAVE_SYNC_READ_WRITE */ + .read_iter = ll_file_read_iter, .write_iter = ll_file_write_iter, #else /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */ .read = ll_file_read, @@ -3533,14 +4124,19 @@ struct inode_operations ll_file_inode_operations = { .setattr = ll_setattr, .getattr = ll_getattr, .permission = ll_inode_permission, +#ifdef HAVE_IOP_XATTR .setxattr = ll_setxattr, .getxattr = ll_getxattr, - .listxattr = ll_listxattr, .removexattr = ll_removexattr, +#endif + .listxattr = ll_listxattr, .fiemap = ll_fiemap, #ifdef HAVE_IOP_GET_ACL .get_acl = ll_get_acl, #endif +#ifdef HAVE_IOP_SET_ACL + .set_acl = ll_set_acl, +#endif }; /* dynamic ioctl number support routins */ @@ -3648,15 +4244,15 @@ int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf) { struct ll_inode_info *lli = ll_i2info(inode); struct cl_object *obj = lli->lli_clob; - struct cl_env_nest nest; struct lu_env *env; int rc; + __u16 refcheck; ENTRY; if (obj == NULL) RETURN(0); - env = cl_env_nested_get(&nest); + env = cl_env_get(&refcheck); if (IS_ERR(env)) RETURN(PTR_ERR(env)); @@ -3691,7 +4287,7 @@ int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf) } out: - cl_env_nested_put(&nest, env); + cl_env_put(env, &refcheck); RETURN(rc); } @@ -3755,7 +4351,7 @@ static int ll_layout_fetch(struct inode *inode, struct ldlm_lock *lock) } unlock_res_and_lock(lock); - if (lvbdata != NULL) + if (lvbdata) OBD_FREE_LARGE(lvbdata, lmmsize); EXIT; @@ -3769,7 +4365,7 @@ out: * Apply the layout to the inode. Layout lock is held and will be released * in this function. */ -static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode, +static int ll_layout_lock_set(struct lustre_handle *lockh, enum ldlm_mode mode, struct inode *inode) { struct ll_inode_info *lli = ll_i2info(inode); @@ -3791,14 +4387,14 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode, PFID(&lli->lli_fid), inode); /* in case this is a caching lock and reinstate with new inode */ - md_set_lock_data(sbi->ll_md_exp, &lockh->cookie, inode, NULL); + md_set_lock_data(sbi->ll_md_exp, lockh, inode, NULL); lock_res_and_lock(lock); lvb_ready = ldlm_is_lvb_ready(lock); unlock_res_and_lock(lock); + /* checking lvb_ready is racy but this is okay. The worst case is * that multi processes may configure the file on the same time. */ - if (lvb_ready) GOTO(out, rc = 0); @@ -3823,7 +4419,6 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode, /* refresh layout failed, need to wait */ wait_layout = rc == -EBUSY; EXIT; - out: LDLM_LOCK_PUT(lock); ldlm_lock_decref(lockh, mode); @@ -3848,68 +4443,55 @@ out: RETURN(rc); } -static int ll_layout_refresh_locked(struct inode *inode) +/** + * Issue layout intent RPC to MDS. + * \param inode [in] file inode + * \param intent [in] layout intent + * + * \retval 0 on success + * \retval < 0 error code + */ +static int ll_layout_intent(struct inode *inode, struct layout_intent *intent) { struct ll_inode_info *lli = ll_i2info(inode); struct ll_sb_info *sbi = ll_i2sbi(inode); struct md_op_data *op_data; - struct lookup_intent it; - struct lustre_handle lockh; - ldlm_mode_t mode; - struct ldlm_enqueue_info einfo = { - .ei_type = LDLM_IBITS, - .ei_mode = LCK_CR, - .ei_cb_bl = &ll_md_blocking_ast, - .ei_cb_cp = &ldlm_completion_ast, - }; + struct lookup_intent it; + struct ptlrpc_request *req; int rc; ENTRY; -again: - /* mostly layout lock is caching on the local side, so try to match - * it before grabbing layout lock mutex. */ - mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0, - LCK_CR | LCK_CW | LCK_PR | LCK_PW); - if (mode != 0) { /* hit cached lock */ - rc = ll_layout_lock_set(&lockh, mode, inode); - if (rc == -EAGAIN) - goto again; - - RETURN(rc); - } - op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, 0, 0, LUSTRE_OPC_ANY, NULL); if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); - /* have to enqueue one */ + op_data->op_data = intent; + op_data->op_data_size = sizeof(*intent); + memset(&it, 0, sizeof(it)); it.it_op = IT_LAYOUT; - lockh.cookie = 0ULL; + if (intent->li_opc == LAYOUT_INTENT_WRITE || + intent->li_opc == LAYOUT_INTENT_TRUNC) + it.it_flags = FMODE_WRITE; LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file "DFID"(%p)", ll_get_fsname(inode->i_sb, NULL, 0), PFID(&lli->lli_fid), inode); - rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL, &it, op_data, &lockh, 0); - if (it.d.lustre.it_data != NULL) - ptlrpc_req_finished(it.d.lustre.it_data); - it.d.lustre.it_data = NULL; + rc = md_intent_lock(sbi->ll_md_exp, op_data, &it, &req, + &ll_md_blocking_ast, 0); + if (it.it_request != NULL) + ptlrpc_req_finished(it.it_request); + it.it_request = NULL; ll_finish_md_op_data(op_data); - mode = it.d.lustre.it_lock_mode; - it.d.lustre.it_lock_mode = 0; - ll_intent_drop_lock(&it); - - if (rc == 0) { - /* set lock data in case this is a new lock */ + /* set lock data in case this is a new lock */ + if (!rc) ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL); - rc = ll_layout_lock_set(&lockh, mode, inode); - if (rc == -EAGAIN) - goto again; - } + + ll_intent_drop_lock(&it); RETURN(rc); } @@ -3931,6 +4513,11 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen) { struct ll_inode_info *lli = ll_i2info(inode); struct ll_sb_info *sbi = ll_i2sbi(inode); + struct lustre_handle lockh; + struct layout_intent intent = { + .li_opc = LAYOUT_INTENT_ACCESS, + }; + enum ldlm_mode mode; int rc; ENTRY; @@ -3945,18 +4532,57 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen) /* take layout lock mutex to enqueue layout lock exclusively. */ mutex_lock(&lli->lli_layout_mutex); - rc = ll_layout_refresh_locked(inode); - if (rc < 0) - GOTO(out, rc); + while (1) { + /* mostly layout lock is caching on the local side, so try to + * match it before grabbing layout lock mutex. */ + mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0, + LCK_CR | LCK_CW | LCK_PR | LCK_PW); + if (mode != 0) { /* hit cached lock */ + rc = ll_layout_lock_set(&lockh, mode, inode); + if (rc == -EAGAIN) + continue; + break; + } - *gen = ll_layout_version_get(lli); -out: + rc = ll_layout_intent(inode, &intent); + if (rc != 0) + break; + } + + if (rc == 0) + *gen = ll_layout_version_get(lli); mutex_unlock(&lli->lli_layout_mutex); RETURN(rc); } /** + * Issue layout intent RPC indicating where in a file an IO is about to write. + * + * \param[in] inode file inode. + * \param[in] start start offset of fille in bytes where an IO is about to + * write. + * \param[in] end exclusive end offset in bytes of the write range. + * + * \retval 0 on success + * \retval < 0 error code + */ +int ll_layout_write_intent(struct inode *inode, __u64 start, __u64 end) +{ + struct layout_intent intent = { + .li_opc = LAYOUT_INTENT_WRITE, + .li_start = start, + .li_end = end, + }; + int rc; + ENTRY; + + rc = ll_layout_intent(inode, &intent); + + RETURN(rc); +} + +/** * This function send a restore request to the MDT */ int ll_layout_restore(struct inode *inode, loff_t offset, __u64 length)