Whamcloud - gitweb
LU-12349 llite: console message for disabled flock call
[fs/lustre-release.git] / lustre / llite / file.c
index e340c71..6bbe995 100644 (file)
@@ -61,6 +61,7 @@ struct split_param {
 struct pcc_param {
        __u64   pa_data_version;
        __u32   pa_archive_id;
+       __u32   pa_layout_gen;
 };
 
 static int
@@ -237,6 +238,12 @@ static int ll_close_inode_openhandle(struct inode *inode,
                body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
                if (!(body->mbo_valid & OBD_MD_CLOSE_INTENT_EXECED))
                        rc = -EBUSY;
+
+               if (bias & MDS_PCC_ATTACH) {
+                       struct pcc_param *param = data;
+
+                       param->pa_layout_gen = body->mbo_layout_gen;
+               }
        }
 
        ll_finish_md_op_data(op_data);
@@ -431,6 +438,7 @@ void ll_dom_finish_open(struct inode *inode, struct ptlrpc_request *req,
        struct address_space *mapping = inode->i_mapping;
        struct page *vmpage;
        struct niobuf_remote *rnb;
+       struct mdt_body *body;
        char *data;
        unsigned long index, start;
        struct niobuf_local lnb;
@@ -456,18 +464,19 @@ void ll_dom_finish_open(struct inode *inode, struct ptlrpc_request *req,
        if (rnb->rnb_offset % PAGE_SIZE)
                RETURN_EXIT;
 
-       /* Server returns whole file or just file tail if it fills in
-        * reply buffer, in both cases total size should be inode size.
+       /* Server returns whole file or just file tail if it fills in reply
+        * buffer, in both cases total size should be equal to the file size.
         */
-       if (rnb->rnb_offset + rnb->rnb_len < i_size_read(inode)) {
-               CERROR("%s: server returns off/len %llu/%u < i_size %llu\n",
+       body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+       if (rnb->rnb_offset + rnb->rnb_len != body->mbo_dom_size) {
+               CERROR("%s: server returns off/len %llu/%u but size %llu\n",
                       ll_i2sbi(inode)->ll_fsname, rnb->rnb_offset,
-                      rnb->rnb_len, i_size_read(inode));
+                      rnb->rnb_len, body->mbo_dom_size);
                RETURN_EXIT;
        }
 
-       CDEBUG(D_INFO, "Get data along with open at %llu len %i, i_size %llu\n",
-              rnb->rnb_offset, rnb->rnb_len, i_size_read(inode));
+       CDEBUG(D_INFO, "Get data along with open at %llu len %i, size %llu\n",
+              rnb->rnb_offset, rnb->rnb_len, body->mbo_dom_size);
 
        data = (char *)rnb + sizeof(*rnb);
 
@@ -832,6 +841,7 @@ restart:
                if (rc)
                        GOTO(out_och_free, rc);
        }
+
        rc = pcc_file_open(inode, file);
        if (rc)
                GOTO(out_och_free, rc);
@@ -1368,7 +1378,7 @@ static bool file_is_noatime(const struct file *file)
        return false;
 }
 
-static void ll_io_init(struct cl_io *io, struct file *file, enum cl_io_type iot)
+void ll_io_init(struct cl_io *io, struct file *file, enum cl_io_type iot)
 {
        struct inode *inode = file_inode(file);
        struct ll_file_data *fd  = LUSTRE_FPRIVATE(file);
@@ -1391,6 +1401,7 @@ static void ll_io_init(struct cl_io *io, struct file *file, enum cl_io_type iot)
                io->ci_lockreq = CILR_MANDATORY;
        }
        io->ci_noatime = file_is_noatime(file);
+       io->ci_async_readahead = false;
 
        /* FLR: only use non-delay I/O for read as there is only one
         * avaliable mirror for write. */
@@ -1641,7 +1652,10 @@ static ssize_t ll_file_read_iter(struct kiocb *iocb, struct iov_iter *to)
        ssize_t result;
        ssize_t rc2;
        __u16 refcheck;
-       bool cached = false;
+       bool cached;
+
+       if (!iov_iter_count(to))
+               return 0;
 
        /**
         * Currently when PCC read failed, we do not fall back to the
@@ -1752,22 +1766,26 @@ static ssize_t ll_file_write_iter(struct kiocb *iocb, struct iov_iter *from)
        struct lu_env *env;
        ssize_t rc_tiny = 0, rc_normal;
        __u16 refcheck;
-       bool cached = false;
+       bool cached;
        int result;
 
        ENTRY;
 
+       if (!iov_iter_count(from))
+               GOTO(out, rc_normal = 0);
+
        /**
-        * When PCC write failed, we do not fall back to the normal
-        * write path, just return the error. The reason is that:
-        * PCC is actually a HSM device, and HSM does not handle the
-        * failure especially -ENOSPC due to space used out; Moreover,
-        * the fallback to normal I/O path for ENOSPC failure, needs
-        * to restore the file data to OSTs first and redo the write
-        * again, making the logic of PCC very complex.
+        * When PCC write failed, we usually do not fall back to the normal
+        * write path, just return the error. But there is a special case when
+        * returned error code is -ENOSPC due to running out of space on PCC HSM
+        * bakcend. At this time, it will fall back to normal I/O path and
+        * retry the I/O. As the file is in HSM released state, it will restore
+        * the file data to OSTs first and redo the write again. And the
+        * restore process will revoke the layout lock and detach the file
+        * from PCC cache automatically.
         */
        result = pcc_file_write_iter(iocb, from, &cached);
-       if (cached)
+       if (cached && result != -ENOSPC && result != -EDQUOT)
                return result;
 
        /* NB: we can't do direct IO for tiny writes because they use the page
@@ -1854,6 +1872,9 @@ static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
        if (result)
                RETURN(result);
 
+       if (!iov_count)
+               RETURN(0);
+
 # ifdef HAVE_IOV_ITER_INIT_DIRECTION
        iov_iter_init(&to, READ, iov, nr_segs, iov_count);
 # else /* !HAVE_IOV_ITER_INIT_DIRECTION */
@@ -1871,8 +1892,12 @@ static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count,
        struct iovec   iov = { .iov_base = buf, .iov_len = count };
        struct kiocb   kiocb;
        ssize_t        result;
+
        ENTRY;
 
+       if (!count)
+               RETURN(0);
+
        init_sync_kiocb(&kiocb, file);
        kiocb.ki_pos = *ppos;
 #ifdef HAVE_KIOCB_KI_LEFT
@@ -1903,6 +1928,9 @@ static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
        if (result)
                RETURN(result);
 
+       if (!iov_count)
+               RETURN(0);
+
 # ifdef HAVE_IOV_ITER_INIT_DIRECTION
        iov_iter_init(&from, WRITE, iov, nr_segs, iov_count);
 # else /* !HAVE_IOV_ITER_INIT_DIRECTION */
@@ -1924,6 +1952,9 @@ static ssize_t ll_file_write(struct file *file, const char __user *buf,
 
        ENTRY;
 
+       if (!count)
+               RETURN(0);
+
        init_sync_kiocb(&kiocb, file);
        kiocb.ki_pos = *ppos;
 #ifdef HAVE_KIOCB_KI_LEFT
@@ -1946,23 +1977,22 @@ static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
                                    struct pipe_inode_info *pipe, size_t count,
                                    unsigned int flags)
 {
-        struct lu_env      *env;
-        struct vvp_io_args *args;
-        ssize_t             result;
-       __u16               refcheck;
-       struct ll_file_data *fd = LUSTRE_FPRIVATE(in_file);
-       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       struct lu_env *env;
+       struct vvp_io_args *args;
+       ssize_t result;
+       __u16 refcheck;
+       bool cached;
 
         ENTRY;
 
-       /* pcc cache path */
-       if (pcc_file && file_inode(pcc_file)->i_fop->splice_read)
-               return file_inode(pcc_file)->i_fop->splice_read(pcc_file,
-                                               ppos, pipe, count, flags);
+       result = pcc_file_splice_read(in_file, ppos, pipe,
+                                     count, flags, &cached);
+       if (cached)
+               RETURN(result);
 
        ll_ras_enter(in_file);
 
-        env = cl_env_get(&refcheck);
+       env = cl_env_get(&refcheck);
         if (IS_ERR(env))
                 RETURN(PTR_ERR(env));
 
@@ -3310,8 +3340,10 @@ out:
        case LL_LEASE_PCC_ATTACH:
                if (!rc)
                        rc = rc2;
-               rc = pcc_readwrite_attach_fini(file, inode, lease_broken,
-                                              rc, attached);
+               rc = pcc_readwrite_attach_fini(file, inode,
+                                              param.pa_layout_gen,
+                                              lease_broken, rc,
+                                              attached);
                break;
        }
 
@@ -3836,6 +3868,29 @@ out_ladvise:
                rc = ll_heat_set(inode, flags);
                RETURN(rc);
        }
+       case LL_IOC_PCC_DETACH: {
+               struct lu_pcc_detach *detach;
+
+               OBD_ALLOC_PTR(detach);
+               if (detach == NULL)
+                       RETURN(-ENOMEM);
+
+               if (copy_from_user(detach,
+                                  (const struct lu_pcc_detach __user *)arg,
+                                  sizeof(*detach)))
+                       GOTO(out_detach_free, rc = -EFAULT);
+
+               if (!S_ISREG(inode->i_mode))
+                       GOTO(out_detach_free, rc = -EINVAL);
+
+               if (!inode_owner_or_capable(inode))
+                       GOTO(out_detach_free, rc = -EPERM);
+
+               rc = pcc_ioctl_detach(inode, detach->pccd_opt);
+out_detach_free:
+               OBD_FREE_PTR(detach);
+               RETURN(rc);
+       }
        case LL_IOC_PCC_STATE: {
                struct lu_pcc_state __user *ustate =
                        (struct lu_pcc_state __user *)arg;
@@ -3848,7 +3903,7 @@ out_ladvise:
                if (copy_from_user(state, ustate, sizeof(*state)))
                        GOTO(out_state, rc = -EFAULT);
 
-               rc = pcc_ioctl_state(inode, state);
+               rc = pcc_ioctl_state(file, inode, state);
                if (rc)
                        GOTO(out_state, rc);
 
@@ -4055,29 +4110,15 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync)
 #endif
        struct inode *inode = dentry->d_inode;
        struct ll_inode_info *lli = ll_i2info(inode);
-       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
        struct ptlrpc_request *req;
-       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
        int rc, err;
+
        ENTRY;
 
        CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p)\n",
               PFID(ll_inode2fid(inode)), inode);
        ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
 
