X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fllite%2Ffile.c;h=87b30b4936f957913506a2f408c206cf9ae6ea16;hp=01530a5dfa4be9185d1f544e450f59db1ea35021;hb=2402980a0891e43668f4016e17f2ff872006e0fa;hpb=9a93221475f5fab91ef510d77ca5d88b47140f8e diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 01530a5..87b30b4 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -27,7 +27,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Intel Corporation. + * Copyright (c) 2011, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -54,8 +54,12 @@ struct ll_file_data *ll_file_data_get(void) { struct ll_file_data *fd; - OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, CFS_ALLOC_IO); + OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, __GFP_IO); + if (fd == NULL) + return NULL; + fd->fd_write_failed = false; + return fd; } @@ -95,8 +99,9 @@ static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data, { ENTRY; - op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET | - ATTR_MTIME_SET | ATTR_CTIME_SET; + op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME | ATTR_ATIME_SET | + ATTR_MTIME | ATTR_MTIME_SET | + ATTR_CTIME | ATTR_CTIME_SET; if (!(och->och_flags & FMODE_WRITE)) goto out; @@ -246,6 +251,24 @@ int ll_md_close(struct obd_export *md_exp, struct inode *inode, if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid); + if (fd->fd_lease_och != NULL) { + bool lease_broken; + + /* Usually the lease is not released when the + * application crashed, we need to release here. */ + rc = ll_lease_close(fd->fd_lease_och, inode, &lease_broken); + CDEBUG(rc ? D_ERROR : D_INODE, "Clean up lease "DFID" %d/%d\n", + PFID(&lli->lli_fid), rc, lease_broken); + + fd->fd_lease_och = NULL; + } + + if (fd->fd_och != NULL) { + rc = ll_close_inode_openhandle(md_exp, inode, fd->fd_och); + fd->fd_och = NULL; + GOTO(out, rc); + } + /* Let's see if we have good enough OPEN lock on the file and if we can skip talking to MDS */ if (file->f_dentry->d_inode) { /* Can this ever be false? */ @@ -282,11 +305,12 @@ int ll_md_close(struct obd_export *md_exp, struct inode *inode, file, file->f_dentry, file->f_dentry->d_name.name); } - LUSTRE_FPRIVATE(file) = NULL; - ll_file_data_put(fd); - ll_capa_close(inode); +out: + LUSTRE_FPRIVATE(file) = NULL; + ll_file_data_put(fd); + ll_capa_close(inode); - RETURN(rc); + RETURN(rc); } /* While this returns an error code, fput() the caller does not, so we need @@ -355,8 +379,6 @@ static int ll_intent_file_open(struct file *file, void *lmm, { struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode); struct dentry *parent = file->f_dentry->d_parent; - const char *name = file->f_dentry->d_name.name; - const int len = file->f_dentry->d_name.len; struct md_op_data *op_data; struct ptlrpc_request *req; __u32 opc = LUSTRE_OPC_ANY; @@ -381,9 +403,10 @@ static int ll_intent_file_open(struct file *file, void *lmm, opc = LUSTRE_OPC_CREATE; } - op_data = ll_prep_md_op_data(NULL, parent->d_inode, - file->f_dentry->d_inode, name, len, - O_RDWR, opc, NULL); + op_data = ll_prep_md_op_data(NULL, parent->d_inode, + file->f_dentry->d_inode, NULL, 0, + O_RDWR, opc, NULL); + if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); @@ -438,24 +461,20 @@ void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch) } } -static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli, - struct lookup_intent *it, struct obd_client_handle *och) +static int ll_och_fill(struct obd_export *md_exp, struct lookup_intent *it, + struct obd_client_handle *och) { - struct ptlrpc_request *req = it->d.lustre.it_data; - struct mdt_body *body; - - LASSERT(och); - - body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - LASSERT(body != NULL); /* reply already checked out */ + struct ptlrpc_request *req = it->d.lustre.it_data; + struct mdt_body *body; - memcpy(&och->och_fh, &body->handle, sizeof(body->handle)); - och->och_magic = OBD_CLIENT_HANDLE_MAGIC; - och->och_fid = lli->lli_fid; - och->och_flags = it->it_flags; - ll_ioepoch_open(lli, body->ioepoch); + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + och->och_fh = body->handle; + och->och_fid = body->fid1; + och->och_lease_handle.cookie = it->d.lustre.it_lock_handle; + och->och_magic = OBD_CLIENT_HANDLE_MAGIC; + och->och_flags = it->it_flags; - return md_set_open_replay_data(md_exp, och, req); + return md_set_open_replay_data(md_exp, och, req); } int ll_local_open(struct file *file, struct lookup_intent *it, @@ -474,21 +493,19 @@ int ll_local_open(struct file *file, struct lookup_intent *it, struct mdt_body *body; int rc; - rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och); - if (rc) - RETURN(rc); + rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och); + if (rc != 0) + RETURN(rc); - body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - if ((it->it_flags & FMODE_WRITE) && - (body->valid & OBD_MD_FLSIZE)) - CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n", - lli->lli_ioepoch, PFID(&lli->lli_fid)); - } + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + ll_ioepoch_open(lli, body->ioepoch); + } + + LUSTRE_FPRIVATE(file) = fd; + ll_readahead_init(inode, &fd->fd_ras); + fd->fd_omode = it->it_flags & (FMODE_READ | FMODE_WRITE | FMODE_EXEC); - LUSTRE_FPRIVATE(file) = fd; - ll_readahead_init(inode, &fd->fd_ras); - fd->fd_omode = it->it_flags; - RETURN(0); + RETURN(0); } /* Open a file, and (for the very first open) create objects on the OSTs at @@ -521,12 +538,12 @@ int ll_file_open(struct inode *inode, struct file *file) it = file->private_data; /* XXX: compat macro */ file->private_data = NULL; /* prevent ll_local_open assertion */ - fd = ll_file_data_get(); - if (fd == NULL) - GOTO(out_och_free, rc = -ENOMEM); + fd = ll_file_data_get(); + if (fd == NULL) + GOTO(out_openerr, rc = -ENOMEM); - fd->fd_file = file; - if (S_ISDIR(inode->i_mode)) { + fd->fd_file = file; + if (S_ISDIR(inode->i_mode)) { spin_lock(&lli->lli_sa_lock); if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL && lli->lli_opendir_pid == 0) { @@ -535,7 +552,7 @@ int ll_file_open(struct inode *inode, struct file *file) opendir_set = 1; } spin_unlock(&lli->lli_sa_lock); - } + } if (inode->i_sb->s_root == file->f_dentry) { LUSTRE_FPRIVATE(file) = fd; @@ -690,6 +707,199 @@ out_openerr: return rc; } +static int ll_md_blocking_lease_ast(struct ldlm_lock *lock, + struct ldlm_lock_desc *desc, void *data, int flag) +{ + int rc; + struct lustre_handle lockh; + ENTRY; + + switch (flag) { + case LDLM_CB_BLOCKING: + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh, LCF_ASYNC); + if (rc < 0) { + CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc); + RETURN(rc); + } + break; + case LDLM_CB_CANCELING: + /* do nothing */ + break; + } + RETURN(0); +} + +/** + * Acquire a lease and open the file. + */ +struct obd_client_handle *ll_lease_open(struct inode *inode, struct file *file, + fmode_t fmode) +{ + struct lookup_intent it = { .it_op = IT_OPEN }; + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct md_op_data *op_data; + struct ptlrpc_request *req; + struct lustre_handle old_handle = { 0 }; + struct obd_client_handle *och = NULL; + int rc; + int rc2; + ENTRY; + + if (fmode != FMODE_WRITE && fmode != FMODE_READ) + RETURN(ERR_PTR(-EINVAL)); + + if (file != NULL) { + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + struct obd_client_handle **och_p; + __u64 *och_usecount; + + if (!(fmode & file->f_mode) || (file->f_mode & FMODE_EXEC)) + RETURN(ERR_PTR(-EPERM)); + + /* Get the openhandle of the file */ + rc = -EBUSY; + mutex_lock(&lli->lli_och_mutex); + if (fd->fd_lease_och != NULL) { + mutex_unlock(&lli->lli_och_mutex); + RETURN(ERR_PTR(rc)); + } + + if (fd->fd_och == NULL) { + if (file->f_mode & FMODE_WRITE) { + LASSERT(lli->lli_mds_write_och != NULL); + och_p = &lli->lli_mds_write_och; + och_usecount = &lli->lli_open_fd_write_count; + } else { + LASSERT(lli->lli_mds_read_och != NULL); + och_p = &lli->lli_mds_read_och; + och_usecount = &lli->lli_open_fd_read_count; + } + if (*och_usecount == 1) { + fd->fd_och = *och_p; + *och_p = NULL; + *och_usecount = 0; + rc = 0; + } + } + mutex_unlock(&lli->lli_och_mutex); + if (rc < 0) /* more than 1 opener */ + RETURN(ERR_PTR(rc)); + + LASSERT(fd->fd_och != NULL); + old_handle = fd->fd_och->och_fh; + } + + OBD_ALLOC_PTR(och); + if (och == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, 0, 0, + LUSTRE_OPC_ANY, NULL); + if (IS_ERR(op_data)) + GOTO(out, rc = PTR_ERR(op_data)); + + /* To tell the MDT this openhandle is from the same owner */ + op_data->op_handle = old_handle; + + it.it_flags = fmode | MDS_OPEN_LOCK | MDS_OPEN_BY_FID | MDS_OPEN_LEASE; + rc = md_intent_lock(sbi->ll_md_exp, op_data, NULL, 0, &it, 0, &req, + ll_md_blocking_lease_ast, + /* LDLM_FL_NO_LRU: To not put the lease lock into LRU list, otherwise + * it can be cancelled which may mislead applications that the lease is + * broken; + * LDLM_FL_EXCL: Set this flag so that it won't be matched by normal + * open in ll_md_blocking_ast(). Otherwise as ll_md_blocking_lease_ast + * doesn't deal with openhandle, so normal openhandle will be leaked. */ + LDLM_FL_NO_LRU | LDLM_FL_EXCL); + ll_finish_md_op_data(op_data); + if (req != NULL) { + ptlrpc_req_finished(req); + it_clear_disposition(&it, DISP_ENQ_COMPLETE); + } + if (rc < 0) + GOTO(out_release_it, rc); + + if (it_disposition(&it, DISP_LOOKUP_NEG)) + GOTO(out_release_it, rc = -ENOENT); + + rc = it_open_error(DISP_OPEN_OPEN, &it); + if (rc) + GOTO(out_release_it, rc); + + LASSERT(it_disposition(&it, DISP_ENQ_OPEN_REF)); + ll_och_fill(sbi->ll_md_exp, &it, och); + + if (!it_disposition(&it, DISP_OPEN_LEASE)) /* old server? */ + GOTO(out_close, rc = -EOPNOTSUPP); + + /* already get lease, handle lease lock */ + ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL); + if (it.d.lustre.it_lock_mode == 0 || + it.d.lustre.it_lock_bits != MDS_INODELOCK_OPEN) { + /* open lock must return for lease */ + CERROR(DFID "lease granted but no open lock, %d/%Lu.\n", + PFID(ll_inode2fid(inode)), it.d.lustre.it_lock_mode, + it.d.lustre.it_lock_bits); + GOTO(out_close, rc = -EPROTO); + } + + ll_intent_release(&it); + RETURN(och); + +out_close: + rc2 = ll_close_inode_openhandle(sbi->ll_md_exp, inode, och); + if (rc2) + CERROR("Close openhandle returned %d\n", rc2); + + /* cancel open lock */ + if (it.d.lustre.it_lock_mode != 0) { + ldlm_lock_decref_and_cancel(&och->och_lease_handle, + it.d.lustre.it_lock_mode); + it.d.lustre.it_lock_mode = 0; + } +out_release_it: + ll_intent_release(&it); +out: + OBD_FREE_PTR(och); + RETURN(ERR_PTR(rc)); +} +EXPORT_SYMBOL(ll_lease_open); + +/** + * Release lease and close the file. + * It will check if the lease has ever broken. + */ +int ll_lease_close(struct obd_client_handle *och, struct inode *inode, + bool *lease_broken) +{ + struct ldlm_lock *lock; + bool cancelled = true; + int rc; + ENTRY; + + lock = ldlm_handle2lock(&och->och_lease_handle); + if (lock != NULL) { + lock_res_and_lock(lock); + cancelled = ldlm_is_cancel(lock); + unlock_res_and_lock(lock); + ldlm_lock_put(lock); + } + + CDEBUG(D_INODE, "lease for "DFID" broken? %d\n", + PFID(&ll_i2info(inode)->lli_fid), cancelled); + + if (!cancelled) + ldlm_cli_cancel(&och->och_lease_handle, 0); + if (lease_broken != NULL) + *lease_broken = cancelled; + + rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, inode, och); + RETURN(rc); +} +EXPORT_SYMBOL(ll_lease_close); + /* Fills the obdo with the attributes for the lsm */ static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp, struct obd_capa *capa, struct obdo *obdo, @@ -922,7 +1132,7 @@ out: cl_io_fini(env, io); /* If any bit been read/written (result != 0), we just return * short read/write instead of restart io. */ - if (result == 0 && io->ci_need_restart) { + if ((result == 0 || result == -ENODATA) && io->ci_need_restart) { CDEBUG(D_VFSTRACE, "Restart %s on %s from %lld, count:%zd\n", iot == CIT_READ ? "read" : "write", file->f_dentry->d_name.name, *ppos, count); @@ -1201,35 +1411,6 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count, } #endif - -#ifdef HAVE_KERNEL_SENDFILE -/* - * Send file content (through pagecache) somewhere with helper - */ -static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count, - read_actor_t actor, void *target) -{ - struct lu_env *env; - struct vvp_io_args *args; - ssize_t result; - int refcheck; - ENTRY; - - env = cl_env_get(&refcheck); - if (IS_ERR(env)) - RETURN(PTR_ERR(env)); - - args = vvp_env_args(env, IO_SENDFILE); - args->u.sendfile.via_target = target; - args->u.sendfile.via_actor = actor; - - result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count); - cl_env_put(env, &refcheck); - RETURN(result); -} -#endif - -#ifdef HAVE_KERNEL_SPLICE_READ /* * Send file content (through pagecache) somewhere with helper */ @@ -1255,9 +1436,8 @@ static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos, cl_env_put(env, &refcheck); RETURN(result); } -#endif -static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq, +static int ll_lov_recreate(struct inode *inode, struct ost_id *oi, obd_count ost_idx) { struct obd_export *exp = ll_i2dtexp(inode); @@ -1273,7 +1453,7 @@ static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq, RETURN(-ENOMEM); lsm = ccc_inode_lsm_get(inode); - if (lsm == NULL) + if (!lsm_has_objects(lsm)) GOTO(out, rc = -ENOENT); lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) * @@ -1283,8 +1463,7 @@ static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq, if (lsm2 == NULL) GOTO(out, rc = -ENOMEM); - oa->o_id = id; - oa->o_seq = seq; + oa->o_oi = *oi; oa->o_nlink = ost_idx; oa->o_flags |= OBD_FL_RECREATE_OBJS; oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP; @@ -1307,6 +1486,7 @@ out: static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg) { struct ll_recreate_obj ucreat; + struct ost_id oi; ENTRY; if (!cfs_capable(CFS_CAP_SYS_ADMIN)) @@ -1316,14 +1496,15 @@ static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg) sizeof(ucreat))) RETURN(-EFAULT); - RETURN(ll_lov_recreate(inode, ucreat.lrc_id, 0, - ucreat.lrc_ost_idx)); + ostid_set_seq_mdt0(&oi); + ostid_set_id(&oi, ucreat.lrc_id); + RETURN(ll_lov_recreate(inode, &oi, ucreat.lrc_ost_idx)); } static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg) { struct lu_fid fid; - obd_id id; + struct ost_id oi; obd_count ost_idx; ENTRY; @@ -1333,9 +1514,9 @@ static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg) if (copy_from_user(&fid, (struct lu_fid *)arg, sizeof(fid))) RETURN(-EFAULT); - id = fid_oid(&fid) | ((fid_seq(&fid) & 0xffff) << 32); + fid_to_ostid(&fid, &oi); ost_idx = (fid_seq(&fid) >> 16) & 0xffff; - RETURN(ll_lov_recreate(inode, id, 0, ost_idx)); + RETURN(ll_lov_recreate(inode, &oi, ost_idx)); } int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file, @@ -1428,6 +1609,12 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, * passing it to userspace. */ if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) { + int stripe_count; + + stripe_count = le16_to_cpu(lmm->lmm_stripe_count); + if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_RELEASED) + stripe_count = 0; + /* if function called for directory - we should * avoid swab not existent lsm objects */ if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) { @@ -1435,13 +1622,13 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, if (S_ISREG(body->mode)) lustre_swab_lov_user_md_objects( ((struct lov_user_md_v1 *)lmm)->lmm_objects, - ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count); + stripe_count); } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) { lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm); if (S_ISREG(body->mode)) lustre_swab_lov_user_md_objects( ((struct lov_user_md_v3 *)lmm)->lmm_objects, - ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count); + stripe_count); } } @@ -1638,8 +1825,7 @@ int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it) if (!och) GOTO(out, rc = -ENOMEM); - ll_och_fill(ll_i2sbi(inode)->ll_md_exp, - ll_i2info(inode), it, och); + ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och); rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, inode, och); @@ -1827,20 +2013,18 @@ int ll_data_version(struct inode *inode, __u64 *data_version, /* If no stripe, we consider version is 0. */ lsm = ccc_inode_lsm_get(inode); - if (lsm == NULL) { + if (!lsm_has_objects(lsm)) { *data_version = 0; CDEBUG(D_INODE, "No object for inode\n"); - RETURN(0); + GOTO(out, rc = 0); } OBD_ALLOC_PTR(obdo); - if (obdo == NULL) { - ccc_inode_lsm_put(inode, lsm); - RETURN(-ENOMEM); - } + if (obdo == NULL) + GOTO(out, rc = -ENOMEM); rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock); - if (!rc) { + if (rc == 0) { if (!(obdo->o_valid & OBD_MD_FLDATAVERSION)) rc = -EOPNOTSUPP; else @@ -1848,8 +2032,9 @@ int ll_data_version(struct inode *inode, __u64 *data_version, } OBD_FREE_PTR(obdo); + EXIT; +out: ccc_inode_lsm_put(inode, lsm); - RETURN(rc); } @@ -1880,8 +2065,8 @@ static int ll_swap_layouts(struct file *file1, struct file *file2, if (!S_ISREG(llss->inode2->i_mode)) GOTO(free, rc = -EINVAL); - if (ll_permission(llss->inode1, MAY_WRITE, NULL) || - ll_permission(llss->inode2, MAY_WRITE, NULL)) + if (inode_permission(llss->inode1, MAY_WRITE) || + inode_permission(llss->inode2, MAY_WRITE)) GOTO(free, rc = -EPERM); if (llss->inode2->i_sb != llss->inode1->i_sb) @@ -1961,12 +2146,12 @@ static int ll_swap_layouts(struct file *file1, struct file *file2, rc = -ENOMEM; op_data = ll_prep_md_op_data(NULL, llss->inode1, llss->inode2, NULL, 0, 0, LUSTRE_OPC_ANY, &msl); - if (op_data != NULL) { - rc = obd_iocontrol(LL_IOC_LOV_SWAP_LAYOUTS, - ll_i2mdexp(llss->inode1), - sizeof(*op_data), op_data, NULL); - ll_finish_md_op_data(op_data); - } + if (IS_ERR(op_data)) + GOTO(free, rc = PTR_ERR(op_data)); + + rc = obd_iocontrol(LL_IOC_LOV_SWAP_LAYOUTS, ll_i2mdexp(llss->inode1), + sizeof(*op_data), op_data, NULL); + ll_finish_md_op_data(op_data); putgl: if (gid != 0) { @@ -2063,7 +2248,7 @@ long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg) struct file *file2; struct lustre_swap_layouts lsl; - if (cfs_copy_from_user(&lsl, (char *)arg, + if (copy_from_user(&lsl, (char *)arg, sizeof(struct lustre_swap_layouts))) RETURN(-EFAULT); @@ -2160,9 +2345,9 @@ long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg) op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0, LUSTRE_OPC_ANY, hus); - if (op_data == NULL) { + if (IS_ERR(op_data)) { OBD_FREE_PTR(hus); - RETURN(-ENOMEM); + RETURN(PTR_ERR(op_data)); } rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data), @@ -2198,9 +2383,9 @@ long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg) op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0, LUSTRE_OPC_ANY, hss); - if (op_data == NULL) { + if (IS_ERR(op_data)) { OBD_FREE_PTR(hss); - RETURN(-ENOMEM); + RETURN(PTR_ERR(op_data)); } rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data), @@ -2222,21 +2407,105 @@ long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg) op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0, LUSTRE_OPC_ANY, hca); - if (op_data == NULL) { + if (IS_ERR(op_data)) { OBD_FREE_PTR(hca); - RETURN(-ENOMEM); + RETURN(PTR_ERR(op_data)); } rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data), op_data, NULL); - if (cfs_copy_to_user((char *)arg, hca, sizeof(*hca))) + if (copy_to_user((char *)arg, hca, sizeof(*hca))) rc = -EFAULT; ll_finish_md_op_data(op_data); OBD_FREE_PTR(hca); RETURN(rc); } + case LL_IOC_SET_LEASE: { + struct ll_inode_info *lli = ll_i2info(inode); + struct obd_client_handle *och = NULL; + bool lease_broken; + fmode_t mode = 0; + + switch (arg) { + case F_WRLCK: + if (!(file->f_mode & FMODE_WRITE)) + RETURN(-EPERM); + mode = FMODE_WRITE; + break; + case F_RDLCK: + if (!(file->f_mode & FMODE_READ)) + RETURN(-EPERM); + mode = FMODE_READ; + break; + case F_UNLCK: + mutex_lock(&lli->lli_och_mutex); + if (fd->fd_lease_och != NULL) { + och = fd->fd_lease_och; + fd->fd_lease_och = NULL; + } + mutex_unlock(&lli->lli_och_mutex); + + if (och != NULL) { + mode = och->och_flags &(FMODE_READ|FMODE_WRITE); + rc = ll_lease_close(och, inode, &lease_broken); + if (rc == 0 && lease_broken) + mode = 0; + } else { + rc = -ENOLCK; + } + + /* return the type of lease or error */ + RETURN(rc < 0 ? rc : (int)mode); + default: + RETURN(-EINVAL); + } + + CDEBUG(D_INODE, "Set lease with mode %d\n", mode); + + /* apply for lease */ + och = ll_lease_open(inode, file, mode); + if (IS_ERR(och)) + RETURN(PTR_ERR(och)); + + rc = 0; + mutex_lock(&lli->lli_och_mutex); + if (fd->fd_lease_och == NULL) { + fd->fd_lease_och = och; + och = NULL; + } + mutex_unlock(&lli->lli_och_mutex); + if (och != NULL) { + /* impossible now that only excl is supported for now */ + ll_lease_close(och, inode, &lease_broken); + rc = -EBUSY; + } + RETURN(rc); + } + case LL_IOC_GET_LEASE: { + struct ll_inode_info *lli = ll_i2info(inode); + struct ldlm_lock *lock = NULL; + + rc = 0; + mutex_lock(&lli->lli_och_mutex); + if (fd->fd_lease_och != NULL) { + struct obd_client_handle *och = fd->fd_lease_och; + + lock = ldlm_handle2lock(&och->och_lease_handle); + if (lock != NULL) { + lock_res_and_lock(lock); + if (!ldlm_is_cancel(lock)) + rc = och->och_flags & + (FMODE_READ | FMODE_WRITE); + unlock_res_and_lock(lock); + ldlm_lock_put(lock); + } + } + mutex_unlock(&lli->lli_och_mutex); + + RETURN(rc); + } default: { int err; @@ -2373,7 +2642,7 @@ int ll_flush(struct file *file, fl_owner_t id) * Return how many pages have been written. */ int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end, - enum cl_fsync_mode mode) + enum cl_fsync_mode mode, int ignore_layout) { struct cl_env_nest nest; struct lu_env *env; @@ -2395,7 +2664,7 @@ int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end, io = ccc_env_thread_io(env); io->ci_obj = cl_i2info(inode)->lli_clob; - io->ci_ignore_layout = 1; + io->ci_ignore_layout = ignore_layout; /* initialize parameters for sync */ fio = &io->u.ci_fsync; @@ -2483,7 +2752,7 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync) struct ll_file_data *fd = LUSTRE_FPRIVATE(file); err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF, - CL_FSYNC_ALL); + CL_FSYNC_ALL, 0); if (rc == 0 && err < 0) rc = err; if (rc < 0) @@ -2500,18 +2769,20 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync) int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) { - struct inode *inode = file->f_dentry->d_inode; - struct ll_sb_info *sbi = ll_i2sbi(inode); - struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK, - .ei_cb_cp =ldlm_flock_completion_ast, - .ei_cbdata = file_lock }; - struct md_op_data *op_data; - struct lustre_handle lockh = {0}; - ldlm_policy_data_t flock = {{0}}; - int flags = 0; - int rc; + struct inode *inode = file->f_dentry->d_inode; + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ldlm_enqueue_info einfo = { + .ei_type = LDLM_FLOCK, + .ei_cb_cp = ldlm_flock_completion_ast, + .ei_cbdata = file_lock, + }; + struct md_op_data *op_data; + struct lustre_handle lockh = {0}; + ldlm_policy_data_t flock = {{0}}; + int flags = 0; + int rc; int rc2 = 0; - ENTRY; + ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n", inode->i_ino, file_lock); @@ -2661,11 +2932,11 @@ int ll_have_md_lock(struct inode *inode, __u64 *bits, ldlm_mode_t l_req_mode) CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid), ldlm_lockname[mode]); - flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK; - for (i = 0; i < MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) { - policy.l_inodebits.bits = *bits & (1 << i); - if (policy.l_inodebits.bits == 0) - continue; + flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK; + for (i = 0; i <= MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) { + policy.l_inodebits.bits = *bits & (1 << i); + if (policy.l_inodebits.bits == 0) + continue; if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy, mode, &lockh)) { @@ -2777,7 +3048,7 @@ int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it, here to preserve get_cwd functionality on 2.6. Bug 10503 */ if (!dentry->d_inode->i_nlink) - d_lustre_invalidate(dentry); + d_lustre_invalidate(dentry, 0); ll_lookup_finish_locks(&oit, dentry); } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) { @@ -2818,13 +3089,13 @@ out: } int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it, - __u64 ibits) + __u64 ibits) { - struct inode *inode = dentry->d_inode; - int rc; - ENTRY; + struct inode *inode = dentry->d_inode; + int rc; + ENTRY; - rc = __ll_inode_revalidate_it(dentry, it, ibits); + rc = __ll_inode_revalidate_it(dentry, it, ibits); if (rc != 0) RETURN(rc); @@ -2834,9 +3105,17 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it, LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime; LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime; } else { - rc = ll_glimpse_size(inode); + /* In case of restore, the MDT has the right size and has + * already send it back without granting the layout lock, + * inode is up-to-date so glimpse is useless. + * Also to glimpse we need the layout, in case of a running + * restore the MDT holds the layout lock so the glimpse will + * block up to the end of restore (getattr will block) + */ + if (!(ll_i2info(inode)->lli_flags & LLIF_FILE_RESTORING)) + rc = ll_glimpse_size(inode); } - RETURN(rc); + RETURN(rc); } int ll_getattr_it(struct vfsmount *mnt, struct dentry *de, @@ -3030,12 +3309,7 @@ struct file_operations ll_file_operations = { .release = ll_file_release, .mmap = ll_file_mmap, .llseek = ll_file_seek, -#ifdef HAVE_KERNEL_SENDFILE - .sendfile = ll_file_sendfile, -#endif -#ifdef HAVE_KERNEL_SPLICE_READ .splice_read = ll_file_splice_read, -#endif .fsync = ll_fsync, .flush = ll_flush }; @@ -3050,12 +3324,7 @@ struct file_operations ll_file_operations_flock = { .release = ll_file_release, .mmap = ll_file_mmap, .llseek = ll_file_seek, -#ifdef HAVE_KERNEL_SENDFILE - .sendfile = ll_file_sendfile, -#endif -#ifdef HAVE_KERNEL_SPLICE_READ .splice_read = ll_file_splice_read, -#endif .fsync = ll_fsync, .flush = ll_flush, .flock = ll_file_flock, @@ -3073,12 +3342,7 @@ struct file_operations ll_file_operations_noflock = { .release = ll_file_release, .mmap = ll_file_mmap, .llseek = ll_file_seek, -#ifdef HAVE_KERNEL_SENDFILE - .sendfile = ll_file_sendfile, -#endif -#ifdef HAVE_KERNEL_SPLICE_READ .splice_read = ll_file_splice_read, -#endif .fsync = ll_fsync, .flush = ll_flush, .flock = ll_file_noflock, @@ -3235,6 +3499,74 @@ int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf) RETURN(result); } +/* Fetch layout from MDT with getxattr request, if it's not ready yet */ +static int ll_layout_fetch(struct inode *inode, struct ldlm_lock *lock) + +{ + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct obd_capa *oc; + struct ptlrpc_request *req; + struct mdt_body *body; + void *lvbdata; + void *lmm; + int lmmsize; + int rc; + ENTRY; + + CDEBUG(D_INODE, DFID" LVB_READY=%d l_lvb_data=%p l_lvb_len=%d\n", + PFID(ll_inode2fid(inode)), !!(lock->l_flags & LDLM_FL_LVB_READY), + lock->l_lvb_data, lock->l_lvb_len); + + if ((lock->l_lvb_data != NULL) && (lock->l_flags & LDLM_FL_LVB_READY)) + RETURN(0); + + /* if layout lock was granted right away, the layout is returned + * within DLM_LVB of dlm reply; otherwise if the lock was ever + * blocked and then granted via completion ast, we have to fetch + * layout here. Please note that we can't use the LVB buffer in + * completion AST because it doesn't have a large enough buffer */ + oc = ll_mdscapa_get(inode); + rc = ll_get_max_mdsize(sbi, &lmmsize); + if (rc == 0) + rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, + OBD_MD_FLXATTR, XATTR_NAME_LOV, NULL, 0, + lmmsize, 0, &req); + capa_put(oc); + if (rc < 0) + RETURN(rc); + + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + if (body == NULL || body->eadatasize > lmmsize) + GOTO(out, rc = -EPROTO); + + lmmsize = body->eadatasize; + if (lmmsize == 0) /* empty layout */ + GOTO(out, rc = 0); + + lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA, lmmsize); + if (lmm == NULL) + GOTO(out, rc = -EFAULT); + + OBD_ALLOC_LARGE(lvbdata, lmmsize); + if (lvbdata == NULL) + GOTO(out, rc = -ENOMEM); + + memcpy(lvbdata, lmm, lmmsize); + lock_res_and_lock(lock); + if (lock->l_lvb_data != NULL) + OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len); + + lock->l_lvb_data = lvbdata; + lock->l_lvb_len = lmmsize; + unlock_res_and_lock(lock); + + EXIT; + +out: + ptlrpc_req_finished(req); + return rc; +} + /** * Apply the layout to the inode. Layout lock is held and will be released * in this function. @@ -3249,6 +3581,7 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode, struct cl_object_conf conf; int rc = 0; bool lvb_ready; + bool wait_layout = false; ENTRY; LASSERT(lustre_handle_is_used(lockh)); @@ -3258,16 +3591,18 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode, LASSERT(ldlm_has_layout(lock)); LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n", - inode, PFID(&lli->lli_fid), reconf); + inode, PFID(&lli->lli_fid), reconf); + + /* in case this is a caching lock and reinstate with new inode */ + md_set_lock_data(sbi->ll_md_exp, &lockh->cookie, inode, NULL); lock_res_and_lock(lock); lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY); unlock_res_and_lock(lock); /* checking lvb_ready is racy but this is okay. The worst case is * that multi processes may configure the file on the same time. */ - if (lvb_ready || !reconf) { - LDLM_LOCK_PUT(lock); + if (lvb_ready || !reconf) { rc = -ENODATA; if (lvb_ready) { /* layout_gen must be valid if layout lock is not @@ -3275,10 +3610,13 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode, *gen = lli->lli_layout_gen; rc = 0; } - ldlm_lock_decref(lockh, mode); - RETURN(rc); + GOTO(out, rc); } + rc = ll_layout_fetch(inode, lock); + if (rc < 0) + GOTO(out, rc); + /* for layout lock, lmm is returned in lock's lvb. * lvb_data is immutable if the lock is held so it's safe to access it * without res lock. See the description in ldlm_lock_decref_internal() @@ -3297,11 +3635,8 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode, PFID(&lli->lli_fid), rc); } } - if (rc < 0) { - LDLM_LOCK_PUT(lock); - ldlm_lock_decref(lockh, mode); - RETURN(rc); - } + if (rc < 0) + GOTO(out, rc); /* set layout to file. Unlikely this will fail as old layout was * surely eliminated */ @@ -3311,15 +3646,20 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode, conf.coc_lock = lock; conf.u.coc_md = &md; rc = ll_layout_conf(inode, &conf); - LDLM_LOCK_PUT(lock); - - ldlm_lock_decref(lockh, mode); if (md.lsm != NULL) obd_free_memmd(sbi->ll_dt_exp, &md.lsm); + /* refresh layout failed, need to wait */ + wait_layout = rc == -EBUSY; + EXIT; + +out: + LDLM_LOCK_PUT(lock); + ldlm_lock_decref(lockh, mode); + /* wait for IO to complete if it's still being used. */ - if (rc == -EBUSY) { + if (wait_layout) { CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n", ll_get_fsname(inode->i_sb, NULL, 0), inode, PFID(&lli->lli_fid)); @@ -3334,7 +3674,6 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode, CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n", PFID(&lli->lli_fid), rc); } - RETURN(rc); } @@ -3359,15 +3698,16 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen) struct lookup_intent it; struct lustre_handle lockh; ldlm_mode_t mode; - struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS, - .ei_mode = LCK_CR, - .ei_cb_bl = ll_md_blocking_ast, - .ei_cb_cp = ldlm_completion_ast, - .ei_cbdata = NULL }; + struct ldlm_enqueue_info einfo = { + .ei_type = LDLM_IBITS, + .ei_mode = LCK_CR, + .ei_cb_bl = ll_md_blocking_ast, + .ei_cb_cp = ldlm_completion_ast, + }; int rc; ENTRY; - *gen = LL_LAYOUT_GEN_NONE; + *gen = lli->lli_layout_gen; if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK)) RETURN(0); @@ -3426,8 +3766,6 @@ again: ll_finish_md_op_data(op_data); - md_set_lock_data(sbi->ll_md_exp, &it.d.lustre.it_lock_handle, inode, NULL); - mode = it.d.lustre.it_lock_mode; it.d.lustre.it_lock_mode = 0; ll_intent_drop_lock(&it); @@ -3443,3 +3781,32 @@ again: RETURN(rc); } + +/** + * This function send a restore request to the MDT + */ +int ll_layout_restore(struct inode *inode) +{ + struct hsm_user_request *hur; + int len, rc; + ENTRY; + + len = sizeof(struct hsm_user_request) + + sizeof(struct hsm_user_item); + OBD_ALLOC(hur, len); + if (hur == NULL) + RETURN(-ENOMEM); + + hur->hur_request.hr_action = HUA_RESTORE; + hur->hur_request.hr_archive_id = 0; + hur->hur_request.hr_flags = 0; + memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid, + sizeof(hur->hur_user_item[0].hui_fid)); + hur->hur_user_item[0].hui_extent.length = -1; + hur->hur_request.hr_itemcount = 1; + rc = obd_iocontrol(LL_IOC_HSM_REQUEST, cl_i2sbi(inode)->ll_md_exp, + len, hur, NULL); + OBD_FREE(hur, len); + RETURN(rc); +} +