X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fllite%2Ffile.c;h=b7bd2bf7eb14afcb39e5210f1937700c2d6ce1a2;hp=0d13b130d96dfc82a9872380d43c0a8cc03c3c3e;hb=9573911bfb4a2c3d7e2047c9d5f5440d9c7e7db5;hpb=adfb405433aac3691bbd00ed548203c57ce61e3a diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 0d13b13..b7bd2bf 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -322,21 +322,7 @@ int ll_file_release(struct inode *inode, struct file *file) CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p)\n", PFID(ll_inode2fid(inode)), inode); -#ifdef CONFIG_FS_POSIX_ACL - if (sbi->ll_flags & LL_SBI_RMT_CLIENT && - inode == inode->i_sb->s_root->d_inode) { - struct ll_file_data *fd = LUSTRE_FPRIVATE(file); - - LASSERT(fd != NULL); - if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) { - fd->fd_flags &= ~LL_FILE_RMTACL; - rct_del(&sbi->ll_rct, current_pid()); - et_search_free(&sbi->ll_et, current_pid()); - } - } -#endif - - if (inode->i_sb->s_root != file->f_path.dentry) + if (inode->i_sb->s_root != file_dentry(file)) ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1); fd = LUSTRE_FPRIVATE(file); LASSERT(fd != NULL); @@ -346,7 +332,7 @@ int ll_file_release(struct inode *inode, struct file *file) if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd) ll_deauthorize_statahead(inode, fd); - if (inode->i_sb->s_root == file->f_path.dentry) { + if (inode->i_sb->s_root == file_dentry(file)) { LUSTRE_FPRIVATE(file) = NULL; ll_file_data_put(fd); RETURN(0); @@ -369,7 +355,7 @@ int ll_file_release(struct inode *inode, struct file *file) static int ll_intent_file_open(struct file *file, void *lmm, int lmmsize, struct lookup_intent *itp) { - struct dentry *de = file->f_path.dentry; + struct dentry *de = file_dentry(file); struct ll_sb_info *sbi = ll_i2sbi(de->d_inode); struct dentry *parent = de->d_parent; const char *name = NULL; @@ -421,26 +407,35 @@ static int ll_intent_file_open(struct file *file, void *lmm, int lmmsize, } rc = ll_prep_inode(&de->d_inode, req, NULL, itp); - if (!rc && itp->d.lustre.it_lock_mode) + if (!rc && itp->it_lock_mode) ll_set_lock_data(sbi->ll_md_exp, de->d_inode, itp, NULL); out: ptlrpc_req_finished(req); ll_intent_drop_lock(itp); + /* We did open by fid, but by the time we got to the server, + * the object disappeared. If this is a create, we cannot really + * tell the userspace that the file it was trying to create + * does not exist. Instead let's return -ESTALE, and the VFS will + * retry the create with LOOKUP_REVAL that we are going to catch + * in ll_revalidate_dentry() and use lookup then. + */ + if (rc == -ENOENT && itp->it_op & IT_CREAT) + rc = -ESTALE; + RETURN(rc); } static int ll_och_fill(struct obd_export *md_exp, struct lookup_intent *it, struct obd_client_handle *och) { - struct ptlrpc_request *req = it->d.lustre.it_data; struct mdt_body *body; - body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + body = req_capsule_server_get(&it->it_request->rq_pill, &RMF_MDT_BODY); och->och_fh = body->mbo_handle; och->och_fid = body->mbo_fid1; - och->och_lease_handle.cookie = it->d.lustre.it_lock_handle; + och->och_lease_handle.cookie = it->it_lock_handle; och->och_magic = OBD_CLIENT_HANDLE_MAGIC; och->och_flags = it->it_flags; @@ -450,7 +445,7 @@ static int ll_och_fill(struct obd_export *md_exp, struct lookup_intent *it, static int ll_local_open(struct file *file, struct lookup_intent *it, struct ll_file_data *fd, struct obd_client_handle *och) { - struct inode *inode = file->f_path.dentry->d_inode; + struct inode *inode = file_inode(file); ENTRY; LASSERT(!LUSTRE_FPRIVATE(file)); @@ -514,12 +509,12 @@ int ll_file_open(struct inode *inode, struct file *file) if (S_ISDIR(inode->i_mode)) ll_authorize_statahead(inode, fd); - if (inode->i_sb->s_root == file->f_path.dentry) { + if (inode->i_sb->s_root == file_dentry(file)) { LUSTRE_FPRIVATE(file) = fd; RETURN(0); } - if (!it || !it->d.lustre.it_disposition) { + if (!it || !it->it_disposition) { /* Convert f_flags into access mode. We cannot use file->f_mode, * because everything but O_ACCMODE mask was stripped from * there */ @@ -572,7 +567,7 @@ restart: GOTO(out_openerr, rc); } - ll_release_openhandle(file->f_path.dentry, it); + ll_release_openhandle(file_dentry(file), it); } (*och_usecount)++; @@ -584,7 +579,8 @@ restart: } } else { LASSERT(*och_usecount == 0); - if (!it->d.lustre.it_disposition) { + if (!it->it_disposition) { + struct ll_dentry_data *ldd = ll_d2d(file->f_path.dentry); /* We cannot just request lock handle now, new ELC code means that one of other OPEN locks for this file could be cancelled, and since blocking ast handler @@ -598,12 +594,24 @@ restart: * handle to be returned from LOOKUP|OPEN request, * for example if the target entry was a symlink. * - * Always fetch MDS_OPEN_LOCK if this is not setstripe. + * Only fetch MDS_OPEN_LOCK if this is in NFS path, + * marked by a bit set in ll_iget_for_nfs. Clear the + * bit so that it's not confusing later callers. * + * NB; when ldd is NULL, it must have come via normal + * lookup path only, since ll_iget_for_nfs always calls + * ll_d_init(). + */ + if (ldd && ldd->lld_nfs_dentry) { + ldd->lld_nfs_dentry = 0; + it->it_flags |= MDS_OPEN_LOCK; + } + + /* * Always specify MDS_OPEN_BY_FID because we don't want * to get file with different fid. */ - it->it_flags |= MDS_OPEN_LOCK | MDS_OPEN_BY_FID; + it->it_flags |= MDS_OPEN_BY_FID; rc = ll_intent_file_open(file, NULL, 0, it); if (rc) GOTO(out_openerr, rc); @@ -627,7 +635,7 @@ restart: LASSERTF(it_disposition(it, DISP_ENQ_OPEN_REF), "inode %p: disposition %x, status %d\n", inode, - it_disposition(it, ~0), it->d.lustre.it_status); + it_disposition(it, ~0), it->it_status); rc = ll_local_open(file, it, fd, *och_p); if (rc) @@ -664,7 +672,7 @@ out_openerr: } if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) { - ptlrpc_req_finished(it->d.lustre.it_data); + ptlrpc_req_finished(it->it_request); it_clear_disposition(it, DISP_ENQ_OPEN_REF); } @@ -695,6 +703,95 @@ static int ll_md_blocking_lease_ast(struct ldlm_lock *lock, } /** + * When setting a lease on a file, we take ownership of the lli_mds_*_och + * and save it as fd->fd_och so as to force client to reopen the file even + * if it has an open lock in cache already. + */ +static int ll_lease_och_acquire(struct inode *inode, struct file *file, + struct lustre_handle *old_handle) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + struct obd_client_handle **och_p; + __u64 *och_usecount; + int rc = 0; + ENTRY; + + /* Get the openhandle of the file */ + mutex_lock(&lli->lli_och_mutex); + if (fd->fd_lease_och != NULL) + GOTO(out_unlock, rc = -EBUSY); + + if (fd->fd_och == NULL) { + if (file->f_mode & FMODE_WRITE) { + LASSERT(lli->lli_mds_write_och != NULL); + och_p = &lli->lli_mds_write_och; + och_usecount = &lli->lli_open_fd_write_count; + } else { + LASSERT(lli->lli_mds_read_och != NULL); + och_p = &lli->lli_mds_read_och; + och_usecount = &lli->lli_open_fd_read_count; + } + + if (*och_usecount > 1) + GOTO(out_unlock, rc = -EBUSY); + + fd->fd_och = *och_p; + *och_usecount = 0; + *och_p = NULL; + } + + *old_handle = fd->fd_och->och_fh; + + EXIT; +out_unlock: + mutex_unlock(&lli->lli_och_mutex); + return rc; +} + +/** + * Release ownership on lli_mds_*_och when putting back a file lease. + */ +static int ll_lease_och_release(struct inode *inode, struct file *file) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + struct obd_client_handle **och_p; + struct obd_client_handle *old_och = NULL; + __u64 *och_usecount; + int rc = 0; + ENTRY; + + mutex_lock(&lli->lli_och_mutex); + if (file->f_mode & FMODE_WRITE) { + och_p = &lli->lli_mds_write_och; + och_usecount = &lli->lli_open_fd_write_count; + } else { + och_p = &lli->lli_mds_read_och; + och_usecount = &lli->lli_open_fd_read_count; + } + + /* The file may have been open by another process (broken lease) so + * *och_p is not NULL. In this case we should simply increase usecount + * and close fd_och. + */ + if (*och_p != NULL) { + old_och = fd->fd_och; + (*och_usecount)++; + } else { + *och_p = fd->fd_och; + *och_usecount = 1; + } + fd->fd_och = NULL; + mutex_unlock(&lli->lli_och_mutex); + + if (old_och != NULL) + rc = ll_close_inode_openhandle(inode, old_och, 0, NULL); + + RETURN(rc); +} + +/** * Acquire a lease and open the file. */ static struct obd_client_handle * @@ -715,45 +812,12 @@ ll_lease_open(struct inode *inode, struct file *file, fmode_t fmode, RETURN(ERR_PTR(-EINVAL)); if (file != NULL) { - struct ll_inode_info *lli = ll_i2info(inode); - struct ll_file_data *fd = LUSTRE_FPRIVATE(file); - struct obd_client_handle **och_p; - __u64 *och_usecount; - if (!(fmode & file->f_mode) || (file->f_mode & FMODE_EXEC)) RETURN(ERR_PTR(-EPERM)); - /* Get the openhandle of the file */ - rc = -EBUSY; - mutex_lock(&lli->lli_och_mutex); - if (fd->fd_lease_och != NULL) { - mutex_unlock(&lli->lli_och_mutex); - RETURN(ERR_PTR(rc)); - } - - if (fd->fd_och == NULL) { - if (file->f_mode & FMODE_WRITE) { - LASSERT(lli->lli_mds_write_och != NULL); - och_p = &lli->lli_mds_write_och; - och_usecount = &lli->lli_open_fd_write_count; - } else { - LASSERT(lli->lli_mds_read_och != NULL); - och_p = &lli->lli_mds_read_och; - och_usecount = &lli->lli_open_fd_read_count; - } - if (*och_usecount == 1) { - fd->fd_och = *och_p; - *och_p = NULL; - *och_usecount = 0; - rc = 0; - } - } - mutex_unlock(&lli->lli_och_mutex); - if (rc < 0) /* more than 1 opener */ + rc = ll_lease_och_acquire(inode, file, &old_handle); + if (rc) RETURN(ERR_PTR(rc)); - - LASSERT(fd->fd_och != NULL); - old_handle = fd->fd_och->och_fh; } OBD_ALLOC_PTR(och); @@ -799,12 +863,12 @@ ll_lease_open(struct inode *inode, struct file *file, fmode_t fmode, /* already get lease, handle lease lock */ ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL); - if (it.d.lustre.it_lock_mode == 0 || - it.d.lustre.it_lock_bits != MDS_INODELOCK_OPEN) { + if (it.it_lock_mode == 0 || + it.it_lock_bits != MDS_INODELOCK_OPEN) { /* open lock must return for lease */ - CERROR(DFID "lease granted but no open lock, %d/"LPU64".\n", - PFID(ll_inode2fid(inode)), it.d.lustre.it_lock_mode, - it.d.lustre.it_lock_bits); + CERROR(DFID "lease granted but no open lock, %d/%llu.\n", + PFID(ll_inode2fid(inode)), it.it_lock_mode, + it.it_lock_bits); GOTO(out_close, rc = -EPROTO); } @@ -813,10 +877,10 @@ ll_lease_open(struct inode *inode, struct file *file, fmode_t fmode, out_close: /* Cancel open lock */ - if (it.d.lustre.it_lock_mode != 0) { + if (it.it_lock_mode != 0) { ldlm_lock_decref_and_cancel(&och->och_lease_handle, - it.d.lustre.it_lock_mode); - it.d.lustre.it_lock_mode = 0; + it.it_lock_mode); + it.it_lock_mode = 0; och->och_lease_handle.cookie = 0ULL; } rc2 = ll_close_inode_openhandle(inode, och, 0, NULL); @@ -916,10 +980,11 @@ static int ll_lease_close(struct obd_client_handle *och, struct inode *inode, } CDEBUG(D_INODE, "lease for "DFID" broken? %d\n", - PFID(&ll_i2info(inode)->lli_fid), cancelled); + PFID(&ll_i2info(inode)->lli_fid), cancelled); if (!cancelled) ldlm_cli_cancel(&och->och_lease_handle, 0); + if (lease_broken != NULL) *lease_broken = cancelled; @@ -941,9 +1006,19 @@ int ll_merge_attr(const struct lu_env *env, struct inode *inode) ll_inode_size_lock(inode); - /* merge timestamps the most recently obtained from mds with - timestamps obtained from osts */ - LTIME_S(inode->i_atime) = lli->lli_atime; + /* Merge timestamps the most recently obtained from MDS with + * timestamps obtained from OSTs. + * + * Do not overwrite atime of inode because it may be refreshed + * by file_accessed() function. If the read was served by cache + * data, there is no RPC to be sent so that atime may not be + * transferred to OSTs at all. MDT only updates atime at close time + * if it's at least 'mdd.*.atime_diff' older. + * All in all, the atime in Lustre does not strictly comply with + * POSIX. Solving this problem needs to send an RPC to MDT for each + * read, this will hurt performance. */ + if (LTIME_S(inode->i_atime) < lli->lli_atime) + LTIME_S(inode->i_atime) = lli->lli_atime; LTIME_S(inode->i_mtime) = lli->lli_mtime; LTIME_S(inode->i_ctime) = lli->lli_ctime; @@ -967,7 +1042,7 @@ int ll_merge_attr(const struct lu_env *env, struct inode *inode) if (mtime < attr->cat_mtime) mtime = attr->cat_mtime; - CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n", + CDEBUG(D_VFSTRACE, DFID" updating i_size %llu\n", PFID(&lli->lli_fid), attr->cat_size); i_size_write(inode, attr->cat_size); @@ -986,7 +1061,7 @@ out_size_unlock: static bool file_is_noatime(const struct file *file) { const struct vfsmount *mnt = file->f_path.mnt; - const struct inode *inode = file->f_path.dentry->d_inode; + const struct inode *inode = file_inode((struct file *)file); /* Adapted from file_accessed() and touch_atime().*/ if (file->f_flags & O_NOATIME) @@ -1012,7 +1087,7 @@ static bool file_is_noatime(const struct file *file) static void ll_io_init(struct cl_io *io, const struct file *file, int write) { - struct inode *inode = file->f_path.dentry->d_inode; + struct inode *inode = file_inode((struct file *)file); io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK; if (write) { @@ -1039,7 +1114,7 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args, loff_t *ppos, size_t count) { struct vvp_io *vio = vvp_env_io(env); - struct inode *inode = file->f_path.dentry->d_inode; + struct inode *inode = file_inode(file); struct ll_inode_info *lli = ll_i2info(inode); struct ll_file_data *fd = LUSTRE_FPRIVATE(file); struct cl_io *io; @@ -1049,8 +1124,8 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args, ENTRY; - CDEBUG(D_VFSTRACE, "file: %s, type: %d ppos: "LPU64", count: %zu\n", - file->f_path.dentry->d_name.name, iot, *ppos, count); + CDEBUG(D_VFSTRACE, "file: %s, type: %d ppos: %llu, count: %zu\n", + file_dentry(file)->d_name.name, iot, *ppos, count); restart: io = vvp_env_thread_io(env); @@ -1070,9 +1145,6 @@ restart: switch (vio->vui_io_subtype) { case IO_NORMAL: vio->vui_iter = args->u.normal.via_iter; -#ifndef HAVE_FILE_OPERATIONS_READ_WRITE_ITER - vio->vui_tot_nrsegs = vio->vui_iter->nr_segs; -#endif /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */ vio->vui_iocb = args->u.normal.via_iocb; /* Direct IO reads must also take range lock, * or multiple reads will try to work on the same pages @@ -1098,7 +1170,7 @@ restart: LBUG(); } - ll_cl_add(file, env, io); + ll_cl_add(file, env, io, LCC_RW); rc = cl_io_loop(env, io); ll_cl_remove(file, env); @@ -1118,12 +1190,8 @@ restart: *ppos = io->u.ci_wr.wr.crw_pos; /* for splice */ /* prepare IO restart */ - if (count > 0 && args->via_io_subtype == IO_NORMAL) { + if (count > 0 && args->via_io_subtype == IO_NORMAL) args->u.normal.via_iter = vio->vui_iter; -#ifndef HAVE_FILE_OPERATIONS_READ_WRITE_ITER - args->u.normal.via_iter->nr_segs = vio->vui_tot_nrsegs; -#endif /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */ - } } GOTO(out, rc); out: @@ -1132,7 +1200,7 @@ out: if ((rc == 0 || rc == -ENODATA) && count > 0 && io->ci_need_restart) { CDEBUG(D_VFSTRACE, "%s: restart %s from %lld, count:%zu, result: %zd\n", - file->f_path.dentry->d_name.name, + file_dentry(file)->d_name.name, iot == CIT_READ ? "read" : "write", *ppos, count, result); goto restart; @@ -1163,26 +1231,103 @@ out: return result > 0 ? result : rc; } +/** + * The purpose of fast read is to overcome per I/O overhead and improve IOPS + * especially for small I/O. + * + * To serve a read request, CLIO has to create and initialize a cl_io and + * then request DLM lock. This has turned out to have siginificant overhead + * and affects the performance of small I/O dramatically. + * + * It's not necessary to create a cl_io for each I/O. Under the help of read + * ahead, most of the pages being read are already in memory cache and we can + * read those pages directly because if the pages exist, the corresponding DLM + * lock must exist so that page content must be valid. + * + * In fast read implementation, the llite speculatively finds and reads pages + * in memory cache. There are three scenarios for fast read: + * - If the page exists and is uptodate, kernel VM will provide the data and + * CLIO won't be intervened; + * - If the page was brought into memory by read ahead, it will be exported + * and read ahead parameters will be updated; + * - Otherwise the page is not in memory, we can't do fast read. Therefore, + * it will go back and invoke normal read, i.e., a cl_io will be created + * and DLM lock will be requested. + * + * POSIX compliance: posix standard states that read is intended to be atomic. + * Lustre read implementation is in line with Linux kernel read implementation + * and neither of them complies with POSIX standard in this matter. Fast read + * doesn't make the situation worse on single node but it may interleave write + * results from multiple nodes due to short read handling in ll_file_aio_read(). + * + * \param env - lu_env + * \param iocb - kiocb from kernel + * \param iter - user space buffers where the data will be copied + * + * \retval - number of bytes have been read, or error code if error occurred. + */ +static ssize_t +ll_do_fast_read(const struct lu_env *env, struct kiocb *iocb, + struct iov_iter *iter) +{ + ssize_t result; + + if (!ll_sbi_has_fast_read(ll_i2sbi(file_inode(iocb->ki_filp)))) + return 0; + + /* NB: we can't do direct IO for fast read because it will need a lock + * to make IO engine happy. */ + if (iocb->ki_filp->f_flags & O_DIRECT) + return 0; + + ll_cl_add(iocb->ki_filp, env, NULL, LCC_RW); + result = generic_file_read_iter(iocb, iter); + ll_cl_remove(iocb->ki_filp, env); + + /* If the first page is not in cache, generic_file_aio_read() will be + * returned with -ENODATA. + * See corresponding code in ll_readpage(). */ + if (result == -ENODATA) + result = 0; + + if (result > 0) + ll_stats_ops_tally(ll_i2sbi(file_inode(iocb->ki_filp)), + LPROC_LL_READ_BYTES, result); + + return result; +} + /* * Read from a file (through the page cache). */ static ssize_t ll_file_read_iter(struct kiocb *iocb, struct iov_iter *to) { - struct vvp_io_args *args; struct lu_env *env; + struct vvp_io_args *args; ssize_t result; + ssize_t rc2; __u16 refcheck; env = cl_env_get(&refcheck); if (IS_ERR(env)) return PTR_ERR(env); + result = ll_do_fast_read(env, iocb, to); + if (result < 0 || iov_iter_count(to) == 0) + GOTO(out, result); + args = ll_env_args(env, IO_NORMAL); args->u.normal.via_iter = to; args->u.normal.via_iocb = iocb; - result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ, - &iocb->ki_pos, iov_iter_count(to)); + rc2 = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ, + &iocb->ki_pos, iov_iter_count(to)); + if (rc2 > 0) + result += rc2; + else if (result == 0) + result = rc2; + +out: cl_env_put(env, &refcheck); return result; } @@ -1246,54 +1391,22 @@ static int ll_file_get_iov_count(const struct iovec *iov, static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov, unsigned long nr_segs, loff_t pos) { - struct iovec *local_iov; - struct iov_iter *to; + struct iov_iter to; size_t iov_count; ssize_t result; - struct lu_env *env = NULL; - __u16 refcheck; ENTRY; result = ll_file_get_iov_count(iov, &nr_segs, &iov_count); if (result) RETURN(result); - if (nr_segs == 1) { - - env = cl_env_get(&refcheck); - if (IS_ERR(env)) - RETURN(PTR_ERR(env)); - - local_iov = &ll_env_info(env)->lti_local_iov; - *local_iov = *iov; - - } else { - OBD_ALLOC(local_iov, sizeof(*iov) * nr_segs); - if (local_iov == NULL) - RETURN(-ENOMEM); - - memcpy(local_iov, iov, sizeof(*iov) * nr_segs); - } - - OBD_ALLOC_PTR(to); - if (to == NULL) { - result = -ENOMEM; - goto out; - } # ifdef HAVE_IOV_ITER_INIT_DIRECTION - iov_iter_init(to, READ, local_iov, nr_segs, iov_count); + iov_iter_init(&to, READ, iov, nr_segs, iov_count); # else /* !HAVE_IOV_ITER_INIT_DIRECTION */ - iov_iter_init(to, local_iov, nr_segs, iov_count, 0); + iov_iter_init(&to, iov, nr_segs, iov_count, 0); # endif /* HAVE_IOV_ITER_INIT_DIRECTION */ - result = ll_file_read_iter(iocb, to); - - OBD_FREE_PTR(to); -out: - if (nr_segs == 1) - cl_env_put(env, &refcheck); - else - OBD_FREE(local_iov, sizeof(*iov) * nr_segs); + result = ll_file_read_iter(iocb, &to); RETURN(result); } @@ -1301,18 +1414,15 @@ out: static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count, loff_t *ppos) { - struct lu_env *env; struct iovec iov = { .iov_base = buf, .iov_len = count }; struct kiocb *kiocb; ssize_t result; - __u16 refcheck; ENTRY; - env = cl_env_get(&refcheck); - if (IS_ERR(env)) - RETURN(PTR_ERR(env)); + OBD_ALLOC_PTR(kiocb); + if (kiocb == NULL) + RETURN(-ENOMEM); - kiocb = &ll_env_info(env)->lti_kiocb; init_sync_kiocb(kiocb, file); kiocb->ki_pos = *ppos; #ifdef HAVE_KIOCB_KI_LEFT @@ -1324,7 +1434,7 @@ static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count, result = ll_file_aio_read(kiocb, &iov, 1, kiocb->ki_pos); *ppos = kiocb->ki_pos; - cl_env_put(env, &refcheck); + OBD_FREE_PTR(kiocb); RETURN(result); } @@ -1335,52 +1445,22 @@ static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count, static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov, unsigned long nr_segs, loff_t pos) { - struct iovec *local_iov; - struct iov_iter *from; + struct iov_iter from; size_t iov_count; ssize_t result; - struct lu_env *env = NULL; - __u16 refcheck; ENTRY; result = ll_file_get_iov_count(iov, &nr_segs, &iov_count); if (result) RETURN(result); - if (nr_segs == 1) { - env = cl_env_get(&refcheck); - if (IS_ERR(env)) - RETURN(PTR_ERR(env)); - - local_iov = &ll_env_info(env)->lti_local_iov; - *local_iov = *iov; - } else { - OBD_ALLOC(local_iov, sizeof(*iov) * nr_segs); - if (local_iov == NULL) - RETURN(-ENOMEM); - - memcpy(local_iov, iov, sizeof(*iov) * nr_segs); - } - - OBD_ALLOC_PTR(from); - if (from == NULL) { - result = -ENOMEM; - goto out; - } # ifdef HAVE_IOV_ITER_INIT_DIRECTION - iov_iter_init(from, WRITE, local_iov, nr_segs, iov_count); + iov_iter_init(&from, WRITE, iov, nr_segs, iov_count); # else /* !HAVE_IOV_ITER_INIT_DIRECTION */ - iov_iter_init(from, local_iov, nr_segs, iov_count, 0); + iov_iter_init(&from, iov, nr_segs, iov_count, 0); # endif /* HAVE_IOV_ITER_INIT_DIRECTION */ - result = ll_file_write_iter(iocb, from); - - OBD_FREE_PTR(from); -out: - if (nr_segs == 1) - cl_env_put(env, &refcheck); - else - OBD_FREE(local_iov, sizeof(*iov) * nr_segs); + result = ll_file_write_iter(iocb, &from); RETURN(result); } @@ -1459,7 +1539,7 @@ int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file, if (rc < 0) GOTO(out_unlock, rc); - ll_release_openhandle(file->f_path.dentry, &oit); + ll_release_openhandle(file_dentry(file), &oit); out_unlock: ll_inode_size_unlock(inode); @@ -1571,13 +1651,12 @@ static int ll_lov_setea(struct inode *inode, struct file *file, if (lump == NULL) RETURN(-ENOMEM); - if (copy_from_user(lump, (struct lov_user_md __user *)arg, lum_size)) { - OBD_FREE_LARGE(lump, lum_size); - RETURN(-EFAULT); - } + if (copy_from_user(lump, (struct lov_user_md __user *)arg, lum_size)) + GOTO(out_lump, rc = -EFAULT); rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size); +out_lump: OBD_FREE_LARGE(lump, lum_size); RETURN(rc); } @@ -1747,7 +1826,7 @@ int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it) out: /* this one is in place of ll_file_open */ if (it_disposition(it, DISP_ENQ_OPEN_REF)) { - ptlrpc_req_finished(it->d.lustre.it_data); + ptlrpc_req_finished(it->it_request); it_clear_disposition(it, DISP_ENQ_OPEN_REF); } RETURN(rc); @@ -1839,6 +1918,10 @@ int ll_fid2path(struct inode *inode, void __user *arg) if (copy_from_user(gfout, arg, sizeof(*gfout))) GOTO(gf_free, rc = -EFAULT); + /* append root FID after gfout to let MDT know the root FID so that it + * can lookup the correct path, this is mainly for fileset. + * old server without fileset mount support will ignore this. */ + *gfout->gf_u.gf_root_fid = *ll_inode2fid(inode); /* Call mdc_iocontrol */ rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL); @@ -1912,11 +1995,11 @@ restart: */ int ll_hsm_release(struct inode *inode) { - struct cl_env_nest nest; struct lu_env *env; struct obd_client_handle *och = NULL; __u64 data_version = 0; int rc; + __u16 refcheck; ENTRY; CDEBUG(D_INODE, "%s: Releasing file "DFID".\n", @@ -1932,12 +2015,12 @@ int ll_hsm_release(struct inode *inode) if (rc != 0) GOTO(out, rc); - env = cl_env_nested_get(&nest); + env = cl_env_get(&refcheck); if (IS_ERR(env)) GOTO(out, rc = PTR_ERR(env)); ll_merge_attr(env, inode); - cl_env_nested_put(&nest, env); + cl_env_put(env, &refcheck); /* Release the file. * NB: lease lock handle is released in mdc_hsm_release_pack() because @@ -1977,8 +2060,8 @@ static int ll_swap_layouts(struct file *file1, struct file *file2, if (llss == NULL) RETURN(-ENOMEM); - llss->inode1 = file1->f_path.dentry->d_inode; - llss->inode2 = file2->f_path.dentry->d_inode; + llss->inode1 = file_inode(file1); + llss->inode2 = file_inode(file2); rc = ll_check_swap_layouts_validity(llss->inode1, llss->inode2); if (rc < 0) @@ -2145,13 +2228,13 @@ static int ll_hsm_import(struct inode *inode, struct file *file, ATTR_MTIME | ATTR_MTIME_SET | ATTR_ATIME | ATTR_ATIME_SET; - mutex_lock(&inode->i_mutex); + inode_lock(inode); - rc = ll_setattr_raw(file->f_path.dentry, attr, true); + rc = ll_setattr_raw(file_dentry(file), attr, true); if (rc == -ENODATA) rc = 0; - mutex_unlock(&inode->i_mutex); + inode_unlock(inode); out: if (hss != NULL) @@ -2171,7 +2254,7 @@ static inline long ll_lease_type_from_fmode(fmode_t fmode) static int ll_file_futimes_3(struct file *file, const struct ll_futimes_3 *lfu) { - struct inode *inode = file->f_path.dentry->d_inode; + struct inode *inode = file_inode(file); struct iattr ia = { .ia_valid = ATTR_ATIME | ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET | @@ -2198,17 +2281,66 @@ static int ll_file_futimes_3(struct file *file, const struct ll_futimes_3 *lfu) if (!S_ISREG(inode->i_mode)) RETURN(-EINVAL); - mutex_lock(&inode->i_mutex); - rc = ll_setattr_raw(file->f_path.dentry, &ia, false); - mutex_unlock(&inode->i_mutex); + inode_lock(inode); + rc = ll_setattr_raw(file_dentry(file), &ia, false); + inode_unlock(inode); RETURN(rc); } +/* + * Give file access advices + * + * The ladvise interface is similar to Linux fadvise() system call, except it + * forwards the advices directly from Lustre client to server. The server side + * codes will apply appropriate read-ahead and caching techniques for the + * corresponding files. + * + * A typical workload for ladvise is e.g. a bunch of different clients are + * doing small random reads of a file, so prefetching pages into OSS cache + * with big linear reads before the random IO is a net benefit. Fetching + * all that data into each client cache with fadvise() may not be, due to + * much more data being sent to the client. + */ +static int ll_ladvise(struct inode *inode, struct file *file, __u64 flags, + struct llapi_lu_ladvise *ladvise) +{ + struct lu_env *env; + struct cl_io *io; + struct cl_ladvise_io *lio; + int rc; + __u16 refcheck; + ENTRY; + + env = cl_env_get(&refcheck); + if (IS_ERR(env)) + RETURN(PTR_ERR(env)); + + io = vvp_env_thread_io(env); + io->ci_obj = ll_i2info(inode)->lli_clob; + + /* initialize parameters for ladvise */ + lio = &io->u.ci_ladvise; + lio->li_start = ladvise->lla_start; + lio->li_end = ladvise->lla_end; + lio->li_fid = ll_inode2fid(inode); + lio->li_advice = ladvise->lla_advice; + lio->li_flags = flags; + + if (cl_io_init(env, io, CIT_LADVISE, io->ci_obj) == 0) + rc = cl_io_loop(env, io); + else + rc = io->ci_result; + + cl_io_fini(env, io); + cl_env_put(env, &refcheck); + RETURN(rc); +} + static long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg) { - struct inode *inode = file->f_path.dentry->d_inode; + struct inode *inode = file_inode(file); struct ll_file_data *fd = LUSTRE_FPRIVATE(file); int flags, rc; ENTRY; @@ -2287,7 +2419,7 @@ ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg) mutex_unlock(&lli->lli_och_mutex); if (och == NULL) GOTO(out, rc = -ENOLCK); - inode2 = file2->f_path.dentry->d_inode; + inode2 = file_inode(file2); rc = ll_swap_layouts_close(och, inode, inode2); } else { rc = ll_swap_layouts(file, file2, &lsl); @@ -2467,6 +2599,10 @@ out: if (rc < 0) RETURN(rc); + rc = ll_lease_och_release(inode, file); + if (rc < 0) + RETURN(rc); + if (lease_broken) fmode = 0; @@ -2546,6 +2682,56 @@ out: RETURN(ll_file_futimes_3(file, &lfu)); } + case LL_IOC_LADVISE: { + struct llapi_ladvise_hdr *ladvise_hdr; + int i; + int num_advise; + int alloc_size = sizeof(*ladvise_hdr); + + rc = 0; + OBD_ALLOC_PTR(ladvise_hdr); + if (ladvise_hdr == NULL) + RETURN(-ENOMEM); + + if (copy_from_user(ladvise_hdr, + (const struct llapi_ladvise_hdr __user *)arg, + alloc_size)) + GOTO(out_ladvise, rc = -EFAULT); + + if (ladvise_hdr->lah_magic != LADVISE_MAGIC || + ladvise_hdr->lah_count < 1) + GOTO(out_ladvise, rc = -EINVAL); + + num_advise = ladvise_hdr->lah_count; + if (num_advise >= LAH_COUNT_MAX) + GOTO(out_ladvise, rc = -EFBIG); + + OBD_FREE_PTR(ladvise_hdr); + alloc_size = offsetof(typeof(*ladvise_hdr), + lah_advise[num_advise]); + OBD_ALLOC(ladvise_hdr, alloc_size); + if (ladvise_hdr == NULL) + RETURN(-ENOMEM); + + /* + * TODO: submit multiple advices to one server in a single RPC + */ + if (copy_from_user(ladvise_hdr, + (const struct llapi_ladvise_hdr __user *)arg, + alloc_size)) + GOTO(out_ladvise, rc = -EFAULT); + + for (i = 0; i < num_advise; i++) { + rc = ll_ladvise(inode, file, ladvise_hdr->lah_flags, + &ladvise_hdr->lah_advise[i]); + if (rc) + break; + } + +out_ladvise: + OBD_FREE(ladvise_hdr, alloc_size); + RETURN(rc); + } default: { int err; @@ -2579,7 +2765,7 @@ static loff_t generic_file_llseek_size(struct file *file, loff_t offset, int origin, loff_t maxsize, loff_t eof) { - struct inode *inode = file->f_path.dentry->d_inode; + struct inode *inode = file_inode(file); switch (origin) { case SEEK_END: @@ -2599,9 +2785,9 @@ generic_file_llseek_size(struct file *file, loff_t offset, int origin, * SEEK_CURs. Note that parallel writes and reads behave * like SEEK_SET. */ - mutex_lock(&inode->i_mutex); + inode_lock(inode); offset = llseek_execute(file, file->f_pos + offset, maxsize); - mutex_unlock(&inode->i_mutex); + inode_unlock(inode); return offset; case SEEK_DATA: /* @@ -2628,7 +2814,7 @@ generic_file_llseek_size(struct file *file, loff_t offset, int origin, static loff_t ll_file_seek(struct file *file, loff_t offset, int origin) { - struct inode *inode = file->f_path.dentry->d_inode; + struct inode *inode = file_inode(file); loff_t retval, eof = 0; ENTRY; @@ -2653,7 +2839,7 @@ static loff_t ll_file_seek(struct file *file, loff_t offset, int origin) static int ll_flush(struct file *file, fl_owner_t id) { - struct inode *inode = file->f_path.dentry->d_inode; + struct inode *inode = file_inode(file); struct ll_inode_info *lli = ll_i2info(inode); struct ll_file_data *fd = LUSTRE_FPRIVATE(file); int rc, err; @@ -2686,18 +2872,18 @@ static int ll_flush(struct file *file, fl_owner_t id) int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end, enum cl_fsync_mode mode, int ignore_layout) { - struct cl_env_nest nest; struct lu_env *env; struct cl_io *io; struct cl_fsync_io *fio; int result; + __u16 refcheck; ENTRY; if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL && mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL) RETURN(-EINVAL); - env = cl_env_nested_get(&nest); + env = cl_env_get(&refcheck); if (IS_ERR(env)) RETURN(PTR_ERR(env)); @@ -2720,25 +2906,25 @@ int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end, if (result == 0) result = fio->fi_nr_written; cl_io_fini(env, io); - cl_env_nested_put(&nest, env); + cl_env_put(env, &refcheck); RETURN(result); } /* - * When dentry is provided (the 'else' case), *file->f_path.dentry may be + * When dentry is provided (the 'else' case), file_dentry() may be * null and dentry must be used directly rather than pulled from - * *file->f_path.dentry as is done otherwise. + * file_dentry() as is done otherwise. */ #ifdef HAVE_FILE_FSYNC_4ARGS int ll_fsync(struct file *file, loff_t start, loff_t end, int datasync) { - struct dentry *dentry = file->f_path.dentry; + struct dentry *dentry = file_dentry(file); #elif defined(HAVE_FILE_FSYNC_2ARGS) int ll_fsync(struct file *file, int datasync) { - struct dentry *dentry = file->f_path.dentry; + struct dentry *dentry = file_dentry(file); loff_t start = 0; loff_t end = LLONG_MAX; #else @@ -2759,7 +2945,7 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync) #ifdef HAVE_FILE_FSYNC_4ARGS rc = filemap_write_and_wait_range(inode->i_mapping, start, end); - mutex_lock(&inode->i_mutex); + inode_lock(inode); #else /* fsync's caller has already called _fdata{sync,write}, we want * that IO to finish before calling the osc and mdc sync methods */ @@ -2797,7 +2983,7 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync) } #ifdef HAVE_FILE_FSYNC_4ARGS - mutex_unlock(&inode->i_mutex); + inode_unlock(inode); #endif RETURN(rc); } @@ -2805,7 +2991,7 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync) static int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) { - struct inode *inode = file->f_path.dentry->d_inode; + struct inode *inode = file_inode(file); struct ll_sb_info *sbi = ll_i2sbi(inode); struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK, @@ -2907,8 +3093,8 @@ ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); - CDEBUG(D_DLMTRACE, "inode="DFID", pid=%u, flags="LPX64", mode=%u, " - "start="LPU64", end="LPU64"\n", PFID(ll_inode2fid(inode)), + CDEBUG(D_DLMTRACE, "inode="DFID", pid=%u, flags=%#llx, mode=%u, " + "start=%llu, end=%llu\n", PFID(ll_inode2fid(inode)), flock.l_flock.pid, flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end); @@ -2919,6 +3105,11 @@ ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) if (!(flags & LDLM_FL_TEST_LOCK)) file_lock->fl_type = fl_type; +#ifdef HAVE_LOCKS_LOCK_FILE_WAIT + if ((rc == 0 || file_lock->fl_type == F_UNLCK) && + !(flags & LDLM_FL_TEST_LOCK)) + rc2 = locks_lock_file_wait(file, file_lock); +#else if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0 || file_lock->fl_type == F_UNLCK)) rc2 = flock_lock_file_wait(file, file_lock); @@ -2926,6 +3117,7 @@ ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) (rc == 0 || file_lock->fl_type == F_UNLCK) && !(flags & LDLM_FL_TEST_LOCK)) rc2 = posix_lock_file_wait(file, file_lock); +#endif /* HAVE_LOCKS_LOCK_FILE_WAIT */ if (rc2 && file_lock->fl_type != F_UNLCK) { einfo.ei_mode = LCK_NL; @@ -2999,7 +3191,7 @@ int ll_migrate(struct inode *parent, struct file *file, int mdtidx, qstr.hash = full_name_hash(name, namelen); qstr.name = name; qstr.len = namelen; - dchild = d_lookup(file->f_path.dentry, &qstr); + dchild = d_lookup(file_dentry(file), &qstr); if (dchild != NULL) { if (dchild->d_inode != NULL) child_inode = igrab(dchild->d_inode); @@ -3016,7 +3208,15 @@ int ll_migrate(struct inode *parent, struct file *file, int mdtidx, if (child_inode == NULL) GOTO(out_free, rc = -EINVAL); - mutex_lock(&child_inode->i_mutex); + /* + * lfs migrate command needs to be blocked on the client + * by checking the migrate FID against the FID of the + * filesystem root. + */ + if (child_inode == parent->i_sb->s_root->d_inode) + GOTO(out_iput, rc = -EINVAL); + + inode_lock(child_inode); op_data->op_fid3 = *ll_inode2fid(child_inode); if (!fid_is_sane(&op_data->op_fid3)) { CERROR("%s: migrate %s, but FID "DFID" is insane\n", @@ -3094,7 +3294,8 @@ out_close: if (rc == 0) clear_nlink(child_inode); out_unlock: - mutex_unlock(&child_inode->i_mutex); + inode_unlock(child_inode); +out_iput: iput(child_inode); out_free: ll_finish_md_op_data(op_data); @@ -3255,8 +3456,11 @@ static int __ll_inode_revalidate(struct dentry *dentry, __u64 ibits) do_lookup() -> ll_revalidate_it(). We cannot use d_drop here to preserve get_cwd functionality on 2.6. Bug 10503 */ - if (!dentry->d_inode->i_nlink) + if (!dentry->d_inode->i_nlink) { + ll_lock_dcache(inode); d_lustre_invalidate(dentry, 0); + ll_unlock_dcache(inode); + } ll_lookup_finish_locks(&oit, dentry); } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) { @@ -3366,6 +3570,8 @@ int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat) if (res) return res; + OBD_FAIL_TIMEOUT(OBD_FAIL_GETATTR_DELAY, 30); + stat->dev = inode->i_sb->s_dev; if (ll_need_32bit_api(sbi)) stat->ino = cl_fid_build_ino(&lli->lli_fid, 1); @@ -3540,12 +3746,7 @@ int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd) } ll_stats_ops_tally(sbi, LPROC_LL_INODE_PERM, 1); - - if (sbi->ll_flags & LL_SBI_RMT_CLIENT) - rc = lustre_check_remote_perm(inode, mask); - else - rc = ll_generic_permission(inode, mask, flags, ll_check_acl); - + rc = ll_generic_permission(inode, mask, flags, ll_check_acl); /* restore current process's credentials and FS capability */ if (squash_id) { revert_creds(old_cred); @@ -3752,15 +3953,15 @@ int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf) { struct ll_inode_info *lli = ll_i2info(inode); struct cl_object *obj = lli->lli_clob; - struct cl_env_nest nest; struct lu_env *env; int rc; + __u16 refcheck; ENTRY; if (obj == NULL) RETURN(0); - env = cl_env_nested_get(&nest); + env = cl_env_get(&refcheck); if (IS_ERR(env)) RETURN(PTR_ERR(env)); @@ -3795,7 +3996,7 @@ int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf) } out: - cl_env_nested_put(&nest, env); + cl_env_put(env, &refcheck); RETURN(rc); } @@ -3859,7 +4060,7 @@ static int ll_layout_fetch(struct inode *inode, struct ldlm_lock *lock) } unlock_res_and_lock(lock); - if (lvbdata != NULL) + if (lvbdata) OBD_FREE_LARGE(lvbdata, lmmsize); EXIT; @@ -3895,7 +4096,7 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, enum ldlm_mode mode, PFID(&lli->lli_fid), inode); /* in case this is a caching lock and reinstate with new inode */ - md_set_lock_data(sbi->ll_md_exp, &lockh->cookie, inode, NULL); + md_set_lock_data(sbi->ll_md_exp, lockh, inode, NULL); lock_res_and_lock(lock); lvb_ready = ldlm_is_lvb_ready(lock); @@ -3997,14 +4198,14 @@ again: PFID(&lli->lli_fid), inode); rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL, &it, op_data, &lockh, 0); - if (it.d.lustre.it_data != NULL) - ptlrpc_req_finished(it.d.lustre.it_data); - it.d.lustre.it_data = NULL; + if (it.it_request != NULL) + ptlrpc_req_finished(it.it_request); + it.it_request = NULL; ll_finish_md_op_data(op_data); - mode = it.d.lustre.it_lock_mode; - it.d.lustre.it_lock_mode = 0; + mode = it.it_lock_mode; + it.it_lock_mode = 0; ll_intent_drop_lock(&it); if (rc == 0) {