Whamcloud - gitweb
LU-9749 llite: Reduce overhead for ll_do_fast_read
[fs/lustre-release.git] / lustre / llite / file.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/llite/file.c
33  *
34  * Author: Peter Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_LLITE
40 #include <lustre_dlm.h>
41 #include <linux/pagemap.h>
42 #include <linux/file.h>
43 #include <linux/sched.h>
44 #include <linux/user_namespace.h>
45 #ifdef HAVE_UIDGID_HEADER
46 # include <linux/uidgid.h>
47 #endif
48 #include <lustre/ll_fiemap.h>
49
50 #include <uapi/linux/lustre_ioctl.h>
51 #include <lustre_swab.h>
52
53 #include "cl_object.h"
54 #include "llite_internal.h"
55 #include "vvp_internal.h"
56
57 static int
58 ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg);
59
60 static int ll_lease_close(struct obd_client_handle *och, struct inode *inode,
61                           bool *lease_broken);
62
63 static enum llioc_iter
64 ll_iocontrol_call(struct inode *inode, struct file *file,
65                   unsigned int cmd, unsigned long arg, int *rcp);
66
67 static struct ll_file_data *ll_file_data_get(void)
68 {
69         struct ll_file_data *fd;
70
71         OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, GFP_NOFS);
72         if (fd == NULL)
73                 return NULL;
74
75         fd->fd_write_failed = false;
76
77         return fd;
78 }
79
80 static void ll_file_data_put(struct ll_file_data *fd)
81 {
82         if (fd != NULL)
83                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
84 }
85
86 /**
87  * Packs all the attributes into @op_data for the CLOSE rpc.
88  */
89 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
90                              struct obd_client_handle *och)
91 {
92         ENTRY;
93
94         ll_prep_md_op_data(op_data, inode, NULL, NULL,
95                            0, 0, LUSTRE_OPC_ANY, NULL);
96
97         op_data->op_attr.ia_mode = inode->i_mode;
98         op_data->op_attr.ia_atime = inode->i_atime;
99         op_data->op_attr.ia_mtime = inode->i_mtime;
100         op_data->op_attr.ia_ctime = inode->i_ctime;
101         op_data->op_attr.ia_size = i_size_read(inode);
102         op_data->op_attr.ia_valid |= ATTR_MODE | ATTR_ATIME | ATTR_ATIME_SET |
103                                      ATTR_MTIME | ATTR_MTIME_SET |
104                                      ATTR_CTIME | ATTR_CTIME_SET;
105         op_data->op_attr_blocks = inode->i_blocks;
106         op_data->op_attr_flags = ll_inode_to_ext_flags(inode->i_flags);
107         op_data->op_handle = och->och_fh;
108
109         if (och->och_flags & FMODE_WRITE &&
110             ll_file_test_and_clear_flag(ll_i2info(inode), LLIF_DATA_MODIFIED))
111                 /* For HSM: if inode data has been modified, pack it so that
112                  * MDT can set data dirty flag in the archive. */
113                 op_data->op_bias |= MDS_DATA_MODIFIED;
114
115         EXIT;
116 }
117
118 /**
119  * Perform a close, possibly with a bias.
120  * The meaning of "data" depends on the value of "bias".
121  *
122  * If \a bias is MDS_HSM_RELEASE then \a data is a pointer to the data version.
123  * If \a bias is MDS_CLOSE_LAYOUT_SWAP then \a data is a pointer to the inode to
124  * swap layouts with.
125  */
126 static int ll_close_inode_openhandle(struct inode *inode,
127                                      struct obd_client_handle *och,
128                                      enum mds_op_bias bias, void *data)
129 {
130         struct obd_export *md_exp = ll_i2mdexp(inode);
131         const struct ll_inode_info *lli = ll_i2info(inode);
132         struct md_op_data *op_data;
133         struct ptlrpc_request *req = NULL;
134         int rc;
135         ENTRY;
136
137         if (class_exp2obd(md_exp) == NULL) {
138                 CERROR("%s: invalid MDC connection handle closing "DFID"\n",
139                        ll_get_fsname(inode->i_sb, NULL, 0),
140                        PFID(&lli->lli_fid));
141                 GOTO(out, rc = 0);
142         }
143
144         OBD_ALLOC_PTR(op_data);
145         /* We leak openhandle and request here on error, but not much to be
146          * done in OOM case since app won't retry close on error either. */
147         if (op_data == NULL)
148                 GOTO(out, rc = -ENOMEM);
149
150         ll_prepare_close(inode, op_data, och);
151         switch (bias) {
152         case MDS_CLOSE_LAYOUT_SWAP:
153                 LASSERT(data != NULL);
154                 op_data->op_bias |= MDS_CLOSE_LAYOUT_SWAP;
155                 op_data->op_data_version = 0;
156                 op_data->op_lease_handle = och->och_lease_handle;
157                 op_data->op_fid2 = *ll_inode2fid(data);
158                 break;
159
160         case MDS_HSM_RELEASE:
161                 LASSERT(data != NULL);
162                 op_data->op_bias |= MDS_HSM_RELEASE;
163                 op_data->op_data_version = *(__u64 *)data;
164                 op_data->op_lease_handle = och->och_lease_handle;
165                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
166                 break;
167
168         default:
169                 LASSERT(data == NULL);
170                 break;
171         }
172
173         rc = md_close(md_exp, op_data, och->och_mod, &req);
174         if (rc != 0 && rc != -EINTR)
175                 CERROR("%s: inode "DFID" mdc close failed: rc = %d\n",
176                        md_exp->exp_obd->obd_name, PFID(&lli->lli_fid), rc);
177
178         if (rc == 0 &&
179             op_data->op_bias & (MDS_HSM_RELEASE | MDS_CLOSE_LAYOUT_SWAP)) {
180                 struct mdt_body *body;
181
182                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
183                 if (!(body->mbo_valid & OBD_MD_CLOSE_INTENT_EXECED))
184                         rc = -EBUSY;
185         }
186
187         ll_finish_md_op_data(op_data);
188         EXIT;
189 out:
190
191         md_clear_open_replay_data(md_exp, och);
192         och->och_fh.cookie = DEAD_HANDLE_MAGIC;
193         OBD_FREE_PTR(och);
194
195         ptlrpc_req_finished(req);       /* This is close request */
196         return rc;
197 }
198
199 int ll_md_real_close(struct inode *inode, fmode_t fmode)
200 {
201         struct ll_inode_info *lli = ll_i2info(inode);
202         struct obd_client_handle **och_p;
203         struct obd_client_handle *och;
204         __u64 *och_usecount;
205         int rc = 0;
206         ENTRY;
207
208         if (fmode & FMODE_WRITE) {
209                 och_p = &lli->lli_mds_write_och;
210                 och_usecount = &lli->lli_open_fd_write_count;
211         } else if (fmode & FMODE_EXEC) {
212                 och_p = &lli->lli_mds_exec_och;
213                 och_usecount = &lli->lli_open_fd_exec_count;
214         } else {
215                 LASSERT(fmode & FMODE_READ);
216                 och_p = &lli->lli_mds_read_och;
217                 och_usecount = &lli->lli_open_fd_read_count;
218         }
219
220         mutex_lock(&lli->lli_och_mutex);
221         if (*och_usecount > 0) {
222                 /* There are still users of this handle, so skip
223                  * freeing it. */
224                 mutex_unlock(&lli->lli_och_mutex);
225                 RETURN(0);
226         }
227
228         och = *och_p;
229         *och_p = NULL;
230         mutex_unlock(&lli->lli_och_mutex);
231
232         if (och != NULL) {
233                 /* There might be a race and this handle may already
234                  * be closed. */
235                 rc = ll_close_inode_openhandle(inode, och, 0, NULL);
236         }
237
238         RETURN(rc);
239 }
240
241 static int ll_md_close(struct inode *inode, struct file *file)
242 {
243         union ldlm_policy_data policy = {
244                 .l_inodebits    = { MDS_INODELOCK_OPEN },
245         };
246         __u64 flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
247         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
248         struct ll_inode_info *lli = ll_i2info(inode);
249         struct lustre_handle lockh;
250         enum ldlm_mode lockmode;
251         int rc = 0;
252         ENTRY;
253
254         /* clear group lock, if present */
255         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
256                 ll_put_grouplock(inode, file, fd->fd_grouplock.lg_gid);
257
258         if (fd->fd_lease_och != NULL) {
259                 bool lease_broken;
260
261                 /* Usually the lease is not released when the
262                  * application crashed, we need to release here. */
263                 rc = ll_lease_close(fd->fd_lease_och, inode, &lease_broken);
264                 CDEBUG(rc ? D_ERROR : D_INODE, "Clean up lease "DFID" %d/%d\n",
265                         PFID(&lli->lli_fid), rc, lease_broken);
266
267                 fd->fd_lease_och = NULL;
268         }
269
270         if (fd->fd_och != NULL) {
271                 rc = ll_close_inode_openhandle(inode, fd->fd_och, 0, NULL);
272                 fd->fd_och = NULL;
273                 GOTO(out, rc);
274         }
275
276         /* Let's see if we have good enough OPEN lock on the file and if
277            we can skip talking to MDS */
278         mutex_lock(&lli->lli_och_mutex);
279         if (fd->fd_omode & FMODE_WRITE) {
280                 lockmode = LCK_CW;
281                 LASSERT(lli->lli_open_fd_write_count);
282                 lli->lli_open_fd_write_count--;
283         } else if (fd->fd_omode & FMODE_EXEC) {
284                 lockmode = LCK_PR;
285                 LASSERT(lli->lli_open_fd_exec_count);
286                 lli->lli_open_fd_exec_count--;
287         } else {
288                 lockmode = LCK_CR;
289                 LASSERT(lli->lli_open_fd_read_count);
290                 lli->lli_open_fd_read_count--;
291         }
292         mutex_unlock(&lli->lli_och_mutex);
293
294         if (!md_lock_match(ll_i2mdexp(inode), flags, ll_inode2fid(inode),
295                            LDLM_IBITS, &policy, lockmode, &lockh))
296                 rc = ll_md_real_close(inode, fd->fd_omode);
297
298 out:
299         LUSTRE_FPRIVATE(file) = NULL;
300         ll_file_data_put(fd);
301
302         RETURN(rc);
303 }
304
305 /* While this returns an error code, fput() the caller does not, so we need
306  * to make every effort to clean up all of our state here.  Also, applications
307  * rarely check close errors and even if an error is returned they will not
308  * re-try the close call.
309  */
310 int ll_file_release(struct inode *inode, struct file *file)
311 {
312         struct ll_file_data *fd;
313         struct ll_sb_info *sbi = ll_i2sbi(inode);
314         struct ll_inode_info *lli = ll_i2info(inode);
315         int rc;
316         ENTRY;
317
318         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p)\n",
319                PFID(ll_inode2fid(inode)), inode);
320
321         if (inode->i_sb->s_root != file_dentry(file))
322                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
323         fd = LUSTRE_FPRIVATE(file);
324         LASSERT(fd != NULL);
325
326         /* The last ref on @file, maybe not the the owner pid of statahead,
327          * because parent and child process can share the same file handle. */
328         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd)
329                 ll_deauthorize_statahead(inode, fd);
330
331         if (inode->i_sb->s_root == file_dentry(file)) {
332                 LUSTRE_FPRIVATE(file) = NULL;
333                 ll_file_data_put(fd);
334                 RETURN(0);
335         }
336
337         if (!S_ISDIR(inode->i_mode)) {
338                 if (lli->lli_clob != NULL)
339                         lov_read_and_clear_async_rc(lli->lli_clob);
340                 lli->lli_async_rc = 0;
341         }
342
343         rc = ll_md_close(inode, file);
344
345         if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
346                 libcfs_debug_dumplog();
347
348         RETURN(rc);
349 }
350
351 static int ll_intent_file_open(struct dentry *de, void *lmm, int lmmsize,
352                                 struct lookup_intent *itp)
353 {
354         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
355         struct dentry *parent = de->d_parent;
356         const char *name = NULL;
357         int len = 0;
358         struct md_op_data *op_data;
359         struct ptlrpc_request *req = NULL;
360         int rc;
361         ENTRY;
362
363         LASSERT(parent != NULL);
364         LASSERT(itp->it_flags & MDS_OPEN_BY_FID);
365
366         /* if server supports open-by-fid, or file name is invalid, don't pack
367          * name in open request */
368         if (!(exp_connect_flags(sbi->ll_md_exp) & OBD_CONNECT_OPEN_BY_FID) &&
369             lu_name_is_valid_2(de->d_name.name, de->d_name.len)) {
370                 name = de->d_name.name;
371                 len = de->d_name.len;
372         }
373
374         op_data = ll_prep_md_op_data(NULL, parent->d_inode, de->d_inode,
375                                      name, len, 0, LUSTRE_OPC_ANY, NULL);
376         if (IS_ERR(op_data))
377                 RETURN(PTR_ERR(op_data));
378         op_data->op_data = lmm;
379         op_data->op_data_size = lmmsize;
380
381         rc = md_intent_lock(sbi->ll_md_exp, op_data, itp, &req,
382                             &ll_md_blocking_ast, 0);
383         ll_finish_md_op_data(op_data);
384         if (rc == -ESTALE) {
385                 /* reason for keep own exit path - don`t flood log
386                  * with messages with -ESTALE errors.
387                  */
388                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
389                      it_open_error(DISP_OPEN_OPEN, itp))
390                         GOTO(out, rc);
391                 ll_release_openhandle(de, itp);
392                 GOTO(out, rc);
393         }
394
395         if (it_disposition(itp, DISP_LOOKUP_NEG))
396                 GOTO(out, rc = -ENOENT);
397
398         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
399                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
400                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
401                 GOTO(out, rc);
402         }
403
404         rc = ll_prep_inode(&de->d_inode, req, NULL, itp);
405         if (!rc && itp->it_lock_mode)
406                 ll_set_lock_data(sbi->ll_md_exp, de->d_inode, itp, NULL);
407
408 out:
409         ptlrpc_req_finished(req);
410         ll_intent_drop_lock(itp);
411
412         /* We did open by fid, but by the time we got to the server,
413          * the object disappeared. If this is a create, we cannot really
414          * tell the userspace that the file it was trying to create
415          * does not exist. Instead let's return -ESTALE, and the VFS will
416          * retry the create with LOOKUP_REVAL that we are going to catch
417          * in ll_revalidate_dentry() and use lookup then.
418          */
419         if (rc == -ENOENT && itp->it_op & IT_CREAT)
420                 rc = -ESTALE;
421
422         RETURN(rc);
423 }
424
425 static int ll_och_fill(struct obd_export *md_exp, struct lookup_intent *it,
426                        struct obd_client_handle *och)
427 {
428         struct mdt_body *body;
429
430         body = req_capsule_server_get(&it->it_request->rq_pill, &RMF_MDT_BODY);
431         och->och_fh = body->mbo_handle;
432         och->och_fid = body->mbo_fid1;
433         och->och_lease_handle.cookie = it->it_lock_handle;
434         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
435         och->och_flags = it->it_flags;
436
437         return md_set_open_replay_data(md_exp, och, it);
438 }
439
440 static int ll_local_open(struct file *file, struct lookup_intent *it,
441                          struct ll_file_data *fd, struct obd_client_handle *och)
442 {
443         struct inode *inode = file_inode(file);
444         ENTRY;
445
446         LASSERT(!LUSTRE_FPRIVATE(file));
447
448         LASSERT(fd != NULL);
449
450         if (och) {
451                 int rc;
452
453                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och);
454                 if (rc != 0)
455                         RETURN(rc);
456         }
457
458         LUSTRE_FPRIVATE(file) = fd;
459         ll_readahead_init(inode, &fd->fd_ras);
460         fd->fd_omode = it->it_flags & (FMODE_READ | FMODE_WRITE | FMODE_EXEC);
461
462         /* ll_cl_context initialize */
463         rwlock_init(&fd->fd_lock);
464         INIT_LIST_HEAD(&fd->fd_lccs);
465
466         RETURN(0);
467 }
468
469 /* Open a file, and (for the very first open) create objects on the OSTs at
470  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
471  * creation or open until ll_lov_setstripe() ioctl is called.
472  *
473  * If we already have the stripe MD locally then we don't request it in
474  * md_open(), by passing a lmm_size = 0.
475  *
476  * It is up to the application to ensure no other processes open this file
477  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
478  * used.  We might be able to avoid races of that sort by getting lli_open_sem
479  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
480  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
481  */
482 int ll_file_open(struct inode *inode, struct file *file)
483 {
484         struct ll_inode_info *lli = ll_i2info(inode);
485         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
486                                           .it_flags = file->f_flags };
487         struct obd_client_handle **och_p = NULL;
488         __u64 *och_usecount = NULL;
489         struct ll_file_data *fd;
490         int rc = 0;
491         ENTRY;
492
493         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), flags %o\n",
494                PFID(ll_inode2fid(inode)), inode, file->f_flags);
495
496         it = file->private_data; /* XXX: compat macro */
497         file->private_data = NULL; /* prevent ll_local_open assertion */
498
499         fd = ll_file_data_get();
500         if (fd == NULL)
501                 GOTO(out_openerr, rc = -ENOMEM);
502
503         fd->fd_file = file;
504         if (S_ISDIR(inode->i_mode))
505                 ll_authorize_statahead(inode, fd);
506
507         if (inode->i_sb->s_root == file_dentry(file)) {
508                 LUSTRE_FPRIVATE(file) = fd;
509                 RETURN(0);
510         }
511
512         if (!it || !it->it_disposition) {
513                 /* Convert f_flags into access mode. We cannot use file->f_mode,
514                  * because everything but O_ACCMODE mask was stripped from
515                  * there */
516                 if ((oit.it_flags + 1) & O_ACCMODE)
517                         oit.it_flags++;
518                 if (file->f_flags & O_TRUNC)
519                         oit.it_flags |= FMODE_WRITE;
520
521                 /* kernel only call f_op->open in dentry_open.  filp_open calls
522                  * dentry_open after call to open_namei that checks permissions.
523                  * Only nfsd_open call dentry_open directly without checking
524                  * permissions and because of that this code below is safe. */
525                 if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
526                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
527
528                 /* We do not want O_EXCL here, presumably we opened the file
529                  * already? XXX - NFS implications? */
530                 oit.it_flags &= ~O_EXCL;
531
532                 /* bug20584, if "it_flags" contains O_CREAT, the file will be
533                  * created if necessary, then "IT_CREAT" should be set to keep
534                  * consistent with it */
535                 if (oit.it_flags & O_CREAT)
536                         oit.it_op |= IT_CREAT;
537
538                 it = &oit;
539         }
540
541 restart:
542         /* Let's see if we have file open on MDS already. */
543         if (it->it_flags & FMODE_WRITE) {
544                 och_p = &lli->lli_mds_write_och;
545                 och_usecount = &lli->lli_open_fd_write_count;
546         } else if (it->it_flags & FMODE_EXEC) {
547                 och_p = &lli->lli_mds_exec_och;
548                 och_usecount = &lli->lli_open_fd_exec_count;
549          } else {
550                 och_p = &lli->lli_mds_read_och;
551                 och_usecount = &lli->lli_open_fd_read_count;
552         }
553
554         mutex_lock(&lli->lli_och_mutex);
555         if (*och_p) { /* Open handle is present */
556                 if (it_disposition(it, DISP_OPEN_OPEN)) {
557                         /* Well, there's extra open request that we do not need,
558                            let's close it somehow. This will decref request. */
559                         rc = it_open_error(DISP_OPEN_OPEN, it);
560                         if (rc) {
561                                 mutex_unlock(&lli->lli_och_mutex);
562                                 GOTO(out_openerr, rc);
563                         }
564
565                         ll_release_openhandle(file_dentry(file), it);
566                 }
567                 (*och_usecount)++;
568
569                 rc = ll_local_open(file, it, fd, NULL);
570                 if (rc) {
571                         (*och_usecount)--;
572                         mutex_unlock(&lli->lli_och_mutex);
573                         GOTO(out_openerr, rc);
574                 }
575         } else {
576                 LASSERT(*och_usecount == 0);
577                 if (!it->it_disposition) {
578                         struct ll_dentry_data *ldd = ll_d2d(file->f_path.dentry);
579                         /* We cannot just request lock handle now, new ELC code
580                            means that one of other OPEN locks for this file
581                            could be cancelled, and since blocking ast handler
582                            would attempt to grab och_mutex as well, that would
583                            result in a deadlock */
584                         mutex_unlock(&lli->lli_och_mutex);
585                         /*
586                          * Normally called under two situations:
587                          * 1. NFS export.
588                          * 2. A race/condition on MDS resulting in no open
589                          *    handle to be returned from LOOKUP|OPEN request,
590                          *    for example if the target entry was a symlink.
591                          *
592                          *  Only fetch MDS_OPEN_LOCK if this is in NFS path,
593                          *  marked by a bit set in ll_iget_for_nfs. Clear the
594                          *  bit so that it's not confusing later callers.
595                          *
596                          *  NB; when ldd is NULL, it must have come via normal
597                          *  lookup path only, since ll_iget_for_nfs always calls
598                          *  ll_d_init().
599                          */
600                         if (ldd && ldd->lld_nfs_dentry) {
601                                 ldd->lld_nfs_dentry = 0;
602                                 it->it_flags |= MDS_OPEN_LOCK;
603                         }
604
605                          /*
606                          * Always specify MDS_OPEN_BY_FID because we don't want
607                          * to get file with different fid.
608                          */
609                         it->it_flags |= MDS_OPEN_BY_FID;
610                         rc = ll_intent_file_open(file_dentry(file), NULL, 0,
611                                                  it);
612                         if (rc)
613                                 GOTO(out_openerr, rc);
614
615                         goto restart;
616                 }
617                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
618                 if (!*och_p)
619                         GOTO(out_och_free, rc = -ENOMEM);
620
621                 (*och_usecount)++;
622
623                 /* md_intent_lock() didn't get a request ref if there was an
624                  * open error, so don't do cleanup on the request here
625                  * (bug 3430) */
626                 /* XXX (green): Should not we bail out on any error here, not
627                  * just open error? */
628                 rc = it_open_error(DISP_OPEN_OPEN, it);
629                 if (rc != 0)
630                         GOTO(out_och_free, rc);
631
632                 LASSERTF(it_disposition(it, DISP_ENQ_OPEN_REF),
633                          "inode %p: disposition %x, status %d\n", inode,
634                          it_disposition(it, ~0), it->it_status);
635
636                 rc = ll_local_open(file, it, fd, *och_p);
637                 if (rc)
638                         GOTO(out_och_free, rc);
639         }
640         mutex_unlock(&lli->lli_och_mutex);
641         fd = NULL;
642
643         /* Must do this outside lli_och_mutex lock to prevent deadlock where
644            different kind of OPEN lock for this same inode gets cancelled
645            by ldlm_cancel_lru */
646         if (!S_ISREG(inode->i_mode))
647                 GOTO(out_och_free, rc);
648
649         cl_lov_delay_create_clear(&file->f_flags);
650         GOTO(out_och_free, rc);
651
652 out_och_free:
653         if (rc) {
654                 if (och_p && *och_p) {
655                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
656                         *och_p = NULL; /* OBD_FREE writes some magic there */
657                         (*och_usecount)--;
658                 }
659                 mutex_unlock(&lli->lli_och_mutex);
660
661 out_openerr:
662                 if (lli->lli_opendir_key == fd)
663                         ll_deauthorize_statahead(inode, fd);
664                 if (fd != NULL)
665                         ll_file_data_put(fd);
666         } else {
667                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
668         }
669
670         if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
671                 ptlrpc_req_finished(it->it_request);
672                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
673         }
674
675         return rc;
676 }
677
678 static int ll_md_blocking_lease_ast(struct ldlm_lock *lock,
679                         struct ldlm_lock_desc *desc, void *data, int flag)
680 {
681         int rc;
682         struct lustre_handle lockh;
683         ENTRY;
684
685         switch (flag) {
686         case LDLM_CB_BLOCKING:
687                 ldlm_lock2handle(lock, &lockh);
688                 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
689                 if (rc < 0) {
690                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
691                         RETURN(rc);
692                 }
693                 break;
694         case LDLM_CB_CANCELING:
695                 /* do nothing */
696                 break;
697         }
698         RETURN(0);
699 }
700
701 /**
702  * When setting a lease on a file, we take ownership of the lli_mds_*_och
703  * and save it as fd->fd_och so as to force client to reopen the file even
704  * if it has an open lock in cache already.
705  */
706 static int ll_lease_och_acquire(struct inode *inode, struct file *file,
707                                 struct lustre_handle *old_handle)
708 {
709         struct ll_inode_info *lli = ll_i2info(inode);
710         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
711         struct obd_client_handle **och_p;
712         __u64 *och_usecount;
713         int rc = 0;
714         ENTRY;
715
716         /* Get the openhandle of the file */
717         mutex_lock(&lli->lli_och_mutex);
718         if (fd->fd_lease_och != NULL)
719                 GOTO(out_unlock, rc = -EBUSY);
720
721         if (fd->fd_och == NULL) {
722                 if (file->f_mode & FMODE_WRITE) {
723                         LASSERT(lli->lli_mds_write_och != NULL);
724                         och_p = &lli->lli_mds_write_och;
725                         och_usecount = &lli->lli_open_fd_write_count;
726                 } else {
727                         LASSERT(lli->lli_mds_read_och != NULL);
728                         och_p = &lli->lli_mds_read_och;
729                         och_usecount = &lli->lli_open_fd_read_count;
730                 }
731
732                 if (*och_usecount > 1)
733                         GOTO(out_unlock, rc = -EBUSY);
734
735                 fd->fd_och = *och_p;
736                 *och_usecount = 0;
737                 *och_p = NULL;
738         }
739
740         *old_handle = fd->fd_och->och_fh;
741
742         EXIT;
743 out_unlock:
744         mutex_unlock(&lli->lli_och_mutex);
745         return rc;
746 }
747
748 /**
749  * Release ownership on lli_mds_*_och when putting back a file lease.
750  */
751 static int ll_lease_och_release(struct inode *inode, struct file *file)
752 {
753         struct ll_inode_info *lli = ll_i2info(inode);
754         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
755         struct obd_client_handle **och_p;
756         struct obd_client_handle *old_och = NULL;
757         __u64 *och_usecount;
758         int rc = 0;
759         ENTRY;
760
761         mutex_lock(&lli->lli_och_mutex);
762         if (file->f_mode & FMODE_WRITE) {
763                 och_p = &lli->lli_mds_write_och;
764                 och_usecount = &lli->lli_open_fd_write_count;
765         } else {
766                 och_p = &lli->lli_mds_read_och;
767                 och_usecount = &lli->lli_open_fd_read_count;
768         }
769
770         /* The file may have been open by another process (broken lease) so
771          * *och_p is not NULL. In this case we should simply increase usecount
772          * and close fd_och.
773          */
774         if (*och_p != NULL) {
775                 old_och = fd->fd_och;
776                 (*och_usecount)++;
777         } else {
778                 *och_p = fd->fd_och;
779                 *och_usecount = 1;
780         }
781         fd->fd_och = NULL;
782         mutex_unlock(&lli->lli_och_mutex);
783
784         if (old_och != NULL)
785                 rc = ll_close_inode_openhandle(inode, old_och, 0, NULL);
786
787         RETURN(rc);
788 }
789
790 /**
791  * Acquire a lease and open the file.
792  */
793 static struct obd_client_handle *
794 ll_lease_open(struct inode *inode, struct file *file, fmode_t fmode,
795               __u64 open_flags)
796 {
797         struct lookup_intent it = { .it_op = IT_OPEN };
798         struct ll_sb_info *sbi = ll_i2sbi(inode);
799         struct md_op_data *op_data;
800         struct ptlrpc_request *req = NULL;
801         struct lustre_handle old_handle = { 0 };
802         struct obd_client_handle *och = NULL;
803         int rc;
804         int rc2;
805         ENTRY;
806
807         if (fmode != FMODE_WRITE && fmode != FMODE_READ)
808                 RETURN(ERR_PTR(-EINVAL));
809
810         if (file != NULL) {
811                 if (!(fmode & file->f_mode) || (file->f_mode & FMODE_EXEC))
812                         RETURN(ERR_PTR(-EPERM));
813
814                 rc = ll_lease_och_acquire(inode, file, &old_handle);
815                 if (rc)
816                         RETURN(ERR_PTR(rc));
817         }
818
819         OBD_ALLOC_PTR(och);
820         if (och == NULL)
821                 RETURN(ERR_PTR(-ENOMEM));
822
823         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, 0, 0,
824                                         LUSTRE_OPC_ANY, NULL);
825         if (IS_ERR(op_data))
826                 GOTO(out, rc = PTR_ERR(op_data));
827
828         /* To tell the MDT this openhandle is from the same owner */
829         op_data->op_handle = old_handle;
830
831         it.it_flags = fmode | open_flags;
832         it.it_flags |= MDS_OPEN_LOCK | MDS_OPEN_BY_FID | MDS_OPEN_LEASE;
833         rc = md_intent_lock(sbi->ll_md_exp, op_data, &it, &req,
834                             &ll_md_blocking_lease_ast,
835         /* LDLM_FL_NO_LRU: To not put the lease lock into LRU list, otherwise
836          * it can be cancelled which may mislead applications that the lease is
837          * broken;
838          * LDLM_FL_EXCL: Set this flag so that it won't be matched by normal
839          * open in ll_md_blocking_ast(). Otherwise as ll_md_blocking_lease_ast
840          * doesn't deal with openhandle, so normal openhandle will be leaked. */
841                             LDLM_FL_NO_LRU | LDLM_FL_EXCL);
842         ll_finish_md_op_data(op_data);
843         ptlrpc_req_finished(req);
844         if (rc < 0)
845                 GOTO(out_release_it, rc);
846
847         if (it_disposition(&it, DISP_LOOKUP_NEG))
848                 GOTO(out_release_it, rc = -ENOENT);
849
850         rc = it_open_error(DISP_OPEN_OPEN, &it);
851         if (rc)
852                 GOTO(out_release_it, rc);
853
854         LASSERT(it_disposition(&it, DISP_ENQ_OPEN_REF));
855         ll_och_fill(sbi->ll_md_exp, &it, och);
856
857         if (!it_disposition(&it, DISP_OPEN_LEASE)) /* old server? */
858                 GOTO(out_close, rc = -EOPNOTSUPP);
859
860         /* already get lease, handle lease lock */
861         ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
862         if (it.it_lock_mode == 0 ||
863             it.it_lock_bits != MDS_INODELOCK_OPEN) {
864                 /* open lock must return for lease */
865                 CERROR(DFID "lease granted but no open lock, %d/%llu.\n",
866                         PFID(ll_inode2fid(inode)), it.it_lock_mode,
867                         it.it_lock_bits);
868                 GOTO(out_close, rc = -EPROTO);
869         }
870
871         ll_intent_release(&it);
872         RETURN(och);
873
874 out_close:
875         /* Cancel open lock */
876         if (it.it_lock_mode != 0) {
877                 ldlm_lock_decref_and_cancel(&och->och_lease_handle,
878                                             it.it_lock_mode);
879                 it.it_lock_mode = 0;
880                 och->och_lease_handle.cookie = 0ULL;
881         }
882         rc2 = ll_close_inode_openhandle(inode, och, 0, NULL);
883         if (rc2 < 0)
884                 CERROR("%s: error closing file "DFID": %d\n",
885                        ll_get_fsname(inode->i_sb, NULL, 0),
886                        PFID(&ll_i2info(inode)->lli_fid), rc2);
887         och = NULL; /* och has been freed in ll_close_inode_openhandle() */
888 out_release_it:
889         ll_intent_release(&it);
890 out:
891         if (och != NULL)
892                 OBD_FREE_PTR(och);
893         RETURN(ERR_PTR(rc));
894 }
895
896 /**
897  * Check whether a layout swap can be done between two inodes.
898  *
899  * \param[in] inode1  First inode to check
900  * \param[in] inode2  Second inode to check
901  *
902  * \retval 0 on success, layout swap can be performed between both inodes
903  * \retval negative error code if requirements are not met
904  */
905 static int ll_check_swap_layouts_validity(struct inode *inode1,
906                                           struct inode *inode2)
907 {
908         if (!S_ISREG(inode1->i_mode) || !S_ISREG(inode2->i_mode))
909                 return -EINVAL;
910
911         if (inode_permission(inode1, MAY_WRITE) ||
912             inode_permission(inode2, MAY_WRITE))
913                 return -EPERM;
914
915         if (inode1->i_sb != inode2->i_sb)
916                 return -EXDEV;
917
918         return 0;
919 }
920
921 static int ll_swap_layouts_close(struct obd_client_handle *och,
922                                  struct inode *inode, struct inode *inode2)
923 {
924         const struct lu_fid     *fid1 = ll_inode2fid(inode);
925         const struct lu_fid     *fid2;
926         int                      rc;
927         ENTRY;
928
929         CDEBUG(D_INODE, "%s: biased close of file "DFID"\n",
930                ll_get_fsname(inode->i_sb, NULL, 0), PFID(fid1));
931
932         rc = ll_check_swap_layouts_validity(inode, inode2);
933         if (rc < 0)
934                 GOTO(out_free_och, rc);
935
936         /* We now know that inode2 is a lustre inode */
937         fid2 = ll_inode2fid(inode2);
938
939         rc = lu_fid_cmp(fid1, fid2);
940         if (rc == 0)
941                 GOTO(out_free_och, rc = -EINVAL);
942
943         /* Close the file and swap layouts between inode & inode2.
944          * NB: lease lock handle is released in mdc_close_layout_swap_pack()
945          * because we still need it to pack l_remote_handle to MDT. */
946         rc = ll_close_inode_openhandle(inode, och, MDS_CLOSE_LAYOUT_SWAP,
947                                        inode2);
948
949         och = NULL; /* freed in ll_close_inode_openhandle() */
950
951 out_free_och:
952         if (och != NULL)
953                 OBD_FREE_PTR(och);
954
955         RETURN(rc);
956 }
957
958 /**
959  * Release lease and close the file.
960  * It will check if the lease has ever broken.
961  */
962 static int ll_lease_close(struct obd_client_handle *och, struct inode *inode,
963                           bool *lease_broken)
964 {
965         struct ldlm_lock *lock;
966         bool cancelled = true;
967         int rc;
968         ENTRY;
969
970         lock = ldlm_handle2lock(&och->och_lease_handle);
971         if (lock != NULL) {
972                 lock_res_and_lock(lock);
973                 cancelled = ldlm_is_cancel(lock);
974                 unlock_res_and_lock(lock);
975                 LDLM_LOCK_PUT(lock);
976         }
977
978         CDEBUG(D_INODE, "lease for "DFID" broken? %d\n",
979                PFID(&ll_i2info(inode)->lli_fid), cancelled);
980
981         if (!cancelled)
982                 ldlm_cli_cancel(&och->och_lease_handle, 0);
983
984         if (lease_broken != NULL)
985                 *lease_broken = cancelled;
986
987         rc = ll_close_inode_openhandle(inode, och, 0, NULL);
988         RETURN(rc);
989 }
990
991 int ll_merge_attr(const struct lu_env *env, struct inode *inode)
992 {
993         struct ll_inode_info *lli = ll_i2info(inode);
994         struct cl_object *obj = lli->lli_clob;
995         struct cl_attr *attr = vvp_env_thread_attr(env);
996         s64 atime;
997         s64 mtime;
998         s64 ctime;
999         int rc = 0;
1000
1001         ENTRY;
1002
1003         ll_inode_size_lock(inode);
1004
1005         /* Merge timestamps the most recently obtained from MDS with
1006          * timestamps obtained from OSTs.
1007          *
1008          * Do not overwrite atime of inode because it may be refreshed
1009          * by file_accessed() function. If the read was served by cache
1010          * data, there is no RPC to be sent so that atime may not be
1011          * transferred to OSTs at all. MDT only updates atime at close time
1012          * if it's at least 'mdd.*.atime_diff' older.
1013          * All in all, the atime in Lustre does not strictly comply with
1014          * POSIX. Solving this problem needs to send an RPC to MDT for each
1015          * read, this will hurt performance. */
1016         if (LTIME_S(inode->i_atime) < lli->lli_atime || lli->lli_update_atime) {
1017                 LTIME_S(inode->i_atime) = lli->lli_atime;
1018                 lli->lli_update_atime = 0;
1019         }
1020         LTIME_S(inode->i_mtime) = lli->lli_mtime;
1021         LTIME_S(inode->i_ctime) = lli->lli_ctime;
1022
1023         atime = LTIME_S(inode->i_atime);
1024         mtime = LTIME_S(inode->i_mtime);
1025         ctime = LTIME_S(inode->i_ctime);
1026
1027         cl_object_attr_lock(obj);
1028         rc = cl_object_attr_get(env, obj, attr);
1029         cl_object_attr_unlock(obj);
1030
1031         if (rc != 0)
1032                 GOTO(out_size_unlock, rc);
1033
1034         if (atime < attr->cat_atime)
1035                 atime = attr->cat_atime;
1036
1037         if (ctime < attr->cat_ctime)
1038                 ctime = attr->cat_ctime;
1039
1040         if (mtime < attr->cat_mtime)
1041                 mtime = attr->cat_mtime;
1042
1043         CDEBUG(D_VFSTRACE, DFID" updating i_size %llu\n",
1044                PFID(&lli->lli_fid), attr->cat_size);
1045
1046         i_size_write(inode, attr->cat_size);
1047         inode->i_blocks = attr->cat_blocks;
1048
1049         LTIME_S(inode->i_atime) = atime;
1050         LTIME_S(inode->i_mtime) = mtime;
1051         LTIME_S(inode->i_ctime) = ctime;
1052
1053 out_size_unlock:
1054         ll_inode_size_unlock(inode);
1055
1056         RETURN(rc);
1057 }
1058
1059 static bool file_is_noatime(const struct file *file)
1060 {
1061         const struct vfsmount *mnt = file->f_path.mnt;
1062         const struct inode *inode = file_inode((struct file *)file);
1063
1064         /* Adapted from file_accessed() and touch_atime().*/
1065         if (file->f_flags & O_NOATIME)
1066                 return true;
1067
1068         if (inode->i_flags & S_NOATIME)
1069                 return true;
1070
1071         if (IS_NOATIME(inode))
1072                 return true;
1073
1074         if (mnt->mnt_flags & (MNT_NOATIME | MNT_READONLY))
1075                 return true;
1076
1077         if ((mnt->mnt_flags & MNT_NODIRATIME) && S_ISDIR(inode->i_mode))
1078                 return true;
1079
1080         if ((inode->i_sb->s_flags & MS_NODIRATIME) && S_ISDIR(inode->i_mode))
1081                 return true;
1082
1083         return false;
1084 }
1085
1086 static int ll_file_io_ptask(struct cfs_ptask *ptask);
1087
1088 static void ll_io_init(struct cl_io *io, struct file *file, enum cl_io_type iot)
1089 {
1090         struct inode *inode = file_inode(file);
1091
1092         memset(&io->u.ci_rw.rw_iter, 0, sizeof(io->u.ci_rw.rw_iter));
1093         init_sync_kiocb(&io->u.ci_rw.rw_iocb, file);
1094         io->u.ci_rw.rw_file = file;
1095         io->u.ci_rw.rw_ptask = ll_file_io_ptask;
1096         io->u.ci_rw.rw_nonblock = !!(file->f_flags & O_NONBLOCK);
1097         if (iot == CIT_WRITE) {
1098                 io->u.ci_rw.rw_append = !!(file->f_flags & O_APPEND);
1099                 io->u.ci_rw.rw_sync   = !!(file->f_flags & O_SYNC ||
1100                                            file->f_flags & O_DIRECT ||
1101                                            IS_SYNC(inode));
1102         }
1103         io->ci_obj = ll_i2info(inode)->lli_clob;
1104         io->ci_lockreq = CILR_MAYBE;
1105         if (ll_file_nolock(file)) {
1106                 io->ci_lockreq = CILR_NEVER;
1107                 io->ci_no_srvlock = 1;
1108         } else if (file->f_flags & O_APPEND) {
1109                 io->ci_lockreq = CILR_MANDATORY;
1110         }
1111         io->ci_noatime = file_is_noatime(file);
1112         if (ll_i2sbi(inode)->ll_flags & LL_SBI_PIO)
1113                 io->ci_pio = !io->u.ci_rw.rw_append;
1114         else
1115                 io->ci_pio = 0;
1116 }
1117
1118 static int ll_file_io_ptask(struct cfs_ptask *ptask)
1119 {
1120         struct cl_io_pt *pt = ptask->pt_cbdata;
1121         struct file *file = pt->cip_file;
1122         struct lu_env *env;
1123         struct cl_io *io;
1124         loff_t pos = pt->cip_pos;
1125         int rc;
1126         __u16 refcheck;
1127         ENTRY;
1128
1129         env = cl_env_get(&refcheck);
1130         if (IS_ERR(env))
1131                 RETURN(PTR_ERR(env));
1132
1133         CDEBUG(D_VFSTRACE, "%s: %s range: [%llu, %llu)\n",
1134                 file_dentry(file)->d_name.name,
1135                 pt->cip_iot == CIT_READ ? "read" : "write",
1136                 pos, pos + pt->cip_count);
1137
1138 restart:
1139         io = vvp_env_thread_io(env);
1140         ll_io_init(io, file, pt->cip_iot);
1141         io->u.ci_rw.rw_iter = pt->cip_iter;
1142         io->u.ci_rw.rw_iocb = pt->cip_iocb;
1143         io->ci_pio = 0; /* It's already in parallel task */
1144
1145         rc = cl_io_rw_init(env, io, pt->cip_iot, pos,
1146                            pt->cip_count - pt->cip_result);
1147         if (!rc) {
1148                 struct vvp_io *vio = vvp_env_io(env);
1149
1150                 vio->vui_io_subtype = IO_NORMAL;
1151                 vio->vui_fd = LUSTRE_FPRIVATE(file);
1152
1153                 ll_cl_add(file, env, io, LCC_RW);
1154                 rc = cl_io_loop(env, io);
1155                 ll_cl_remove(file, env);
1156         } else {
1157                 /* cl_io_rw_init() handled IO */
1158                 rc = io->ci_result;
1159         }
1160
1161         if (OBD_FAIL_CHECK_RESET(OBD_FAIL_LLITE_PTASK_IO_FAIL, 0)) {
1162                 if (io->ci_nob > 0)
1163                         io->ci_nob /= 2;
1164                 rc = -EIO;
1165         }
1166
1167         if (io->ci_nob > 0) {
1168                 pt->cip_result += io->ci_nob;
1169                 iov_iter_advance(&pt->cip_iter, io->ci_nob);
1170                 pos += io->ci_nob;
1171                 pt->cip_iocb.ki_pos = pos;
1172 #ifdef HAVE_KIOCB_KI_LEFT
1173                 pt->cip_iocb.ki_left = pt->cip_count - pt->cip_result;
1174 #elif defined(HAVE_KI_NBYTES)
1175                 pt->cip_iocb.ki_nbytes = pt->cip_count - pt->cip_result;
1176 #endif
1177         }
1178
1179         cl_io_fini(env, io);
1180
1181         if ((rc == 0 || rc == -ENODATA) &&
1182             pt->cip_result < pt->cip_count &&
1183             io->ci_need_restart) {
1184                 CDEBUG(D_VFSTRACE,
1185                         "%s: restart %s range: [%llu, %llu) ret: %zd, rc: %d\n",
1186                         file_dentry(file)->d_name.name,
1187                         pt->cip_iot == CIT_READ ? "read" : "write",
1188                         pos, pos + pt->cip_count - pt->cip_result,
1189                         pt->cip_result, rc);
1190                 goto restart;
1191         }
1192
1193         CDEBUG(D_VFSTRACE, "%s: %s ret: %zd, rc: %d\n",
1194                 file_dentry(file)->d_name.name,
1195                 pt->cip_iot == CIT_READ ? "read" : "write",
1196                 pt->cip_result, rc);
1197
1198         cl_env_put(env, &refcheck);
1199         RETURN(pt->cip_result > 0 ? 0 : rc);
1200 }
1201
1202 static ssize_t
1203 ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
1204                    struct file *file, enum cl_io_type iot,
1205                    loff_t *ppos, size_t count)
1206 {
1207         struct range_lock       range;
1208         struct vvp_io           *vio = vvp_env_io(env);
1209         struct inode            *inode = file_inode(file);
1210         struct ll_inode_info    *lli = ll_i2info(inode);
1211         struct ll_file_data     *fd  = LUSTRE_FPRIVATE(file);
1212         struct cl_io            *io;
1213         loff_t                  pos = *ppos;
1214         ssize_t                 result = 0;
1215         int                     rc = 0;
1216
1217         ENTRY;
1218
1219         CDEBUG(D_VFSTRACE, "%s: %s range: [%llu, %llu)\n",
1220                 file_dentry(file)->d_name.name,
1221                 iot == CIT_READ ? "read" : "write", pos, pos + count);
1222
1223 restart:
1224         io = vvp_env_thread_io(env);
1225         ll_io_init(io, file, iot);
1226         if (args->via_io_subtype == IO_NORMAL) {
1227                 io->u.ci_rw.rw_iter = *args->u.normal.via_iter;
1228                 io->u.ci_rw.rw_iocb = *args->u.normal.via_iocb;
1229         } else {
1230                 io->ci_pio = 0;
1231         }
1232
1233         if (cl_io_rw_init(env, io, iot, pos, count) == 0) {
1234                 bool range_locked = false;
1235
1236                 if (file->f_flags & O_APPEND)
1237                         range_lock_init(&range, 0, LUSTRE_EOF);
1238                 else
1239                         range_lock_init(&range, pos, pos + count - 1);
1240
1241                 vio->vui_fd  = LUSTRE_FPRIVATE(file);
1242                 vio->vui_io_subtype = args->via_io_subtype;
1243
1244                 switch (vio->vui_io_subtype) {
1245                 case IO_NORMAL:
1246                         /* Direct IO reads must also take range lock,
1247                          * or multiple reads will try to work on the same pages
1248                          * See LU-6227 for details. */
1249                         if (((iot == CIT_WRITE) ||
1250                             (iot == CIT_READ && (file->f_flags & O_DIRECT))) &&
1251                             !(vio->vui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1252                                 CDEBUG(D_VFSTRACE, "Range lock "RL_FMT"\n",
1253                                        RL_PARA(&range));
1254                                 rc = range_lock(&lli->lli_write_tree, &range);
1255                                 if (rc < 0)
1256                                         GOTO(out, rc);
1257
1258                                 range_locked = true;
1259                         }
1260                         break;
1261                 case IO_SPLICE:
1262                         vio->u.splice.vui_pipe = args->u.splice.via_pipe;
1263                         vio->u.splice.vui_flags = args->u.splice.via_flags;
1264                         break;
1265                 default:
1266                         CERROR("unknown IO subtype %u\n", vio->vui_io_subtype);
1267                         LBUG();
1268                 }
1269
1270                 ll_cl_add(file, env, io, LCC_RW);
1271                 if (io->ci_pio && iot == CIT_WRITE && !IS_NOSEC(inode) &&
1272                     !lli->lli_inode_locked) {
1273                         inode_lock(inode);
1274                         lli->lli_inode_locked = 1;
1275                 }
1276                 rc = cl_io_loop(env, io);
1277                 if (lli->lli_inode_locked) {
1278                         lli->lli_inode_locked = 0;
1279                         inode_unlock(inode);
1280                 }
1281                 ll_cl_remove(file, env);
1282
1283                 if (range_locked) {
1284                         CDEBUG(D_VFSTRACE, "Range unlock "RL_FMT"\n",
1285                                RL_PARA(&range));
1286                         range_unlock(&lli->lli_write_tree, &range);
1287                 }
1288         } else {
1289                 /* cl_io_rw_init() handled IO */
1290                 rc = io->ci_result;
1291         }
1292
1293         if (io->ci_nob > 0) {
1294                 result += io->ci_nob;
1295                 count  -= io->ci_nob;
1296
1297                 if (args->via_io_subtype == IO_NORMAL) {
1298                         iov_iter_advance(args->u.normal.via_iter, io->ci_nob);
1299                         pos += io->ci_nob;
1300                         args->u.normal.via_iocb->ki_pos = pos;
1301 #ifdef HAVE_KIOCB_KI_LEFT
1302                         args->u.normal.via_iocb->ki_left = count;
1303 #elif defined(HAVE_KI_NBYTES)
1304                         args->u.normal.via_iocb->ki_nbytes = count;
1305 #endif
1306                 } else {
1307                         /* for splice */
1308                         pos = io->u.ci_rw.rw_range.cir_pos;
1309                 }
1310         }
1311 out:
1312         cl_io_fini(env, io);
1313
1314         if ((rc == 0 || rc == -ENODATA) && count > 0 && io->ci_need_restart) {
1315                 CDEBUG(D_VFSTRACE,
1316                         "%s: restart %s range: [%llu, %llu) ret: %zd, rc: %d\n",
1317                         file_dentry(file)->d_name.name,
1318                         iot == CIT_READ ? "read" : "write",
1319                         pos, pos + count, result, rc);
1320                 goto restart;
1321         }
1322
1323         if (iot == CIT_READ) {
1324                 if (result > 0)
1325                         ll_stats_ops_tally(ll_i2sbi(inode),
1326                                            LPROC_LL_READ_BYTES, result);
1327         } else if (iot == CIT_WRITE) {
1328                 if (result > 0) {
1329                         ll_stats_ops_tally(ll_i2sbi(inode),
1330                                            LPROC_LL_WRITE_BYTES, result);
1331                         fd->fd_write_failed = false;
1332                 } else if (result == 0 && rc == 0) {
1333                         rc = io->ci_result;
1334                         if (rc < 0)
1335                                 fd->fd_write_failed = true;
1336                         else
1337                                 fd->fd_write_failed = false;
1338                 } else if (rc != -ERESTARTSYS) {
1339                         fd->fd_write_failed = true;
1340                 }
1341         }
1342
1343         CDEBUG(D_VFSTRACE, "%s: %s *ppos: %llu, pos: %llu, ret: %zd, rc: %d\n",
1344                 file_dentry(file)->d_name.name,
1345                 iot == CIT_READ ? "read" : "write", *ppos, pos, result, rc);
1346
1347         *ppos = pos;
1348
1349         RETURN(result > 0 ? result : rc);
1350 }
1351
1352 /**
1353  * The purpose of fast read is to overcome per I/O overhead and improve IOPS
1354  * especially for small I/O.
1355  *
1356  * To serve a read request, CLIO has to create and initialize a cl_io and
1357  * then request DLM lock. This has turned out to have siginificant overhead
1358  * and affects the performance of small I/O dramatically.
1359  *
1360  * It's not necessary to create a cl_io for each I/O. Under the help of read
1361  * ahead, most of the pages being read are already in memory cache and we can
1362  * read those pages directly because if the pages exist, the corresponding DLM
1363  * lock must exist so that page content must be valid.
1364  *
1365  * In fast read implementation, the llite speculatively finds and reads pages
1366  * in memory cache. There are three scenarios for fast read:
1367  *   - If the page exists and is uptodate, kernel VM will provide the data and
1368  *     CLIO won't be intervened;
1369  *   - If the page was brought into memory by read ahead, it will be exported
1370  *     and read ahead parameters will be updated;
1371  *   - Otherwise the page is not in memory, we can't do fast read. Therefore,
1372  *     it will go back and invoke normal read, i.e., a cl_io will be created
1373  *     and DLM lock will be requested.
1374  *
1375  * POSIX compliance: posix standard states that read is intended to be atomic.
1376  * Lustre read implementation is in line with Linux kernel read implementation
1377  * and neither of them complies with POSIX standard in this matter. Fast read
1378  * doesn't make the situation worse on single node but it may interleave write
1379  * results from multiple nodes due to short read handling in ll_file_aio_read().
1380  *
1381  * \param env - lu_env
1382  * \param iocb - kiocb from kernel
1383  * \param iter - user space buffers where the data will be copied
1384  *
1385  * \retval - number of bytes have been read, or error code if error occurred.
1386  */
1387 static ssize_t
1388 ll_do_fast_read(struct kiocb *iocb, struct iov_iter *iter)
1389 {
1390         ssize_t result;
1391
1392         if (!ll_sbi_has_fast_read(ll_i2sbi(file_inode(iocb->ki_filp))))
1393                 return 0;
1394
1395         /* NB: we can't do direct IO for fast read because it will need a lock
1396          * to make IO engine happy. */
1397         if (iocb->ki_filp->f_flags & O_DIRECT)
1398                 return 0;
1399
1400         result = generic_file_read_iter(iocb, iter);
1401
1402         /* If the first page is not in cache, generic_file_aio_read() will be
1403          * returned with -ENODATA.
1404          * See corresponding code in ll_readpage(). */
1405         if (result == -ENODATA)
1406                 result = 0;
1407
1408         if (result > 0)
1409                 ll_stats_ops_tally(ll_i2sbi(file_inode(iocb->ki_filp)),
1410                                 LPROC_LL_READ_BYTES, result);
1411
1412         return result;
1413 }
1414
1415 /*
1416  * Read from a file (through the page cache).
1417  */
1418 static ssize_t ll_file_read_iter(struct kiocb *iocb, struct iov_iter *to)
1419 {
1420         struct lu_env *env;
1421         struct vvp_io_args *args;
1422         ssize_t result;
1423         ssize_t rc2;
1424         __u16 refcheck;
1425
1426         result = ll_do_fast_read(iocb, to);
1427         if (result < 0 || iov_iter_count(to) == 0)
1428                 GOTO(out, result);
1429
1430         env = cl_env_get(&refcheck);
1431         if (IS_ERR(env))
1432                 return PTR_ERR(env);
1433
1434         args = ll_env_args(env, IO_NORMAL);
1435         args->u.normal.via_iter = to;
1436         args->u.normal.via_iocb = iocb;
1437
1438         rc2 = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1439                                  &iocb->ki_pos, iov_iter_count(to));
1440         if (rc2 > 0)
1441                 result += rc2;
1442         else if (result == 0)
1443                 result = rc2;
1444
1445         cl_env_put(env, &refcheck);
1446 out:
1447         return result;
1448 }
1449
1450 /*
1451  * Write to a file (through the page cache).
1452  */
1453 static ssize_t ll_file_write_iter(struct kiocb *iocb, struct iov_iter *from)
1454 {
1455         struct vvp_io_args *args;
1456         struct lu_env *env;
1457         ssize_t result;
1458         __u16 refcheck;
1459
1460         env = cl_env_get(&refcheck);
1461         if (IS_ERR(env))
1462                 return PTR_ERR(env);
1463
1464         args = ll_env_args(env, IO_NORMAL);
1465         args->u.normal.via_iter = from;
1466         args->u.normal.via_iocb = iocb;
1467
1468         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1469                                     &iocb->ki_pos, iov_iter_count(from));
1470         cl_env_put(env, &refcheck);
1471         return result;
1472 }
1473
1474 #ifndef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1475 /*
1476  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
1477  */
1478 static int ll_file_get_iov_count(const struct iovec *iov,
1479                                  unsigned long *nr_segs, size_t *count)
1480 {
1481         size_t cnt = 0;
1482         unsigned long seg;
1483
1484         for (seg = 0; seg < *nr_segs; seg++) {
1485                 const struct iovec *iv = &iov[seg];
1486
1487                 /*
1488                  * If any segment has a negative length, or the cumulative
1489                  * length ever wraps negative then return -EINVAL.
1490                  */
1491                 cnt += iv->iov_len;
1492                 if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
1493                         return -EINVAL;
1494                 if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
1495                         continue;
1496                 if (seg == 0)
1497                         return -EFAULT;
1498                 *nr_segs = seg;
1499                 cnt -= iv->iov_len;     /* This segment is no good */
1500                 break;
1501         }
1502         *count = cnt;
1503         return 0;
1504 }
1505
1506 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
1507                                 unsigned long nr_segs, loff_t pos)
1508 {
1509         struct iov_iter to;
1510         size_t iov_count;
1511         ssize_t result;
1512         ENTRY;
1513
1514         result = ll_file_get_iov_count(iov, &nr_segs, &iov_count);
1515         if (result)
1516                 RETURN(result);
1517
1518 # ifdef HAVE_IOV_ITER_INIT_DIRECTION
1519         iov_iter_init(&to, READ, iov, nr_segs, iov_count);
1520 # else /* !HAVE_IOV_ITER_INIT_DIRECTION */
1521         iov_iter_init(&to, iov, nr_segs, iov_count, 0);
1522 # endif /* HAVE_IOV_ITER_INIT_DIRECTION */
1523
1524         result = ll_file_read_iter(iocb, &to);
1525
1526         RETURN(result);
1527 }
1528
1529 static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count,
1530                             loff_t *ppos)
1531 {
1532         struct iovec   iov = { .iov_base = buf, .iov_len = count };
1533         struct kiocb   kiocb;
1534         ssize_t        result;
1535         ENTRY;
1536
1537         init_sync_kiocb(&kiocb, file);
1538         kiocb.ki_pos = *ppos;
1539 #ifdef HAVE_KIOCB_KI_LEFT
1540         kiocb.ki_left = count;
1541 #elif defined(HAVE_KI_NBYTES)
1542         kiocb.i_nbytes = count;
1543 #endif
1544
1545         result = ll_file_aio_read(&kiocb, &iov, 1, kiocb.ki_pos);
1546         *ppos = kiocb.ki_pos;
1547
1548         RETURN(result);
1549 }
1550
1551 /*
1552  * Write to a file (through the page cache).
1553  * AIO stuff
1554  */
1555 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1556                                  unsigned long nr_segs, loff_t pos)
1557 {
1558         struct iov_iter from;
1559         size_t iov_count;
1560         ssize_t result;
1561         ENTRY;
1562
1563         result = ll_file_get_iov_count(iov, &nr_segs, &iov_count);
1564         if (result)
1565                 RETURN(result);
1566
1567 # ifdef HAVE_IOV_ITER_INIT_DIRECTION
1568         iov_iter_init(&from, WRITE, iov, nr_segs, iov_count);
1569 # else /* !HAVE_IOV_ITER_INIT_DIRECTION */
1570         iov_iter_init(&from, iov, nr_segs, iov_count, 0);
1571 # endif /* HAVE_IOV_ITER_INIT_DIRECTION */
1572
1573         result = ll_file_write_iter(iocb, &from);
1574
1575         RETURN(result);
1576 }
1577
1578 static ssize_t ll_file_write(struct file *file, const char __user *buf,
1579                              size_t count, loff_t *ppos)
1580 {
1581         struct lu_env *env;
1582         struct iovec   iov = { .iov_base = (void __user *)buf,
1583                                .iov_len = count };
1584         struct kiocb  *kiocb;
1585         ssize_t        result;
1586         __u16          refcheck;
1587         ENTRY;
1588
1589         env = cl_env_get(&refcheck);
1590         if (IS_ERR(env))
1591                 RETURN(PTR_ERR(env));
1592
1593         kiocb = &ll_env_info(env)->lti_kiocb;
1594         init_sync_kiocb(kiocb, file);
1595         kiocb->ki_pos = *ppos;
1596 #ifdef HAVE_KIOCB_KI_LEFT
1597         kiocb->ki_left = count;
1598 #elif defined(HAVE_KI_NBYTES)
1599         kiocb->ki_nbytes = count;
1600 #endif
1601
1602         result = ll_file_aio_write(kiocb, &iov, 1, kiocb->ki_pos);
1603         *ppos = kiocb->ki_pos;
1604
1605         cl_env_put(env, &refcheck);
1606         RETURN(result);
1607 }
1608 #endif /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
1609
1610 /*
1611  * Send file content (through pagecache) somewhere with helper
1612  */
1613 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1614                                    struct pipe_inode_info *pipe, size_t count,
1615                                    unsigned int flags)
1616 {
1617         struct lu_env      *env;
1618         struct vvp_io_args *args;
1619         ssize_t             result;
1620         __u16               refcheck;
1621         ENTRY;
1622
1623         env = cl_env_get(&refcheck);
1624         if (IS_ERR(env))
1625                 RETURN(PTR_ERR(env));
1626
1627         args = ll_env_args(env, IO_SPLICE);
1628         args->u.splice.via_pipe = pipe;
1629         args->u.splice.via_flags = flags;
1630
1631         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1632         cl_env_put(env, &refcheck);
1633         RETURN(result);
1634 }
1635
1636 int ll_lov_setstripe_ea_info(struct inode *inode, struct dentry *dentry,
1637                              __u64 flags, struct lov_user_md *lum, int lum_size)
1638 {
1639         struct lookup_intent oit = {
1640                 .it_op = IT_OPEN,
1641                 .it_flags = flags | MDS_OPEN_BY_FID,
1642         };
1643         int rc;
1644         ENTRY;
1645
1646         ll_inode_size_lock(inode);
1647         rc = ll_intent_file_open(dentry, lum, lum_size, &oit);
1648         if (rc < 0)
1649                 GOTO(out_unlock, rc);
1650
1651         ll_release_openhandle(dentry, &oit);
1652
1653 out_unlock:
1654         ll_inode_size_unlock(inode);
1655         ll_intent_release(&oit);
1656
1657         RETURN(rc);
1658 }
1659
1660 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1661                              struct lov_mds_md **lmmp, int *lmm_size,
1662                              struct ptlrpc_request **request)
1663 {
1664         struct ll_sb_info *sbi = ll_i2sbi(inode);
1665         struct mdt_body  *body;
1666         struct lov_mds_md *lmm = NULL;
1667         struct ptlrpc_request *req = NULL;
1668         struct md_op_data *op_data;
1669         int rc, lmmsize;
1670
1671         rc = ll_get_default_mdsize(sbi, &lmmsize);
1672         if (rc)
1673                 RETURN(rc);
1674
1675         op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1676                                      strlen(filename), lmmsize,
1677                                      LUSTRE_OPC_ANY, NULL);
1678         if (IS_ERR(op_data))
1679                 RETURN(PTR_ERR(op_data));
1680
1681         op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1682         rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1683         ll_finish_md_op_data(op_data);
1684         if (rc < 0) {
1685                 CDEBUG(D_INFO, "md_getattr_name failed "
1686                        "on %s: rc %d\n", filename, rc);
1687                 GOTO(out, rc);
1688         }
1689
1690         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1691         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1692
1693         lmmsize = body->mbo_eadatasize;
1694
1695         if (!(body->mbo_valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1696                         lmmsize == 0) {
1697                 GOTO(out, rc = -ENODATA);
1698         }
1699
1700         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1701         LASSERT(lmm != NULL);
1702
1703         if (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1) &&
1704             lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3) &&
1705             lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_COMP_V1))
1706                 GOTO(out, rc = -EPROTO);
1707
1708         /*
1709          * This is coming from the MDS, so is probably in
1710          * little endian.  We convert it to host endian before
1711          * passing it to userspace.
1712          */
1713         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1714                 int stripe_count;
1715
1716                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1) ||
1717                     lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1718                         stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
1719                         if (le32_to_cpu(lmm->lmm_pattern) &
1720                             LOV_PATTERN_F_RELEASED)
1721                                 stripe_count = 0;
1722                 }
1723
1724                 /* if function called for directory - we should
1725                  * avoid swab not existent lsm objects */
1726                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1727                         lustre_swab_lov_user_md_v1(
1728                                         (struct lov_user_md_v1 *)lmm);
1729                         if (S_ISREG(body->mbo_mode))
1730                                 lustre_swab_lov_user_md_objects(
1731                                     ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1732                                     stripe_count);
1733                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1734                         lustre_swab_lov_user_md_v3(
1735                                         (struct lov_user_md_v3 *)lmm);
1736                         if (S_ISREG(body->mbo_mode))
1737                                 lustre_swab_lov_user_md_objects(
1738                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1739                                  stripe_count);
1740                 } else if (lmm->lmm_magic ==
1741                            cpu_to_le32(LOV_MAGIC_COMP_V1)) {
1742                         lustre_swab_lov_comp_md_v1(
1743                                         (struct lov_comp_md_v1 *)lmm);
1744                 }
1745         }
1746
1747 out:
1748         *lmmp = lmm;
1749         *lmm_size = lmmsize;
1750         *request = req;
1751         return rc;
1752 }
1753
1754 static int ll_lov_setea(struct inode *inode, struct file *file,
1755                         void __user *arg)
1756 {
1757         __u64                    flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1758         struct lov_user_md      *lump;
1759         int                      lum_size = sizeof(struct lov_user_md) +
1760                                             sizeof(struct lov_user_ost_data);
1761         int                      rc;
1762         ENTRY;
1763
1764         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1765                 RETURN(-EPERM);
1766
1767         OBD_ALLOC_LARGE(lump, lum_size);
1768         if (lump == NULL)
1769                 RETURN(-ENOMEM);
1770
1771         if (copy_from_user(lump, arg, lum_size))
1772                 GOTO(out_lump, rc = -EFAULT);
1773
1774         rc = ll_lov_setstripe_ea_info(inode, file_dentry(file), flags, lump,
1775                                       lum_size);
1776         cl_lov_delay_create_clear(&file->f_flags);
1777
1778 out_lump:
1779         OBD_FREE_LARGE(lump, lum_size);
1780         RETURN(rc);
1781 }
1782
1783 static int ll_file_getstripe(struct inode *inode, void __user *lum, size_t size)
1784 {
1785         struct lu_env   *env;
1786         __u16           refcheck;
1787         int             rc;
1788         ENTRY;
1789
1790         env = cl_env_get(&refcheck);
1791         if (IS_ERR(env))
1792                 RETURN(PTR_ERR(env));
1793
1794         rc = cl_object_getstripe(env, ll_i2info(inode)->lli_clob, lum, size);
1795         cl_env_put(env, &refcheck);
1796         RETURN(rc);
1797 }
1798
1799 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1800                             void __user *arg)
1801 {
1802         struct lov_user_md __user *lum = (struct lov_user_md __user *)arg;
1803         struct lov_user_md        *klum;
1804         int                        lum_size, rc;
1805         __u64                      flags = FMODE_WRITE;
1806         ENTRY;
1807
1808         rc = ll_copy_user_md(lum, &klum);
1809         if (rc < 0)
1810                 RETURN(rc);
1811
1812         lum_size = rc;
1813         rc = ll_lov_setstripe_ea_info(inode, file_dentry(file), flags, klum,
1814                                       lum_size);
1815         if (!rc) {
1816                 __u32 gen;
1817
1818                 rc = put_user(0, &lum->lmm_stripe_count);
1819                 if (rc)
1820                         GOTO(out, rc);
1821
1822                 rc = ll_layout_refresh(inode, &gen);
1823                 if (rc)
1824                         GOTO(out, rc);
1825
1826                 rc = ll_file_getstripe(inode, arg, lum_size);
1827         }
1828         cl_lov_delay_create_clear(&file->f_flags);
1829
1830 out:
1831         OBD_FREE(klum, lum_size);
1832         RETURN(rc);
1833 }
1834
1835 static int
1836 ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1837 {
1838         struct ll_inode_info *lli = ll_i2info(inode);
1839         struct cl_object *obj = lli->lli_clob;
1840         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1841         struct ll_grouplock grouplock;
1842         int rc;
1843         ENTRY;
1844
1845         if (arg == 0) {
1846                 CWARN("group id for group lock must not be 0\n");
1847                 RETURN(-EINVAL);
1848         }
1849
1850         if (ll_file_nolock(file))
1851                 RETURN(-EOPNOTSUPP);
1852
1853         spin_lock(&lli->lli_lock);
1854         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1855                 CWARN("group lock already existed with gid %lu\n",
1856                       fd->fd_grouplock.lg_gid);
1857                 spin_unlock(&lli->lli_lock);
1858                 RETURN(-EINVAL);
1859         }
1860         LASSERT(fd->fd_grouplock.lg_lock == NULL);
1861         spin_unlock(&lli->lli_lock);
1862
1863         /**
1864          * XXX: group lock needs to protect all OST objects while PFL
1865          * can add new OST objects during the IO, so we'd instantiate
1866          * all OST objects before getting its group lock.
1867          */
1868         if (obj) {
1869                 struct lu_env *env;
1870                 __u16 refcheck;
1871                 struct cl_layout cl = {
1872                         .cl_is_composite = false,
1873                 };
1874
1875                 env = cl_env_get(&refcheck);
1876                 if (IS_ERR(env))
1877                         RETURN(PTR_ERR(env));
1878
1879                 rc = cl_object_layout_get(env, obj, &cl);
1880                 if (!rc && cl.cl_is_composite)
1881                         rc = ll_layout_write_intent(inode, 0, OBD_OBJECT_EOF);
1882
1883                 cl_env_put(env, &refcheck);
1884                 if (rc)
1885                         RETURN(rc);
1886         }
1887
1888         rc = cl_get_grouplock(ll_i2info(inode)->lli_clob,
1889                               arg, (file->f_flags & O_NONBLOCK), &grouplock);
1890         if (rc)
1891                 RETURN(rc);
1892
1893         spin_lock(&lli->lli_lock);
1894         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1895                 spin_unlock(&lli->lli_lock);
1896                 CERROR("another thread just won the race\n");
1897                 cl_put_grouplock(&grouplock);
1898                 RETURN(-EINVAL);
1899         }
1900
1901         fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1902         fd->fd_grouplock = grouplock;
1903         spin_unlock(&lli->lli_lock);
1904
1905         CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1906         RETURN(0);
1907 }
1908
1909 static int ll_put_grouplock(struct inode *inode, struct file *file,
1910                             unsigned long arg)
1911 {
1912         struct ll_inode_info   *lli = ll_i2info(inode);
1913         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1914         struct ll_grouplock     grouplock;
1915         ENTRY;
1916
1917         spin_lock(&lli->lli_lock);
1918         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1919                 spin_unlock(&lli->lli_lock);
1920                 CWARN("no group lock held\n");
1921                 RETURN(-EINVAL);
1922         }
1923
1924         LASSERT(fd->fd_grouplock.lg_lock != NULL);
1925
1926         if (fd->fd_grouplock.lg_gid != arg) {
1927                 CWARN("group lock %lu doesn't match current id %lu\n",
1928                       arg, fd->fd_grouplock.lg_gid);
1929                 spin_unlock(&lli->lli_lock);
1930                 RETURN(-EINVAL);
1931         }
1932
1933         grouplock = fd->fd_grouplock;
1934         memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1935         fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1936         spin_unlock(&lli->lli_lock);
1937
1938         cl_put_grouplock(&grouplock);
1939         CDEBUG(D_INFO, "group lock %lu released\n", arg);
1940         RETURN(0);
1941 }
1942
1943 /**
1944  * Close inode open handle
1945  *
1946  * \param dentry [in]     dentry which contains the inode
1947  * \param it     [in,out] intent which contains open info and result
1948  *
1949  * \retval 0     success
1950  * \retval <0    failure
1951  */
1952 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1953 {
1954         struct inode *inode = dentry->d_inode;
1955         struct obd_client_handle *och;
1956         int rc;
1957         ENTRY;
1958
1959         LASSERT(inode);
1960
1961         /* Root ? Do nothing. */
1962         if (dentry->d_inode->i_sb->s_root == dentry)
1963                 RETURN(0);
1964
1965         /* No open handle to close? Move away */
1966         if (!it_disposition(it, DISP_OPEN_OPEN))
1967                 RETURN(0);
1968
1969         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1970
1971         OBD_ALLOC(och, sizeof(*och));
1972         if (!och)
1973                 GOTO(out, rc = -ENOMEM);
1974
1975         ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och);
1976
1977         rc = ll_close_inode_openhandle(inode, och, 0, NULL);
1978 out:
1979         /* this one is in place of ll_file_open */
1980         if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1981                 ptlrpc_req_finished(it->it_request);
1982                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1983         }
1984         RETURN(rc);
1985 }
1986
1987 /**
1988  * Get size for inode for which FIEMAP mapping is requested.
1989  * Make the FIEMAP get_info call and returns the result.
1990  * \param fiemap        kernel buffer to hold extens
1991  * \param num_bytes     kernel buffer size
1992  */
1993 static int ll_do_fiemap(struct inode *inode, struct fiemap *fiemap,
1994                         size_t num_bytes)
1995 {
1996         struct lu_env                   *env;
1997         __u16                           refcheck;
1998         int                             rc = 0;
1999         struct ll_fiemap_info_key       fmkey = { .lfik_name = KEY_FIEMAP, };
2000         ENTRY;
2001
2002         /* Checks for fiemap flags */
2003         if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
2004                 fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
2005                 return -EBADR;
2006         }
2007
2008         /* Check for FIEMAP_FLAG_SYNC */
2009         if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
2010                 rc = filemap_fdatawrite(inode->i_mapping);
2011                 if (rc)
2012                         return rc;
2013         }
2014
2015         env = cl_env_get(&refcheck);
2016         if (IS_ERR(env))
2017                 RETURN(PTR_ERR(env));
2018
2019         if (i_size_read(inode) == 0) {
2020                 rc = ll_glimpse_size(inode);
2021                 if (rc)
2022                         GOTO(out, rc);
2023         }
2024
2025         fmkey.lfik_oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2026         obdo_from_inode(&fmkey.lfik_oa, inode, OBD_MD_FLSIZE);
2027         obdo_set_parent_fid(&fmkey.lfik_oa, &ll_i2info(inode)->lli_fid);
2028
2029         /* If filesize is 0, then there would be no objects for mapping */
2030         if (fmkey.lfik_oa.o_size == 0) {
2031                 fiemap->fm_mapped_extents = 0;
2032                 GOTO(out, rc = 0);
2033         }
2034
2035         fmkey.lfik_fiemap = *fiemap;
2036
2037         rc = cl_object_fiemap(env, ll_i2info(inode)->lli_clob,
2038                               &fmkey, fiemap, &num_bytes);
2039 out:
2040         cl_env_put(env, &refcheck);
2041         RETURN(rc);
2042 }
2043
2044 int ll_fid2path(struct inode *inode, void __user *arg)
2045 {
2046         struct obd_export       *exp = ll_i2mdexp(inode);
2047         const struct getinfo_fid2path __user *gfin = arg;
2048         __u32                    pathlen;
2049         struct getinfo_fid2path *gfout;
2050         size_t                   outsize;
2051         int                      rc;
2052
2053         ENTRY;
2054
2055         if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
2056             !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
2057                 RETURN(-EPERM);
2058
2059         /* Only need to get the buflen */
2060         if (get_user(pathlen, &gfin->gf_pathlen))
2061                 RETURN(-EFAULT);
2062
2063         if (pathlen > PATH_MAX)
2064                 RETURN(-EINVAL);
2065
2066         outsize = sizeof(*gfout) + pathlen;
2067         OBD_ALLOC(gfout, outsize);
2068         if (gfout == NULL)
2069                 RETURN(-ENOMEM);
2070
2071         if (copy_from_user(gfout, arg, sizeof(*gfout)))
2072                 GOTO(gf_free, rc = -EFAULT);
2073         /* append root FID after gfout to let MDT know the root FID so that it
2074          * can lookup the correct path, this is mainly for fileset.
2075          * old server without fileset mount support will ignore this. */
2076         *gfout->gf_u.gf_root_fid = *ll_inode2fid(inode);
2077
2078         /* Call mdc_iocontrol */
2079         rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
2080         if (rc != 0)
2081                 GOTO(gf_free, rc);
2082
2083         if (copy_to_user(arg, gfout, outsize))
2084                 rc = -EFAULT;
2085
2086 gf_free:
2087         OBD_FREE(gfout, outsize);
2088         RETURN(rc);
2089 }
2090
2091 /*
2092  * Read the data_version for inode.
2093  *
2094  * This value is computed using stripe object version on OST.
2095  * Version is computed using server side locking.
2096  *
2097  * @param flags if do sync on the OST side;
2098  *              0: no sync
2099  *              LL_DV_RD_FLUSH: flush dirty pages, LCK_PR on OSTs
2100  *              LL_DV_WR_FLUSH: drop all caching pages, LCK_PW on OSTs
2101  */
2102 int ll_data_version(struct inode *inode, __u64 *data_version, int flags)
2103 {
2104         struct cl_object *obj = ll_i2info(inode)->lli_clob;
2105         struct lu_env *env;
2106         struct cl_io *io;
2107         __u16  refcheck;
2108         int result;
2109
2110         ENTRY;
2111
2112         /* If no file object initialized, we consider its version is 0. */
2113         if (obj == NULL) {
2114                 *data_version = 0;
2115                 RETURN(0);
2116         }
2117
2118         env = cl_env_get(&refcheck);
2119         if (IS_ERR(env))
2120                 RETURN(PTR_ERR(env));
2121
2122         io = vvp_env_thread_io(env);
2123         io->ci_obj = obj;
2124         io->u.ci_data_version.dv_data_version = 0;
2125         io->u.ci_data_version.dv_flags = flags;
2126
2127 restart:
2128         if (cl_io_init(env, io, CIT_DATA_VERSION, io->ci_obj) == 0)
2129                 result = cl_io_loop(env, io);
2130         else
2131                 result = io->ci_result;
2132
2133         *data_version = io->u.ci_data_version.dv_data_version;
2134
2135         cl_io_fini(env, io);
2136
2137         if (unlikely(io->ci_need_restart))
2138                 goto restart;
2139
2140         cl_env_put(env, &refcheck);
2141
2142         RETURN(result);
2143 }
2144
2145 /*
2146  * Trigger a HSM release request for the provided inode.
2147  */
2148 int ll_hsm_release(struct inode *inode)
2149 {
2150         struct lu_env *env;
2151         struct obd_client_handle *och = NULL;
2152         __u64 data_version = 0;
2153         int rc;
2154         __u16 refcheck;
2155         ENTRY;
2156
2157         CDEBUG(D_INODE, "%s: Releasing file "DFID".\n",
2158                ll_get_fsname(inode->i_sb, NULL, 0),
2159                PFID(&ll_i2info(inode)->lli_fid));
2160
2161         och = ll_lease_open(inode, NULL, FMODE_WRITE, MDS_OPEN_RELEASE);
2162         if (IS_ERR(och))
2163                 GOTO(out, rc = PTR_ERR(och));
2164
2165         /* Grab latest data_version and [am]time values */
2166         rc = ll_data_version(inode, &data_version, LL_DV_WR_FLUSH);
2167         if (rc != 0)
2168                 GOTO(out, rc);
2169
2170         env = cl_env_get(&refcheck);
2171         if (IS_ERR(env))
2172                 GOTO(out, rc = PTR_ERR(env));
2173
2174         ll_merge_attr(env, inode);
2175         cl_env_put(env, &refcheck);
2176
2177         /* Release the file.
2178          * NB: lease lock handle is released in mdc_hsm_release_pack() because
2179          * we still need it to pack l_remote_handle to MDT. */
2180         rc = ll_close_inode_openhandle(inode, och, MDS_HSM_RELEASE,
2181                                        &data_version);
2182         och = NULL;
2183
2184         EXIT;
2185 out:
2186         if (och != NULL && !IS_ERR(och)) /* close the file */
2187                 ll_lease_close(och, inode, NULL);
2188
2189         return rc;
2190 }
2191
2192 struct ll_swap_stack {
2193         __u64                    dv1;
2194         __u64                    dv2;
2195         struct inode            *inode1;
2196         struct inode            *inode2;
2197         bool                     check_dv1;
2198         bool                     check_dv2;
2199 };
2200
2201 static int ll_swap_layouts(struct file *file1, struct file *file2,
2202                            struct lustre_swap_layouts *lsl)
2203 {
2204         struct mdc_swap_layouts  msl;
2205         struct md_op_data       *op_data;
2206         __u32                    gid;
2207         __u64                    dv;
2208         struct ll_swap_stack    *llss = NULL;
2209         int                      rc;
2210
2211         OBD_ALLOC_PTR(llss);
2212         if (llss == NULL)
2213                 RETURN(-ENOMEM);
2214
2215         llss->inode1 = file_inode(file1);
2216         llss->inode2 = file_inode(file2);
2217
2218         rc = ll_check_swap_layouts_validity(llss->inode1, llss->inode2);
2219         if (rc < 0)
2220                 GOTO(free, rc);
2221
2222         /* we use 2 bool because it is easier to swap than 2 bits */
2223         if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV1)
2224                 llss->check_dv1 = true;
2225
2226         if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV2)
2227                 llss->check_dv2 = true;
2228
2229         /* we cannot use lsl->sl_dvX directly because we may swap them */
2230         llss->dv1 = lsl->sl_dv1;
2231         llss->dv2 = lsl->sl_dv2;
2232
2233         rc = lu_fid_cmp(ll_inode2fid(llss->inode1), ll_inode2fid(llss->inode2));
2234         if (rc == 0) /* same file, done! */
2235                 GOTO(free, rc);
2236
2237         if (rc < 0) { /* sequentialize it */
2238                 swap(llss->inode1, llss->inode2);
2239                 swap(file1, file2);
2240                 swap(llss->dv1, llss->dv2);
2241                 swap(llss->check_dv1, llss->check_dv2);
2242         }
2243
2244         gid = lsl->sl_gid;
2245         if (gid != 0) { /* application asks to flush dirty cache */
2246                 rc = ll_get_grouplock(llss->inode1, file1, gid);
2247                 if (rc < 0)
2248                         GOTO(free, rc);
2249
2250                 rc = ll_get_grouplock(llss->inode2, file2, gid);
2251                 if (rc < 0) {
2252                         ll_put_grouplock(llss->inode1, file1, gid);
2253                         GOTO(free, rc);
2254                 }
2255         }
2256
2257         /* ultimate check, before swaping the layouts we check if
2258          * dataversion has changed (if requested) */
2259         if (llss->check_dv1) {
2260                 rc = ll_data_version(llss->inode1, &dv, 0);
2261                 if (rc)
2262                         GOTO(putgl, rc);
2263                 if (dv != llss->dv1)
2264                         GOTO(putgl, rc = -EAGAIN);
2265         }
2266
2267         if (llss->check_dv2) {
2268                 rc = ll_data_version(llss->inode2, &dv, 0);
2269                 if (rc)
2270                         GOTO(putgl, rc);
2271                 if (dv != llss->dv2)
2272                         GOTO(putgl, rc = -EAGAIN);
2273         }
2274
2275         /* struct md_op_data is used to send the swap args to the mdt
2276          * only flags is missing, so we use struct mdc_swap_layouts
2277          * through the md_op_data->op_data */
2278         /* flags from user space have to be converted before they are send to
2279          * server, no flag is sent today, they are only used on the client */
2280         msl.msl_flags = 0;
2281         rc = -ENOMEM;
2282         op_data = ll_prep_md_op_data(NULL, llss->inode1, llss->inode2, NULL, 0,
2283                                      0, LUSTRE_OPC_ANY, &msl);
2284         if (IS_ERR(op_data))
2285                 GOTO(free, rc = PTR_ERR(op_data));
2286
2287         rc = obd_iocontrol(LL_IOC_LOV_SWAP_LAYOUTS, ll_i2mdexp(llss->inode1),
2288                            sizeof(*op_data), op_data, NULL);
2289         ll_finish_md_op_data(op_data);
2290
2291         if (rc < 0)
2292                 GOTO(putgl, rc);
2293
2294 putgl:
2295         if (gid != 0) {
2296                 ll_put_grouplock(llss->inode2, file2, gid);
2297                 ll_put_grouplock(llss->inode1, file1, gid);
2298         }
2299
2300 free:
2301         if (llss != NULL)
2302                 OBD_FREE_PTR(llss);
2303
2304         RETURN(rc);
2305 }
2306
2307 int ll_hsm_state_set(struct inode *inode, struct hsm_state_set *hss)
2308 {
2309         struct md_op_data       *op_data;
2310         int                      rc;
2311         ENTRY;
2312
2313         /* Detect out-of range masks */
2314         if ((hss->hss_setmask | hss->hss_clearmask) & ~HSM_FLAGS_MASK)
2315                 RETURN(-EINVAL);
2316
2317         /* Non-root users are forbidden to set or clear flags which are
2318          * NOT defined in HSM_USER_MASK. */
2319         if (((hss->hss_setmask | hss->hss_clearmask) & ~HSM_USER_MASK) &&
2320             !cfs_capable(CFS_CAP_SYS_ADMIN))
2321                 RETURN(-EPERM);
2322
2323         /* Detect out-of range archive id */
2324         if ((hss->hss_valid & HSS_ARCHIVE_ID) &&
2325             (hss->hss_archive_id > LL_HSM_MAX_ARCHIVE))
2326                 RETURN(-EINVAL);
2327
2328         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2329                                      LUSTRE_OPC_ANY, hss);
2330         if (IS_ERR(op_data))
2331                 RETURN(PTR_ERR(op_data));
2332
2333         rc = obd_iocontrol(LL_IOC_HSM_STATE_SET, ll_i2mdexp(inode),
2334                            sizeof(*op_data), op_data, NULL);
2335
2336         ll_finish_md_op_data(op_data);
2337
2338         RETURN(rc);
2339 }
2340
2341 static int ll_hsm_import(struct inode *inode, struct file *file,
2342                          struct hsm_user_import *hui)
2343 {
2344         struct hsm_state_set    *hss = NULL;
2345         struct iattr            *attr = NULL;
2346         int                      rc;
2347         ENTRY;
2348
2349         if (!S_ISREG(inode->i_mode))
2350                 RETURN(-EINVAL);
2351
2352         /* set HSM flags */
2353         OBD_ALLOC_PTR(hss);
2354         if (hss == NULL)
2355                 GOTO(out, rc = -ENOMEM);
2356
2357         hss->hss_valid = HSS_SETMASK | HSS_ARCHIVE_ID;
2358         hss->hss_archive_id = hui->hui_archive_id;
2359         hss->hss_setmask = HS_ARCHIVED | HS_EXISTS | HS_RELEASED;
2360         rc = ll_hsm_state_set(inode, hss);
2361         if (rc != 0)
2362                 GOTO(out, rc);
2363
2364         OBD_ALLOC_PTR(attr);
2365         if (attr == NULL)
2366                 GOTO(out, rc = -ENOMEM);
2367
2368         attr->ia_mode = hui->hui_mode & (S_IRWXU | S_IRWXG | S_IRWXO);
2369         attr->ia_mode |= S_IFREG;
2370         attr->ia_uid = make_kuid(&init_user_ns, hui->hui_uid);
2371         attr->ia_gid = make_kgid(&init_user_ns, hui->hui_gid);
2372         attr->ia_size = hui->hui_size;
2373         attr->ia_mtime.tv_sec = hui->hui_mtime;
2374         attr->ia_mtime.tv_nsec = hui->hui_mtime_ns;
2375         attr->ia_atime.tv_sec = hui->hui_atime;
2376         attr->ia_atime.tv_nsec = hui->hui_atime_ns;
2377
2378         attr->ia_valid = ATTR_SIZE | ATTR_MODE | ATTR_FORCE |
2379                          ATTR_UID | ATTR_GID |
2380                          ATTR_MTIME | ATTR_MTIME_SET |
2381                          ATTR_ATIME | ATTR_ATIME_SET;
2382
2383         inode_lock(inode);
2384
2385         rc = ll_setattr_raw(file_dentry(file), attr, true);
2386         if (rc == -ENODATA)
2387                 rc = 0;
2388
2389         inode_unlock(inode);
2390
2391 out:
2392         if (hss != NULL)
2393                 OBD_FREE_PTR(hss);
2394
2395         if (attr != NULL)
2396                 OBD_FREE_PTR(attr);
2397
2398         RETURN(rc);
2399 }
2400
2401 static inline long ll_lease_type_from_fmode(fmode_t fmode)
2402 {
2403         return ((fmode & FMODE_READ) ? LL_LEASE_RDLCK : 0) |
2404                ((fmode & FMODE_WRITE) ? LL_LEASE_WRLCK : 0);
2405 }
2406
2407 static int ll_file_futimes_3(struct file *file, const struct ll_futimes_3 *lfu)
2408 {
2409         struct inode *inode = file_inode(file);
2410         struct iattr ia = {
2411                 .ia_valid = ATTR_ATIME | ATTR_ATIME_SET |
2412                             ATTR_MTIME | ATTR_MTIME_SET |
2413                             ATTR_CTIME | ATTR_CTIME_SET,
2414                 .ia_atime = {
2415                         .tv_sec = lfu->lfu_atime_sec,
2416                         .tv_nsec = lfu->lfu_atime_nsec,
2417                 },
2418                 .ia_mtime = {
2419                         .tv_sec = lfu->lfu_mtime_sec,
2420                         .tv_nsec = lfu->lfu_mtime_nsec,
2421                 },
2422                 .ia_ctime = {
2423                         .tv_sec = lfu->lfu_ctime_sec,
2424                         .tv_nsec = lfu->lfu_ctime_nsec,
2425                 },
2426         };
2427         int rc;
2428         ENTRY;
2429
2430         if (!capable(CAP_SYS_ADMIN))
2431                 RETURN(-EPERM);
2432
2433         if (!S_ISREG(inode->i_mode))
2434                 RETURN(-EINVAL);
2435
2436         inode_lock(inode);
2437         rc = ll_setattr_raw(file_dentry(file), &ia, false);
2438         inode_unlock(inode);
2439
2440         RETURN(rc);
2441 }
2442
2443 /*
2444  * Give file access advices
2445  *
2446  * The ladvise interface is similar to Linux fadvise() system call, except it
2447  * forwards the advices directly from Lustre client to server. The server side
2448  * codes will apply appropriate read-ahead and caching techniques for the
2449  * corresponding files.
2450  *
2451  * A typical workload for ladvise is e.g. a bunch of different clients are
2452  * doing small random reads of a file, so prefetching pages into OSS cache
2453  * with big linear reads before the random IO is a net benefit. Fetching
2454  * all that data into each client cache with fadvise() may not be, due to
2455  * much more data being sent to the client.
2456  */
2457 static int ll_ladvise(struct inode *inode, struct file *file, __u64 flags,
2458                       struct llapi_lu_ladvise *ladvise)
2459 {
2460         struct lu_env *env;
2461         struct cl_io *io;
2462         struct cl_ladvise_io *lio;
2463         int rc;
2464         __u16 refcheck;
2465         ENTRY;
2466
2467         env = cl_env_get(&refcheck);
2468         if (IS_ERR(env))
2469                 RETURN(PTR_ERR(env));
2470
2471         io = vvp_env_thread_io(env);
2472         io->ci_obj = ll_i2info(inode)->lli_clob;
2473
2474         /* initialize parameters for ladvise */
2475         lio = &io->u.ci_ladvise;
2476         lio->li_start = ladvise->lla_start;
2477         lio->li_end = ladvise->lla_end;
2478         lio->li_fid = ll_inode2fid(inode);
2479         lio->li_advice = ladvise->lla_advice;
2480         lio->li_flags = flags;
2481
2482         if (cl_io_init(env, io, CIT_LADVISE, io->ci_obj) == 0)
2483                 rc = cl_io_loop(env, io);
2484         else
2485                 rc = io->ci_result;
2486
2487         cl_io_fini(env, io);
2488         cl_env_put(env, &refcheck);
2489         RETURN(rc);
2490 }
2491
2492 int ll_ioctl_fsgetxattr(struct inode *inode, unsigned int cmd,
2493                         unsigned long arg)
2494 {
2495         struct fsxattr fsxattr;
2496
2497         if (copy_from_user(&fsxattr,
2498                            (const struct fsxattr __user *)arg,
2499                            sizeof(fsxattr)))
2500                 RETURN(-EFAULT);
2501
2502         fsxattr.fsx_projid = ll_i2info(inode)->lli_projid;
2503         if (copy_to_user((struct fsxattr __user *)arg,
2504                          &fsxattr, sizeof(fsxattr)))
2505                 RETURN(-EFAULT);
2506
2507         RETURN(0);
2508 }
2509
2510 int ll_ioctl_fssetxattr(struct inode *inode, unsigned int cmd,
2511                         unsigned long arg)
2512 {
2513
2514         struct md_op_data *op_data;
2515         struct ptlrpc_request *req = NULL;
2516         int rc = 0;
2517         struct fsxattr fsxattr;
2518
2519         /* only root could change project ID */
2520         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2521                 RETURN(-EPERM);
2522
2523         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2524                                      LUSTRE_OPC_ANY, NULL);
2525         if (IS_ERR(op_data))
2526                 RETURN(PTR_ERR(op_data));
2527
2528         if (copy_from_user(&fsxattr,
2529                            (const struct fsxattr __user *)arg,
2530                            sizeof(fsxattr)))
2531                 GOTO(out_fsxattr1, rc = -EFAULT);
2532
2533         op_data->op_projid = fsxattr.fsx_projid;
2534         op_data->op_attr.ia_valid |= MDS_ATTR_PROJID;
2535         rc = md_setattr(ll_i2sbi(inode)->ll_md_exp, op_data, NULL,
2536                         0, &req);
2537         ptlrpc_req_finished(req);
2538
2539 out_fsxattr1:
2540         ll_finish_md_op_data(op_data);
2541         RETURN(rc);
2542
2543
2544 }
2545
2546 static long
2547 ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
2548 {
2549         struct inode            *inode = file_inode(file);
2550         struct ll_file_data     *fd = LUSTRE_FPRIVATE(file);
2551         int                      flags, rc;
2552         ENTRY;
2553
2554         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), cmd=%x\n",
2555                PFID(ll_inode2fid(inode)), inode, cmd);
2556         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2557
2558         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2559         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2560                 RETURN(-ENOTTY);
2561
2562         switch(cmd) {
2563         case LL_IOC_GETFLAGS:
2564                 /* Get the current value of the file flags */
2565                 return put_user(fd->fd_flags, (int __user *)arg);
2566         case LL_IOC_SETFLAGS:
2567         case LL_IOC_CLRFLAGS:
2568                 /* Set or clear specific file flags */
2569                 /* XXX This probably needs checks to ensure the flags are
2570                  *     not abused, and to handle any flag side effects.
2571                  */
2572                 if (get_user(flags, (int __user *) arg))
2573                         RETURN(-EFAULT);
2574
2575                 if (cmd == LL_IOC_SETFLAGS) {
2576                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2577                             !(file->f_flags & O_DIRECT)) {
2578                                 CERROR("%s: unable to disable locking on "
2579                                        "non-O_DIRECT file\n", current->comm);
2580                                 RETURN(-EINVAL);
2581                         }
2582
2583                         fd->fd_flags |= flags;
2584                 } else {
2585                         fd->fd_flags &= ~flags;
2586                 }
2587                 RETURN(0);
2588         case LL_IOC_LOV_SETSTRIPE:
2589         case LL_IOC_LOV_SETSTRIPE_NEW:
2590                 RETURN(ll_lov_setstripe(inode, file, (void __user *)arg));
2591         case LL_IOC_LOV_SETEA:
2592                 RETURN(ll_lov_setea(inode, file, (void __user *)arg));
2593         case LL_IOC_LOV_SWAP_LAYOUTS: {
2594                 struct file *file2;
2595                 struct lustre_swap_layouts lsl;
2596
2597                 if (copy_from_user(&lsl, (char __user *)arg,
2598                                    sizeof(struct lustre_swap_layouts)))
2599                         RETURN(-EFAULT);
2600
2601                 if ((file->f_flags & O_ACCMODE) == O_RDONLY)
2602                         RETURN(-EPERM);
2603
2604                 file2 = fget(lsl.sl_fd);
2605                 if (file2 == NULL)
2606                         RETURN(-EBADF);
2607
2608                 /* O_WRONLY or O_RDWR */
2609                 if ((file2->f_flags & O_ACCMODE) == O_RDONLY)
2610                         GOTO(out, rc = -EPERM);
2611
2612                 if (lsl.sl_flags & SWAP_LAYOUTS_CLOSE) {
2613                         struct inode                    *inode2;
2614                         struct ll_inode_info            *lli;
2615                         struct obd_client_handle        *och = NULL;
2616
2617                         if (lsl.sl_flags != SWAP_LAYOUTS_CLOSE)
2618                                 GOTO(out, rc = -EINVAL);
2619
2620                         lli = ll_i2info(inode);
2621                         mutex_lock(&lli->lli_och_mutex);
2622                         if (fd->fd_lease_och != NULL) {
2623                                 och = fd->fd_lease_och;
2624                                 fd->fd_lease_och = NULL;
2625                         }
2626                         mutex_unlock(&lli->lli_och_mutex);
2627                         if (och == NULL)
2628                                 GOTO(out, rc = -ENOLCK);
2629                         inode2 = file_inode(file2);
2630                         rc = ll_swap_layouts_close(och, inode, inode2);
2631                 } else {
2632                         rc = ll_swap_layouts(file, file2, &lsl);
2633                 }
2634 out:
2635                 fput(file2);
2636                 RETURN(rc);
2637         }
2638         case LL_IOC_LOV_GETSTRIPE:
2639         case LL_IOC_LOV_GETSTRIPE_NEW:
2640                 RETURN(ll_file_getstripe(inode, (void __user *)arg, 0));
2641         case FSFILT_IOC_GETFLAGS:
2642         case FSFILT_IOC_SETFLAGS:
2643                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2644         case FSFILT_IOC_GETVERSION_OLD:
2645         case FSFILT_IOC_GETVERSION:
2646                 RETURN(put_user(inode->i_generation, (int __user *)arg));
2647         case LL_IOC_GROUP_LOCK:
2648                 RETURN(ll_get_grouplock(inode, file, arg));
2649         case LL_IOC_GROUP_UNLOCK:
2650                 RETURN(ll_put_grouplock(inode, file, arg));
2651         case IOC_OBD_STATFS:
2652                 RETURN(ll_obd_statfs(inode, (void __user *)arg));
2653
2654         /* We need to special case any other ioctls we want to handle,
2655          * to send them to the MDS/OST as appropriate and to properly
2656          * network encode the arg field.
2657         case FSFILT_IOC_SETVERSION_OLD:
2658         case FSFILT_IOC_SETVERSION:
2659         */
2660         case LL_IOC_FLUSHCTX:
2661                 RETURN(ll_flush_ctx(inode));
2662         case LL_IOC_PATH2FID: {
2663                 if (copy_to_user((void __user *)arg, ll_inode2fid(inode),
2664                                  sizeof(struct lu_fid)))
2665                         RETURN(-EFAULT);
2666
2667                 RETURN(0);
2668         }
2669         case LL_IOC_GETPARENT:
2670                 RETURN(ll_getparent(file, (struct getparent __user *)arg));
2671
2672         case OBD_IOC_FID2PATH:
2673                 RETURN(ll_fid2path(inode, (void __user *)arg));
2674         case LL_IOC_DATA_VERSION: {
2675                 struct ioc_data_version idv;
2676                 int rc;
2677
2678                 if (copy_from_user(&idv, (char __user *)arg, sizeof(idv)))
2679                         RETURN(-EFAULT);
2680
2681                 idv.idv_flags &= LL_DV_RD_FLUSH | LL_DV_WR_FLUSH;
2682                 rc = ll_data_version(inode, &idv.idv_version, idv.idv_flags);
2683
2684                 if (rc == 0 &&
2685                     copy_to_user((char __user *)arg, &idv, sizeof(idv)))
2686                         RETURN(-EFAULT);
2687
2688                 RETURN(rc);
2689         }
2690
2691         case LL_IOC_GET_MDTIDX: {
2692                 int mdtidx;
2693
2694                 mdtidx = ll_get_mdt_idx(inode);
2695                 if (mdtidx < 0)
2696                         RETURN(mdtidx);
2697
2698                 if (put_user((int)mdtidx, (int __user *)arg))
2699                         RETURN(-EFAULT);
2700
2701                 RETURN(0);
2702         }
2703         case OBD_IOC_GETDTNAME:
2704         case OBD_IOC_GETMDNAME:
2705                 RETURN(ll_get_obd_name(inode, cmd, arg));
2706         case LL_IOC_HSM_STATE_GET: {
2707                 struct md_op_data       *op_data;
2708                 struct hsm_user_state   *hus;
2709                 int                      rc;
2710
2711                 OBD_ALLOC_PTR(hus);
2712                 if (hus == NULL)
2713                         RETURN(-ENOMEM);
2714
2715                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2716                                              LUSTRE_OPC_ANY, hus);
2717                 if (IS_ERR(op_data)) {
2718                         OBD_FREE_PTR(hus);
2719                         RETURN(PTR_ERR(op_data));
2720                 }
2721
2722                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2723                                    op_data, NULL);
2724
2725                 if (copy_to_user((void __user *)arg, hus, sizeof(*hus)))
2726                         rc = -EFAULT;
2727
2728                 ll_finish_md_op_data(op_data);
2729                 OBD_FREE_PTR(hus);
2730                 RETURN(rc);
2731         }
2732         case LL_IOC_HSM_STATE_SET: {
2733                 struct hsm_state_set    *hss;
2734                 int                      rc;
2735
2736                 OBD_ALLOC_PTR(hss);
2737                 if (hss == NULL)
2738                         RETURN(-ENOMEM);
2739
2740                 if (copy_from_user(hss, (char __user *)arg, sizeof(*hss))) {
2741                         OBD_FREE_PTR(hss);
2742                         RETURN(-EFAULT);
2743                 }
2744
2745                 rc = ll_hsm_state_set(inode, hss);
2746
2747                 OBD_FREE_PTR(hss);
2748                 RETURN(rc);
2749         }
2750         case LL_IOC_HSM_ACTION: {
2751                 struct md_op_data               *op_data;
2752                 struct hsm_current_action       *hca;
2753                 int                              rc;
2754
2755                 OBD_ALLOC_PTR(hca);
2756                 if (hca == NULL)
2757                         RETURN(-ENOMEM);
2758
2759                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2760                                              LUSTRE_OPC_ANY, hca);
2761                 if (IS_ERR(op_data)) {
2762                         OBD_FREE_PTR(hca);
2763                         RETURN(PTR_ERR(op_data));
2764                 }
2765
2766                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2767                                    op_data, NULL);
2768
2769                 if (copy_to_user((char __user *)arg, hca, sizeof(*hca)))
2770                         rc = -EFAULT;
2771
2772                 ll_finish_md_op_data(op_data);
2773                 OBD_FREE_PTR(hca);
2774                 RETURN(rc);
2775         }
2776         case LL_IOC_SET_LEASE: {
2777                 struct ll_inode_info *lli = ll_i2info(inode);
2778                 struct obd_client_handle *och = NULL;
2779                 bool lease_broken;
2780                 fmode_t fmode;
2781
2782                 switch (arg) {
2783                 case LL_LEASE_WRLCK:
2784                         if (!(file->f_mode & FMODE_WRITE))
2785                                 RETURN(-EPERM);
2786                         fmode = FMODE_WRITE;
2787                         break;
2788                 case LL_LEASE_RDLCK:
2789                         if (!(file->f_mode & FMODE_READ))
2790                                 RETURN(-EPERM);
2791                         fmode = FMODE_READ;
2792                         break;
2793                 case LL_LEASE_UNLCK:
2794                         mutex_lock(&lli->lli_och_mutex);
2795                         if (fd->fd_lease_och != NULL) {
2796                                 och = fd->fd_lease_och;
2797                                 fd->fd_lease_och = NULL;
2798                         }
2799                         mutex_unlock(&lli->lli_och_mutex);
2800
2801                         if (och == NULL)
2802                                 RETURN(-ENOLCK);
2803
2804                         fmode = och->och_flags;
2805                         rc = ll_lease_close(och, inode, &lease_broken);
2806                         if (rc < 0)
2807                                 RETURN(rc);
2808
2809                         rc = ll_lease_och_release(inode, file);
2810                         if (rc < 0)
2811                                 RETURN(rc);
2812
2813                         if (lease_broken)
2814                                 fmode = 0;
2815
2816                         RETURN(ll_lease_type_from_fmode(fmode));
2817                 default:
2818                         RETURN(-EINVAL);
2819                 }
2820
2821                 CDEBUG(D_INODE, "Set lease with mode %u\n", fmode);
2822
2823                 /* apply for lease */
2824                 och = ll_lease_open(inode, file, fmode, 0);
2825                 if (IS_ERR(och))
2826                         RETURN(PTR_ERR(och));
2827
2828                 rc = 0;
2829                 mutex_lock(&lli->lli_och_mutex);
2830                 if (fd->fd_lease_och == NULL) {
2831                         fd->fd_lease_och = och;
2832                         och = NULL;
2833                 }
2834                 mutex_unlock(&lli->lli_och_mutex);
2835                 if (och != NULL) {
2836                         /* impossible now that only excl is supported for now */
2837                         ll_lease_close(och, inode, &lease_broken);
2838                         rc = -EBUSY;
2839                 }
2840                 RETURN(rc);
2841         }
2842         case LL_IOC_GET_LEASE: {
2843                 struct ll_inode_info *lli = ll_i2info(inode);
2844                 struct ldlm_lock *lock = NULL;
2845                 fmode_t fmode = 0;
2846
2847                 mutex_lock(&lli->lli_och_mutex);
2848                 if (fd->fd_lease_och != NULL) {
2849                         struct obd_client_handle *och = fd->fd_lease_och;
2850
2851                         lock = ldlm_handle2lock(&och->och_lease_handle);
2852                         if (lock != NULL) {
2853                                 lock_res_and_lock(lock);
2854                                 if (!ldlm_is_cancel(lock))
2855                                         fmode = och->och_flags;
2856
2857                                 unlock_res_and_lock(lock);
2858                                 LDLM_LOCK_PUT(lock);
2859                         }
2860                 }
2861                 mutex_unlock(&lli->lli_och_mutex);
2862
2863                 RETURN(ll_lease_type_from_fmode(fmode));
2864         }
2865         case LL_IOC_HSM_IMPORT: {
2866                 struct hsm_user_import *hui;
2867
2868                 OBD_ALLOC_PTR(hui);
2869                 if (hui == NULL)
2870                         RETURN(-ENOMEM);
2871
2872                 if (copy_from_user(hui, (void __user *)arg, sizeof(*hui))) {
2873                         OBD_FREE_PTR(hui);
2874                         RETURN(-EFAULT);
2875                 }
2876
2877                 rc = ll_hsm_import(inode, file, hui);
2878
2879                 OBD_FREE_PTR(hui);
2880                 RETURN(rc);
2881         }
2882         case LL_IOC_FUTIMES_3: {
2883                 struct ll_futimes_3 lfu;
2884
2885                 if (copy_from_user(&lfu,
2886                                    (const struct ll_futimes_3 __user *)arg,
2887                                    sizeof(lfu)))
2888                         RETURN(-EFAULT);
2889
2890                 RETURN(ll_file_futimes_3(file, &lfu));
2891         }
2892         case LL_IOC_LADVISE: {
2893                 struct llapi_ladvise_hdr *ladvise_hdr;
2894                 int i;
2895                 int num_advise;
2896                 int alloc_size = sizeof(*ladvise_hdr);
2897
2898                 rc = 0;
2899                 OBD_ALLOC_PTR(ladvise_hdr);
2900                 if (ladvise_hdr == NULL)
2901                         RETURN(-ENOMEM);
2902
2903                 if (copy_from_user(ladvise_hdr,
2904                                    (const struct llapi_ladvise_hdr __user *)arg,
2905                                    alloc_size))
2906                         GOTO(out_ladvise, rc = -EFAULT);
2907
2908                 if (ladvise_hdr->lah_magic != LADVISE_MAGIC ||
2909                     ladvise_hdr->lah_count < 1)
2910                         GOTO(out_ladvise, rc = -EINVAL);
2911
2912                 num_advise = ladvise_hdr->lah_count;
2913                 if (num_advise >= LAH_COUNT_MAX)
2914                         GOTO(out_ladvise, rc = -EFBIG);
2915
2916                 OBD_FREE_PTR(ladvise_hdr);
2917                 alloc_size = offsetof(typeof(*ladvise_hdr),
2918                                       lah_advise[num_advise]);
2919                 OBD_ALLOC(ladvise_hdr, alloc_size);
2920                 if (ladvise_hdr == NULL)
2921                         RETURN(-ENOMEM);
2922
2923                 /*
2924                  * TODO: submit multiple advices to one server in a single RPC
2925                  */
2926                 if (copy_from_user(ladvise_hdr,
2927                                    (const struct llapi_ladvise_hdr __user *)arg,
2928                                    alloc_size))
2929                         GOTO(out_ladvise, rc = -EFAULT);
2930
2931                 for (i = 0; i < num_advise; i++) {
2932                         rc = ll_ladvise(inode, file, ladvise_hdr->lah_flags,
2933                                         &ladvise_hdr->lah_advise[i]);
2934                         if (rc)
2935                                 break;
2936                 }
2937
2938 out_ladvise:
2939                 OBD_FREE(ladvise_hdr, alloc_size);
2940                 RETURN(rc);
2941         }
2942         case LL_IOC_FSGETXATTR:
2943                 RETURN(ll_ioctl_fsgetxattr(inode, cmd, arg));
2944         case LL_IOC_FSSETXATTR:
2945                 RETURN(ll_ioctl_fssetxattr(inode, cmd, arg));
2946         default: {
2947                 int err;
2948
2949                 if (LLIOC_STOP ==
2950                      ll_iocontrol_call(inode, file, cmd, arg, &err))
2951                         RETURN(err);
2952
2953                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2954                                      (void __user *)arg));
2955         }
2956         }
2957 }
2958
2959 #ifndef HAVE_FILE_LLSEEK_SIZE
2960 static inline loff_t
2961 llseek_execute(struct file *file, loff_t offset, loff_t maxsize)
2962 {
2963         if (offset < 0 && !(file->f_mode & FMODE_UNSIGNED_OFFSET))
2964                 return -EINVAL;
2965         if (offset > maxsize)
2966                 return -EINVAL;
2967
2968         if (offset != file->f_pos) {
2969                 file->f_pos = offset;
2970                 file->f_version = 0;
2971         }
2972         return offset;
2973 }
2974
2975 static loff_t
2976 generic_file_llseek_size(struct file *file, loff_t offset, int origin,
2977                 loff_t maxsize, loff_t eof)
2978 {
2979         struct inode *inode = file_inode(file);
2980
2981         switch (origin) {
2982         case SEEK_END:
2983                 offset += eof;
2984                 break;
2985         case SEEK_CUR:
2986                 /*
2987                  * Here we special-case the lseek(fd, 0, SEEK_CUR)
2988                  * position-querying operation.  Avoid rewriting the "same"
2989                  * f_pos value back to the file because a concurrent read(),
2990                  * write() or lseek() might have altered it
2991                  */
2992                 if (offset == 0)
2993                         return file->f_pos;
2994                 /*
2995                  * f_lock protects against read/modify/write race with other
2996                  * SEEK_CURs. Note that parallel writes and reads behave
2997                  * like SEEK_SET.
2998                  */
2999                 inode_lock(inode);
3000                 offset = llseek_execute(file, file->f_pos + offset, maxsize);
3001                 inode_unlock(inode);
3002                 return offset;
3003         case SEEK_DATA:
3004                 /*
3005                  * In the generic case the entire file is data, so as long as
3006                  * offset isn't at the end of the file then the offset is data.
3007                  */
3008                 if (offset >= eof)
3009                         return -ENXIO;
3010                 break;
3011         case SEEK_HOLE:
3012                 /*
3013                  * There is a virtual hole at the end of the file, so as long as
3014                  * offset isn't i_size or larger, return i_size.
3015                  */
3016                 if (offset >= eof)
3017                         return -ENXIO;
3018                 offset = eof;
3019                 break;
3020         }
3021
3022         return llseek_execute(file, offset, maxsize);
3023 }
3024 #endif
3025
3026 static loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
3027 {
3028         struct inode *inode = file_inode(file);
3029         loff_t retval, eof = 0;
3030
3031         ENTRY;
3032         retval = offset + ((origin == SEEK_END) ? i_size_read(inode) :
3033                            (origin == SEEK_CUR) ? file->f_pos : 0);
3034         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), to=%llu=%#llx(%d)\n",
3035                PFID(ll_inode2fid(inode)), inode, retval, retval,
3036                origin);
3037         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
3038
3039         if (origin == SEEK_END || origin == SEEK_HOLE || origin == SEEK_DATA) {
3040                 retval = ll_glimpse_size(inode);
3041                 if (retval != 0)
3042                         RETURN(retval);
3043                 eof = i_size_read(inode);
3044         }
3045
3046         retval = ll_generic_file_llseek_size(file, offset, origin,
3047                                           ll_file_maxbytes(inode), eof);
3048         RETURN(retval);
3049 }
3050
3051 static int ll_flush(struct file *file, fl_owner_t id)
3052 {
3053         struct inode *inode = file_inode(file);
3054         struct ll_inode_info *lli = ll_i2info(inode);
3055         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
3056         int rc, err;
3057
3058         LASSERT(!S_ISDIR(inode->i_mode));
3059
3060         /* catch async errors that were recorded back when async writeback
3061          * failed for pages in this mapping. */
3062         rc = lli->lli_async_rc;
3063         lli->lli_async_rc = 0;
3064         if (lli->lli_clob != NULL) {
3065                 err = lov_read_and_clear_async_rc(lli->lli_clob);
3066                 if (rc == 0)
3067                         rc = err;
3068         }
3069
3070         /* The application has been told write failure already.
3071          * Do not report failure again. */
3072         if (fd->fd_write_failed)
3073                 return 0;
3074         return rc ? -EIO : 0;
3075 }
3076
3077 /**
3078  * Called to make sure a portion of file has been written out.
3079  * if @mode is not CL_FSYNC_LOCAL, it will send OST_SYNC RPCs to OST.
3080  *
3081  * Return how many pages have been written.
3082  */
3083 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
3084                        enum cl_fsync_mode mode, int ignore_layout)
3085 {
3086         struct lu_env *env;
3087         struct cl_io *io;
3088         struct cl_fsync_io *fio;
3089         int result;
3090         __u16 refcheck;
3091         ENTRY;
3092
3093         if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
3094             mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
3095                 RETURN(-EINVAL);
3096
3097         env = cl_env_get(&refcheck);
3098         if (IS_ERR(env))
3099                 RETURN(PTR_ERR(env));
3100
3101         io = vvp_env_thread_io(env);
3102         io->ci_obj = ll_i2info(inode)->lli_clob;
3103         io->ci_ignore_layout = ignore_layout;
3104
3105         /* initialize parameters for sync */
3106         fio = &io->u.ci_fsync;
3107         fio->fi_start = start;
3108         fio->fi_end = end;
3109         fio->fi_fid = ll_inode2fid(inode);
3110         fio->fi_mode = mode;
3111         fio->fi_nr_written = 0;
3112
3113         if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
3114                 result = cl_io_loop(env, io);
3115         else
3116                 result = io->ci_result;
3117         if (result == 0)
3118                 result = fio->fi_nr_written;
3119         cl_io_fini(env, io);
3120         cl_env_put(env, &refcheck);
3121
3122         RETURN(result);
3123 }
3124
3125 /*
3126  * When dentry is provided (the 'else' case), file_dentry() may be
3127  * null and dentry must be used directly rather than pulled from
3128  * file_dentry() as is done otherwise.
3129  */
3130
3131 #ifdef HAVE_FILE_FSYNC_4ARGS
3132 int ll_fsync(struct file *file, loff_t start, loff_t end, int datasync)
3133 {
3134         struct dentry *dentry = file_dentry(file);
3135         bool lock_inode;
3136 #elif defined(HAVE_FILE_FSYNC_2ARGS)
3137 int ll_fsync(struct file *file, int datasync)
3138 {
3139         struct dentry *dentry = file_dentry(file);
3140         loff_t start = 0;
3141         loff_t end = LLONG_MAX;
3142 #else
3143 int ll_fsync(struct file *file, struct dentry *dentry, int datasync)
3144 {
3145         loff_t start = 0;
3146         loff_t end = LLONG_MAX;
3147 #endif
3148         struct inode *inode = dentry->d_inode;
3149         struct ll_inode_info *lli = ll_i2info(inode);
3150         struct ptlrpc_request *req;
3151         int rc, err;
3152         ENTRY;
3153
3154         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p)\n",
3155                PFID(ll_inode2fid(inode)), inode);
3156         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
3157
3158 #ifdef HAVE_FILE_FSYNC_4ARGS
3159         rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
3160         lock_inode = !lli->lli_inode_locked;
3161         if (lock_inode)
3162                 inode_lock(inode);
3163 #else
3164         /* fsync's caller has already called _fdata{sync,write}, we want
3165          * that IO to finish before calling the osc and mdc sync methods */
3166         rc = filemap_fdatawait(inode->i_mapping);
3167 #endif
3168
3169         /* catch async errors that were recorded back when async writeback
3170          * failed for pages in this mapping. */
3171         if (!S_ISDIR(inode->i_mode)) {
3172                 err = lli->lli_async_rc;
3173                 lli->lli_async_rc = 0;
3174                 if (rc == 0)
3175                         rc = err;
3176                 if (lli->lli_clob != NULL) {
3177                         err = lov_read_and_clear_async_rc(lli->lli_clob);
3178                         if (rc == 0)
3179                                 rc = err;
3180                 }
3181         }
3182
3183         err = md_fsync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), &req);
3184         if (!rc)
3185                 rc = err;
3186         if (!err)
3187                 ptlrpc_req_finished(req);
3188
3189         if (S_ISREG(inode->i_mode)) {
3190                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
3191
3192                 err = cl_sync_file_range(inode, start, end, CL_FSYNC_ALL, 0);
3193                 if (rc == 0 && err < 0)
3194                         rc = err;
3195                 if (rc < 0)
3196                         fd->fd_write_failed = true;
3197                 else
3198                         fd->fd_write_failed = false;
3199         }
3200
3201 #ifdef HAVE_FILE_FSYNC_4ARGS
3202         if (lock_inode)
3203                 inode_unlock(inode);
3204 #endif
3205         RETURN(rc);
3206 }
3207
3208 static int
3209 ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
3210 {
3211         struct inode *inode = file_inode(file);
3212         struct ll_sb_info *sbi = ll_i2sbi(inode);
3213         struct ldlm_enqueue_info einfo = {
3214                 .ei_type        = LDLM_FLOCK,
3215                 .ei_cb_cp       = ldlm_flock_completion_ast,
3216                 .ei_cbdata      = file_lock,
3217         };
3218         struct md_op_data *op_data;
3219         struct lustre_handle lockh = { 0 };
3220         union ldlm_policy_data flock = { { 0 } };
3221         int fl_type = file_lock->fl_type;
3222         __u64 flags = 0;
3223         int rc;
3224         int rc2 = 0;
3225         ENTRY;
3226
3227         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID" file_lock=%p\n",
3228                PFID(ll_inode2fid(inode)), file_lock);
3229
3230         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
3231
3232         if (file_lock->fl_flags & FL_FLOCK) {
3233                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
3234                 /* flocks are whole-file locks */
3235                 flock.l_flock.end = OFFSET_MAX;
3236                 /* For flocks owner is determined by the local file desctiptor*/
3237                 flock.l_flock.owner = (unsigned long)file_lock->fl_file;
3238         } else if (file_lock->fl_flags & FL_POSIX) {
3239                 flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
3240                 flock.l_flock.start = file_lock->fl_start;
3241                 flock.l_flock.end = file_lock->fl_end;
3242         } else {
3243                 RETURN(-EINVAL);
3244         }
3245         flock.l_flock.pid = file_lock->fl_pid;
3246
3247         /* Somewhat ugly workaround for svc lockd.
3248          * lockd installs custom fl_lmops->lm_compare_owner that checks
3249          * for the fl_owner to be the same (which it always is on local node
3250          * I guess between lockd processes) and then compares pid.
3251          * As such we assign pid to the owner field to make it all work,
3252          * conflict with normal locks is unlikely since pid space and
3253          * pointer space for current->files are not intersecting */
3254         if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
3255                 flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
3256
3257         switch (fl_type) {
3258         case F_RDLCK:
3259                 einfo.ei_mode = LCK_PR;
3260                 break;
3261         case F_UNLCK:
3262                 /* An unlock request may or may not have any relation to
3263                  * existing locks so we may not be able to pass a lock handle
3264                  * via a normal ldlm_lock_cancel() request. The request may even
3265                  * unlock a byte range in the middle of an existing lock. In
3266                  * order to process an unlock request we need all of the same
3267                  * information that is given with a normal read or write record
3268                  * lock request. To avoid creating another ldlm unlock (cancel)
3269                  * message we'll treat a LCK_NL flock request as an unlock. */
3270                 einfo.ei_mode = LCK_NL;
3271                 break;
3272         case F_WRLCK:
3273                 einfo.ei_mode = LCK_PW;
3274                 break;
3275         default:
3276                 CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n", fl_type);
3277                 RETURN (-ENOTSUPP);
3278         }
3279
3280         switch (cmd) {
3281         case F_SETLKW:
3282 #ifdef F_SETLKW64
3283         case F_SETLKW64:
3284 #endif
3285                 flags = 0;
3286                 break;
3287         case F_SETLK:
3288 #ifdef F_SETLK64
3289         case F_SETLK64:
3290 #endif
3291                 flags = LDLM_FL_BLOCK_NOWAIT;
3292                 break;
3293         case F_GETLK:
3294 #ifdef F_GETLK64
3295         case F_GETLK64:
3296 #endif
3297                 flags = LDLM_FL_TEST_LOCK;
3298                 break;
3299         default:
3300                 CERROR("unknown fcntl lock command: %d\n", cmd);
3301                 RETURN (-EINVAL);
3302         }
3303
3304         /* Save the old mode so that if the mode in the lock changes we
3305          * can decrement the appropriate reader or writer refcount. */
3306         file_lock->fl_type = einfo.ei_mode;
3307
3308         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
3309                                      LUSTRE_OPC_ANY, NULL);
3310         if (IS_ERR(op_data))
3311                 RETURN(PTR_ERR(op_data));
3312
3313         CDEBUG(D_DLMTRACE, "inode="DFID", pid=%u, flags=%#llx, mode=%u, "
3314                "start=%llu, end=%llu\n", PFID(ll_inode2fid(inode)),
3315                flock.l_flock.pid, flags, einfo.ei_mode,
3316                flock.l_flock.start, flock.l_flock.end);
3317
3318         rc = md_enqueue(sbi->ll_md_exp, &einfo, &flock, op_data, &lockh,
3319                         flags);
3320
3321         /* Restore the file lock type if not TEST lock. */
3322         if (!(flags & LDLM_FL_TEST_LOCK))
3323                 file_lock->fl_type = fl_type;
3324
3325 #ifdef HAVE_LOCKS_LOCK_FILE_WAIT
3326         if ((rc == 0 || file_lock->fl_type == F_UNLCK) &&
3327             !(flags & LDLM_FL_TEST_LOCK))
3328                 rc2  = locks_lock_file_wait(file, file_lock);
3329 #else
3330         if ((file_lock->fl_flags & FL_FLOCK) &&
3331             (rc == 0 || file_lock->fl_type == F_UNLCK))
3332                 rc2  = flock_lock_file_wait(file, file_lock);
3333         if ((file_lock->fl_flags & FL_POSIX) &&