-       /* pcc cache path */
-       if (pcc_file)
-#ifdef HAVE_FILE_FSYNC_4ARGS
-               return file_inode(pcc_file)->i_fop->fsync(pcc_file,
-                                       start, end, datasync);
-#elif defined(HAVE_FILE_FSYNC_2ARGS)
-               return file_inode(pcc_file)->i_fop->fsync(pcc_file,
-                                       datasync);
-#else
-               return file_inode(pcc_file)->i_fop->fsync(pcc_file,
-                                       dentry, datasync);
-#endif
-
 #ifdef HAVE_FILE_FSYNC_4ARGS
        rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
        inode_lock(inode);
@@ -4109,8 +4150,15 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync)
 
        if (S_ISREG(inode->i_mode)) {
                struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+               bool cached;
 
-               err = cl_sync_file_range(inode, start, end, CL_FSYNC_ALL, 0);
+               /* Sync metadata on MDT first, and then sync the cached data
+                * on PCC.
+                */
+               err = pcc_fsync(file, start, end, datasync, &cached);
+               if (!cached)
+                       err = cl_sync_file_range(inode, start, end,
+                                                CL_FSYNC_ALL, 0);
                if (rc == 0 && err < 0)
                        rc = err;
                if (rc < 0)
@@ -4350,7 +4398,7 @@ int ll_migrate(struct inode *parent, struct file *file, struct lmv_user_md *lum,
        if (!(exp_connect_flags2(ll_i2sbi(parent)->ll_md_exp) &
              OBD_CONNECT2_DIR_MIGRATE)) {
                if (le32_to_cpu(lum->lum_stripe_count) > 1 ||
-                   ll_i2info(child_inode)->lli_lsm_md) {
+                   ll_dir_striped(child_inode)) {
                        CERROR("%s: MDT doesn't support stripe directory "
                               "migration!\n", ll_i2sbi(parent)->ll_fsname);
                        GOTO(out_iput, rc = -EOPNOTSUPP);
@@ -4455,9 +4503,20 @@ out_iput:
 static int
 ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
 {
-        ENTRY;
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       ENTRY;
 
-        RETURN(-ENOSYS);
+       /*
+        * In order to avoid flood of warning messages, only print one message
+        * for one file. And the entire message rate on the client is limited
+        * by CDEBUG_LIMIT too.
+        */
+       if (!(fd->fd_flags & LL_FILE_FLOCK_WARNING)) {
+               fd->fd_flags |= LL_FILE_FLOCK_WARNING;
+               CDEBUG_LIMIT(D_TTY | D_CONSOLE,
+                            "flock disabled, mount with '-o [local]flock' to enable\r\n");
+       }
+       RETURN(-ENOSYS);
 }
 
 /**
@@ -4537,8 +4596,7 @@ static int ll_inode_revalidate_fini(struct inode *inode, int rc)
                /* If it is striped directory, and there is bad stripe
                 * Let's revalidate the dentry again, instead of returning
                 * error */
-               if (S_ISDIR(inode->i_mode) &&
-                   ll_i2info(inode)->lli_lsm_md != NULL)
+               if (ll_dir_striped(inode))
                        return 0;
 
                /* This path cannot be hit for regular files unless in
@@ -4615,8 +4673,7 @@ static int ll_merge_md_attr(struct inode *inode)
 
        LASSERT(lli->lli_lsm_md != NULL);
 
-       /* foreign dir is not striped dir */
-       if (lli->lli_lsm_md->lsm_md_magic == LMV_MAGIC_FOREIGN)
+       if (!lmv_dir_striped(lli->lli_lsm_md))
                RETURN(0);
 
        down_read(&lli->lli_lsm_sem);
@@ -4651,11 +4708,12 @@ int ll_getattr_dentry(struct dentry *de, struct kstat *stat)
                RETURN(rc);
 
        if (S_ISREG(inode->i_mode)) {
-               bool cached = false;
+               bool cached;
 
                rc = pcc_inode_getattr(inode, &cached);
                if (cached && rc < 0)
                        RETURN(rc);
+
                /* In case of restore, the MDT has the right size and has
                 * already send it back without granting the layout lock,
                 * inode is up-to-date so glimpse is useless.
@@ -4670,8 +4728,7 @@ int ll_getattr_dentry(struct dentry *de, struct kstat *stat)
                }
        } else {
                /* If object isn't regular a file then don't validate size. */
-               if (S_ISDIR(inode->i_mode) &&
-                   lli->lli_lsm_md != NULL) {
+               if (ll_dir_striped(inode)) {
                        rc = ll_merge_md_attr(inode);
                        if (rc < 0)
                                RETURN(rc);
@@ -5334,7 +5391,8 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
                /* mostly layout lock is caching on the local side, so try to
                 * match it before grabbing layout lock mutex. */
                mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0,
-                                      LCK_CR | LCK_CW | LCK_PR | LCK_PW);
+                                      LCK_CR | LCK_CW | LCK_PR |
+                                      LCK_PW | LCK_EX);
                if (mode != 0) { /* hit cached lock */
                        rc = ll_layout_lock_set(&lockh, mode, inode);
                        if (rc == -EAGAIN)