X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fllite%2Ffile.c;h=cb325704f1f990fb74ed6eb475f9d8df88312699;hp=cfa7f032ec83c39a30e1627cd6bd34546ec04b7d;hb=4502a37ce21a25da5b1f0cf717ee94352b9def1b;hpb=b82cd91778d84914f63cbaf501f1a637ed8f3d86 diff --git a/lustre/llite/file.c b/lustre/llite/file.c index cfa7f03..cb32570 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -55,7 +55,7 @@ struct ll_file_data *ll_file_data_get(void) { struct ll_file_data *fd; - OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab); + OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, CFS_ALLOC_IO); return fd; } @@ -77,10 +77,15 @@ void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data, op_data->op_attr_blocks = inode->i_blocks; ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags; op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch; - memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle)); + if (fh) + op_data->op_handle = *fh; op_data->op_capa1 = ll_mdscapa_get(inode); } +/** + * Closes the IO epoch and packs all the attributes into @op_data for + * the CLOSE rpc. + */ static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data, struct obd_client_handle *och) { @@ -92,14 +97,15 @@ static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data, if (!(och->och_flags & FMODE_WRITE)) goto out; - if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) || - !S_ISREG(inode->i_mode)) + if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode)) op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS; else - ll_epoch_close(inode, op_data, &och, 0); + ll_ioepoch_close(inode, op_data, &och, 0); out: ll_pack_inode2opdata(inode, op_data, &och->och_fh); + ll_prep_md_op_data(op_data, inode, NULL, NULL, + 0, 0, LUSTRE_OPC_ANY, NULL); EXIT; } @@ -112,7 +118,7 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp, struct ptlrpc_request *req = NULL; struct obd_device *obd = class_exp2obd(exp); int epoch_close = 1; - int seq_end = 0, rc; + int rc; ENTRY; if (obd == NULL) { @@ -125,14 +131,6 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp, GOTO(out, rc = 0); } - /* - * here we check if this is forced umount. If so this is called on - * canceling "open lock" and we do not call md_close() in this case, as - * it will not be successful, as import is already deactivated. - */ - if (obd->obd_force) - GOTO(out, rc = 0); - OBD_ALLOC_PTR(op_data); if (op_data == NULL) GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here. @@ -140,17 +138,12 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp, ll_prepare_close(inode, op_data, och); epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE); rc = md_close(md_exp, op_data, och->och_mod, &req); - if (rc != -EAGAIN) - seq_end = 1; - if (rc == -EAGAIN) { /* This close must have the epoch closed. */ - LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM); LASSERT(epoch_close); /* MDS has instructed us to obtain Size-on-MDS attribute from * OSTs and send setattr to back to MDS. */ - rc = ll_sizeonmds_update(inode, och->och_mod, - &och->och_fh, op_data->op_ioepoch); + rc = ll_som_update(inode, op_data); if (rc) { CERROR("inode %lu mdc Size-on-MDS update failed: " "rc = %d\n", inode->i_ino, rc); @@ -172,12 +165,10 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp, EXIT; out: - if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close && + if (exp_connect_som(exp) && !epoch_close && S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) { ll_queue_done_writing(inode, LLIF_DONE_WRITING); } else { - if (seq_end) - ptlrpc_close_replay_seq(req); md_clear_open_replay_data(md_exp, och); /* Free @och if it is not waiting for DONE_WRITING. */ och->och_fh.cookie = DEAD_HANDLE_MAGIC; @@ -237,14 +228,8 @@ int ll_md_close(struct obd_export *md_exp, struct inode *inode, ENTRY; /* clear group lock, if present */ - if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) { -#if 0 /* XXX */ - struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; - fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK); - rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, - &fd->fd_cwlockh); -#endif - } + if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) + ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid); /* Let's see if we have good enough OPEN lock on the file and if we can skip talking to MDS */ @@ -344,6 +329,10 @@ int ll_file_release(struct inode *inode, struct file *file) lli->lli_async_rc = 0; rc = ll_md_close(sbi->ll_md_exp, inode, file); + + if (OBD_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, obd_fail_val)) + libcfs_debug_dumplog(); + RETURN(rc); } @@ -400,12 +389,12 @@ static int ll_intent_file_open(struct file *file, void *lmm, GOTO(out, rc); } - if (itp->d.lustre.it_lock_mode) + rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL); + if (!rc && itp->d.lustre.it_lock_mode) md_set_lock_data(sbi->ll_md_exp, &itp->d.lustre.it_lock_handle, - file->f_dentry->d_inode); + file->f_dentry->d_inode, NULL); - rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL); out: ptlrpc_req_finished(itp->d.lustre.it_data); it_clear_disposition(itp, DISP_ENQ_COMPLETE); @@ -414,6 +403,20 @@ out: RETURN(rc); } +/** + * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does + * not believe attributes if a few ioepoch holders exist. Attributes for + * previous ioepoch if new one is opened are also skipped by MDS. + */ +void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch) +{ + if (ioepoch && lli->lli_ioepoch != ioepoch) { + lli->lli_ioepoch = ioepoch; + CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n", + ioepoch, PFID(&lli->lli_fid)); + } +} + static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli, struct lookup_intent *it, struct obd_client_handle *och) { @@ -429,7 +432,7 @@ static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli, och->och_magic = OBD_CLIENT_HANDLE_MAGIC; och->och_fid = lli->lli_fid; och->och_flags = it->it_flags; - lli->lli_ioepoch = body->ioepoch; + ll_ioepoch_open(lli, body->ioepoch); return md_set_open_replay_data(md_exp, och, req); } @@ -511,29 +514,12 @@ int ll_file_open(struct inode *inode, struct file *file) fd->fd_file = file; if (S_ISDIR(inode->i_mode)) { -again: spin_lock(&lli->lli_lock); if (lli->lli_opendir_key == NULL && lli->lli_opendir_pid == 0) { LASSERT(lli->lli_sai == NULL); lli->lli_opendir_key = fd; lli->lli_opendir_pid = cfs_curproc_pid(); opendir_set = 1; - } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid() && - lli->lli_opendir_key != NULL)) { - /* Two cases for this: - * (1) The same process open such directory many times. - * (2) The old process opened the directory, and exited - * before its children processes. Then new process - * with the same pid opens such directory before the - * old process's children processes exit. - * reset stat ahead for such cases. */ - spin_unlock(&lli->lli_lock); - CDEBUG(D_INFO, "Conflict statahead for %.*s "DFID - " reset it.\n", file->f_dentry->d_name.len, - file->f_dentry->d_name.name, - PFID(&lli->lli_fid)); - ll_stop_statahead(inode, lli->lli_opendir_key); - goto again; } spin_unlock(&lli->lli_lock); } @@ -563,6 +549,12 @@ again: * already? XXX - NFS implications? */ oit.it_flags &= ~O_EXCL; + /* bug20584, if "it_flags" contains O_CREAT, the file will be + * created if necessary, then "IT_CREAT" should be set to keep + * consistent with it */ + if (oit.it_flags & O_CREAT) + oit.it_op |= IT_CREAT; + it = &oit; } @@ -612,9 +604,9 @@ restart: would attempt to grab och_sem as well, that would result in a deadlock */ up(&lli->lli_och_sem); - it->it_flags |= O_CHECK_STALE; + it->it_create_mode |= M_CHECK_STALE; rc = ll_intent_file_open(file, NULL, 0, it); - it->it_flags &= ~O_CHECK_STALE; + it->it_create_mode &= ~M_CHECK_STALE; if (rc) { ll_file_data_put(fd); GOTO(out_openerr, rc); @@ -625,9 +617,6 @@ restart: req = it->d.lustre.it_data; ptlrpc_req_finished(req); } - md_set_lock_data(ll_i2sbi(inode)->ll_md_exp, - &it->d.lustre.it_lock_handle, - file->f_dentry->d_inode); goto restart; } OBD_ALLOC(*och_p, sizeof (struct obd_client_handle)); @@ -698,7 +687,8 @@ out_openerr: /* Fills the obdo with the attributes for the lsm */ static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp, - struct obd_capa *capa, struct obdo *obdo) + struct obd_capa *capa, struct obdo *obdo, + __u64 ioepoch, int sync) { struct ptlrpc_request_set *set; struct obd_info oinfo = { { { 0 } } }; @@ -713,12 +703,17 @@ static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp, oinfo.oi_oa->o_id = lsm->lsm_object_id; oinfo.oi_oa->o_gr = lsm->lsm_object_gr; oinfo.oi_oa->o_mode = S_IFREG; + oinfo.oi_oa->o_ioepoch = ioepoch; oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME | - OBD_MD_FLGROUP; + OBD_MD_FLGROUP | OBD_MD_FLEPOCH; oinfo.oi_capa = capa; + if (sync) { + oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS; + oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK; + } set = ptlrpc_prep_set(); if (set == NULL) { @@ -737,15 +732,20 @@ static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp, RETURN(rc); } -/* Fills the obdo with the attributes for the inode defined by lsm */ -int ll_inode_getattr(struct inode *inode, struct obdo *obdo) +/** + * Performs the getattr on the inode and updates its fields. + * If @sync != 0, perform the getattr under the server-side lock. + */ +int ll_inode_getattr(struct inode *inode, struct obdo *obdo, + __u64 ioepoch, int sync) { struct ll_inode_info *lli = ll_i2info(inode); struct obd_capa *capa = ll_mdscapa_get(inode); int rc; ENTRY; - rc = ll_lsm_getattr(lli->lli_smd, ll_i2dtexp(inode), capa, obdo); + rc = ll_lsm_getattr(lli->lli_smd, ll_i2dtexp(inode), + capa, obdo, ioepoch, sync); capa_put(capa); if (rc == 0) { obdo_refresh_inode(inode, obdo, obdo->o_valid); @@ -787,7 +787,7 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm, struct obdo obdo = { 0 }; int rc; - rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo); + rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0); if (rc == 0) { st->st_size = obdo.o_size; st->st_blocks = obdo.o_blocks; @@ -800,25 +800,24 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm, void ll_io_init(struct cl_io *io, const struct file *file, int write) { - struct inode *inode = file->f_dentry->d_inode; - struct ll_sb_info *sbi = ll_i2sbi(inode); - struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + struct inode *inode = file->f_dentry->d_inode; - LASSERT(fd != NULL); memset(io, 0, sizeof *io); io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK; if (write) - io->u.ci_wr.wr_append = file->f_flags & O_APPEND; + io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND); io->ci_obj = ll_i2info(inode)->lli_clob; io->ci_lockreq = CILR_MAYBE; - if (fd->fd_flags & LL_FILE_IGNORE_LOCK || sbi->ll_flags & LL_SBI_NOLCK) + if (ll_file_nolock(file)) { io->ci_lockreq = CILR_NEVER; - else if (file->f_flags & O_APPEND) + io->ci_no_srvlock = 1; + } else if (file->f_flags & O_APPEND) { io->ci_lockreq = CILR_MANDATORY; + } } static ssize_t ll_file_io_generic(const struct lu_env *env, - struct ccc_io_args *args, struct file *file, + struct vvp_io_args *args, struct file *file, enum cl_io_type iot, loff_t *ppos, size_t count) { struct cl_io *io; @@ -828,27 +827,48 @@ static ssize_t ll_file_io_generic(const struct lu_env *env, io = &ccc_env_info(env)->cti_io; ll_io_init(io, file, iot == CIT_WRITE); - if (iot == CIT_READ) - io->u.ci_rd.rd_is_sendfile = args->cia_is_sendfile; - if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) { struct vvp_io *vio = vvp_env_io(env); struct ccc_io *cio = ccc_env_io(env); - if (cl_io_is_sendfile(io)) { - vio->u.read.cui_actor = args->cia_actor; - vio->u.read.cui_target = args->cia_target; - } else { - cio->cui_iov = args->cia_iov; - cio->cui_nrsegs = args->cia_nrsegs; + struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode); + int write_sem_locked = 0; + + cio->cui_fd = LUSTRE_FPRIVATE(file); + vio->cui_io_subtype = args->via_io_subtype; + + switch (vio->cui_io_subtype) { + case IO_NORMAL: + cio->cui_iov = args->u.normal.via_iov; + cio->cui_nrsegs = args->u.normal.via_nrsegs; #ifndef HAVE_FILE_WRITEV - cio->cui_iocb = args->cia_iocb; + cio->cui_iocb = args->u.normal.via_iocb; #endif + if ((iot == CIT_WRITE) && + !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) { + down(&lli->lli_write_sem); + write_sem_locked = 1; + } + break; + case IO_SENDFILE: + vio->u.sendfile.cui_actor = args->u.sendfile.via_actor; + vio->u.sendfile.cui_target = args->u.sendfile.via_target; + break; + case IO_SPLICE: + vio->u.splice.cui_pipe = args->u.splice.via_pipe; + vio->u.splice.cui_flags = args->u.splice.via_flags; + break; + default: + CERROR("Unknow IO type - %u\n", vio->cui_io_subtype); + LBUG(); } - cio->cui_fd = LUSTRE_FPRIVATE(file); result = cl_io_loop(env, io); - } else + if (write_sem_locked) + up(&lli->lli_write_sem); + } else { /* cl_io_rw_init() handled IO */ result = io->ci_result; + } + if (io->ci_nob > 0) { result = io->ci_nob; *ppos = io->u.ci_wr.wr.crw_pos; @@ -894,7 +914,7 @@ static ssize_t ll_file_readv(struct file *file, const struct iovec *iov, unsigned long nr_segs, loff_t *ppos) { struct lu_env *env; - struct ccc_io_args *args; + struct vvp_io_args *args; size_t count; ssize_t result; int refcheck; @@ -908,10 +928,10 @@ static ssize_t ll_file_readv(struct file *file, const struct iovec *iov, if (IS_ERR(env)) RETURN(PTR_ERR(env)); - args = &vvp_env_info(env)->vti_args; - args->cia_is_sendfile = 0; - args->cia_iov = (struct iovec *)iov; - args->cia_nrsegs = nr_segs; + args = vvp_env_args(env, IO_NORMAL); + args->u.normal.via_iov = (struct iovec *)iov; + args->u.normal.via_nrsegs = nr_segs; + result = ll_file_io_generic(env, args, file, CIT_READ, ppos, count); cl_env_put(env, &refcheck); RETURN(result); @@ -943,7 +963,7 @@ static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov, unsigned long nr_segs, loff_t pos) { struct lu_env *env; - struct ccc_io_args *args; + struct vvp_io_args *args; size_t count; ssize_t result; int refcheck; @@ -957,11 +977,11 @@ static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov, if (IS_ERR(env)) RETURN(PTR_ERR(env)); - args = &vvp_env_info(env)->vti_args; - args->cia_is_sendfile = 0; - args->cia_iov = (struct iovec *)iov; - args->cia_nrsegs = nr_segs; - args->cia_iocb = iocb; + args = vvp_env_args(env, IO_NORMAL); + args->u.normal.via_iov = (struct iovec *)iov; + args->u.normal.via_nrsegs = nr_segs; + args->u.normal.via_iocb = iocb; + result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ, &iocb->ki_pos, count); cl_env_put(env, &refcheck); @@ -1006,7 +1026,7 @@ static ssize_t ll_file_writev(struct file *file, const struct iovec *iov, unsigned long nr_segs, loff_t *ppos) { struct lu_env *env; - struct ccc_io_args *args; + struct vvp_io_args *args; size_t count; ssize_t result; int refcheck; @@ -1020,9 +1040,10 @@ static ssize_t ll_file_writev(struct file *file, const struct iovec *iov, if (IS_ERR(env)) RETURN(PTR_ERR(env)); - args = &vvp_env_info(env)->vti_args; - args->cia_iov = (struct iovec *)iov; - args->cia_nrsegs = nr_segs; + args = vvp_env_args(env, IO_NORMAL); + args->u.normal.via_iov = (struct iovec *)iov; + args->u.normal.via_nrsegs = nr_segs; + result = ll_file_io_generic(env, args, file, CIT_WRITE, ppos, count); cl_env_put(env, &refcheck); RETURN(result); @@ -1055,7 +1076,7 @@ static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov, unsigned long nr_segs, loff_t pos) { struct lu_env *env; - struct ccc_io_args *args; + struct vvp_io_args *args; size_t count; ssize_t result; int refcheck; @@ -1069,10 +1090,11 @@ static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov, if (IS_ERR(env)) RETURN(PTR_ERR(env)); - args = &vvp_env_info(env)->vti_args; - args->cia_iov = (struct iovec *)iov; - args->cia_nrsegs = nr_segs; - args->cia_iocb = iocb; + args = vvp_env_args(env, IO_NORMAL); + args->u.normal.via_iov = (struct iovec *)iov; + args->u.normal.via_nrsegs = nr_segs; + args->u.normal.via_iocb = iocb; + result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE, &iocb->ki_pos, count); cl_env_put(env, &refcheck); @@ -1110,6 +1132,7 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count, #endif +#ifdef HAVE_KERNEL_SENDFILE /* * Send file content (through pagecache) somewhere with helper */ @@ -1117,7 +1140,7 @@ static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count, read_actor_t actor, void *target) { struct lu_env *env; - struct ccc_io_args *args; + struct vvp_io_args *args; ssize_t result; int refcheck; ENTRY; @@ -1126,14 +1149,43 @@ static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count, if (IS_ERR(env)) RETURN(PTR_ERR(env)); - args = &vvp_env_info(env)->vti_args; - args->cia_is_sendfile = 1; - args->cia_target = target; - args->cia_actor = actor; + args = vvp_env_args(env, IO_SENDFILE); + args->u.sendfile.via_target = target; + args->u.sendfile.via_actor = actor; + result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count); cl_env_put(env, &refcheck); RETURN(result); } +#endif + +#ifdef HAVE_KERNEL_SPLICE_READ +/* + * Send file content (through pagecache) somewhere with helper + */ +static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos, + struct pipe_inode_info *pipe, size_t count, + unsigned int flags) +{ + struct lu_env *env; + struct vvp_io_args *args; + ssize_t result; + int refcheck; + ENTRY; + + env = cl_env_get(&refcheck); + if (IS_ERR(env)) + RETURN(PTR_ERR(env)); + + args = vvp_env_args(env, IO_SPLICE); + args->u.splice.via_pipe = pipe; + args->u.splice.via_flags = flags; + + result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count); + cl_env_put(env, &refcheck); + RETURN(result); +} +#endif static int ll_lov_recreate_obj(struct inode *inode, struct file *file, unsigned long arg) @@ -1266,8 +1318,7 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, LASSERT(lmm != NULL); if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) && - (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3)) && - (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_JOIN))) { + (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) { GOTO(out, rc = -EPROTO); } @@ -1291,62 +1342,9 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, lustre_swab_lov_user_md_objects( ((struct lov_user_md_v3 *)lmm)->lmm_objects, ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count); - } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_JOIN)) { - lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm); } } - if (lmm->lmm_magic == LOV_MAGIC_JOIN) { - struct lov_stripe_md *lsm; - struct lov_user_md_join *lmj; - int lmj_size, i, aindex = 0; - - rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize); - if (rc < 0) - GOTO(out, rc = -ENOMEM); - rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm); - if (rc) - GOTO(out_free_memmd, rc); - - lmj_size = sizeof(struct lov_user_md_join) + - lsm->lsm_stripe_count * - sizeof(struct lov_user_ost_data_join); - OBD_ALLOC(lmj, lmj_size); - if (!lmj) - GOTO(out_free_memmd, rc = -ENOMEM); - - memcpy(lmj, lmm, sizeof(struct lov_user_md_join)); - for (i = 0; i < lsm->lsm_stripe_count; i++) { - struct lov_extent *lex = - &lsm->lsm_array->lai_ext_array[aindex]; - - if (lex->le_loi_idx + lex->le_stripe_count <= i) - aindex ++; - CDEBUG(D_INFO, "aindex %d i %d l_extent_start " - LPU64" len %d\n", aindex, i, - lex->le_start, (int)lex->le_len); - lmj->lmm_objects[i].l_extent_start = - lex->le_start; - - if ((int)lex->le_len == -1) - lmj->lmm_objects[i].l_extent_end = -1; - else - lmj->lmm_objects[i].l_extent_end = - lex->le_start + lex->le_len; - lmj->lmm_objects[i].l_object_id = - lsm->lsm_oinfo[i]->loi_id; - lmj->lmm_objects[i].l_object_gr = - lsm->lsm_oinfo[i]->loi_gr; - lmj->lmm_objects[i].l_ost_gen = - lsm->lsm_oinfo[i]->loi_ost_gen; - lmj->lmm_objects[i].l_ost_idx = - lsm->lsm_oinfo[i]->loi_ost_idx; - } - lmm = (struct lov_mds_md *)lmj; - lmmsize = lmj_size; -out_free_memmd: - obd_free_memmd(sbi->ll_dt_exp, &lsm); - } out: *lmmp = lmm; *lmm_size = lmmsize; @@ -1426,192 +1424,79 @@ static int ll_lov_getstripe(struct inode *inode, unsigned long arg) (void *)arg); } -static int ll_get_grouplock(struct inode *inode, struct file *file, - unsigned long arg) +int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg) { - /* XXX */ - return -ENOSYS; -} + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + struct ccc_grouplock grouplock; + int rc; + ENTRY; -static int ll_put_grouplock(struct inode *inode, struct file *file, - unsigned long arg) -{ - /* XXX */ - return -ENOSYS; -} + if (ll_file_nolock(file)) + RETURN(-EOPNOTSUPP); -#if LUSTRE_FIX >= 50 -static int join_sanity_check(struct inode *head, struct inode *tail) -{ - ENTRY; - if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) { - CERROR("server do not support join \n"); - RETURN(-EINVAL); - } - if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) { - CERROR("tail ino %lu and ino head %lu must be regular\n", - head->i_ino, tail->i_ino); - RETURN(-EINVAL); - } - if (head->i_ino == tail->i_ino) { - CERROR("file %lu can not be joined to itself \n", head->i_ino); + spin_lock(&lli->lli_lock); + if (fd->fd_flags & LL_FILE_GROUP_LOCKED) { + CWARN("group lock already existed with gid %lu\n", + fd->fd_grouplock.cg_gid); + spin_unlock(&lli->lli_lock); RETURN(-EINVAL); } - if (i_size_read(head) % JOIN_FILE_ALIGN) { - CERROR("hsize %llu must be times of 64K\n", i_size_read(head)); + LASSERT(fd->fd_grouplock.cg_lock == NULL); + spin_unlock(&lli->lli_lock); + + rc = cl_get_grouplock(cl_i2info(inode)->lli_clob, + arg, (file->f_flags & O_NONBLOCK), &grouplock); + if (rc) + RETURN(rc); + + spin_lock(&lli->lli_lock); + if (fd->fd_flags & LL_FILE_GROUP_LOCKED) { + spin_unlock(&lli->lli_lock); + CERROR("another thread just won the race\n"); + cl_put_grouplock(&grouplock); RETURN(-EINVAL); } + + fd->fd_flags |= LL_FILE_GROUP_LOCKED; + fd->fd_grouplock = grouplock; + spin_unlock(&lli->lli_lock); + + CDEBUG(D_INFO, "group lock %lu obtained\n", arg); RETURN(0); } -static int join_file(struct inode *head_inode, struct file *head_filp, - struct file *tail_filp) +int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg) { - struct dentry *tail_dentry = tail_filp->f_dentry; - struct lookup_intent oit = {.it_op = IT_OPEN, - .it_flags = head_filp->f_flags|O_JOIN_FILE}; - struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW, - ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL }; - - struct lustre_handle lockh; - struct md_op_data *op_data; - int rc; - loff_t data; + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + struct ccc_grouplock grouplock; ENTRY; - tail_dentry = tail_filp->f_dentry; - - data = i_size_read(head_inode); - op_data = ll_prep_md_op_data(NULL, head_inode, - tail_dentry->d_parent->d_inode, - tail_dentry->d_name.name, - tail_dentry->d_name.len, 0, - LUSTRE_OPC_ANY, &data); - if (IS_ERR(op_data)) - RETURN(PTR_ERR(op_data)); - - rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, - op_data, &lockh, NULL, 0, NULL, 0); - - ll_finish_md_op_data(op_data); - if (rc < 0) - GOTO(out, rc); - - rc = oit.d.lustre.it_status; - - if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) { - rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit); - ptlrpc_req_finished((struct ptlrpc_request *) - oit.d.lustre.it_data); - GOTO(out, rc); + spin_lock(&lli->lli_lock); + if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) { + spin_unlock(&lli->lli_lock); + CWARN("no group lock held\n"); + RETURN(-EINVAL); } + LASSERT(fd->fd_grouplock.cg_lock != NULL); - if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right - * away */ - ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode); - oit.d.lustre.it_lock_mode = 0; + if (fd->fd_grouplock.cg_gid != arg) { + CWARN("group lock %lu doesn't match current id %lu\n", + arg, fd->fd_grouplock.cg_gid); + spin_unlock(&lli->lli_lock); + RETURN(-EINVAL); } - ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data); - it_clear_disposition(&oit, DISP_ENQ_COMPLETE); - ll_release_openhandle(head_filp->f_dentry, &oit); -out: - ll_intent_release(&oit); - RETURN(rc); -} -static int ll_file_join(struct inode *head, struct file *filp, - char *filename_tail) -{ - struct inode *tail = NULL, *first = NULL, *second = NULL; - struct dentry *tail_dentry; - struct file *tail_filp, *first_filp, *second_filp; - struct ll_lock_tree first_tree, second_tree; - struct ll_lock_tree_node *first_node, *second_node; - struct ll_inode_info *hlli = ll_i2info(head), *tlli; - int rc = 0, cleanup_phase = 0; - ENTRY; - - CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n", - head->i_ino, head->i_generation, head, filename_tail); - - tail_filp = filp_open(filename_tail, O_WRONLY, 0644); - if (IS_ERR(tail_filp)) { - CERROR("Can not open tail file %s", filename_tail); - rc = PTR_ERR(tail_filp); - GOTO(cleanup, rc); - } - tail = igrab(tail_filp->f_dentry->d_inode); - - tlli = ll_i2info(tail); - tail_dentry = tail_filp->f_dentry; - LASSERT(tail_dentry); - cleanup_phase = 1; - - /*reorder the inode for lock sequence*/ - first = head->i_ino > tail->i_ino ? head : tail; - second = head->i_ino > tail->i_ino ? tail : head; - first_filp = head->i_ino > tail->i_ino ? filp : tail_filp; - second_filp = head->i_ino > tail->i_ino ? tail_filp : filp; - - CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n", - head->i_ino, tail->i_ino, first->i_ino, second->i_ino); - first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX); - if (IS_ERR(first_node)){ - rc = PTR_ERR(first_node); - GOTO(cleanup, rc); - } - first_tree.lt_fd = first_filp->private_data; - rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0); - if (rc != 0) - GOTO(cleanup, rc); - cleanup_phase = 2; - - second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX); - if (IS_ERR(second_node)){ - rc = PTR_ERR(second_node); - GOTO(cleanup, rc); - } - second_tree.lt_fd = second_filp->private_data; - rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0); - if (rc != 0) - GOTO(cleanup, rc); - cleanup_phase = 3; - - rc = join_sanity_check(head, tail); - if (rc) - GOTO(cleanup, rc); + grouplock = fd->fd_grouplock; + memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock)); + fd->fd_flags &= ~LL_FILE_GROUP_LOCKED; + spin_unlock(&lli->lli_lock); - rc = join_file(head, filp, tail_filp); - if (rc) - GOTO(cleanup, rc); -cleanup: - switch (cleanup_phase) { - case 3: - ll_tree_unlock(&second_tree); - obd_cancel_unused(ll_i2dtexp(second), - ll_i2info(second)->lli_smd, 0, NULL); - case 2: - ll_tree_unlock(&first_tree); - obd_cancel_unused(ll_i2dtexp(first), - ll_i2info(first)->lli_smd, 0, NULL); - case 1: - filp_close(tail_filp, 0); - if (tail) - iput(tail); - if (head && rc == 0) { - obd_free_memmd(ll_i2sbi(head)->ll_dt_exp, - &hlli->lli_smd); - hlli->lli_smd = NULL; - } - case 0: - break; - default: - CERROR("invalid cleanup_phase %d\n", cleanup_phase); - LBUG(); - } - RETURN(rc); + cl_put_grouplock(&grouplock); + CDEBUG(D_INFO, "group lock %lu released\n", arg); + RETURN(0); } -#endif /* LUSTRE_FIX >= 50 */ /** * Close inode open handle @@ -1662,7 +1547,7 @@ int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it) * Get size for inode for which FIEMAP mapping is requested. * Make the FIEMAP get_info call and returns the result. */ -int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap, +int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap, int num_bytes) { struct obd_export *exp = ll_i2dtexp(inode); @@ -1701,6 +1586,42 @@ int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap, RETURN(rc); } +int ll_fid2path(struct obd_export *exp, void *arg) +{ + struct getinfo_fid2path *gfout, *gfin; + int outsize, rc; + ENTRY; + + /* Need to get the buflen */ + OBD_ALLOC_PTR(gfin); + if (gfin == NULL) + RETURN(-ENOMEM); + if (copy_from_user(gfin, arg, sizeof(*gfin))) { + OBD_FREE_PTR(gfin); + RETURN(-EFAULT); + } + + outsize = sizeof(*gfout) + gfin->gf_pathlen; + OBD_ALLOC(gfout, outsize); + if (gfout == NULL) { + OBD_FREE_PTR(gfin); + RETURN(-ENOMEM); + } + memcpy(gfout, gfin, sizeof(*gfout)); + OBD_FREE_PTR(gfin); + + /* Call mdc_iocontrol */ + rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL); + if (rc) + GOTO(gf_free, rc); + if (copy_to_user(arg, gfout, outsize)) + rc = -EFAULT; + +gf_free: + OBD_FREE(gfout, outsize); + RETURN(rc); +} + int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd, unsigned long arg) { @@ -1750,7 +1671,7 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd, RETURN(ll_lov_getstripe(inode, arg)); case LL_IOC_RECREATE_OBJ: RETURN(ll_lov_recreate_obj(inode, file, arg)); - case EXT3_IOC_FIEMAP: { + case FSFILT_IOC_FIEMAP: { struct ll_user_fiemap *fiemap_s; size_t num_bytes, ret_bytes; unsigned int extent_count; @@ -1799,7 +1720,7 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd, GOTO(error, rc); } - rc = ll_fiemap(inode, fiemap_s, num_bytes); + rc = ll_do_fiemap(inode, fiemap_s, num_bytes); if (rc) GOTO(error, rc); @@ -1816,29 +1737,12 @@ error: OBD_VFREE(fiemap_s, num_bytes); RETURN(rc); } - case EXT3_IOC_GETFLAGS: - case EXT3_IOC_SETFLAGS: + case FSFILT_IOC_GETFLAGS: + case FSFILT_IOC_SETFLAGS: RETURN(ll_iocontrol(inode, file, cmd, arg)); - case EXT3_IOC_GETVERSION_OLD: - case EXT3_IOC_GETVERSION: + case FSFILT_IOC_GETVERSION_OLD: + case FSFILT_IOC_GETVERSION: RETURN(put_user(inode->i_generation, (int *)arg)); - case LL_IOC_JOIN: { -#if LUSTRE_FIX >= 50 - /* Allow file join in beta builds to allow debuggging */ - char *ftail; - int rc; - - ftail = getname((const char *)arg); - if (IS_ERR(ftail)) - RETURN(PTR_ERR(ftail)); - rc = ll_file_join(inode, file, ftail); - putname(ftail); - RETURN(rc); -#else - CWARN("file join is not supported in this version of Lustre\n"); - RETURN(-ENOTTY); -#endif - } case LL_IOC_GROUP_LOCK: RETURN(ll_get_grouplock(inode, file, arg)); case LL_IOC_GROUP_UNLOCK: @@ -1849,18 +1753,21 @@ error: /* We need to special case any other ioctls we want to handle, * to send them to the MDS/OST as appropriate and to properly * network encode the arg field. - case EXT3_IOC_SETVERSION_OLD: - case EXT3_IOC_SETVERSION: + case FSFILT_IOC_SETVERSION_OLD: + case FSFILT_IOC_SETVERSION: */ case LL_IOC_FLUSHCTX: RETURN(ll_flush_ctx(inode)); case LL_IOC_PATH2FID: { - if (copy_to_user((void *)arg, &ll_i2info(inode)->lli_fid, + if (copy_to_user((void *)arg, ll_inode2fid(inode), sizeof(struct lu_fid))) RETURN(-EFAULT); RETURN(0); } + case OBD_IOC_FID2PATH: + RETURN(ll_fid2path(ll_i2mdexp(inode), (void *)arg)); + default: { int err; @@ -2152,13 +2059,14 @@ static int ll_inode_revalidate_fini(struct inode *inode, int rc) { return 0; } -int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it) +int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it, + __u64 ibits) { struct inode *inode = dentry->d_inode; struct ptlrpc_request *req = NULL; struct ll_sb_info *sbi; struct obd_export *exp; - int rc; + int rc = 0; ENTRY; if (!inode) { @@ -2183,14 +2091,14 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it) if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); - oit.it_flags |= O_CHECK_STALE; + oit.it_create_mode |= M_CHECK_STALE; rc = md_intent_lock(exp, op_data, NULL, 0, /* we are not interested in name based lookup */ &oit, 0, &req, ll_md_blocking_ast, 0); ll_finish_md_op_data(op_data); - oit.it_flags &= ~O_CHECK_STALE; + oit.it_create_mode &= ~M_CHECK_STALE; if (rc < 0) { rc = ll_inode_revalidate_fini(inode, rc); GOTO (out, rc); @@ -2207,16 +2115,14 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it) here to preserve get_cwd functionality on 2.6. Bug 10503 */ if (!dentry->d_inode->i_nlink) { - spin_lock(&ll_lookup_lock); spin_lock(&dcache_lock); ll_drop_dentry(dentry); spin_unlock(&dcache_lock); - spin_unlock(&ll_lookup_lock); } - ll_lookup_finish_locks(&oit, dentry); - } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE | - MDS_INODELOCK_LOOKUP)) { + ll_finish_locks(&oit, dentry); + } else if (!ll_have_md_lock(dentry->d_inode, ibits)) { + struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode); obd_valid valid = OBD_MD_FLGETATTR; struct obd_capa *oc; @@ -2241,21 +2147,31 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it) } rc = ll_prep_inode(&inode, req, NULL); - if (rc) - GOTO(out, rc); } +out: + ptlrpc_req_finished(req); + return rc; +} + +int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it) +{ + int rc; + ENTRY; + + rc = __ll_inode_revalidate_it(dentry, it, MDS_INODELOCK_UPDATE | + MDS_INODELOCK_LOOKUP); /* if object not yet allocated, don't validate size */ - if (ll_i2info(inode)->lli_smd == NULL) - GOTO(out, rc = 0); + if (rc == 0 && ll_i2info(dentry->d_inode)->lli_smd == NULL) + RETURN(0); /* cl_glimpse_size will prefer locally cached writes if they extend * the file */ - rc = cl_glimpse_size(inode); - EXIT; -out: - ptlrpc_req_finished(req); - return rc; + + if (rc == 0) + rc = cl_glimpse_size(dentry->d_inode); + + RETURN(rc); } int ll_getattr_it(struct vfsmount *mnt, struct dentry *de, @@ -2300,6 +2216,26 @@ int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat) return ll_getattr_it(mnt, de, &it, stat); } +#ifdef HAVE_LINUX_FIEMAP_H +int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo, + __u64 start, __u64 len) +{ + int rc; + struct ll_user_fiemap *fiemap = (struct ll_user_fiemap*)( + fieinfo->fi_extents_start - sizeof(ll_user_fiemap)); + + rc = ll_do_fiemap(inode, fiemap, sizeof(*fiemap) + + fiemap->fm_extent_count * + sizeof(struct ll_fiemap_extent)); + + fieinfo->fi_flags = fiemap->fm_flags; + fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents; + + return rc; +} +#endif + + static int lustre_check_acl(struct inode *inode, int mask) { @@ -2326,15 +2262,37 @@ int lustre_check_acl(struct inode *inode, int mask) } #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10)) +#ifndef HAVE_INODE_PERMISION_2ARGS int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd) +#else +int ll_inode_permission(struct inode *inode, int mask) +#endif { - CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n", - inode->i_ino, inode->i_generation, inode, mask); + int rc = 0; + ENTRY; + + /* as root inode are NOT getting validated in lookup operation, + * need to do it before permission check. */ + + if (inode == inode->i_sb->s_root->d_inode) { + struct lookup_intent it = { .it_op = IT_LOOKUP }; + + rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it, + MDS_INODELOCK_LOOKUP); + if (rc) + RETURN(rc); + } + + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n", + inode->i_ino, inode->i_generation, inode, inode->i_mode, mask); + if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT) return lustre_check_remote_perm(inode, mask); ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1); - return generic_permission(inode, mask, lustre_check_acl); + rc = generic_permission(inode, mask, lustre_check_acl); + + RETURN(rc); } #else int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd) @@ -2411,7 +2369,12 @@ struct file_operations ll_file_operations = { .release = ll_file_release, .mmap = ll_file_mmap, .llseek = ll_file_seek, +#ifdef HAVE_KERNEL_SENDFILE .sendfile = ll_file_sendfile, +#endif +#ifdef HAVE_KERNEL_SPLICE_READ + .splice_read = ll_file_splice_read, +#endif .fsync = ll_fsync, }; @@ -2425,7 +2388,12 @@ struct file_operations ll_file_operations_flock = { .release = ll_file_release, .mmap = ll_file_mmap, .llseek = ll_file_seek, +#ifdef HAVE_KERNEL_SENDFILE .sendfile = ll_file_sendfile, +#endif +#ifdef HAVE_KERNEL_SPLICE_READ + .splice_read = ll_file_splice_read, +#endif .fsync = ll_fsync, #ifdef HAVE_F_OP_FLOCK .flock = ll_file_flock, @@ -2444,7 +2412,12 @@ struct file_operations ll_file_operations_noflock = { .release = ll_file_release, .mmap = ll_file_mmap, .llseek = ll_file_seek, +#ifdef HAVE_KERNEL_SENDFILE .sendfile = ll_file_sendfile, +#endif +#ifdef HAVE_KERNEL_SPLICE_READ + .splice_read = ll_file_splice_read, +#endif .fsync = ll_fsync, #ifdef HAVE_F_OP_FLOCK .flock = ll_file_noflock, @@ -2464,6 +2437,9 @@ struct inode_operations ll_file_inode_operations = { .getxattr = ll_getxattr, .listxattr = ll_listxattr, .removexattr = ll_removexattr, +#ifdef HAVE_LINUX_FIEMAP_H + .fiemap = ll_fiemap, +#endif }; /* dynamic ioctl number support routins */