Whamcloud - gitweb
LU-8653 lod: use stripe_count instead of stripe_nr
[fs/lustre-release.git] / lustre / llite / file.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/llite/file.c
33  *
34  * Author: Peter Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_LLITE
40 #include <lustre_dlm.h>
41 #include <linux/pagemap.h>
42 #include <linux/file.h>
43 #include <linux/sched.h>
44 #include <linux/user_namespace.h>
45 #ifdef HAVE_UIDGID_HEADER
46 # include <linux/uidgid.h>
47 #endif
48
49 #include <uapi/linux/lustre/lustre_ioctl.h>
50 #include <lustre_swab.h>
51
52 #include "cl_object.h"
53 #include "llite_internal.h"
54 #include "vvp_internal.h"
55
56 static int
57 ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg);
58
59 static int ll_lease_close(struct obd_client_handle *och, struct inode *inode,
60                           bool *lease_broken);
61
62 static struct ll_file_data *ll_file_data_get(void)
63 {
64         struct ll_file_data *fd;
65
66         OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, GFP_NOFS);
67         if (fd == NULL)
68                 return NULL;
69
70         fd->fd_write_failed = false;
71
72         return fd;
73 }
74
75 static void ll_file_data_put(struct ll_file_data *fd)
76 {
77         if (fd != NULL)
78                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
79 }
80
81 /**
82  * Packs all the attributes into @op_data for the CLOSE rpc.
83  */
84 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
85                              struct obd_client_handle *och)
86 {
87         ENTRY;
88
89         ll_prep_md_op_data(op_data, inode, NULL, NULL,
90                            0, 0, LUSTRE_OPC_ANY, NULL);
91
92         op_data->op_attr.ia_mode = inode->i_mode;
93         op_data->op_attr.ia_atime = inode->i_atime;
94         op_data->op_attr.ia_mtime = inode->i_mtime;
95         op_data->op_attr.ia_ctime = inode->i_ctime;
96         op_data->op_attr.ia_size = i_size_read(inode);
97         op_data->op_attr.ia_valid |= ATTR_MODE | ATTR_ATIME | ATTR_ATIME_SET |
98                                      ATTR_MTIME | ATTR_MTIME_SET |
99                                      ATTR_CTIME | ATTR_CTIME_SET;
100         op_data->op_attr_blocks = inode->i_blocks;
101         op_data->op_attr_flags = ll_inode_to_ext_flags(inode->i_flags);
102         op_data->op_handle = och->och_fh;
103
104         if (och->och_flags & FMODE_WRITE &&
105             ll_file_test_and_clear_flag(ll_i2info(inode), LLIF_DATA_MODIFIED))
106                 /* For HSM: if inode data has been modified, pack it so that
107                  * MDT can set data dirty flag in the archive. */
108                 op_data->op_bias |= MDS_DATA_MODIFIED;
109
110         EXIT;
111 }
112
113 /**
114  * Perform a close, possibly with a bias.
115  * The meaning of "data" depends on the value of "bias".
116  *
117  * If \a bias is MDS_HSM_RELEASE then \a data is a pointer to the data version.
118  * If \a bias is MDS_CLOSE_LAYOUT_SWAP then \a data is a pointer to the inode to
119  * swap layouts with.
120  */
121 static int ll_close_inode_openhandle(struct inode *inode,
122                                      struct obd_client_handle *och,
123                                      enum mds_op_bias bias, void *data)
124 {
125         struct obd_export *md_exp = ll_i2mdexp(inode);
126         const struct ll_inode_info *lli = ll_i2info(inode);
127         struct md_op_data *op_data;
128         struct ptlrpc_request *req = NULL;
129         int rc;
130         ENTRY;
131
132         if (class_exp2obd(md_exp) == NULL) {
133                 CERROR("%s: invalid MDC connection handle closing "DFID"\n",
134                        ll_get_fsname(inode->i_sb, NULL, 0),
135                        PFID(&lli->lli_fid));
136                 GOTO(out, rc = 0);
137         }
138
139         OBD_ALLOC_PTR(op_data);
140         /* We leak openhandle and request here on error, but not much to be
141          * done in OOM case since app won't retry close on error either. */
142         if (op_data == NULL)
143                 GOTO(out, rc = -ENOMEM);
144
145         ll_prepare_close(inode, op_data, och);
146         switch (bias) {
147         case MDS_CLOSE_LAYOUT_SWAP:
148                 LASSERT(data != NULL);
149                 op_data->op_bias |= MDS_CLOSE_LAYOUT_SWAP;
150                 op_data->op_data_version = 0;
151                 op_data->op_lease_handle = och->och_lease_handle;
152                 op_data->op_fid2 = *ll_inode2fid(data);
153                 break;
154
155         case MDS_HSM_RELEASE:
156                 LASSERT(data != NULL);
157                 op_data->op_bias |= MDS_HSM_RELEASE;
158                 op_data->op_data_version = *(__u64 *)data;
159                 op_data->op_lease_handle = och->och_lease_handle;
160                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
161                 break;
162
163         default:
164                 LASSERT(data == NULL);
165                 break;
166         }
167
168         rc = md_close(md_exp, op_data, och->och_mod, &req);
169         if (rc != 0 && rc != -EINTR)
170                 CERROR("%s: inode "DFID" mdc close failed: rc = %d\n",
171                        md_exp->exp_obd->obd_name, PFID(&lli->lli_fid), rc);
172
173         if (rc == 0 &&
174             op_data->op_bias & (MDS_HSM_RELEASE | MDS_CLOSE_LAYOUT_SWAP)) {
175                 struct mdt_body *body;
176
177                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
178                 if (!(body->mbo_valid & OBD_MD_CLOSE_INTENT_EXECED))
179                         rc = -EBUSY;
180         }
181
182         ll_finish_md_op_data(op_data);
183         EXIT;
184 out:
185
186         md_clear_open_replay_data(md_exp, och);
187         och->och_fh.cookie = DEAD_HANDLE_MAGIC;
188         OBD_FREE_PTR(och);
189
190         ptlrpc_req_finished(req);       /* This is close request */
191         return rc;
192 }
193
194 int ll_md_real_close(struct inode *inode, fmode_t fmode)
195 {
196         struct ll_inode_info *lli = ll_i2info(inode);
197         struct obd_client_handle **och_p;
198         struct obd_client_handle *och;
199         __u64 *och_usecount;
200         int rc = 0;
201         ENTRY;
202
203         if (fmode & FMODE_WRITE) {
204                 och_p = &lli->lli_mds_write_och;
205                 och_usecount = &lli->lli_open_fd_write_count;
206         } else if (fmode & FMODE_EXEC) {
207                 och_p = &lli->lli_mds_exec_och;
208                 och_usecount = &lli->lli_open_fd_exec_count;
209         } else {
210                 LASSERT(fmode & FMODE_READ);
211                 och_p = &lli->lli_mds_read_och;
212                 och_usecount = &lli->lli_open_fd_read_count;
213         }
214
215         mutex_lock(&lli->lli_och_mutex);
216         if (*och_usecount > 0) {
217                 /* There are still users of this handle, so skip
218                  * freeing it. */
219                 mutex_unlock(&lli->lli_och_mutex);
220                 RETURN(0);
221         }
222
223         och = *och_p;
224         *och_p = NULL;
225         mutex_unlock(&lli->lli_och_mutex);
226
227         if (och != NULL) {
228                 /* There might be a race and this handle may already
229                  * be closed. */
230                 rc = ll_close_inode_openhandle(inode, och, 0, NULL);
231         }
232
233         RETURN(rc);
234 }
235
236 static int ll_md_close(struct inode *inode, struct file *file)
237 {
238         union ldlm_policy_data policy = {
239                 .l_inodebits    = { MDS_INODELOCK_OPEN },
240         };
241         __u64 flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
242         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
243         struct ll_inode_info *lli = ll_i2info(inode);
244         struct lustre_handle lockh;
245         enum ldlm_mode lockmode;
246         int rc = 0;
247         ENTRY;
248
249         /* clear group lock, if present */
250         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
251                 ll_put_grouplock(inode, file, fd->fd_grouplock.lg_gid);
252
253         if (fd->fd_lease_och != NULL) {
254                 bool lease_broken;
255
256                 /* Usually the lease is not released when the
257                  * application crashed, we need to release here. */
258                 rc = ll_lease_close(fd->fd_lease_och, inode, &lease_broken);
259                 CDEBUG(rc ? D_ERROR : D_INODE, "Clean up lease "DFID" %d/%d\n",
260                         PFID(&lli->lli_fid), rc, lease_broken);
261
262                 fd->fd_lease_och = NULL;
263         }
264
265         if (fd->fd_och != NULL) {
266                 rc = ll_close_inode_openhandle(inode, fd->fd_och, 0, NULL);
267                 fd->fd_och = NULL;
268                 GOTO(out, rc);
269         }
270
271         /* Let's see if we have good enough OPEN lock on the file and if
272            we can skip talking to MDS */
273         mutex_lock(&lli->lli_och_mutex);
274         if (fd->fd_omode & FMODE_WRITE) {
275                 lockmode = LCK_CW;
276                 LASSERT(lli->lli_open_fd_write_count);
277                 lli->lli_open_fd_write_count--;
278         } else if (fd->fd_omode & FMODE_EXEC) {
279                 lockmode = LCK_PR;
280                 LASSERT(lli->lli_open_fd_exec_count);
281                 lli->lli_open_fd_exec_count--;
282         } else {
283                 lockmode = LCK_CR;
284                 LASSERT(lli->lli_open_fd_read_count);
285                 lli->lli_open_fd_read_count--;
286         }
287         mutex_unlock(&lli->lli_och_mutex);
288
289         if (!md_lock_match(ll_i2mdexp(inode), flags, ll_inode2fid(inode),
290                            LDLM_IBITS, &policy, lockmode, &lockh))
291                 rc = ll_md_real_close(inode, fd->fd_omode);
292
293 out:
294         LUSTRE_FPRIVATE(file) = NULL;
295         ll_file_data_put(fd);
296
297         RETURN(rc);
298 }
299
300 /* While this returns an error code, fput() the caller does not, so we need
301  * to make every effort to clean up all of our state here.  Also, applications
302  * rarely check close errors and even if an error is returned they will not
303  * re-try the close call.
304  */
305 int ll_file_release(struct inode *inode, struct file *file)
306 {
307         struct ll_file_data *fd;
308         struct ll_sb_info *sbi = ll_i2sbi(inode);
309         struct ll_inode_info *lli = ll_i2info(inode);
310         int rc;
311         ENTRY;
312
313         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p)\n",
314                PFID(ll_inode2fid(inode)), inode);
315
316         if (inode->i_sb->s_root != file_dentry(file))
317                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
318         fd = LUSTRE_FPRIVATE(file);
319         LASSERT(fd != NULL);
320
321         /* The last ref on @file, maybe not the the owner pid of statahead,
322          * because parent and child process can share the same file handle. */
323         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd)
324                 ll_deauthorize_statahead(inode, fd);
325
326         if (inode->i_sb->s_root == file_dentry(file)) {
327                 LUSTRE_FPRIVATE(file) = NULL;
328                 ll_file_data_put(fd);
329                 RETURN(0);
330         }
331
332         if (!S_ISDIR(inode->i_mode)) {
333                 if (lli->lli_clob != NULL)
334                         lov_read_and_clear_async_rc(lli->lli_clob);
335                 lli->lli_async_rc = 0;
336         }
337
338         rc = ll_md_close(inode, file);
339
340         if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
341                 libcfs_debug_dumplog();
342
343         RETURN(rc);
344 }
345
346 static int ll_intent_file_open(struct dentry *de, void *lmm, int lmmsize,
347                                 struct lookup_intent *itp)
348 {
349         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
350         struct dentry *parent = de->d_parent;
351         const char *name = NULL;
352         int len = 0;
353         struct md_op_data *op_data;
354         struct ptlrpc_request *req = NULL;
355         int rc;
356         ENTRY;
357
358         LASSERT(parent != NULL);
359         LASSERT(itp->it_flags & MDS_OPEN_BY_FID);
360
361         /* if server supports open-by-fid, or file name is invalid, don't pack
362          * name in open request */
363         if (!(exp_connect_flags(sbi->ll_md_exp) & OBD_CONNECT_OPEN_BY_FID) &&
364             lu_name_is_valid_2(de->d_name.name, de->d_name.len)) {
365                 name = de->d_name.name;
366                 len = de->d_name.len;
367         }
368
369         op_data = ll_prep_md_op_data(NULL, parent->d_inode, de->d_inode,
370                                      name, len, 0, LUSTRE_OPC_ANY, NULL);
371         if (IS_ERR(op_data))
372                 RETURN(PTR_ERR(op_data));
373         op_data->op_data = lmm;
374         op_data->op_data_size = lmmsize;
375
376         rc = md_intent_lock(sbi->ll_md_exp, op_data, itp, &req,
377                             &ll_md_blocking_ast, 0);
378         ll_finish_md_op_data(op_data);
379         if (rc == -ESTALE) {
380                 /* reason for keep own exit path - don`t flood log
381                  * with messages with -ESTALE errors.
382                  */
383                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
384                      it_open_error(DISP_OPEN_OPEN, itp))
385                         GOTO(out, rc);
386                 ll_release_openhandle(de, itp);
387                 GOTO(out, rc);
388         }
389
390         if (it_disposition(itp, DISP_LOOKUP_NEG))
391                 GOTO(out, rc = -ENOENT);
392
393         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
394                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
395                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
396                 GOTO(out, rc);
397         }
398
399         rc = ll_prep_inode(&de->d_inode, req, NULL, itp);
400         if (!rc && itp->it_lock_mode)
401                 ll_set_lock_data(sbi->ll_md_exp, de->d_inode, itp, NULL);
402
403 out:
404         ptlrpc_req_finished(req);
405         ll_intent_drop_lock(itp);
406
407         /* We did open by fid, but by the time we got to the server,
408          * the object disappeared. If this is a create, we cannot really
409          * tell the userspace that the file it was trying to create
410          * does not exist. Instead let's return -ESTALE, and the VFS will
411          * retry the create with LOOKUP_REVAL that we are going to catch
412          * in ll_revalidate_dentry() and use lookup then.
413          */
414         if (rc == -ENOENT && itp->it_op & IT_CREAT)
415                 rc = -ESTALE;
416
417         RETURN(rc);
418 }
419
420 static int ll_och_fill(struct obd_export *md_exp, struct lookup_intent *it,
421                        struct obd_client_handle *och)
422 {
423         struct mdt_body *body;
424
425         body = req_capsule_server_get(&it->it_request->rq_pill, &RMF_MDT_BODY);
426         och->och_fh = body->mbo_handle;
427         och->och_fid = body->mbo_fid1;
428         och->och_lease_handle.cookie = it->it_lock_handle;
429         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
430         och->och_flags = it->it_flags;
431
432         return md_set_open_replay_data(md_exp, och, it);
433 }
434
435 static int ll_local_open(struct file *file, struct lookup_intent *it,
436                          struct ll_file_data *fd, struct obd_client_handle *och)
437 {
438         struct inode *inode = file_inode(file);
439         ENTRY;
440
441         LASSERT(!LUSTRE_FPRIVATE(file));
442
443         LASSERT(fd != NULL);
444
445         if (och) {
446                 int rc;
447
448                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och);
449                 if (rc != 0)
450                         RETURN(rc);
451         }
452
453         LUSTRE_FPRIVATE(file) = fd;
454         ll_readahead_init(inode, &fd->fd_ras);
455         fd->fd_omode = it->it_flags & (FMODE_READ | FMODE_WRITE | FMODE_EXEC);
456
457         /* ll_cl_context initialize */
458         rwlock_init(&fd->fd_lock);
459         INIT_LIST_HEAD(&fd->fd_lccs);
460
461         RETURN(0);
462 }
463
464 /* Open a file, and (for the very first open) create objects on the OSTs at
465  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
466  * creation or open until ll_lov_setstripe() ioctl is called.
467  *
468  * If we already have the stripe MD locally then we don't request it in
469  * md_open(), by passing a lmm_size = 0.
470  *
471  * It is up to the application to ensure no other processes open this file
472  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
473  * used.  We might be able to avoid races of that sort by getting lli_open_sem
474  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
475  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
476  */
477 int ll_file_open(struct inode *inode, struct file *file)
478 {
479         struct ll_inode_info *lli = ll_i2info(inode);
480         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
481                                           .it_flags = file->f_flags };
482         struct obd_client_handle **och_p = NULL;
483         __u64 *och_usecount = NULL;
484         struct ll_file_data *fd;
485         int rc = 0;
486         ENTRY;
487
488         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), flags %o\n",
489                PFID(ll_inode2fid(inode)), inode, file->f_flags);
490
491         it = file->private_data; /* XXX: compat macro */
492         file->private_data = NULL; /* prevent ll_local_open assertion */
493
494         fd = ll_file_data_get();
495         if (fd == NULL)
496                 GOTO(out_openerr, rc = -ENOMEM);
497
498         fd->fd_file = file;
499         if (S_ISDIR(inode->i_mode))
500                 ll_authorize_statahead(inode, fd);
501
502         if (inode->i_sb->s_root == file_dentry(file)) {
503                 LUSTRE_FPRIVATE(file) = fd;
504                 RETURN(0);
505         }
506
507         if (!it || !it->it_disposition) {
508                 /* Convert f_flags into access mode. We cannot use file->f_mode,
509                  * because everything but O_ACCMODE mask was stripped from
510                  * there */
511                 if ((oit.it_flags + 1) & O_ACCMODE)
512                         oit.it_flags++;
513                 if (file->f_flags & O_TRUNC)
514                         oit.it_flags |= FMODE_WRITE;
515
516                 /* kernel only call f_op->open in dentry_open.  filp_open calls
517                  * dentry_open after call to open_namei that checks permissions.
518                  * Only nfsd_open call dentry_open directly without checking
519                  * permissions and because of that this code below is safe. */
520                 if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
521                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
522
523                 /* We do not want O_EXCL here, presumably we opened the file
524                  * already? XXX - NFS implications? */
525                 oit.it_flags &= ~O_EXCL;
526
527                 /* bug20584, if "it_flags" contains O_CREAT, the file will be
528                  * created if necessary, then "IT_CREAT" should be set to keep
529                  * consistent with it */
530                 if (oit.it_flags & O_CREAT)
531                         oit.it_op |= IT_CREAT;
532
533                 it = &oit;
534         }
535
536 restart:
537         /* Let's see if we have file open on MDS already. */
538         if (it->it_flags & FMODE_WRITE) {
539                 och_p = &lli->lli_mds_write_och;
540                 och_usecount = &lli->lli_open_fd_write_count;
541         } else if (it->it_flags & FMODE_EXEC) {
542                 och_p = &lli->lli_mds_exec_och;
543                 och_usecount = &lli->lli_open_fd_exec_count;
544          } else {
545                 och_p = &lli->lli_mds_read_och;
546                 och_usecount = &lli->lli_open_fd_read_count;
547         }
548
549         mutex_lock(&lli->lli_och_mutex);
550         if (*och_p) { /* Open handle is present */
551                 if (it_disposition(it, DISP_OPEN_OPEN)) {
552                         /* Well, there's extra open request that we do not need,
553                            let's close it somehow. This will decref request. */
554                         rc = it_open_error(DISP_OPEN_OPEN, it);
555                         if (rc) {
556                                 mutex_unlock(&lli->lli_och_mutex);
557                                 GOTO(out_openerr, rc);
558                         }
559
560                         ll_release_openhandle(file_dentry(file), it);
561                 }
562                 (*och_usecount)++;
563
564                 rc = ll_local_open(file, it, fd, NULL);
565                 if (rc) {
566                         (*och_usecount)--;
567                         mutex_unlock(&lli->lli_och_mutex);
568                         GOTO(out_openerr, rc);
569                 }
570         } else {
571                 LASSERT(*och_usecount == 0);
572                 if (!it->it_disposition) {
573                         struct ll_dentry_data *ldd = ll_d2d(file->f_path.dentry);
574                         /* We cannot just request lock handle now, new ELC code
575                            means that one of other OPEN locks for this file
576                            could be cancelled, and since blocking ast handler
577                            would attempt to grab och_mutex as well, that would
578                            result in a deadlock */
579                         mutex_unlock(&lli->lli_och_mutex);
580                         /*
581                          * Normally called under two situations:
582                          * 1. NFS export.
583                          * 2. A race/condition on MDS resulting in no open
584                          *    handle to be returned from LOOKUP|OPEN request,
585                          *    for example if the target entry was a symlink.
586                          *
587                          *  Only fetch MDS_OPEN_LOCK if this is in NFS path,
588                          *  marked by a bit set in ll_iget_for_nfs. Clear the
589                          *  bit so that it's not confusing later callers.
590                          *
591                          *  NB; when ldd is NULL, it must have come via normal
592                          *  lookup path only, since ll_iget_for_nfs always calls
593                          *  ll_d_init().
594                          */
595                         if (ldd && ldd->lld_nfs_dentry) {
596                                 ldd->lld_nfs_dentry = 0;
597                                 it->it_flags |= MDS_OPEN_LOCK;
598                         }
599
600                          /*
601                          * Always specify MDS_OPEN_BY_FID because we don't want
602                          * to get file with different fid.
603                          */
604                         it->it_flags |= MDS_OPEN_BY_FID;
605                         rc = ll_intent_file_open(file_dentry(file), NULL, 0,
606                                                  it);
607                         if (rc)
608                                 GOTO(out_openerr, rc);
609
610                         goto restart;
611                 }
612                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
613                 if (!*och_p)
614                         GOTO(out_och_free, rc = -ENOMEM);
615
616                 (*och_usecount)++;
617
618                 /* md_intent_lock() didn't get a request ref if there was an
619                  * open error, so don't do cleanup on the request here
620                  * (bug 3430) */
621                 /* XXX (green): Should not we bail out on any error here, not
622                  * just open error? */
623                 rc = it_open_error(DISP_OPEN_OPEN, it);
624                 if (rc != 0)
625                         GOTO(out_och_free, rc);
626
627                 LASSERTF(it_disposition(it, DISP_ENQ_OPEN_REF),
628                          "inode %p: disposition %x, status %d\n", inode,
629                          it_disposition(it, ~0), it->it_status);
630
631                 rc = ll_local_open(file, it, fd, *och_p);
632                 if (rc)
633                         GOTO(out_och_free, rc);
634         }
635         mutex_unlock(&lli->lli_och_mutex);
636         fd = NULL;
637
638         /* Must do this outside lli_och_mutex lock to prevent deadlock where
639            different kind of OPEN lock for this same inode gets cancelled
640            by ldlm_cancel_lru */
641         if (!S_ISREG(inode->i_mode))
642                 GOTO(out_och_free, rc);
643
644         cl_lov_delay_create_clear(&file->f_flags);
645         GOTO(out_och_free, rc);
646
647 out_och_free:
648         if (rc) {
649                 if (och_p && *och_p) {
650                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
651                         *och_p = NULL; /* OBD_FREE writes some magic there */
652                         (*och_usecount)--;
653                 }
654                 mutex_unlock(&lli->lli_och_mutex);
655
656 out_openerr:
657                 if (lli->lli_opendir_key == fd)
658                         ll_deauthorize_statahead(inode, fd);
659                 if (fd != NULL)
660                         ll_file_data_put(fd);
661         } else {
662                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
663         }
664
665         if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
666                 ptlrpc_req_finished(it->it_request);
667                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
668         }
669
670         return rc;
671 }
672
673 static int ll_md_blocking_lease_ast(struct ldlm_lock *lock,
674                         struct ldlm_lock_desc *desc, void *data, int flag)
675 {
676         int rc;
677         struct lustre_handle lockh;
678         ENTRY;
679
680         switch (flag) {
681         case LDLM_CB_BLOCKING:
682                 ldlm_lock2handle(lock, &lockh);
683                 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
684                 if (rc < 0) {
685                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
686                         RETURN(rc);
687                 }
688                 break;
689         case LDLM_CB_CANCELING:
690                 /* do nothing */
691                 break;
692         }
693         RETURN(0);
694 }
695
696 /**
697  * When setting a lease on a file, we take ownership of the lli_mds_*_och
698  * and save it as fd->fd_och so as to force client to reopen the file even
699  * if it has an open lock in cache already.
700  */
701 static int ll_lease_och_acquire(struct inode *inode, struct file *file,
702                                 struct lustre_handle *old_handle)
703 {
704         struct ll_inode_info *lli = ll_i2info(inode);
705         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
706         struct obd_client_handle **och_p;
707         __u64 *och_usecount;
708         int rc = 0;
709         ENTRY;
710
711         /* Get the openhandle of the file */
712         mutex_lock(&lli->lli_och_mutex);
713         if (fd->fd_lease_och != NULL)
714                 GOTO(out_unlock, rc = -EBUSY);
715
716         if (fd->fd_och == NULL) {
717                 if (file->f_mode & FMODE_WRITE) {
718                         LASSERT(lli->lli_mds_write_och != NULL);
719                         och_p = &lli->lli_mds_write_och;
720                         och_usecount = &lli->lli_open_fd_write_count;
721                 } else {
722                         LASSERT(lli->lli_mds_read_och != NULL);
723                         och_p = &lli->lli_mds_read_och;
724                         och_usecount = &lli->lli_open_fd_read_count;
725                 }
726
727                 if (*och_usecount > 1)
728                         GOTO(out_unlock, rc = -EBUSY);
729
730                 fd->fd_och = *och_p;
731                 *och_usecount = 0;
732                 *och_p = NULL;
733         }
734
735         *old_handle = fd->fd_och->och_fh;
736
737         EXIT;
738 out_unlock:
739         mutex_unlock(&lli->lli_och_mutex);
740         return rc;
741 }
742
743 /**
744  * Release ownership on lli_mds_*_och when putting back a file lease.
745  */
746 static int ll_lease_och_release(struct inode *inode, struct file *file)
747 {
748         struct ll_inode_info *lli = ll_i2info(inode);
749         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
750         struct obd_client_handle **och_p;
751         struct obd_client_handle *old_och = NULL;
752         __u64 *och_usecount;
753         int rc = 0;
754         ENTRY;
755
756         mutex_lock(&lli->lli_och_mutex);
757         if (file->f_mode & FMODE_WRITE) {
758                 och_p = &lli->lli_mds_write_och;
759                 och_usecount = &lli->lli_open_fd_write_count;
760         } else {
761                 och_p = &lli->lli_mds_read_och;
762                 och_usecount = &lli->lli_open_fd_read_count;
763         }
764
765         /* The file may have been open by another process (broken lease) so
766          * *och_p is not NULL. In this case we should simply increase usecount
767          * and close fd_och.
768          */
769         if (*och_p != NULL) {
770                 old_och = fd->fd_och;
771                 (*och_usecount)++;
772         } else {
773                 *och_p = fd->fd_och;
774                 *och_usecount = 1;
775         }
776         fd->fd_och = NULL;
777         mutex_unlock(&lli->lli_och_mutex);
778
779         if (old_och != NULL)
780                 rc = ll_close_inode_openhandle(inode, old_och, 0, NULL);
781
782         RETURN(rc);
783 }
784
785 /**
786  * Acquire a lease and open the file.
787  */
788 static struct obd_client_handle *
789 ll_lease_open(struct inode *inode, struct file *file, fmode_t fmode,
790               __u64 open_flags)
791 {
792         struct lookup_intent it = { .it_op = IT_OPEN };
793         struct ll_sb_info *sbi = ll_i2sbi(inode);
794         struct md_op_data *op_data;
795         struct ptlrpc_request *req = NULL;
796         struct lustre_handle old_handle = { 0 };
797         struct obd_client_handle *och = NULL;
798         int rc;
799         int rc2;
800         ENTRY;
801
802         if (fmode != FMODE_WRITE && fmode != FMODE_READ)
803                 RETURN(ERR_PTR(-EINVAL));
804
805         if (file != NULL) {
806                 if (!(fmode & file->f_mode) || (file->f_mode & FMODE_EXEC))
807                         RETURN(ERR_PTR(-EPERM));
808
809                 rc = ll_lease_och_acquire(inode, file, &old_handle);
810                 if (rc)
811                         RETURN(ERR_PTR(rc));
812         }
813
814         OBD_ALLOC_PTR(och);
815         if (och == NULL)
816                 RETURN(ERR_PTR(-ENOMEM));
817
818         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, 0, 0,
819                                         LUSTRE_OPC_ANY, NULL);
820         if (IS_ERR(op_data))
821                 GOTO(out, rc = PTR_ERR(op_data));
822
823         /* To tell the MDT this openhandle is from the same owner */
824         op_data->op_handle = old_handle;
825
826         it.it_flags = fmode | open_flags;
827         it.it_flags |= MDS_OPEN_LOCK | MDS_OPEN_BY_FID | MDS_OPEN_LEASE;
828         rc = md_intent_lock(sbi->ll_md_exp, op_data, &it, &req,
829                             &ll_md_blocking_lease_ast,
830         /* LDLM_FL_NO_LRU: To not put the lease lock into LRU list, otherwise
831          * it can be cancelled which may mislead applications that the lease is
832          * broken;
833          * LDLM_FL_EXCL: Set this flag so that it won't be matched by normal
834          * open in ll_md_blocking_ast(). Otherwise as ll_md_blocking_lease_ast
835          * doesn't deal with openhandle, so normal openhandle will be leaked. */
836                             LDLM_FL_NO_LRU | LDLM_FL_EXCL);
837         ll_finish_md_op_data(op_data);
838         ptlrpc_req_finished(req);
839         if (rc < 0)
840                 GOTO(out_release_it, rc);
841
842         if (it_disposition(&it, DISP_LOOKUP_NEG))
843                 GOTO(out_release_it, rc = -ENOENT);
844
845         rc = it_open_error(DISP_OPEN_OPEN, &it);
846         if (rc)
847                 GOTO(out_release_it, rc);
848
849         LASSERT(it_disposition(&it, DISP_ENQ_OPEN_REF));
850         ll_och_fill(sbi->ll_md_exp, &it, och);
851
852         if (!it_disposition(&it, DISP_OPEN_LEASE)) /* old server? */
853                 GOTO(out_close, rc = -EOPNOTSUPP);
854
855         /* already get lease, handle lease lock */
856         ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
857         if (it.it_lock_mode == 0 ||
858             it.it_lock_bits != MDS_INODELOCK_OPEN) {
859                 /* open lock must return for lease */
860                 CERROR(DFID "lease granted but no open lock, %d/%llu.\n",
861                         PFID(ll_inode2fid(inode)), it.it_lock_mode,
862                         it.it_lock_bits);
863                 GOTO(out_close, rc = -EPROTO);
864         }
865
866         ll_intent_release(&it);
867         RETURN(och);
868
869 out_close:
870         /* Cancel open lock */
871         if (it.it_lock_mode != 0) {
872                 ldlm_lock_decref_and_cancel(&och->och_lease_handle,
873                                             it.it_lock_mode);
874                 it.it_lock_mode = 0;
875                 och->och_lease_handle.cookie = 0ULL;
876         }
877         rc2 = ll_close_inode_openhandle(inode, och, 0, NULL);
878         if (rc2 < 0)
879                 CERROR("%s: error closing file "DFID": %d\n",
880                        ll_get_fsname(inode->i_sb, NULL, 0),
881                        PFID(&ll_i2info(inode)->lli_fid), rc2);
882         och = NULL; /* och has been freed in ll_close_inode_openhandle() */
883 out_release_it:
884         ll_intent_release(&it);
885 out:
886         if (och != NULL)
887                 OBD_FREE_PTR(och);
888         RETURN(ERR_PTR(rc));
889 }
890
891 /**
892  * Check whether a layout swap can be done between two inodes.
893  *
894  * \param[in] inode1  First inode to check
895  * \param[in] inode2  Second inode to check
896  *
897  * \retval 0 on success, layout swap can be performed between both inodes
898  * \retval negative error code if requirements are not met
899  */
900 static int ll_check_swap_layouts_validity(struct inode *inode1,
901                                           struct inode *inode2)
902 {
903         if (!S_ISREG(inode1->i_mode) || !S_ISREG(inode2->i_mode))
904                 return -EINVAL;
905
906         if (inode_permission(inode1, MAY_WRITE) ||
907             inode_permission(inode2, MAY_WRITE))
908                 return -EPERM;
909
910         if (inode1->i_sb != inode2->i_sb)
911                 return -EXDEV;
912
913         return 0;
914 }
915
916 static int ll_swap_layouts_close(struct obd_client_handle *och,
917                                  struct inode *inode, struct inode *inode2)
918 {
919         const struct lu_fid     *fid1 = ll_inode2fid(inode);
920         const struct lu_fid     *fid2;
921         int                      rc;
922         ENTRY;
923
924         CDEBUG(D_INODE, "%s: biased close of file "DFID"\n",
925                ll_get_fsname(inode->i_sb, NULL, 0), PFID(fid1));
926
927         rc = ll_check_swap_layouts_validity(inode, inode2);
928         if (rc < 0)
929                 GOTO(out_free_och, rc);
930
931         /* We now know that inode2 is a lustre inode */
932         fid2 = ll_inode2fid(inode2);
933
934         rc = lu_fid_cmp(fid1, fid2);
935         if (rc == 0)
936                 GOTO(out_free_och, rc = -EINVAL);
937
938         /* Close the file and swap layouts between inode & inode2.
939          * NB: lease lock handle is released in mdc_close_layout_swap_pack()
940          * because we still need it to pack l_remote_handle to MDT. */
941         rc = ll_close_inode_openhandle(inode, och, MDS_CLOSE_LAYOUT_SWAP,
942                                        inode2);
943
944         och = NULL; /* freed in ll_close_inode_openhandle() */
945
946 out_free_och:
947         if (och != NULL)
948                 OBD_FREE_PTR(och);
949
950         RETURN(rc);
951 }
952
953 /**
954  * Release lease and close the file.
955  * It will check if the lease has ever broken.
956  */
957 static int ll_lease_close(struct obd_client_handle *och, struct inode *inode,
958                           bool *lease_broken)
959 {
960         struct ldlm_lock *lock;
961         bool cancelled = true;
962         int rc;
963         ENTRY;
964
965         lock = ldlm_handle2lock(&och->och_lease_handle);
966         if (lock != NULL) {
967                 lock_res_and_lock(lock);
968                 cancelled = ldlm_is_cancel(lock);
969                 unlock_res_and_lock(lock);
970                 LDLM_LOCK_PUT(lock);
971         }
972
973         CDEBUG(D_INODE, "lease for "DFID" broken? %d\n",
974                PFID(&ll_i2info(inode)->lli_fid), cancelled);
975
976         if (!cancelled)
977                 ldlm_cli_cancel(&och->och_lease_handle, 0);
978
979         if (lease_broken != NULL)
980                 *lease_broken = cancelled;
981
982         rc = ll_close_inode_openhandle(inode, och, 0, NULL);
983         RETURN(rc);
984 }
985
986 int ll_merge_attr(const struct lu_env *env, struct inode *inode)
987 {
988         struct ll_inode_info *lli = ll_i2info(inode);
989         struct cl_object *obj = lli->lli_clob;
990         struct cl_attr *attr = vvp_env_thread_attr(env);
991         s64 atime;
992         s64 mtime;
993         s64 ctime;
994         int rc = 0;
995
996         ENTRY;
997
998         ll_inode_size_lock(inode);
999
1000         /* Merge timestamps the most recently obtained from MDS with
1001          * timestamps obtained from OSTs.
1002          *
1003          * Do not overwrite atime of inode because it may be refreshed
1004          * by file_accessed() function. If the read was served by cache
1005          * data, there is no RPC to be sent so that atime may not be
1006          * transferred to OSTs at all. MDT only updates atime at close time
1007          * if it's at least 'mdd.*.atime_diff' older.
1008          * All in all, the atime in Lustre does not strictly comply with
1009          * POSIX. Solving this problem needs to send an RPC to MDT for each
1010          * read, this will hurt performance. */
1011         if (LTIME_S(inode->i_atime) < lli->lli_atime || lli->lli_update_atime) {
1012                 LTIME_S(inode->i_atime) = lli->lli_atime;
1013                 lli->lli_update_atime = 0;
1014         }
1015         LTIME_S(inode->i_mtime) = lli->lli_mtime;
1016         LTIME_S(inode->i_ctime) = lli->lli_ctime;
1017
1018         atime = LTIME_S(inode->i_atime);
1019         mtime = LTIME_S(inode->i_mtime);
1020         ctime = LTIME_S(inode->i_ctime);
1021
1022         cl_object_attr_lock(obj);
1023         rc = cl_object_attr_get(env, obj, attr);
1024         cl_object_attr_unlock(obj);
1025
1026         if (rc != 0)
1027                 GOTO(out_size_unlock, rc);
1028
1029         if (atime < attr->cat_atime)
1030                 atime = attr->cat_atime;
1031
1032         if (ctime < attr->cat_ctime)
1033                 ctime = attr->cat_ctime;
1034
1035         if (mtime < attr->cat_mtime)
1036                 mtime = attr->cat_mtime;
1037
1038         CDEBUG(D_VFSTRACE, DFID" updating i_size %llu\n",
1039                PFID(&lli->lli_fid), attr->cat_size);
1040
1041         i_size_write(inode, attr->cat_size);
1042         inode->i_blocks = attr->cat_blocks;
1043
1044         LTIME_S(inode->i_atime) = atime;
1045         LTIME_S(inode->i_mtime) = mtime;
1046         LTIME_S(inode->i_ctime) = ctime;
1047
1048 out_size_unlock:
1049         ll_inode_size_unlock(inode);
1050
1051         RETURN(rc);
1052 }
1053
1054 static bool file_is_noatime(const struct file *file)
1055 {
1056         const struct vfsmount *mnt = file->f_path.mnt;
1057         const struct inode *inode = file_inode((struct file *)file);
1058
1059         /* Adapted from file_accessed() and touch_atime().*/
1060         if (file->f_flags & O_NOATIME)
1061                 return true;
1062
1063         if (inode->i_flags & S_NOATIME)
1064                 return true;
1065
1066         if (IS_NOATIME(inode))
1067                 return true;
1068
1069         if (mnt->mnt_flags & (MNT_NOATIME | MNT_READONLY))
1070                 return true;
1071
1072         if ((mnt->mnt_flags & MNT_NODIRATIME) && S_ISDIR(inode->i_mode))
1073                 return true;
1074
1075         if ((inode->i_sb->s_flags & MS_NODIRATIME) && S_ISDIR(inode->i_mode))
1076                 return true;
1077
1078         return false;
1079 }
1080
1081 static int ll_file_io_ptask(struct cfs_ptask *ptask);
1082
1083 static void ll_io_init(struct cl_io *io, struct file *file, enum cl_io_type iot)
1084 {
1085         struct inode *inode = file_inode(file);
1086
1087         memset(&io->u.ci_rw.rw_iter, 0, sizeof(io->u.ci_rw.rw_iter));
1088         init_sync_kiocb(&io->u.ci_rw.rw_iocb, file);
1089         io->u.ci_rw.rw_file = file;
1090         io->u.ci_rw.rw_ptask = ll_file_io_ptask;
1091         io->u.ci_rw.rw_nonblock = !!(file->f_flags & O_NONBLOCK);
1092         if (iot == CIT_WRITE) {
1093                 io->u.ci_rw.rw_append = !!(file->f_flags & O_APPEND);
1094                 io->u.ci_rw.rw_sync   = !!(file->f_flags & O_SYNC ||
1095                                            file->f_flags & O_DIRECT ||
1096                                            IS_SYNC(inode));
1097         }
1098         io->ci_obj = ll_i2info(inode)->lli_clob;
1099         io->ci_lockreq = CILR_MAYBE;
1100         if (ll_file_nolock(file)) {
1101                 io->ci_lockreq = CILR_NEVER;
1102                 io->ci_no_srvlock = 1;
1103         } else if (file->f_flags & O_APPEND) {
1104                 io->ci_lockreq = CILR_MANDATORY;
1105         }
1106         io->ci_noatime = file_is_noatime(file);
1107         if (ll_i2sbi(inode)->ll_flags & LL_SBI_PIO)
1108                 io->ci_pio = !io->u.ci_rw.rw_append;
1109         else
1110                 io->ci_pio = 0;
1111 }
1112
1113 static int ll_file_io_ptask(struct cfs_ptask *ptask)
1114 {
1115         struct cl_io_pt *pt = ptask->pt_cbdata;
1116         struct file *file = pt->cip_file;
1117         struct lu_env *env;
1118         struct cl_io *io;
1119         loff_t pos = pt->cip_pos;
1120         int rc;
1121         __u16 refcheck;
1122         ENTRY;
1123
1124         env = cl_env_get(&refcheck);
1125         if (IS_ERR(env))
1126                 RETURN(PTR_ERR(env));
1127
1128         CDEBUG(D_VFSTRACE, "%s: %s range: [%llu, %llu)\n",
1129                 file_dentry(file)->d_name.name,
1130                 pt->cip_iot == CIT_READ ? "read" : "write",
1131                 pos, pos + pt->cip_count);
1132
1133 restart:
1134         io = vvp_env_thread_io(env);
1135         ll_io_init(io, file, pt->cip_iot);
1136         io->u.ci_rw.rw_iter = pt->cip_iter;
1137         io->u.ci_rw.rw_iocb = pt->cip_iocb;
1138         io->ci_pio = 0; /* It's already in parallel task */
1139
1140         rc = cl_io_rw_init(env, io, pt->cip_iot, pos,
1141                            pt->cip_count - pt->cip_result);
1142         if (!rc) {
1143                 struct vvp_io *vio = vvp_env_io(env);
1144
1145                 vio->vui_io_subtype = IO_NORMAL;
1146                 vio->vui_fd = LUSTRE_FPRIVATE(file);
1147
1148                 ll_cl_add(file, env, io, LCC_RW);
1149                 rc = cl_io_loop(env, io);
1150                 ll_cl_remove(file, env);
1151         } else {
1152                 /* cl_io_rw_init() handled IO */
1153                 rc = io->ci_result;
1154         }
1155
1156         if (OBD_FAIL_CHECK_RESET(OBD_FAIL_LLITE_PTASK_IO_FAIL, 0)) {
1157                 if (io->ci_nob > 0)
1158                         io->ci_nob /= 2;
1159                 rc = -EIO;
1160         }
1161
1162         if (io->ci_nob > 0) {
1163                 pt->cip_result += io->ci_nob;
1164                 iov_iter_advance(&pt->cip_iter, io->ci_nob);
1165                 pos += io->ci_nob;
1166                 pt->cip_iocb.ki_pos = pos;
1167 #ifdef HAVE_KIOCB_KI_LEFT
1168                 pt->cip_iocb.ki_left = pt->cip_count - pt->cip_result;
1169 #elif defined(HAVE_KI_NBYTES)
1170                 pt->cip_iocb.ki_nbytes = pt->cip_count - pt->cip_result;
1171 #endif
1172         }
1173
1174         cl_io_fini(env, io);
1175
1176         if ((rc == 0 || rc == -ENODATA) &&
1177             pt->cip_result < pt->cip_count &&
1178             io->ci_need_restart) {
1179                 CDEBUG(D_VFSTRACE,
1180                         "%s: restart %s range: [%llu, %llu) ret: %zd, rc: %d\n",
1181                         file_dentry(file)->d_name.name,
1182                         pt->cip_iot == CIT_READ ? "read" : "write",
1183                         pos, pos + pt->cip_count - pt->cip_result,
1184                         pt->cip_result, rc);
1185                 goto restart;
1186         }
1187
1188         CDEBUG(D_VFSTRACE, "%s: %s ret: %zd, rc: %d\n",
1189                 file_dentry(file)->d_name.name,
1190                 pt->cip_iot == CIT_READ ? "read" : "write",
1191                 pt->cip_result, rc);
1192
1193         cl_env_put(env, &refcheck);
1194         RETURN(pt->cip_result > 0 ? 0 : rc);
1195 }
1196
1197 static ssize_t
1198 ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
1199                    struct file *file, enum cl_io_type iot,
1200                    loff_t *ppos, size_t count)
1201 {
1202         struct range_lock       range;
1203         struct vvp_io           *vio = vvp_env_io(env);
1204         struct inode            *inode = file_inode(file);
1205         struct ll_inode_info    *lli = ll_i2info(inode);
1206         struct ll_file_data     *fd  = LUSTRE_FPRIVATE(file);
1207         struct cl_io            *io;
1208         loff_t                  pos = *ppos;
1209         ssize_t                 result = 0;
1210         int                     rc = 0;
1211
1212         ENTRY;
1213
1214         CDEBUG(D_VFSTRACE, "%s: %s range: [%llu, %llu)\n",
1215                 file_dentry(file)->d_name.name,
1216                 iot == CIT_READ ? "read" : "write", pos, pos + count);
1217
1218 restart:
1219         io = vvp_env_thread_io(env);
1220         ll_io_init(io, file, iot);
1221         if (args->via_io_subtype == IO_NORMAL) {
1222                 io->u.ci_rw.rw_iter = *args->u.normal.via_iter;
1223                 io->u.ci_rw.rw_iocb = *args->u.normal.via_iocb;
1224         } else {
1225                 io->ci_pio = 0;
1226         }
1227
1228         if (cl_io_rw_init(env, io, iot, pos, count) == 0) {
1229                 bool range_locked = false;
1230
1231                 if (file->f_flags & O_APPEND)
1232                         range_lock_init(&range, 0, LUSTRE_EOF);
1233                 else
1234                         range_lock_init(&range, pos, pos + count - 1);
1235
1236                 vio->vui_fd  = LUSTRE_FPRIVATE(file);
1237                 vio->vui_io_subtype = args->via_io_subtype;
1238
1239                 switch (vio->vui_io_subtype) {
1240                 case IO_NORMAL:
1241                         /* Direct IO reads must also take range lock,
1242                          * or multiple reads will try to work on the same pages
1243                          * See LU-6227 for details. */
1244                         if (((iot == CIT_WRITE) ||
1245                             (iot == CIT_READ && (file->f_flags & O_DIRECT))) &&
1246                             !(vio->vui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1247                                 CDEBUG(D_VFSTRACE, "Range lock "RL_FMT"\n",
1248                                        RL_PARA(&range));
1249                                 rc = range_lock(&lli->lli_write_tree, &range);
1250                                 if (rc < 0)
1251                                         GOTO(out, rc);
1252
1253                                 range_locked = true;
1254                         }
1255                         break;
1256                 case IO_SPLICE:
1257                         vio->u.splice.vui_pipe = args->u.splice.via_pipe;
1258                         vio->u.splice.vui_flags = args->u.splice.via_flags;
1259                         break;
1260                 default:
1261                         CERROR("unknown IO subtype %u\n", vio->vui_io_subtype);
1262                         LBUG();
1263                 }
1264
1265                 ll_cl_add(file, env, io, LCC_RW);
1266                 if (io->ci_pio && iot == CIT_WRITE && !IS_NOSEC(inode) &&
1267                     !lli->lli_inode_locked) {
1268                         inode_lock(inode);
1269                         lli->lli_inode_locked = 1;
1270                 }
1271                 rc = cl_io_loop(env, io);
1272                 if (lli->lli_inode_locked) {
1273                         lli->lli_inode_locked = 0;
1274                         inode_unlock(inode);
1275                 }
1276                 ll_cl_remove(file, env);
1277
1278                 if (range_locked) {
1279                         CDEBUG(D_VFSTRACE, "Range unlock "RL_FMT"\n",
1280                                RL_PARA(&range));
1281                         range_unlock(&lli->lli_write_tree, &range);
1282                 }
1283         } else {
1284                 /* cl_io_rw_init() handled IO */
1285                 rc = io->ci_result;
1286         }
1287
1288         if (io->ci_nob > 0) {
1289                 result += io->ci_nob;
1290                 count  -= io->ci_nob;
1291
1292                 if (args->via_io_subtype == IO_NORMAL) {
1293                         iov_iter_advance(args->u.normal.via_iter, io->ci_nob);
1294                         pos += io->ci_nob;
1295                         args->u.normal.via_iocb->ki_pos = pos;
1296 #ifdef HAVE_KIOCB_KI_LEFT
1297                         args->u.normal.via_iocb->ki_left = count;
1298 #elif defined(HAVE_KI_NBYTES)
1299                         args->u.normal.via_iocb->ki_nbytes = count;
1300 #endif
1301                 } else {
1302                         /* for splice */
1303                         pos = io->u.ci_rw.rw_range.cir_pos;
1304                 }
1305         }
1306 out:
1307         cl_io_fini(env, io);
1308
1309         if ((rc == 0 || rc == -ENODATA) && count > 0 && io->ci_need_restart) {
1310                 CDEBUG(D_VFSTRACE,
1311                         "%s: restart %s range: [%llu, %llu) ret: %zd, rc: %d\n",
1312                         file_dentry(file)->d_name.name,
1313                         iot == CIT_READ ? "read" : "write",
1314                         pos, pos + count, result, rc);
1315                 goto restart;
1316         }
1317
1318         if (iot == CIT_READ) {
1319                 if (result > 0)
1320                         ll_stats_ops_tally(ll_i2sbi(inode),
1321                                            LPROC_LL_READ_BYTES, result);
1322         } else if (iot == CIT_WRITE) {
1323                 if (result > 0) {
1324                         ll_stats_ops_tally(ll_i2sbi(inode),
1325                                            LPROC_LL_WRITE_BYTES, result);
1326                         fd->fd_write_failed = false;
1327                 } else if (result == 0 && rc == 0) {
1328                         rc = io->ci_result;
1329                         if (rc < 0)
1330                                 fd->fd_write_failed = true;
1331                         else
1332                                 fd->fd_write_failed = false;
1333                 } else if (rc != -ERESTARTSYS) {
1334                         fd->fd_write_failed = true;
1335                 }
1336         }
1337
1338         CDEBUG(D_VFSTRACE, "%s: %s *ppos: %llu, pos: %llu, ret: %zd, rc: %d\n",
1339                 file_dentry(file)->d_name.name,
1340                 iot == CIT_READ ? "read" : "write", *ppos, pos, result, rc);
1341
1342         *ppos = pos;
1343
1344         RETURN(result > 0 ? result : rc);
1345 }
1346
1347 /**
1348  * The purpose of fast read is to overcome per I/O overhead and improve IOPS
1349  * especially for small I/O.
1350  *
1351  * To serve a read request, CLIO has to create and initialize a cl_io and
1352  * then request DLM lock. This has turned out to have siginificant overhead
1353  * and affects the performance of small I/O dramatically.
1354  *
1355  * It's not necessary to create a cl_io for each I/O. Under the help of read
1356  * ahead, most of the pages being read are already in memory cache and we can
1357  * read those pages directly because if the pages exist, the corresponding DLM
1358  * lock must exist so that page content must be valid.
1359  *
1360  * In fast read implementation, the llite speculatively finds and reads pages
1361  * in memory cache. There are three scenarios for fast read:
1362  *   - If the page exists and is uptodate, kernel VM will provide the data and
1363  *     CLIO won't be intervened;
1364  *   - If the page was brought into memory by read ahead, it will be exported
1365  *     and read ahead parameters will be updated;
1366  *   - Otherwise the page is not in memory, we can't do fast read. Therefore,
1367  *     it will go back and invoke normal read, i.e., a cl_io will be created
1368  *     and DLM lock will be requested.
1369  *
1370  * POSIX compliance: posix standard states that read is intended to be atomic.
1371  * Lustre read implementation is in line with Linux kernel read implementation
1372  * and neither of them complies with POSIX standard in this matter. Fast read
1373  * doesn't make the situation worse on single node but it may interleave write
1374  * results from multiple nodes due to short read handling in ll_file_aio_read().
1375  *
1376  * \param env - lu_env
1377  * \param iocb - kiocb from kernel
1378  * \param iter - user space buffers where the data will be copied
1379  *
1380  * \retval - number of bytes have been read, or error code if error occurred.
1381  */
1382 static ssize_t
1383 ll_do_fast_read(struct kiocb *iocb, struct iov_iter *iter)
1384 {
1385         ssize_t result;
1386
1387         if (!ll_sbi_has_fast_read(ll_i2sbi(file_inode(iocb->ki_filp))))
1388                 return 0;
1389
1390         /* NB: we can't do direct IO for fast read because it will need a lock
1391          * to make IO engine happy. */
1392         if (iocb->ki_filp->f_flags & O_DIRECT)
1393                 return 0;
1394
1395         result = generic_file_read_iter(iocb, iter);
1396
1397         /* If the first page is not in cache, generic_file_aio_read() will be
1398          * returned with -ENODATA.
1399          * See corresponding code in ll_readpage(). */
1400         if (result == -ENODATA)
1401                 result = 0;
1402
1403         if (result > 0)
1404                 ll_stats_ops_tally(ll_i2sbi(file_inode(iocb->ki_filp)),
1405                                 LPROC_LL_READ_BYTES, result);
1406
1407         return result;
1408 }
1409
1410 /*
1411  * Read from a file (through the page cache).
1412  */
1413 static ssize_t ll_file_read_iter(struct kiocb *iocb, struct iov_iter *to)
1414 {
1415         struct lu_env *env;
1416         struct vvp_io_args *args;
1417         ssize_t result;
1418         ssize_t rc2;
1419         __u16 refcheck;
1420
1421         result = ll_do_fast_read(iocb, to);
1422         if (result < 0 || iov_iter_count(to) == 0)
1423                 GOTO(out, result);
1424
1425         env = cl_env_get(&refcheck);
1426         if (IS_ERR(env))
1427                 return PTR_ERR(env);
1428
1429         args = ll_env_args(env, IO_NORMAL);
1430         args->u.normal.via_iter = to;
1431         args->u.normal.via_iocb = iocb;
1432
1433         rc2 = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1434                                  &iocb->ki_pos, iov_iter_count(to));
1435         if (rc2 > 0)
1436                 result += rc2;
1437         else if (result == 0)
1438                 result = rc2;
1439
1440         cl_env_put(env, &refcheck);
1441 out:
1442         return result;
1443 }
1444
1445 /*
1446  * Write to a file (through the page cache).
1447  */
1448 static ssize_t ll_file_write_iter(struct kiocb *iocb, struct iov_iter *from)
1449 {
1450         struct vvp_io_args *args;
1451         struct lu_env *env;
1452         ssize_t result;
1453         __u16 refcheck;
1454
1455         env = cl_env_get(&refcheck);
1456         if (IS_ERR(env))
1457                 return PTR_ERR(env);
1458
1459         args = ll_env_args(env, IO_NORMAL);
1460         args->u.normal.via_iter = from;
1461         args->u.normal.via_iocb = iocb;
1462
1463         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1464                                     &iocb->ki_pos, iov_iter_count(from));
1465         cl_env_put(env, &refcheck);
1466         return result;
1467 }
1468
1469 #ifndef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1470 /*
1471  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
1472  */
1473 static int ll_file_get_iov_count(const struct iovec *iov,
1474                                  unsigned long *nr_segs, size_t *count)
1475 {
1476         size_t cnt = 0;
1477         unsigned long seg;
1478
1479         for (seg = 0; seg < *nr_segs; seg++) {
1480                 const struct iovec *iv = &iov[seg];
1481
1482                 /*
1483                  * If any segment has a negative length, or the cumulative
1484                  * length ever wraps negative then return -EINVAL.
1485                  */
1486                 cnt += iv->iov_len;
1487                 if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
1488                         return -EINVAL;
1489                 if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
1490                         continue;
1491                 if (seg == 0)
1492                         return -EFAULT;
1493                 *nr_segs = seg;
1494                 cnt -= iv->iov_len;     /* This segment is no good */
1495                 break;
1496         }
1497         *count = cnt;
1498         return 0;
1499 }
1500
1501 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
1502                                 unsigned long nr_segs, loff_t pos)
1503 {
1504         struct iov_iter to;
1505         size_t iov_count;
1506         ssize_t result;
1507         ENTRY;
1508
1509         result = ll_file_get_iov_count(iov, &nr_segs, &iov_count);
1510         if (result)
1511                 RETURN(result);
1512
1513 # ifdef HAVE_IOV_ITER_INIT_DIRECTION
1514         iov_iter_init(&to, READ, iov, nr_segs, iov_count);
1515 # else /* !HAVE_IOV_ITER_INIT_DIRECTION */
1516         iov_iter_init(&to, iov, nr_segs, iov_count, 0);
1517 # endif /* HAVE_IOV_ITER_INIT_DIRECTION */
1518
1519         result = ll_file_read_iter(iocb, &to);
1520
1521         RETURN(result);
1522 }
1523
1524 static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count,
1525                             loff_t *ppos)
1526 {
1527         struct iovec   iov = { .iov_base = buf, .iov_len = count };
1528         struct kiocb   kiocb;
1529         ssize_t        result;
1530         ENTRY;
1531
1532         init_sync_kiocb(&kiocb, file);
1533         kiocb.ki_pos = *ppos;
1534 #ifdef HAVE_KIOCB_KI_LEFT
1535         kiocb.ki_left = count;
1536 #elif defined(HAVE_KI_NBYTES)
1537         kiocb.i_nbytes = count;
1538 #endif
1539
1540         result = ll_file_aio_read(&kiocb, &iov, 1, kiocb.ki_pos);
1541         *ppos = kiocb.ki_pos;
1542
1543         RETURN(result);
1544 }
1545
1546 /*
1547  * Write to a file (through the page cache).
1548  * AIO stuff
1549  */
1550 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1551                                  unsigned long nr_segs, loff_t pos)
1552 {
1553         struct iov_iter from;
1554         size_t iov_count;
1555         ssize_t result;
1556         ENTRY;
1557
1558         result = ll_file_get_iov_count(iov, &nr_segs, &iov_count);
1559         if (result)
1560                 RETURN(result);
1561
1562 # ifdef HAVE_IOV_ITER_INIT_DIRECTION
1563         iov_iter_init(&from, WRITE, iov, nr_segs, iov_count);
1564 # else /* !HAVE_IOV_ITER_INIT_DIRECTION */
1565         iov_iter_init(&from, iov, nr_segs, iov_count, 0);
1566 # endif /* HAVE_IOV_ITER_INIT_DIRECTION */
1567
1568         result = ll_file_write_iter(iocb, &from);
1569
1570         RETURN(result);
1571 }
1572
1573 static ssize_t ll_file_write(struct file *file, const char __user *buf,
1574                              size_t count, loff_t *ppos)
1575 {
1576         struct lu_env *env;
1577         struct iovec   iov = { .iov_base = (void __user *)buf,
1578                                .iov_len = count };
1579         struct kiocb  *kiocb;
1580         ssize_t        result;
1581         __u16          refcheck;
1582         ENTRY;
1583
1584         env = cl_env_get(&refcheck);
1585         if (IS_ERR(env))
1586                 RETURN(PTR_ERR(env));
1587
1588         kiocb = &ll_env_info(env)->lti_kiocb;
1589         init_sync_kiocb(kiocb, file);
1590         kiocb->ki_pos = *ppos;
1591 #ifdef HAVE_KIOCB_KI_LEFT
1592         kiocb->ki_left = count;
1593 #elif defined(HAVE_KI_NBYTES)
1594         kiocb->ki_nbytes = count;
1595 #endif
1596
1597         result = ll_file_aio_write(kiocb, &iov, 1, kiocb->ki_pos);
1598         *ppos = kiocb->ki_pos;
1599
1600         cl_env_put(env, &refcheck);
1601         RETURN(result);
1602 }
1603 #endif /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
1604
1605 /*
1606  * Send file content (through pagecache) somewhere with helper
1607  */
1608 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1609                                    struct pipe_inode_info *pipe, size_t count,
1610                                    unsigned int flags)
1611 {
1612         struct lu_env      *env;
1613         struct vvp_io_args *args;
1614         ssize_t             result;
1615         __u16               refcheck;
1616         ENTRY;
1617
1618         env = cl_env_get(&refcheck);
1619         if (IS_ERR(env))
1620                 RETURN(PTR_ERR(env));
1621
1622         args = ll_env_args(env, IO_SPLICE);
1623         args->u.splice.via_pipe = pipe;
1624         args->u.splice.via_flags = flags;
1625
1626         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1627         cl_env_put(env, &refcheck);
1628         RETURN(result);
1629 }
1630
1631 int ll_lov_setstripe_ea_info(struct inode *inode, struct dentry *dentry,
1632                              __u64 flags, struct lov_user_md *lum, int lum_size)
1633 {
1634         struct lookup_intent oit = {
1635                 .it_op = IT_OPEN,
1636                 .it_flags = flags | MDS_OPEN_BY_FID,
1637         };
1638         int rc;
1639         ENTRY;
1640
1641         ll_inode_size_lock(inode);
1642         rc = ll_intent_file_open(dentry, lum, lum_size, &oit);
1643         if (rc < 0)
1644                 GOTO(out_unlock, rc);
1645
1646         ll_release_openhandle(dentry, &oit);
1647
1648 out_unlock:
1649         ll_inode_size_unlock(inode);
1650         ll_intent_release(&oit);
1651
1652         RETURN(rc);
1653 }
1654
1655 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1656                              struct lov_mds_md **lmmp, int *lmm_size,
1657                              struct ptlrpc_request **request)
1658 {
1659         struct ll_sb_info *sbi = ll_i2sbi(inode);
1660         struct mdt_body  *body;
1661         struct lov_mds_md *lmm = NULL;
1662         struct ptlrpc_request *req = NULL;
1663         struct md_op_data *op_data;
1664         int rc, lmmsize;
1665
1666         rc = ll_get_default_mdsize(sbi, &lmmsize);
1667         if (rc)
1668                 RETURN(rc);
1669
1670         op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1671                                      strlen(filename), lmmsize,
1672                                      LUSTRE_OPC_ANY, NULL);
1673         if (IS_ERR(op_data))
1674                 RETURN(PTR_ERR(op_data));
1675
1676         op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1677         rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1678         ll_finish_md_op_data(op_data);
1679         if (rc < 0) {
1680                 CDEBUG(D_INFO, "md_getattr_name failed "
1681                        "on %s: rc %d\n", filename, rc);
1682                 GOTO(out, rc);
1683         }
1684
1685         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1686         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1687
1688         lmmsize = body->mbo_eadatasize;
1689
1690         if (!(body->mbo_valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1691                         lmmsize == 0) {
1692                 GOTO(out, rc = -ENODATA);
1693         }
1694
1695         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1696         LASSERT(lmm != NULL);
1697
1698         if (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1) &&
1699             lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3) &&
1700             lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_COMP_V1))
1701                 GOTO(out, rc = -EPROTO);
1702
1703         /*
1704          * This is coming from the MDS, so is probably in
1705          * little endian.  We convert it to host endian before
1706          * passing it to userspace.
1707          */
1708         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1709                 int stripe_count;
1710
1711                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1) ||
1712                     lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1713                         stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
1714                         if (le32_to_cpu(lmm->lmm_pattern) &
1715                             LOV_PATTERN_F_RELEASED)
1716                                 stripe_count = 0;
1717                 }
1718
1719                 /* if function called for directory - we should
1720                  * avoid swab not existent lsm objects */
1721                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1722                         lustre_swab_lov_user_md_v1(
1723                                         (struct lov_user_md_v1 *)lmm);
1724                         if (S_ISREG(body->mbo_mode))
1725                                 lustre_swab_lov_user_md_objects(
1726                                     ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1727                                     stripe_count);
1728                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1729                         lustre_swab_lov_user_md_v3(
1730                                         (struct lov_user_md_v3 *)lmm);
1731                         if (S_ISREG(body->mbo_mode))
1732                                 lustre_swab_lov_user_md_objects(
1733                                     ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1734                                     stripe_count);
1735                 } else if (lmm->lmm_magic ==
1736                            cpu_to_le32(LOV_MAGIC_COMP_V1)) {
1737                         lustre_swab_lov_comp_md_v1(
1738                                         (struct lov_comp_md_v1 *)lmm);
1739                 }
1740         }
1741
1742 out:
1743         *lmmp = lmm;
1744         *lmm_size = lmmsize;
1745         *request = req;
1746         return rc;
1747 }
1748
1749 static int ll_lov_setea(struct inode *inode, struct file *file,
1750                         void __user *arg)
1751 {
1752         __u64                    flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1753         struct lov_user_md      *lump;
1754         int                      lum_size = sizeof(struct lov_user_md) +
1755                                             sizeof(struct lov_user_ost_data);
1756         int                      rc;
1757         ENTRY;
1758
1759         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1760                 RETURN(-EPERM);
1761
1762         OBD_ALLOC_LARGE(lump, lum_size);
1763         if (lump == NULL)
1764                 RETURN(-ENOMEM);
1765
1766         if (copy_from_user(lump, arg, lum_size))
1767                 GOTO(out_lump, rc = -EFAULT);
1768
1769         rc = ll_lov_setstripe_ea_info(inode, file_dentry(file), flags, lump,
1770                                       lum_size);
1771         cl_lov_delay_create_clear(&file->f_flags);
1772
1773 out_lump:
1774         OBD_FREE_LARGE(lump, lum_size);
1775         RETURN(rc);
1776 }
1777
1778 static int ll_file_getstripe(struct inode *inode, void __user *lum, size_t size)
1779 {
1780         struct lu_env   *env;
1781         __u16           refcheck;
1782         int             rc;
1783         ENTRY;
1784
1785         env = cl_env_get(&refcheck);
1786         if (IS_ERR(env))
1787                 RETURN(PTR_ERR(env));
1788
1789         rc = cl_object_getstripe(env, ll_i2info(inode)->lli_clob, lum, size);
1790         cl_env_put(env, &refcheck);
1791         RETURN(rc);
1792 }
1793
1794 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1795                             void __user *arg)
1796 {
1797         struct lov_user_md __user *lum = (struct lov_user_md __user *)arg;
1798         struct lov_user_md        *klum;
1799         int                        lum_size, rc;
1800         __u64                      flags = FMODE_WRITE;
1801         ENTRY;
1802
1803         rc = ll_copy_user_md(lum, &klum);
1804         if (rc < 0)
1805                 RETURN(rc);
1806
1807         lum_size = rc;
1808         rc = ll_lov_setstripe_ea_info(inode, file_dentry(file), flags, klum,
1809                                       lum_size);
1810         if (!rc) {
1811                 __u32 gen;
1812
1813                 rc = put_user(0, &lum->lmm_stripe_count);
1814                 if (rc)
1815                         GOTO(out, rc);
1816
1817                 rc = ll_layout_refresh(inode, &gen);
1818                 if (rc)
1819                         GOTO(out, rc);
1820
1821                 rc = ll_file_getstripe(inode, arg, lum_size);
1822         }
1823         cl_lov_delay_create_clear(&file->f_flags);
1824
1825 out:
1826         OBD_FREE(klum, lum_size);
1827         RETURN(rc);
1828 }
1829
1830 static int
1831 ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1832 {
1833         struct ll_inode_info *lli = ll_i2info(inode);
1834         struct cl_object *obj = lli->lli_clob;
1835         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1836         struct ll_grouplock grouplock;
1837         int rc;
1838         ENTRY;
1839
1840         if (arg == 0) {
1841                 CWARN("group id for group lock must not be 0\n");
1842                 RETURN(-EINVAL);
1843         }
1844
1845         if (ll_file_nolock(file))
1846                 RETURN(-EOPNOTSUPP);
1847
1848         spin_lock(&lli->lli_lock);
1849         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1850                 CWARN("group lock already existed with gid %lu\n",
1851                       fd->fd_grouplock.lg_gid);
1852                 spin_unlock(&lli->lli_lock);
1853                 RETURN(-EINVAL);
1854         }
1855         LASSERT(fd->fd_grouplock.lg_lock == NULL);
1856         spin_unlock(&lli->lli_lock);
1857
1858         /**
1859          * XXX: group lock needs to protect all OST objects while PFL
1860          * can add new OST objects during the IO, so we'd instantiate
1861          * all OST objects before getting its group lock.
1862          */
1863         if (obj) {
1864                 struct lu_env *env;
1865                 __u16 refcheck;
1866                 struct cl_layout cl = {
1867                         .cl_is_composite = false,
1868                 };
1869
1870                 env = cl_env_get(&refcheck);
1871                 if (IS_ERR(env))
1872                         RETURN(PTR_ERR(env));
1873
1874                 rc = cl_object_layout_get(env, obj, &cl);
1875                 if (!rc && cl.cl_is_composite)
1876                         rc = ll_layout_write_intent(inode, 0, OBD_OBJECT_EOF);
1877
1878                 cl_env_put(env, &refcheck);
1879                 if (rc)
1880                         RETURN(rc);
1881         }
1882
1883         rc = cl_get_grouplock(ll_i2info(inode)->lli_clob,
1884                               arg, (file->f_flags & O_NONBLOCK), &grouplock);
1885         if (rc)
1886                 RETURN(rc);
1887
1888         spin_lock(&lli->lli_lock);
1889         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1890                 spin_unlock(&lli->lli_lock);
1891                 CERROR("another thread just won the race\n");
1892                 cl_put_grouplock(&grouplock);
1893                 RETURN(-EINVAL);
1894         }
1895
1896         fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1897         fd->fd_grouplock = grouplock;
1898         spin_unlock(&lli->lli_lock);
1899
1900         CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1901         RETURN(0);
1902 }
1903
1904 static int ll_put_grouplock(struct inode *inode, struct file *file,
1905                             unsigned long arg)
1906 {
1907         struct ll_inode_info   *lli = ll_i2info(inode);
1908         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1909         struct ll_grouplock     grouplock;
1910         ENTRY;
1911
1912         spin_lock(&lli->lli_lock);
1913         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1914                 spin_unlock(&lli->lli_lock);
1915                 CWARN("no group lock held\n");
1916                 RETURN(-EINVAL);
1917         }
1918
1919         LASSERT(fd->fd_grouplock.lg_lock != NULL);
1920
1921         if (fd->fd_grouplock.lg_gid != arg) {
1922                 CWARN("group lock %lu doesn't match current id %lu\n",
1923                       arg, fd->fd_grouplock.lg_gid);
1924                 spin_unlock(&lli->lli_lock);
1925                 RETURN(-EINVAL);
1926         }
1927
1928         grouplock = fd->fd_grouplock;
1929         memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1930         fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1931         spin_unlock(&lli->lli_lock);
1932
1933         cl_put_grouplock(&grouplock);
1934         CDEBUG(D_INFO, "group lock %lu released\n", arg);
1935         RETURN(0);
1936 }
1937
1938 /**
1939  * Close inode open handle
1940  *
1941  * \param dentry [in]     dentry which contains the inode
1942  * \param it     [in,out] intent which contains open info and result
1943  *
1944  * \retval 0     success
1945  * \retval <0    failure
1946  */
1947 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1948 {
1949         struct inode *inode = dentry->d_inode;
1950         struct obd_client_handle *och;
1951         int rc;
1952         ENTRY;
1953
1954         LASSERT(inode);
1955
1956         /* Root ? Do nothing. */
1957         if (dentry->d_inode->i_sb->s_root == dentry)
1958                 RETURN(0);
1959
1960         /* No open handle to close? Move away */
1961         if (!it_disposition(it, DISP_OPEN_OPEN))
1962                 RETURN(0);
1963
1964         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1965
1966         OBD_ALLOC(och, sizeof(*och));
1967         if (!och)
1968                 GOTO(out, rc = -ENOMEM);
1969
1970         ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och);
1971
1972         rc = ll_close_inode_openhandle(inode, och, 0, NULL);
1973 out:
1974         /* this one is in place of ll_file_open */
1975         if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1976                 ptlrpc_req_finished(it->it_request);
1977                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1978         }
1979         RETURN(rc);
1980 }
1981
1982 /**
1983  * Get size for inode for which FIEMAP mapping is requested.
1984  * Make the FIEMAP get_info call and returns the result.
1985  * \param fiemap        kernel buffer to hold extens
1986  * \param num_bytes     kernel buffer size
1987  */
1988 static int ll_do_fiemap(struct inode *inode, struct fiemap *fiemap,
1989                         size_t num_bytes)
1990 {
1991         struct lu_env                   *env;
1992         __u16                           refcheck;
1993         int                             rc = 0;
1994         struct ll_fiemap_info_key       fmkey = { .lfik_name = KEY_FIEMAP, };
1995         ENTRY;
1996
1997         /* Checks for fiemap flags */
1998         if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1999                 fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
2000                 return -EBADR;
2001         }
2002
2003         /* Check for FIEMAP_FLAG_SYNC */
2004         if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
2005                 rc = filemap_fdatawrite(inode->i_mapping);
2006                 if (rc)
2007                         return rc;
2008         }
2009
2010         env = cl_env_get(&refcheck);
2011         if (IS_ERR(env))
2012                 RETURN(PTR_ERR(env));
2013
2014         if (i_size_read(inode) == 0) {
2015                 rc = ll_glimpse_size(inode);
2016                 if (rc)
2017                         GOTO(out, rc);
2018         }
2019
2020         fmkey.lfik_oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2021         obdo_from_inode(&fmkey.lfik_oa, inode, OBD_MD_FLSIZE);
2022         obdo_set_parent_fid(&fmkey.lfik_oa, &ll_i2info(inode)->lli_fid);
2023
2024         /* If filesize is 0, then there would be no objects for mapping */
2025         if (fmkey.lfik_oa.o_size == 0) {
2026                 fiemap->fm_mapped_extents = 0;
2027                 GOTO(out, rc = 0);
2028         }
2029
2030         fmkey.lfik_fiemap = *fiemap;
2031
2032         rc = cl_object_fiemap(env, ll_i2info(inode)->lli_clob,
2033                               &fmkey, fiemap, &num_bytes);
2034 out:
2035         cl_env_put(env, &refcheck);
2036         RETURN(rc);
2037 }
2038
2039 int ll_fid2path(struct inode *inode, void __user *arg)
2040 {
2041         struct obd_export       *exp = ll_i2mdexp(inode);
2042         const struct getinfo_fid2path __user *gfin = arg;
2043         __u32                    pathlen;
2044         struct getinfo_fid2path *gfout;
2045         size_t                   outsize;
2046         int                      rc;
2047
2048         ENTRY;
2049
2050         if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
2051             !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
2052                 RETURN(-EPERM);
2053
2054         /* Only need to get the buflen */
2055         if (get_user(pathlen, &gfin->gf_pathlen))
2056                 RETURN(-EFAULT);
2057
2058         if (pathlen > PATH_MAX)
2059                 RETURN(-EINVAL);
2060
2061         outsize = sizeof(*gfout) + pathlen;
2062         OBD_ALLOC(gfout, outsize);
2063         if (gfout == NULL)
2064                 RETURN(-ENOMEM);
2065
2066         if (copy_from_user(gfout, arg, sizeof(*gfout)))
2067                 GOTO(gf_free, rc = -EFAULT);
2068         /* append root FID after gfout to let MDT know the root FID so that it
2069          * can lookup the correct path, this is mainly for fileset.
2070          * old server without fileset mount support will ignore this. */
2071         *gfout->gf_u.gf_root_fid = *ll_inode2fid(inode);
2072
2073         /* Call mdc_iocontrol */
2074         rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
2075         if (rc != 0)
2076                 GOTO(gf_free, rc);
2077
2078         if (copy_to_user(arg, gfout, outsize))
2079                 rc = -EFAULT;
2080
2081 gf_free:
2082         OBD_FREE(gfout, outsize);
2083         RETURN(rc);
2084 }
2085
2086 /*
2087  * Read the data_version for inode.
2088  *
2089  * This value is computed using stripe object version on OST.
2090  * Version is computed using server side locking.
2091  *
2092  * @param flags if do sync on the OST side;
2093  *              0: no sync
2094  *              LL_DV_RD_FLUSH: flush dirty pages, LCK_PR on OSTs
2095  *              LL_DV_WR_FLUSH: drop all caching pages, LCK_PW on OSTs
2096  */
2097 int ll_data_version(struct inode *inode, __u64 *data_version, int flags)
2098 {
2099         struct cl_object *obj = ll_i2info(inode)->lli_clob;
2100         struct lu_env *env;
2101         struct cl_io *io;
2102         __u16  refcheck;
2103         int result;
2104
2105         ENTRY;
2106
2107         /* If no file object initialized, we consider its version is 0. */
2108         if (obj == NULL) {
2109                 *data_version = 0;
2110                 RETURN(0);
2111         }
2112
2113         env = cl_env_get(&refcheck);
2114         if (IS_ERR(env))
2115                 RETURN(PTR_ERR(env));
2116
2117         io = vvp_env_thread_io(env);
2118         io->ci_obj = obj;
2119         io->u.ci_data_version.dv_data_version = 0;
2120         io->u.ci_data_version.dv_flags = flags;
2121
2122 restart:
2123         if (cl_io_init(env, io, CIT_DATA_VERSION, io->ci_obj) == 0)
2124                 result = cl_io_loop(env, io);
2125         else
2126                 result = io->ci_result;
2127
2128         *data_version = io->u.ci_data_version.dv_data_version;
2129
2130         cl_io_fini(env, io);
2131
2132         if (unlikely(io->ci_need_restart))
2133                 goto restart;
2134
2135         cl_env_put(env, &refcheck);
2136
2137         RETURN(result);
2138 }
2139
2140 /*
2141  * Trigger a HSM release request for the provided inode.
2142  */
2143 int ll_hsm_release(struct inode *inode)
2144 {
2145         struct lu_env *env;
2146         struct obd_client_handle *och = NULL;
2147         __u64 data_version = 0;
2148         int rc;
2149         __u16 refcheck;
2150         ENTRY;
2151
2152         CDEBUG(D_INODE, "%s: Releasing file "DFID".\n",
2153                ll_get_fsname(inode->i_sb, NULL, 0),
2154                PFID(&ll_i2info(inode)->lli_fid));
2155
2156         och = ll_lease_open(inode, NULL, FMODE_WRITE, MDS_OPEN_RELEASE);
2157         if (IS_ERR(och))
2158                 GOTO(out, rc = PTR_ERR(och));
2159
2160         /* Grab latest data_version and [am]time values */
2161         rc = ll_data_version(inode, &data_version, LL_DV_WR_FLUSH);
2162         if (rc != 0)
2163                 GOTO(out, rc);
2164
2165         env = cl_env_get(&refcheck);
2166         if (IS_ERR(env))
2167                 GOTO(out, rc = PTR_ERR(env));
2168
2169         ll_merge_attr(env, inode);
2170         cl_env_put(env, &refcheck);
2171
2172         /* Release the file.
2173          * NB: lease lock handle is released in mdc_hsm_release_pack() because
2174          * we still need it to pack l_remote_handle to MDT. */
2175         rc = ll_close_inode_openhandle(inode, och, MDS_HSM_RELEASE,
2176                                        &data_version);
2177         och = NULL;
2178
2179         EXIT;
2180 out:
2181         if (och != NULL && !IS_ERR(och)) /* close the file */
2182                 ll_lease_close(och, inode, NULL);
2183
2184         return rc;
2185 }
2186
2187 struct ll_swap_stack {
2188         __u64                    dv1;
2189         __u64                    dv2;
2190         struct inode            *inode1;
2191         struct inode            *inode2;
2192         bool                     check_dv1;
2193         bool                     check_dv2;
2194 };
2195
2196 static int ll_swap_layouts(struct file *file1, struct file *file2,
2197                            struct lustre_swap_layouts *lsl)
2198 {
2199         struct mdc_swap_layouts  msl;
2200         struct md_op_data       *op_data;
2201         __u32                    gid;
2202         __u64                    dv;
2203         struct ll_swap_stack    *llss = NULL;
2204         int                      rc;
2205
2206         OBD_ALLOC_PTR(llss);
2207         if (llss == NULL)
2208                 RETURN(-ENOMEM);
2209
2210         llss->inode1 = file_inode(file1);
2211         llss->inode2 = file_inode(file2);
2212
2213         rc = ll_check_swap_layouts_validity(llss->inode1, llss->inode2);
2214         if (rc < 0)
2215                 GOTO(free, rc);
2216
2217         /* we use 2 bool because it is easier to swap than 2 bits */
2218         if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV1)
2219                 llss->check_dv1 = true;
2220
2221         if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV2)
2222                 llss->check_dv2 = true;
2223
2224         /* we cannot use lsl->sl_dvX directly because we may swap them */
2225         llss->dv1 = lsl->sl_dv1;
2226         llss->dv2 = lsl->sl_dv2;
2227
2228         rc = lu_fid_cmp(ll_inode2fid(llss->inode1), ll_inode2fid(llss->inode2));
2229         if (rc == 0) /* same file, done! */
2230                 GOTO(free, rc);
2231
2232         if (rc < 0) { /* sequentialize it */
2233                 swap(llss->inode1, llss->inode2);
2234                 swap(file1, file2);
2235                 swap(llss->dv1, llss->dv2);
2236                 swap(llss->check_dv1, llss->check_dv2);
2237         }
2238
2239         gid = lsl->sl_gid;
2240         if (gid != 0) { /* application asks to flush dirty cache */
2241                 rc = ll_get_grouplock(llss->inode1, file1, gid);
2242                 if (rc < 0)
2243                         GOTO(free, rc);
2244
2245                 rc = ll_get_grouplock(llss->inode2, file2, gid);
2246                 if (rc < 0) {
2247                         ll_put_grouplock(llss->inode1, file1, gid);
2248                         GOTO(free, rc);
2249                 }
2250         }
2251
2252         /* ultimate check, before swaping the layouts we check if
2253          * dataversion has changed (if requested) */
2254         if (llss->check_dv1) {
2255                 rc = ll_data_version(llss->inode1, &dv, 0);
2256                 if (rc)
2257                         GOTO(putgl, rc);
2258                 if (dv != llss->dv1)
2259                         GOTO(putgl, rc = -EAGAIN);
2260         }
2261
2262         if (llss->check_dv2) {
2263                 rc = ll_data_version(llss->inode2, &dv, 0);
2264                 if (rc)
2265                         GOTO(putgl, rc);
2266                 if (dv != llss->dv2)
2267                         GOTO(putgl, rc = -EAGAIN);
2268         }
2269
2270         /* struct md_op_data is used to send the swap args to the mdt
2271          * only flags is missing, so we use struct mdc_swap_layouts
2272          * through the md_op_data->op_data */
2273         /* flags from user space have to be converted before they are send to
2274          * server, no flag is sent today, they are only used on the client */
2275         msl.msl_flags = 0;
2276         rc = -ENOMEM;
2277         op_data = ll_prep_md_op_data(NULL, llss->inode1, llss->inode2, NULL, 0,
2278                                      0, LUSTRE_OPC_ANY, &msl);
2279         if (IS_ERR(op_data))
2280                 GOTO(free, rc = PTR_ERR(op_data));
2281
2282         rc = obd_iocontrol(LL_IOC_LOV_SWAP_LAYOUTS, ll_i2mdexp(llss->inode1),
2283                            sizeof(*op_data), op_data, NULL);
2284         ll_finish_md_op_data(op_data);
2285
2286         if (rc < 0)
2287                 GOTO(putgl, rc);
2288
2289 putgl:
2290         if (gid != 0) {
2291                 ll_put_grouplock(llss->inode2, file2, gid);
2292                 ll_put_grouplock(llss->inode1, file1, gid);
2293         }
2294
2295 free:
2296         if (llss != NULL)
2297                 OBD_FREE_PTR(llss);
2298
2299         RETURN(rc);
2300 }
2301
2302 int ll_hsm_state_set(struct inode *inode, struct hsm_state_set *hss)
2303 {
2304         struct md_op_data       *op_data;
2305         int                      rc;
2306         ENTRY;
2307
2308         /* Detect out-of range masks */
2309         if ((hss->hss_setmask | hss->hss_clearmask) & ~HSM_FLAGS_MASK)
2310                 RETURN(-EINVAL);
2311
2312         /* Non-root users are forbidden to set or clear flags which are
2313          * NOT defined in HSM_USER_MASK. */
2314         if (((hss->hss_setmask | hss->hss_clearmask) & ~HSM_USER_MASK) &&
2315             !cfs_capable(CFS_CAP_SYS_ADMIN))
2316                 RETURN(-EPERM);
2317
2318         /* Detect out-of range archive id */
2319         if ((hss->hss_valid & HSS_ARCHIVE_ID) &&
2320             (hss->hss_archive_id > LL_HSM_MAX_ARCHIVE))
2321                 RETURN(-EINVAL);
2322
2323         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2324                                      LUSTRE_OPC_ANY, hss);
2325         if (IS_ERR(op_data))
2326                 RETURN(PTR_ERR(op_data));
2327
2328         rc = obd_iocontrol(LL_IOC_HSM_STATE_SET, ll_i2mdexp(inode),
2329                            sizeof(*op_data), op_data, NULL);
2330
2331         ll_finish_md_op_data(op_data);
2332
2333         RETURN(rc);
2334 }
2335
2336 static int ll_hsm_import(struct inode *inode, struct file *file,
2337                          struct hsm_user_import *hui)
2338 {
2339         struct hsm_state_set    *hss = NULL;
2340         struct iattr            *attr = NULL;
2341         int                      rc;
2342         ENTRY;
2343
2344         if (!S_ISREG(inode->i_mode))
2345                 RETURN(-EINVAL);
2346
2347         /* set HSM flags */
2348         OBD_ALLOC_PTR(hss);
2349         if (hss == NULL)
2350                 GOTO(out, rc = -ENOMEM);
2351
2352         hss->hss_valid = HSS_SETMASK | HSS_ARCHIVE_ID;
2353         hss->hss_archive_id = hui->hui_archive_id;
2354         hss->hss_setmask = HS_ARCHIVED | HS_EXISTS | HS_RELEASED;
2355         rc = ll_hsm_state_set(inode, hss);
2356         if (rc != 0)
2357                 GOTO(out, rc);
2358
2359         OBD_ALLOC_PTR(attr);
2360         if (attr == NULL)
2361                 GOTO(out, rc = -ENOMEM);
2362
2363         attr->ia_mode = hui->hui_mode & (S_IRWXU | S_IRWXG | S_IRWXO);
2364         attr->ia_mode |= S_IFREG;
2365         attr->ia_uid = make_kuid(&init_user_ns, hui->hui_uid);
2366         attr->ia_gid = make_kgid(&init_user_ns, hui->hui_gid);
2367         attr->ia_size = hui->hui_size;
2368         attr->ia_mtime.tv_sec = hui->hui_mtime;
2369         attr->ia_mtime.tv_nsec = hui->hui_mtime_ns;
2370         attr->ia_atime.tv_sec = hui->hui_atime;
2371         attr->ia_atime.tv_nsec = hui->hui_atime_ns;
2372
2373         attr->ia_valid = ATTR_SIZE | ATTR_MODE | ATTR_FORCE |
2374                          ATTR_UID | ATTR_GID |
2375                          ATTR_MTIME | ATTR_MTIME_SET |
2376                          ATTR_ATIME | ATTR_ATIME_SET;
2377
2378         inode_lock(inode);
2379
2380         rc = ll_setattr_raw(file_dentry(file), attr, true);
2381         if (rc == -ENODATA)
2382                 rc = 0;
2383
2384         inode_unlock(inode);
2385
2386 out:
2387         if (hss != NULL)
2388                 OBD_FREE_PTR(hss);
2389
2390         if (attr != NULL)
2391                 OBD_FREE_PTR(attr);
2392
2393         RETURN(rc);
2394 }
2395
2396 static inline long ll_lease_type_from_fmode(fmode_t fmode)
2397 {
2398         return ((fmode & FMODE_READ) ? LL_LEASE_RDLCK : 0) |
2399                ((fmode & FMODE_WRITE) ? LL_LEASE_WRLCK : 0);
2400 }
2401
2402 static int ll_file_futimes_3(struct file *file, const struct ll_futimes_3 *lfu)
2403 {
2404         struct inode *inode = file_inode(file);
2405         struct iattr ia = {
2406                 .ia_valid = ATTR_ATIME | ATTR_ATIME_SET |
2407                             ATTR_MTIME | ATTR_MTIME_SET |
2408                             ATTR_CTIME | ATTR_CTIME_SET,
2409                 .ia_atime = {
2410                         .tv_sec = lfu->lfu_atime_sec,
2411                         .tv_nsec = lfu->lfu_atime_nsec,
2412                 },
2413                 .ia_mtime = {
2414                         .tv_sec = lfu->lfu_mtime_sec,
2415                         .tv_nsec = lfu->lfu_mtime_nsec,
2416                 },
2417                 .ia_ctime = {
2418                         .tv_sec = lfu->lfu_ctime_sec,
2419                         .tv_nsec = lfu->lfu_ctime_nsec,
2420                 },
2421         };
2422         int rc;
2423         ENTRY;
2424
2425         if (!capable(CAP_SYS_ADMIN))
2426                 RETURN(-EPERM);
2427
2428         if (!S_ISREG(inode->i_mode))
2429                 RETURN(-EINVAL);
2430
2431         inode_lock(inode);
2432         rc = ll_setattr_raw(file_dentry(file), &ia, false);
2433         inode_unlock(inode);
2434
2435         RETURN(rc);
2436 }
2437
2438 /*
2439  * Give file access advices
2440  *
2441  * The ladvise interface is similar to Linux fadvise() system call, except it
2442  * forwards the advices directly from Lustre client to server. The server side
2443  * codes will apply appropriate read-ahead and caching techniques for the
2444  * corresponding files.
2445  *
2446  * A typical workload for ladvise is e.g. a bunch of different clients are
2447  * doing small random reads of a file, so prefetching pages into OSS cache
2448  * with big linear reads before the random IO is a net benefit. Fetching
2449  * all that data into each client cache with fadvise() may not be, due to
2450  * much more data being sent to the client.
2451  */
2452 static int ll_ladvise(struct inode *inode, struct file *file, __u64 flags,
2453                       struct llapi_lu_ladvise *ladvise)
2454 {
2455         struct lu_env *env;
2456         struct cl_io *io;
2457         struct cl_ladvise_io *lio;
2458         int rc;
2459         __u16 refcheck;
2460         ENTRY;
2461
2462         env = cl_env_get(&refcheck);
2463         if (IS_ERR(env))
2464                 RETURN(PTR_ERR(env));
2465
2466         io = vvp_env_thread_io(env);
2467         io->ci_obj = ll_i2info(inode)->lli_clob;
2468
2469         /* initialize parameters for ladvise */
2470         lio = &io->u.ci_ladvise;
2471         lio->li_start = ladvise->lla_start;
2472         lio->li_end = ladvise->lla_end;
2473         lio->li_fid = ll_inode2fid(inode);
2474         lio->li_advice = ladvise->lla_advice;
2475         lio->li_flags = flags;
2476
2477         if (cl_io_init(env, io, CIT_LADVISE, io->ci_obj) == 0)
2478                 rc = cl_io_loop(env, io);
2479         else
2480                 rc = io->ci_result;
2481
2482         cl_io_fini(env, io);
2483         cl_env_put(env, &refcheck);
2484         RETURN(rc);
2485 }
2486
2487 int ll_ioctl_fsgetxattr(struct inode *inode, unsigned int cmd,
2488                         unsigned long arg)
2489 {
2490         struct fsxattr fsxattr;
2491
2492         if (copy_from_user(&fsxattr,
2493                            (const struct fsxattr __user *)arg,
2494                            sizeof(fsxattr)))
2495                 RETURN(-EFAULT);
2496
2497         fsxattr.fsx_projid = ll_i2info(inode)->lli_projid;
2498         if (copy_to_user((struct fsxattr __user *)arg,
2499                          &fsxattr, sizeof(fsxattr)))
2500                 RETURN(-EFAULT);
2501
2502         RETURN(0);
2503 }
2504
2505 int ll_ioctl_fssetxattr(struct inode *inode, unsigned int cmd,
2506                         unsigned long arg)
2507 {
2508
2509         struct md_op_data *op_data;
2510         struct ptlrpc_request *req = NULL;
2511         int rc = 0;
2512         struct fsxattr fsxattr;
2513
2514         /* only root could change project ID */
2515         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2516                 RETURN(-EPERM);
2517
2518         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2519                                      LUSTRE_OPC_ANY, NULL);
2520         if (IS_ERR(op_data))
2521                 RETURN(PTR_ERR(op_data));
2522
2523         if (copy_from_user(&fsxattr,
2524                            (const struct fsxattr __user *)arg,
2525                            sizeof(fsxattr)))
2526                 GOTO(out_fsxattr1, rc = -EFAULT);
2527
2528         op_data->op_projid = fsxattr.fsx_projid;
2529         op_data->op_attr.ia_valid |= MDS_ATTR_PROJID;
2530         rc = md_setattr(ll_i2sbi(inode)->ll_md_exp, op_data, NULL,
2531                         0, &req);
2532         ptlrpc_req_finished(req);
2533
2534 out_fsxattr1:
2535         ll_finish_md_op_data(op_data);
2536         RETURN(rc);
2537
2538
2539 }
2540
2541 static long
2542 ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
2543 {
2544         struct inode            *inode = file_inode(file);
2545         struct ll_file_data     *fd = LUSTRE_FPRIVATE(file);
2546         int                      flags, rc;
2547         ENTRY;
2548
2549         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), cmd=%x\n",
2550                PFID(ll_inode2fid(inode)), inode, cmd);
2551         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2552
2553         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2554         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2555                 RETURN(-ENOTTY);
2556
2557         switch(cmd) {
2558         case LL_IOC_GETFLAGS:
2559                 /* Get the current value of the file flags */
2560                 return put_user(fd->fd_flags, (int __user *)arg);
2561         case LL_IOC_SETFLAGS:
2562         case LL_IOC_CLRFLAGS:
2563                 /* Set or clear specific file flags */
2564                 /* XXX This probably needs checks to ensure the flags are
2565                  *     not abused, and to handle any flag side effects.
2566                  */
2567                 if (get_user(flags, (int __user *) arg))
2568                         RETURN(-EFAULT);
2569
2570                 if (cmd == LL_IOC_SETFLAGS) {
2571                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2572                             !(file->f_flags & O_DIRECT)) {
2573                                 CERROR("%s: unable to disable locking on "
2574                                        "non-O_DIRECT file\n", current->comm);
2575                                 RETURN(-EINVAL);
2576                         }
2577
2578                         fd->fd_flags |= flags;
2579                 } else {
2580                         fd->fd_flags &= ~flags;
2581                 }
2582                 RETURN(0);
2583         case LL_IOC_LOV_SETSTRIPE:
2584         case LL_IOC_LOV_SETSTRIPE_NEW:
2585                 RETURN(ll_lov_setstripe(inode, file, (void __user *)arg));
2586         case LL_IOC_LOV_SETEA:
2587                 RETURN(ll_lov_setea(inode, file, (void __user *)arg));
2588         case LL_IOC_LOV_SWAP_LAYOUTS: {
2589                 struct file *file2;
2590                 struct lustre_swap_layouts lsl;
2591
2592                 if (copy_from_user(&lsl, (char __user *)arg,
2593                                    sizeof(struct lustre_swap_layouts)))
2594                         RETURN(-EFAULT);
2595
2596                 if ((file->f_flags & O_ACCMODE) == O_RDONLY)
2597                         RETURN(-EPERM);
2598
2599                 file2 = fget(lsl.sl_fd);
2600                 if (file2 == NULL)
2601                         RETURN(-EBADF);
2602
2603                 /* O_WRONLY or O_RDWR */
2604                 if ((file2->f_flags & O_ACCMODE) == O_RDONLY)
2605                         GOTO(out, rc = -EPERM);
2606
2607                 if (lsl.sl_flags & SWAP_LAYOUTS_CLOSE) {
2608                         struct inode                    *inode2;
2609                         struct ll_inode_info            *lli;
2610                         struct obd_client_handle        *och = NULL;
2611
2612                         if (lsl.sl_flags != SWAP_LAYOUTS_CLOSE)
2613                                 GOTO(out, rc = -EINVAL);
2614
2615                         lli = ll_i2info(inode);
2616                         mutex_lock(&lli->lli_och_mutex);
2617                         if (fd->fd_lease_och != NULL) {
2618                                 och = fd->fd_lease_och;
2619                                 fd->fd_lease_och = NULL;
2620                         }
2621                         mutex_unlock(&lli->lli_och_mutex);
2622                         if (och == NULL)
2623                                 GOTO(out, rc = -ENOLCK);
2624                         inode2 = file_inode(file2);
2625                         rc = ll_swap_layouts_close(och, inode, inode2);
2626                 } else {
2627                         rc = ll_swap_layouts(file, file2, &lsl);
2628                 }
2629 out:
2630                 fput(file2);
2631                 RETURN(rc);
2632         }
2633         case LL_IOC_LOV_GETSTRIPE:
2634         case LL_IOC_LOV_GETSTRIPE_NEW:
2635                 RETURN(ll_file_getstripe(inode, (void __user *)arg, 0));
2636         case FSFILT_IOC_GETFLAGS:
2637         case FSFILT_IOC_SETFLAGS:
2638                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2639         case FSFILT_IOC_GETVERSION_OLD:
2640         case FSFILT_IOC_GETVERSION:
2641                 RETURN(put_user(inode->i_generation, (int __user *)arg));
2642         case LL_IOC_GROUP_LOCK:
2643                 RETURN(ll_get_grouplock(inode, file, arg));
2644         case LL_IOC_GROUP_UNLOCK:
2645                 RETURN(ll_put_grouplock(inode, file, arg));
2646         case IOC_OBD_STATFS:
2647                 RETURN(ll_obd_statfs(inode, (void __user *)arg));
2648
2649         /* We need to special case any other ioctls we want to handle,
2650          * to send them to the MDS/OST as appropriate and to properly
2651          * network encode the arg field.
2652         case FSFILT_IOC_SETVERSION_OLD:
2653         case FSFILT_IOC_SETVERSION:
2654         */
2655         case LL_IOC_FLUSHCTX:
2656                 RETURN(ll_flush_ctx(inode));
2657         case LL_IOC_PATH2FID: {
2658                 if (copy_to_user((void __user *)arg, ll_inode2fid(inode),
2659                                  sizeof(struct lu_fid)))
2660                         RETURN(-EFAULT);
2661
2662                 RETURN(0);
2663         }
2664         case LL_IOC_GETPARENT:
2665                 RETURN(ll_getparent(file, (struct getparent __user *)arg));
2666
2667         case OBD_IOC_FID2PATH:
2668                 RETURN(ll_fid2path(inode, (void __user *)arg));
2669         case LL_IOC_DATA_VERSION: {
2670                 struct ioc_data_version idv;
2671                 int rc;
2672
2673                 if (copy_from_user(&idv, (char __user *)arg, sizeof(idv)))
2674                         RETURN(-EFAULT);
2675
2676                 idv.idv_flags &= LL_DV_RD_FLUSH | LL_DV_WR_FLUSH;
2677                 rc = ll_data_version(inode, &idv.idv_version, idv.idv_flags);
2678
2679                 if (rc == 0 &&
2680                     copy_to_user((char __user *)arg, &idv, sizeof(idv)))
2681                         RETURN(-EFAULT);
2682
2683                 RETURN(rc);
2684         }
2685
2686         case LL_IOC_GET_MDTIDX: {
2687                 int mdtidx;
2688
2689                 mdtidx = ll_get_mdt_idx(inode);
2690                 if (mdtidx < 0)
2691                         RETURN(mdtidx);
2692
2693                 if (put_user((int)mdtidx, (int __user *)arg))
2694                         RETURN(-EFAULT);
2695
2696                 RETURN(0);
2697         }
2698         case OBD_IOC_GETDTNAME:
2699         case OBD_IOC_GETMDNAME:
2700                 RETURN(ll_get_obd_name(inode, cmd, arg));
2701         case LL_IOC_HSM_STATE_GET: {
2702                 struct md_op_data       *op_data;
2703                 struct hsm_user_state   *hus;
2704                 int                      rc;
2705
2706                 OBD_ALLOC_PTR(hus);
2707                 if (hus == NULL)
2708                         RETURN(-ENOMEM);
2709
2710                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2711                                              LUSTRE_OPC_ANY, hus);
2712                 if (IS_ERR(op_data)) {
2713                         OBD_FREE_PTR(hus);
2714                         RETURN(PTR_ERR(op_data));
2715                 }
2716
2717                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2718                                    op_data, NULL);
2719
2720                 if (copy_to_user((void __user *)arg, hus, sizeof(*hus)))
2721                         rc = -EFAULT;
2722
2723                 ll_finish_md_op_data(op_data);
2724                 OBD_FREE_PTR(hus);
2725                 RETURN(rc);
2726         }
2727         case LL_IOC_HSM_STATE_SET: {
2728                 struct hsm_state_set    *hss;
2729                 int                      rc;
2730
2731                 OBD_ALLOC_PTR(hss);
2732                 if (hss == NULL)
2733                         RETURN(-ENOMEM);
2734
2735                 if (copy_from_user(hss, (char __user *)arg, sizeof(*hss))) {
2736                         OBD_FREE_PTR(hss);
2737                         RETURN(-EFAULT);
2738                 }
2739
2740                 rc = ll_hsm_state_set(inode, hss);
2741
2742                 OBD_FREE_PTR(hss);
2743                 RETURN(rc);
2744         }
2745         case LL_IOC_HSM_ACTION: {
2746                 struct md_op_data               *op_data;
2747                 struct hsm_current_action       *hca;
2748                 int                              rc;
2749
2750                 OBD_ALLOC_PTR(hca);
2751                 if (hca == NULL)
2752                         RETURN(-ENOMEM);
2753
2754                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2755                                              LUSTRE_OPC_ANY, hca);
2756                 if (IS_ERR(op_data)) {
2757                         OBD_FREE_PTR(hca);
2758                         RETURN(PTR_ERR(op_data));
2759                 }
2760
2761                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2762                                    op_data, NULL);
2763
2764                 if (copy_to_user((char __user *)arg, hca, sizeof(*hca)))
2765                         rc = -EFAULT;
2766
2767                 ll_finish_md_op_data(op_data);
2768                 OBD_FREE_PTR(hca);
2769                 RETURN(rc);
2770         }
2771         case LL_IOC_SET_LEASE: {
2772                 struct ll_inode_info *lli = ll_i2info(inode);
2773                 struct obd_client_handle *och = NULL;
2774                 bool lease_broken;
2775                 fmode_t fmode;
2776
2777                 switch (arg) {
2778                 case LL_LEASE_WRLCK:
2779                         if (!(file->f_mode & FMODE_WRITE))
2780                                 RETURN(-EPERM);
2781                         fmode = FMODE_WRITE;
2782                         break;
2783                 case LL_LEASE_RDLCK:
2784                         if (!(file->f_mode & FMODE_READ))
2785                                 RETURN(-EPERM);
2786                         fmode = FMODE_READ;
2787                         break;
2788                 case LL_LEASE_UNLCK:
2789                         mutex_lock(&lli->lli_och_mutex);
2790                         if (fd->fd_lease_och != NULL) {
2791                                 och = fd->fd_lease_och;
2792                                 fd->fd_lease_och = NULL;
2793                         }
2794                         mutex_unlock(&lli->lli_och_mutex);
2795
2796                         if (och == NULL)
2797                                 RETURN(-ENOLCK);
2798
2799                         fmode = och->och_flags;
2800                         rc = ll_lease_close(och, inode, &lease_broken);
2801                         if (rc < 0)
2802                                 RETURN(rc);
2803
2804                         rc = ll_lease_och_release(inode, file);
2805                         if (rc < 0)
2806                                 RETURN(rc);
2807
2808                         if (lease_broken)
2809                                 fmode = 0;
2810
2811                         RETURN(ll_lease_type_from_fmode(fmode));
2812                 default:
2813                         RETURN(-EINVAL);
2814                 }
2815
2816                 CDEBUG(D_INODE, "Set lease with mode %u\n", fmode);
2817
2818                 /* apply for lease */
2819                 och = ll_lease_open(inode, file, fmode, 0);
2820                 if (IS_ERR(och))
2821                         RETURN(PTR_ERR(och));
2822
2823                 rc = 0;
2824                 mutex_lock(&lli->lli_och_mutex);
2825                 if (fd->fd_lease_och == NULL) {
2826                         fd->fd_lease_och = och;
2827                         och = NULL;
2828                 }
2829                 mutex_unlock(&lli->lli_och_mutex);
2830                 if (och != NULL) {
2831                         /* impossible now that only excl is supported for now */
2832                         ll_lease_close(och, inode, &lease_broken);
2833                         rc = -EBUSY;
2834                 }
2835                 RETURN(rc);
2836         }
2837         case LL_IOC_GET_LEASE: {
2838                 struct ll_inode_info *lli = ll_i2info(inode);
2839                 struct ldlm_lock *lock = NULL;
2840                 fmode_t fmode = 0;
2841
2842                 mutex_lock(&lli->lli_och_mutex);
2843                 if (fd->fd_lease_och != NULL) {
2844                         struct obd_client_handle *och = fd->fd_lease_och;
2845
2846                         lock = ldlm_handle2lock(&och->och_lease_handle);
2847                         if (lock != NULL) {
2848                                 lock_res_and_lock(lock);
2849                                 if (!ldlm_is_cancel(lock))
2850                                         fmode = och->och_flags;
2851
2852                                 unlock_res_and_lock(lock);
2853                                 LDLM_LOCK_PUT(lock);
2854                         }
2855                 }
2856                 mutex_unlock(&lli->lli_och_mutex);
2857
2858                 RETURN(ll_lease_type_from_fmode(fmode));
2859         }
2860         case LL_IOC_HSM_IMPORT: {
2861                 struct hsm_user_import *hui;
2862
2863                 OBD_ALLOC_PTR(hui);
2864                 if (hui == NULL)
2865                         RETURN(-ENOMEM);
2866
2867                 if (copy_from_user(hui, (void __user *)arg, sizeof(*hui))) {
2868                         OBD_FREE_PTR(hui);
2869                         RETURN(-EFAULT);
2870                 }
2871
2872                 rc = ll_hsm_import(inode, file, hui);
2873
2874                 OBD_FREE_PTR(hui);
2875                 RETURN(rc);
2876         }
2877         case LL_IOC_FUTIMES_3: {
2878                 struct ll_futimes_3 lfu;
2879
2880                 if (copy_from_user(&lfu,
2881                                    (const struct ll_futimes_3 __user *)arg,
2882                                    sizeof(lfu)))
2883                         RETURN(-EFAULT);
2884
2885                 RETURN(ll_file_futimes_3(file, &lfu));
2886         }
2887         case LL_IOC_LADVISE: {
2888                 struct llapi_ladvise_hdr *ladvise_hdr;
2889                 int i;
2890                 int num_advise;
2891                 int alloc_size = sizeof(*ladvise_hdr);
2892
2893                 rc = 0;
2894                 OBD_ALLOC_PTR(ladvise_hdr);
2895                 if (ladvise_hdr == NULL)
2896                         RETURN(-ENOMEM);
2897
2898                 if (copy_from_user(ladvise_hdr,
2899                                    (const struct llapi_ladvise_hdr __user *)arg,
2900                                    alloc_size))
2901                         GOTO(out_ladvise, rc = -EFAULT);
2902
2903                 if (ladvise_hdr->lah_magic != LADVISE_MAGIC ||
2904                     ladvise_hdr->lah_count < 1)
2905                         GOTO(out_ladvise, rc = -EINVAL);
2906
2907                 num_advise = ladvise_hdr->lah_count;
2908                 if (num_advise >= LAH_COUNT_MAX)
2909                         GOTO(out_ladvise, rc = -EFBIG);
2910
2911                 OBD_FREE_PTR(ladvise_hdr);
2912                 alloc_size = offsetof(typeof(*ladvise_hdr),
2913                                       lah_advise[num_advise]);
2914                 OBD_ALLOC(ladvise_hdr, alloc_size);
2915                 if (ladvise_hdr == NULL)
2916                         RETURN(-ENOMEM);
2917
2918                 /*
2919                  * TODO: submit multiple advices to one server in a single RPC
2920                  */
2921                 if (copy_from_user(ladvise_hdr,
2922                                    (const struct llapi_ladvise_hdr __user *)arg,
2923                                    alloc_size))
2924                         GOTO(out_ladvise, rc = -EFAULT);
2925
2926                 for (i = 0; i < num_advise; i++) {
2927                         rc = ll_ladvise(inode, file, ladvise_hdr->lah_flags,
2928                                         &ladvise_hdr->lah_advise[i]);
2929                         if (rc)
2930                                 break;
2931                 }
2932
2933 out_ladvise:
2934                 OBD_FREE(ladvise_hdr, alloc_size);
2935                 RETURN(rc);
2936         }
2937         case LL_IOC_FSGETXATTR:
2938                 RETURN(ll_ioctl_fsgetxattr(inode, cmd, arg));
2939         case LL_IOC_FSSETXATTR:
2940                 RETURN(ll_ioctl_fssetxattr(inode, cmd, arg));
2941         case BLKSSZGET:
2942                 RETURN(put_user(PAGE_SIZE, (int __user *)arg));
2943         default:
2944                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2945                                      (void __user *)arg));
2946         }
2947 }
2948
2949 #ifndef HAVE_FILE_LLSEEK_SIZE
2950 static inline loff_t
2951 llseek_execute(struct file *file, loff_t offset, loff_t maxsize)
2952 {
2953         if (offset < 0 && !(file->f_mode & FMODE_UNSIGNED_OFFSET))
2954                 return -EINVAL;
2955         if (offset > maxsize)
2956                 return -EINVAL;
2957
2958         if (offset != file->f_pos) {
2959                 file->f_pos = offset;
2960                 file->f_version = 0;
2961         }
2962         return offset;
2963 }
2964
2965 static loff_t
2966 generic_file_llseek_size(struct file *file, loff_t offset, int origin,
2967                 loff_t maxsize, loff_t eof)
2968 {
2969         struct inode *inode = file_inode(file);
2970
2971         switch (origin) {
2972         case SEEK_END:
2973                 offset += eof;
2974                 break;
2975         case SEEK_CUR:
2976                 /*
2977                  * Here we special-case the lseek(fd, 0, SEEK_CUR)
2978                  * position-querying operation.  Avoid rewriting the "same"
2979                  * f_pos value back to the file because a concurrent read(),
2980                  * write() or lseek() might have altered it
2981                  */
2982                 if (offset == 0)
2983                         return file->f_pos;
2984                 /*
2985                  * f_lock protects against read/modify/write race with other
2986                  * SEEK_CURs. Note that parallel writes and reads behave
2987                  * like SEEK_SET.
2988                  */
2989                 inode_lock(inode);
2990                 offset = llseek_execute(file, file->f_pos + offset, maxsize);
2991                 inode_unlock(inode);
2992                 return offset;
2993         case SEEK_DATA:
2994                 /*
2995                  * In the generic case the entire file is data, so as long as
2996                  * offset isn't at the end of the file then the offset is data.
2997                  */
2998                 if (offset >= eof)
2999                         return -ENXIO;
3000                 break;
3001         case SEEK_HOLE:
3002                 /*
3003                  * There is a virtual hole at the end of the file, so as long as
3004                  * offset isn't i_size or larger, return i_size.
3005                  */
3006                 if (offset >= eof)
3007                         return -ENXIO;
3008                 offset = eof;
3009                 break;
3010         }
3011
3012         return llseek_execute(file, offset, maxsize);
3013 }
3014 #endif
3015
3016 static loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
3017 {
3018         struct inode *inode = file_inode(file);
3019         loff_t retval, eof = 0;
3020
3021         ENTRY;
3022         retval = offset + ((origin == SEEK_END) ? i_size_read(inode) :
3023                            (origin == SEEK_CUR) ? file->f_pos : 0);
3024         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), to=%llu=%#llx(%d)\n",
3025                PFID(ll_inode2fid(inode)), inode, retval, retval,
3026                origin);
3027         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
3028
3029         if (origin == SEEK_END || origin == SEEK_HOLE || origin == SEEK_DATA) {
3030                 retval = ll_glimpse_size(inode);
3031                 if (retval != 0)
3032                         RETURN(retval);
3033                 eof = i_size_read(inode);
3034         }
3035
3036         retval = ll_generic_file_llseek_size(file, offset, origin,
3037                                           ll_file_maxbytes(inode), eof);
3038         RETURN(retval);
3039 }
3040
3041 static int ll_flush(struct file *file, fl_owner_t id)
3042 {
3043         struct inode *inode = file_inode(file);
3044         struct ll_inode_info *lli = ll_i2info(inode);
3045         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
3046         int rc, err;
3047
3048         LASSERT(!S_ISDIR(inode->i_mode));
3049
3050         /* catch async errors that were recorded back when async writeback
3051          * failed for pages in this mapping. */
3052         rc = lli->lli_async_rc;
3053         lli->lli_async_rc = 0;
3054         if (lli->lli_clob != NULL) {
3055                 err = lov_read_and_clear_async_rc(lli->lli_clob);
3056                 if (rc == 0)
3057                         rc = err;
3058         }
3059
3060         /* The application has been told write failure already.
3061          * Do not report failure again. */
3062         if (fd->fd_write_failed)
3063                 return 0;
3064         return rc ? -EIO : 0;
3065 }
3066
3067 /**
3068  * Called to make sure a portion of file has been written out.
3069  * if @mode is not CL_FSYNC_LOCAL, it will send OST_SYNC RPCs to OST.
3070  *
3071  * Return how many pages have been written.
3072  */
3073 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
3074                        enum cl_fsync_mode mode, int ignore_layout)
3075 {
3076         struct lu_env *env;
3077         struct cl_io *io;
3078         struct cl_fsync_io *fio;
3079         int result;
3080         __u16 refcheck;
3081         ENTRY;
3082
3083         if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
3084             mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
3085                 RETURN(-EINVAL);
3086
3087         env = cl_env_get(&refcheck);
3088         if (IS_ERR(env))
3089                 RETURN(PTR_ERR(env));
3090
3091         io = vvp_env_thread_io(env);
3092         io->ci_obj = ll_i2info(inode)->lli_clob;
3093         io->ci_ignore_layout = ignore_layout;
3094
3095         /* initialize parameters for sync */
3096         fio = &io->u.ci_fsync;
3097         fio->fi_start = start;
3098         fio->fi_end = end;
3099         fio->fi_fid = ll_inode2fid(inode);
3100         fio->fi_mode = mode;
3101         fio->fi_nr_written = 0;
3102
3103         if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
3104                 result = cl_io_loop(env, io);
3105         else
3106                 result = io->ci_result;
3107         if (result == 0)
3108                 result = fio->fi_nr_written;
3109         cl_io_fini(env, io);
3110         cl_env_put(env, &refcheck);
3111
3112         RETURN(result);
3113 }
3114
3115 /*
3116  * When dentry is provided (the 'else' case), file_dentry() may be
3117  * null and dentry must be used directly rather than pulled from
3118  * file_dentry() as is done otherwise.
3119  */
3120
3121 #ifdef HAVE_FILE_FSYNC_4ARGS
3122 int ll_fsync(struct file *file, loff_t start, loff_t end, int datasync)
3123 {
3124         struct dentry *dentry = file_dentry(file);
3125         bool lock_inode;
3126 #elif defined(HAVE_FILE_FSYNC_2ARGS)
3127 int ll_fsync(struct file *file, int datasync)
3128 {
3129         struct dentry *dentry = file_dentry(file);
3130         loff_t start = 0;
3131         loff_t end = LLONG_MAX;
3132 #else
3133 int ll_fsync(struct file *file, struct dentry *dentry, int datasync)
3134 {
3135         loff_t start = 0;
3136         loff_t end = LLONG_MAX;
3137 #endif
3138         struct inode *inode = dentry->d_inode;
3139         struct ll_inode_info *lli = ll_i2info(inode);
3140         struct ptlrpc_request *req;
3141         int rc, err;
3142         ENTRY;
3143
3144         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p)\n",
3145                PFID(ll_inode2fid(inode)), inode);
3146         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
3147
3148 #ifdef HAVE_FILE_FSYNC_4ARGS
3149         rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
3150         lock_inode = !lli->lli_inode_locked;
3151         if (lock_inode)
3152                 inode_lock(inode);
3153 #else
3154         /* fsync's caller has already called _fdata{sync,write}, we want
3155          * that IO to finish before calling the osc and mdc sync methods */
3156         rc = filemap_fdatawait(inode->i_mapping);
3157 #endif
3158
3159         /* catch async errors that were recorded back when async writeback
3160          * failed for pages in this mapping. */
3161         if (!S_ISDIR(inode->i_mode)) {
3162                 err = lli->lli_async_rc;
3163                 lli->lli_async_rc = 0;
3164                 if (rc == 0)
3165                         rc = err;
3166                 if (lli->lli_clob != NULL) {
3167                         err = lov_read_and_clear_async_rc(lli->lli_clob);
3168                         if (rc == 0)
3169                                 rc = err;
3170                 }
3171         }
3172
3173         err = md_fsync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), &req);
3174         if (!rc)
3175                 rc = err;
3176         if (!err)
3177                 ptlrpc_req_finished(req);
3178
3179         if (S_ISREG(inode->i_mode)) {
3180                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
3181
3182                 err = cl_sync_file_range(inode, start, end, CL_FSYNC_ALL, 0);
3183                 if (rc == 0 && err < 0)
3184                         rc = err;
3185                 if (rc < 0)
3186                         fd->fd_write_failed = true;
3187                 else
3188                         fd->fd_write_failed = false;
3189         }
3190
3191 #ifdef HAVE_FILE_FSYNC_4ARGS
3192         if (lock_inode)
3193                 inode_unlock(inode);
3194 #endif
3195         RETURN(rc);
3196 }
3197
3198 static int
3199 ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
3200 {
3201         struct inode *inode = file_inode(file);
3202         struct ll_sb_info *sbi = ll_i2sbi(inode);
3203         struct ldlm_enqueue_info einfo = {
3204                 .ei_type        = LDLM_FLOCK,
3205                 .ei_cb_cp       = ldlm_flock_completion_ast,
3206                 .ei_cbdata      = file_lock,
3207         };
3208         struct md_op_data *op_data;
3209         struct lustre_handle lockh = { 0 };
3210         union ldlm_policy_data flock = { { 0 } };
3211         int fl_type = file_lock->fl_type;
3212         __u64 flags = 0;
3213         int rc;
3214         int rc2 = 0;
3215         ENTRY;
3216
3217         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID" file_lock=%p\n",
3218                PFID(ll_inode2fid(inode)), file_lock);
3219
3220         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
3221
3222         if (file_lock->fl_flags & FL_FLOCK) {
3223                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
3224                 /* flocks are whole-file locks */
3225                 flock.l_flock.end = OFFSET_MAX;
3226                 /* For flocks owner is determined by the local file desctiptor*/
3227                 flock.l_flock.owner = (unsigned long)file_lock->fl_file;
3228         } else if (file_lock->fl_flags & FL_POSIX) {
3229                 flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
3230                 flock.l_flock.start = file_lock->fl_start;
3231                 flock.l_flock.end = file_lock->fl_end;
3232         } else {
3233                 RETURN(-EINVAL);
3234         }
3235         flock.l_flock.pid = file_lock->fl_pid;
3236
3237         /* Somewhat ugly workaround for svc lockd.
3238          * lockd installs custom fl_lmops->lm_compare_owner that checks
3239          * for the fl_owner to be the same (which it always is on local node
3240          * I guess between lockd processes) and then compares pid.
3241          * As such we assign pid to the owner field to make it all work,
3242          * conflict with normal locks is unlikely since pid space and
3243          * pointer space for current->files are not intersecting */
3244         if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
3245                 flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
3246
3247         switch (fl_type) {
3248         case F_RDLCK:
3249                 einfo.ei_mode = LCK_PR;
3250                 break;
3251         case F_UNLCK:
3252                 /* An unlock request may or may not have any relation to
3253                  * existing locks so we may not be able to pass a lock handle
3254                  * via a normal ldlm_lock_cancel() request. The request may even
3255                  * unlock a byte range in the middle of an existing lock. In
3256                  * order to process an unlock request we need all of the same
3257                  * information that is given with a normal read or write record
3258                  * lock request. To avoid creating another ldlm unlock (cancel)
3259                  * message we'll treat a LCK_NL flock request as an unlock. */
3260                 einfo.ei_mode = LCK_NL;
3261                 break;
3262         case F_WRLCK:
3263                 einfo.ei_mode = LCK_PW;
3264                 break;
3265         default:
3266                 CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n", fl_type);
3267                 RETURN (-ENOTSUPP);
3268         }
3269
3270         switch (cmd) {
3271         case F_SETLKW:
3272 #ifdef F_SETLKW64
3273         case F_SETLKW64:
3274 #endif
3275                 flags = 0;
3276                 break;
3277         case F_SETLK:
3278 #ifdef F_SETLK64
3279         case F_SETLK64:
3280 #endif
3281                 flags = LDLM_FL_BLOCK_NOWAIT;
3282                 break;
3283         case F_GETLK:
3284 #ifdef F_GETLK64
3285         case F_GETLK64:
3286 #endif
3287                 flags = LDLM_FL_TEST_LOCK;
3288                 break;
3289         default:
3290                 CERROR("unknown fcntl lock command: %d\n", cmd);
3291                 RETURN (-EINVAL);
3292         }
3293
3294         /* Save the old mode so that if the mode in the lock changes we
3295          * can decrement the appropriate reader or writer refcount. */
3296         file_lock->fl_type = einfo.ei_mode;
3297
3298         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
3299                                      LUSTRE_OPC_ANY, NULL);
3300         if (IS_ERR(op_data))
3301                 RETURN(PTR_ERR(op_data));
3302
3303         CDEBUG(D_DLMTRACE, "inode="DFID", pid=%u, flags=%#llx, mode=%u, "
3304                "start=%llu, end=%llu\n", PFID(ll_inode2fid(inode)),
3305                flock.l_flock.pid, flags, einfo.ei_mode,
3306                flock.l_flock.start, flock.l_flock.end);
3307
3308         rc = md_enqueue(sbi->ll_md_exp, &einfo, &flock, op_data, &lockh,
3309                         flags);
3310
3311         /* Restore the file lock type if not TEST lock. */
3312         if (!(flags & LDLM_FL_TEST_LOCK))
3313                 file_lock->fl_type = fl_type;
3314
3315 #ifdef HAVE_LOCKS_LOCK_FILE_WAIT
3316         if ((rc == 0 || file_lock->fl_type == F_UNLCK) &&
3317             !(flags & LDLM_FL_TEST_LOCK))
3318                 rc2  = locks_lock_file_wait(file, file_lock);
3319 #else
3320         if ((file_lock->fl_flags & FL_FLOCK) &&
3321             (rc == 0 || file_lock->fl_type == F_UNLCK))
3322                 rc2  = flock_lock_file_wait(file, file_lock);
3323         if ((file_lock->fl_flags & FL_POSIX) &&
3324             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
3325             !(flags & LDLM_FL_TEST_LOCK))
3326                 rc2  = posix_lock_file_wait(file, file_lock);
3327 #endif /* HAVE_LOCKS_LOCK_FILE_WAIT */
3328
3329         if (rc2 && file_lock->fl_type != F_UNLCK) {
3330                 einfo.ei_mode = LCK_NL;
3331                 md_enqueue(sbi->ll_md_exp, &einfo, &flock, op_data,
3332                            &lockh, flags);
3333                 rc = rc2;
3334         }
3335
3336         ll_finish_md_op_data(op_data);
3337
3338         RETURN(rc);
3339 }
3340
3341 int ll_get_fid_by_name(struct inode *parent, const char *name,
3342                        int namelen, struct lu_fid *fid,
3343                        struct inode **inode)
3344 {
3345         struct md_op_data       *op_data = NULL;
3346         struct mdt_body         *body;
3347         struct ptlrpc_request   *req;
3348         int                     rc;
3349         ENTRY;
3350
3351         op_data = ll_prep_md_op_data(NULL, parent, NULL, name, namelen, 0,
3352                                      LUSTRE_OPC_ANY, NULL);
3353         if (IS_ERR(op_data))
3354                 RETURN(PTR_ERR(op_data));
3355
3356         op_data->op_valid = OBD_MD_FLID | OBD_MD_FLTYPE;
3357         rc = md_getattr_name(ll_i2sbi(parent)->ll_md_exp, op_data, &req);
3358         ll_finish_md_op_data(op_data);
3359         if (rc < 0)
3360                 RETURN(rc);
3361
3362         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
3363         if (body == NULL)
3364                 GOTO(out_req, rc = -EFAULT);
3365         if (fid != NULL)
3366                 *fid = body->mbo_fid1;
3367
3368         if (inode != NULL)
3369                 rc = ll_prep_inode(inode, req, parent->i_sb, NULL);
3370 out_req:
3371         ptlrpc_req_finished(req);
3372         RETURN(rc);
3373 }
3374
3375 int ll_migrate(struct inode *parent, struct file *file, int mdtidx,
3376                const char *name, int namelen)
3377 {
3378         struct dentry         *dchild = NULL;
3379         struct inode          *child_inode = NULL;
3380         struct md_op_data     *op_data;
3381         struct ptlrpc_request *request = NULL;
3382         struct obd_client_handle *och = NULL;
3383         struct qstr           qstr;
3384         struct mdt_body         *body;
3385         int                    rc;
3386         __u64                   data_version = 0;
3387         ENTRY;
3388
3389         CDEBUG(D_VFSTRACE, "migrate %s under "DFID" to MDT%04x\n",
3390                name, PFID(ll_inode2fid(parent)), mdtidx);
3391
3392         op_data = ll_prep_md_op_data(NULL, parent, NULL, name, namelen,
3393                                      0, LUSTRE_OPC_ANY, NULL);
3394         if (IS_ERR(op_data))
3395                 RETURN(PTR_ERR(op_data));
3396
3397         /* Get child FID first */
3398         qstr.hash = ll_full_name_hash(file_dentry(file), name, namelen);
3399         qstr.name = name;
3400         qstr.len = namelen;
3401         dchild = d_lookup(file_dentry(file), &qstr);
3402         if (dchild != NULL) {
3403                 if (dchild->d_inode != NULL)
3404                         child_inode = igrab(dchild->d_inode);
3405                 dput(dchild);
3406         }
3407
3408         if (child_inode == NULL) {
3409                 rc = ll_get_fid_by_name(parent, name, namelen,
3410                                         &op_data->op_fid3, &child_inode);
3411                 if (rc != 0)
3412                         GOTO(out_free, rc);
3413         }
3414
3415         if (child_inode == NULL)
3416                 GOTO(out_free, rc = -EINVAL);
3417
3418         /*
3419          * lfs migrate command needs to be blocked on the client
3420          * by checking the migrate FID against the FID of the
3421          * filesystem root.
3422          */
3423         if (child_inode == parent->i_sb->s_root->d_inode)
3424                 GOTO(out_iput, rc = -EINVAL);
3425
3426         inode_lock(child_inode);
3427         op_data->op_fid3 = *ll_inode2fid(child_inode);
3428         if (!fid_is_sane(&op_data->op_fid3)) {
3429                 CERROR("%s: migrate %s, but FID "DFID" is insane\n",
3430                        ll_get_fsname(parent->i_sb, NULL, 0), name,
3431                        PFID(&op_data->op_fid3));
3432                 GOTO(out_unlock, rc = -EINVAL);
3433         }
3434
3435         rc = ll_get_mdt_idx_by_fid(ll_i2sbi(parent), &op_data->op_fid3);
3436         if (rc < 0)
3437                 GOTO(out_unlock, rc);
3438
3439         if (rc == mdtidx) {
3440                 CDEBUG(D_INFO, "%s: "DFID" is already on MDT%04x\n", name,
3441                        PFID(&op_data->op_fid3), mdtidx);
3442                 GOTO(out_unlock, rc = 0);
3443         }
3444 again:
3445         if (S_ISREG(child_inode->i_mode)) {
3446                 och = ll_lease_open(child_inode, NULL, FMODE_WRITE, 0);
3447                 if (IS_ERR(och)) {
3448                         rc = PTR_ERR(och);
3449                         och = NULL;
3450                         GOTO(out_unlock, rc);
3451                 }
3452
3453                 rc = ll_data_version(child_inode, &data_version,
3454                                      LL_DV_WR_FLUSH);
3455                 if (rc != 0)
3456                         GOTO(out_close, rc);
3457
3458                 op_data->op_handle = och->och_fh;
3459                 op_data->op_data = och->och_mod;
3460                 op_data->op_data_version = data_version;
3461                 op_data->op_lease_handle = och->och_lease_handle;
3462                 op_data->op_bias |= MDS_RENAME_MIGRATE;
3463         }
3464
3465         op_data->op_mds = mdtidx;
3466         op_data->op_cli_flags = CLI_MIGRATE;
3467         rc = md_rename(ll_i2sbi(parent)->ll_md_exp, op_data, name,
3468                        namelen, name, namelen, &request);
3469         if (rc == 0) {
3470                 LASSERT(request != NULL);
3471                 ll_update_times(request, parent);
3472
3473                 body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
3474                 LASSERT(body != NULL);
3475
3476                 /* If the server does release layout lock, then we cleanup
3477                  * the client och here, otherwise release it in out_close: */
3478                 if (och != NULL &&
3479                     body->mbo_valid & OBD_MD_CLOSE_INTENT_EXECED) {
3480                         obd_mod_put(och->och_mod);
3481                         md_clear_open_replay_data(ll_i2sbi(parent)->ll_md_exp,
3482                                                   och);
3483                         och->och_fh.cookie = DEAD_HANDLE_MAGIC;
3484                         OBD_FREE_PTR(och);
3485                         och = NULL;
3486                 }
3487         }
3488
3489         if (request != NULL) {
3490                 ptlrpc_req_finished(request);
3491                 request = NULL;
3492         }
3493
3494         /* Try again if the file layout has changed. */
3495         if (rc == -EAGAIN && S_ISREG(child_inode->i_mode))
3496                 goto again;
3497
3498 out_close:
3499         if (och != NULL) /* close the file */
3500                 ll_lease_close(och, child_inode, NULL);
3501         if (rc == 0)
3502                 clear_nlink(child_inode);
3503 out_unlock:
3504         inode_unlock(child_inode);
3505 out_iput:
3506         iput(child_inode);
3507 out_free:
3508         ll_finish_md_op_data(op_data);
3509         RETURN(rc);
3510 }
3511
3512 static int
3513 ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
3514 {
3515         ENTRY;
3516
3517         RETURN(-ENOSYS);
3518 }
3519
3520 /**
3521  * test if some locks matching bits and l_req_mode are acquired
3522  * - bits can be in different locks
3523  * - if found clear the common lock bits in *bits
3524  * - the bits not found, are kept in *bits
3525  * \param inode [IN]
3526  * \param bits [IN] searched lock bits [IN]
3527  * \param l_req_mode [IN] searched lock mode
3528  * \retval boolean, true iff all bits are found
3529  */
3530 int ll_have_md_lock(struct inode *inode, __u64 *bits, enum ldlm_mode l_req_mode)
3531 {
3532         struct lustre_handle lockh;
3533         union ldlm_policy_data policy;
3534         enum ldlm_mode mode = (l_req_mode == LCK_MINMODE) ?
3535                               (LCK_CR | LCK_CW | LCK_PR | LCK_PW) : l_req_mode;
3536         struct lu_fid *fid;
3537         __u64 flags;
3538         int i;
3539         ENTRY;
3540
3541         if (!inode)
3542                RETURN(0);
3543
3544         fid = &ll_i2info(inode)->lli_fid;
3545         CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
3546                ldlm_lockname[mode]);
3547
3548         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
3549         for (i = 0; i <= MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
3550                 policy.l_inodebits.bits = *bits & (1 << i);
3551                 if (policy.l_inodebits.bits == 0)
3552                         continue;
3553
3554                 if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
3555                                   &policy, mode, &lockh)) {
3556                         struct ldlm_lock *lock;
3557
3558                         lock = ldlm_handle2lock(&lockh);
3559                         if (lock) {
3560                                 *bits &=
3561                                       ~(lock->l_policy_data.l_inodebits.bits);
3562                                 LDLM_LOCK_PUT(lock);
3563                         } else {
3564                                 *bits &= ~policy.l_inodebits.bits;
3565                         }
3566                 }
3567         }
3568         RETURN(*bits == 0);
3569 }
3570
3571 enum ldlm_mode ll_take_md_lock(struct inode *inode, __u64 bits,
3572                                struct lustre_handle *lockh, __u64 flags,
3573                                enum ldlm_mode mode)
3574 {
3575         union ldlm_policy_data policy = { .l_inodebits = { bits } };
3576         struct lu_fid *fid;
3577         enum ldlm_mode rc;
3578         ENTRY;
3579
3580         fid = &ll_i2info(inode)->lli_fid;
3581         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
3582
3583         rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
3584                            fid, LDLM_IBITS, &policy, mode, lockh);
3585
3586         RETURN(rc);
3587 }
3588
3589 static int ll_inode_revalidate_fini(struct inode *inode, int rc)
3590 {
3591         /* Already unlinked. Just update nlink and return success */
3592         if (rc == -ENOENT) {
3593                 clear_nlink(inode);
3594                 /* If it is striped directory, and there is bad stripe
3595                  * Let's revalidate the dentry again, instead of returning
3596                  * error */
3597                 if (S_ISDIR(inode->i_mode) &&
3598                     ll_i2info(inode)->lli_lsm_md != NULL)
3599                         return 0;
3600
3601                 /* This path cannot be hit for regular files unless in
3602                  * case of obscure races, so no need to to validate
3603                  * size. */
3604                 if (!S_ISREG(inode->i_mode) && !S_ISDIR(inode->i_mode))
3605                         return 0;
3606         } else if (rc != 0) {
3607                 CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
3608                              "%s: revalidate FID "DFID" error: rc = %d\n",
3609                              ll_get_fsname(inode->i_sb, NULL, 0),
3610                              PFID(ll_inode2fid(inode)), rc);
3611         }
3612
3613         return rc;
3614 }
3615
3616 static int __ll_inode_revalidate(struct dentry *dentry, __u64 ibits)
3617 {
3618         struct inode *inode = dentry->d_inode;
3619         struct ptlrpc_request *req = NULL;
3620         struct obd_export *exp;
3621         int rc = 0;
3622         ENTRY;
3623
3624         LASSERT(inode != NULL);
3625
3626         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p),name=%s\n",
3627                PFID(ll_inode2fid(inode)), inode, dentry->d_name.name);
3628
3629         exp = ll_i2mdexp(inode);
3630
3631         /* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
3632          *      But under CMD case, it caused some lock issues, should be fixed
3633          *      with new CMD ibits lock. See bug 12718 */
3634         if (exp_connect_flags(exp) & OBD_CONNECT_ATTRFID) {
3635                 struct lookup_intent oit = { .it_op = IT_GETATTR };
3636                 struct md_op_data *op_data;
3637
3638                 if (ibits == MDS_INODELOCK_LOOKUP)
3639                         oit.it_op = IT_LOOKUP;
3640
3641                 /* Call getattr by fid, so do not provide name at all. */
3642                 op_data = ll_prep_md_op_data(NULL, dentry->d_inode,
3643                                              dentry->d_inode, NULL, 0, 0,
3644                                              LUSTRE_OPC_ANY, NULL);
3645                 if (IS_ERR(op_data))
3646                         RETURN(PTR_ERR(op_data));
3647
3648                 rc = md_intent_lock(exp, op_data, &oit, &req,
3649                                     &ll_md_blocking_ast, 0);
3650                 ll_finish_md_op_data(op_data);
3651                 if (rc < 0) {
3652                         rc = ll_inode_revalidate_fini(inode, rc);
3653                         GOTO (out, rc);
3654                 }
3655
3656                 rc = ll_revalidate_it_finish(req, &oit, dentry);
3657                 if (rc != 0) {
3658                         ll_intent_release(&oit);
3659                         GOTO(out, rc);
3660                 }
3661
3662                 /* Unlinked? Unhash dentry, so it is not picked up later by
3663                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
3664                    here to preserve get_cwd functionality on 2.6.
3665                    Bug 10503 */
3666                 if (!dentry->d_inode->i_nlink) {
3667                         ll_lock_dcache(inode);
3668                         d_lustre_invalidate(dentry, 0);
3669                         ll_unlock_dcache(inode);
3670                 }
3671
3672                 ll_lookup_finish_locks(&oit, dentry);
3673         } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
3674                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
3675                 u64 valid = OBD_MD_FLGETATTR;
3676                 struct md_op_data *op_data;
3677                 int ealen = 0;
3678
3679                 if (S_ISREG(inode->i_mode)) {
3680                         rc = ll_get_default_mdsize(sbi, &ealen);
3681                         if (rc)
3682                                 RETURN(rc);
3683                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
3684                 }
3685
3686                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
3687                                              0, ealen, LUSTRE_OPC_ANY,
3688                                              NULL);
3689                 if (IS_ERR(op_data))
3690                         RETURN(PTR_ERR(op_data));
3691
3692                 op_data->op_valid = valid;
3693                 rc = md_getattr(sbi->ll_md_exp, op_data, &req);
3694                 ll_finish_md_op_data(op_data);
3695                 if (rc) {
3696                         rc = ll_inode_revalidate_fini(inode, rc);
3697                         RETURN(rc);
3698                 }
3699
3700                 rc = ll_prep_inode(&inode, req, NULL, NULL);
3701         }
3702 out:
3703         ptlrpc_req_finished(req);
3704         return rc;
3705 }
3706
3707 static int ll_merge_md_attr(struct inode *inode)
3708 {
3709         struct cl_attr attr = { 0 };
3710         int rc;
3711
3712         LASSERT(ll_i2info(inode)->lli_lsm_md != NULL);
3713         rc = md_merge_attr(ll_i2mdexp(inode), ll_i2info(inode)->lli_lsm_md,
3714                            &attr, ll_md_blocking_ast);
3715         if (rc != 0)
3716                 RETURN(rc);
3717
3718         set_nlink(inode, attr.cat_nlink);
3719         inode->i_blocks = attr.cat_blocks;
3720         i_size_write(inode, attr.cat_size);
3721
3722         ll_i2info(inode)->lli_atime = attr.cat_atime;
3723         ll_i2info(inode)->lli_mtime = attr.cat_mtime;
3724         ll_i2info(inode)->lli_ctime = attr.cat_ctime;
3725
3726         RETURN(0);
3727 }
3728
3729 static int
3730 ll_inode_revalidate(struct dentry *dentry, __u64 ibits)
3731 {
3732         struct inode    *inode = dentry->d_inode;
3733         int              rc;
3734         ENTRY;
3735
3736         rc = __ll_inode_revalidate(dentry, ibits);
3737         if (rc != 0)
3738                 RETURN(rc);
3739
3740         /* if object isn't regular file, don't validate size */
3741         if (!S_ISREG(inode->i_mode)) {
3742                 if (S_ISDIR(inode->i_mode) &&
3743                     ll_i2info(inode)->lli_lsm_md != NULL) {
3744                         rc = ll_merge_md_attr(inode);
3745                         if (rc != 0)
3746                                 RETURN(rc);
3747                 }
3748
3749                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_atime;
3750                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_mtime;
3751                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_ctime;
3752         } else {
3753                 /* In case of restore, the MDT has the right size and has
3754                  * already send it back without granting the layout lock,
3755                  * inode is up-to-date so glimpse is useless.
3756                  * Also to glimpse we need the layout, in case of a running
3757                  * restore the MDT holds the layout lock so the glimpse will
3758                  * block up to the end of restore (getattr will block)
3759                  */
3760                 if (!ll_file_test_flag(ll_i2info(inode), LLIF_FILE_RESTORING))
3761                         rc = ll_glimpse_size(inode);
3762         }
3763         RETURN(rc);
3764 }
3765
3766 static inline dev_t ll_compat_encode_dev(dev_t dev)
3767 {
3768         /* The compat_sys_*stat*() syscalls will fail unless the
3769          * device majors and minors are both less than 256. Note that
3770          * the value returned here will be passed through
3771          * old_encode_dev() in cp_compat_stat(). And so we are not
3772          * trying to return a valid compat (u16) device number, just
3773          * one that will pass the old_valid_dev() check. */
3774
3775         return MKDEV(MAJOR(dev) & 0xff, MINOR(dev) & 0xff);
3776 }
3777
3778 #ifdef HAVE_INODEOPS_ENHANCED_GETATTR
3779 int ll_getattr(const struct path *path, struct kstat *stat,
3780                u32 request_mask, unsigned int flags)
3781
3782 {
3783         struct dentry *de = path->dentry;
3784 #else
3785 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3786 {
3787 #endif
3788         struct inode *inode = de->d_inode;
3789         struct ll_sb_info *sbi = ll_i2sbi(inode);
3790         struct ll_inode_info *lli = ll_i2info(inode);
3791         int res = 0;
3792
3793         res = ll_inode_revalidate(de, MDS_INODELOCK_UPDATE |
3794                                       MDS_INODELOCK_LOOKUP);
3795         ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
3796
3797         if (res)
3798                 return res;
3799
3800         OBD_FAIL_TIMEOUT(OBD_FAIL_GETATTR_DELAY, 30);
3801
3802         if (ll_need_32bit_api(sbi)) {
3803                 stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
3804                 stat->dev = ll_compat_encode_dev(inode->i_sb->s_dev);
3805                 stat->rdev = ll_compat_encode_dev(inode->i_rdev);
3806         } else {
3807                 stat->ino = inode->i_ino;
3808                 stat->dev = inode->i_sb->s_dev;
3809                 stat->rdev = inode->i_rdev;
3810         }
3811
3812         stat->mode = inode->i_mode;
3813         stat->uid = inode->i_uid;
3814         stat->gid = inode->i_gid;
3815         stat->atime = inode->i_atime;
3816         stat->mtime = inode->i_mtime;
3817         stat->ctime = inode->i_ctime;
3818         stat->blksize = sbi->ll_stat_blksize ?: 1 << inode->i_blkbits;
3819
3820         stat->nlink = inode->i_nlink;
3821         stat->size = i_size_read(inode);
3822         stat->blocks = inode->i_blocks;
3823
3824         return 0;
3825 }
3826
3827 static int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
3828                      __u64 start, __u64 len)
3829 {
3830         int             rc;
3831         size_t          num_bytes;
3832         struct fiemap   *fiemap;
3833         unsigned int    extent_count = fieinfo->fi_extents_max;
3834
3835         num_bytes = sizeof(*fiemap) + (extent_count *
3836                                        sizeof(struct fiemap_extent));
3837         OBD_ALLOC_LARGE(fiemap, num_bytes);
3838
3839         if (fiemap == NULL)
3840                 RETURN(-ENOMEM);
3841
3842         fiemap->fm_flags = fieinfo->fi_flags;
3843         fiemap->fm_extent_count = fieinfo->fi_extents_max;
3844         fiemap->fm_start = start;
3845         fiemap->fm_length = len;
3846         if (extent_count > 0 &&
3847             copy_from_user(&fiemap->fm_extents[0], fieinfo->fi_extents_start,
3848                            sizeof(struct fiemap_extent)) != 0)
3849                 GOTO(out, rc = -EFAULT);
3850
3851         rc = ll_do_fiemap(inode, fiemap, num_bytes);
3852
3853         fieinfo->fi_flags = fiemap->fm_flags;
3854         fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
3855         if (extent_count > 0 &&
3856             copy_to_user(fieinfo->fi_extents_start, &fiemap->fm_extents[0],
3857                          fiemap->fm_mapped_extents *
3858                          sizeof(struct fiemap_extent)) != 0)
3859                 GOTO(out, rc = -EFAULT);
3860 out:
3861         OBD_FREE_LARGE(fiemap, num_bytes);
3862         return rc;
3863 }
3864
3865 struct posix_acl *ll_get_acl(struct inode *inode, int type)
3866 {
3867         struct ll_inode_info *lli = ll_i2info(inode);
3868         struct posix_acl *acl = NULL;
3869         ENTRY;
3870
3871         spin_lock(&lli->lli_lock);
3872         /* VFS' acl_permission_check->check_acl will release the refcount */
3873         acl = posix_acl_dup(lli->lli_posix_acl);
3874         spin_unlock(&lli->lli_lock);
3875
3876         RETURN(acl);
3877 }
3878
3879 #ifdef HAVE_IOP_SET_ACL
3880 #ifdef CONFIG_FS_POSIX_ACL
3881 int ll_set_acl(struct inode *inode, struct posix_acl *acl, int type)
3882 {
3883         const char *name = NULL;
3884         char *value = NULL;
3885         size_t size = 0;
3886         int rc = 0;
3887         ENTRY;
3888
3889         switch (type) {
3890         case ACL_TYPE_ACCESS:
3891                 if (acl) {
3892                         rc = posix_acl_update_mode(inode, &inode->i_mode, &acl);
3893                         if (rc)
3894                                 GOTO(out, rc);
3895                 }
3896                 name = XATTR_NAME_POSIX_ACL_ACCESS;
3897                 break;
3898         case ACL_TYPE_DEFAULT:
3899                 if (!S_ISDIR(inode->i_mode))
3900                         GOTO(out, rc = acl ? -EACCES : 0);
3901                 name = XATTR_NAME_POSIX_ACL_DEFAULT;
3902                 break;
3903         default:
3904                 GOTO(out, rc = -EINVAL);
3905         }
3906
3907         if (acl) {
3908                 size = posix_acl_xattr_size(acl->a_count);
3909                 value = kmalloc(size, GFP_NOFS);
3910                 if (value == NULL)
3911                         GOTO(out, rc = -ENOMEM);
3912
3913                 rc = posix_acl_to_xattr(&init_user_ns, acl, value, size);
3914                 if (rc < 0)
3915                         GOTO(out_free, rc);
3916         }
3917
3918         /* dentry is only used for *.lov attributes so it's safe to be NULL */
3919         rc = __vfs_setxattr(NULL, inode, name, value, size, XATTR_CREATE);
3920 out_free:
3921         kfree(value);
3922 out:
3923         if (!rc)
3924                 set_cached_acl(inode, type, acl);
3925         else
3926                 forget_cached_acl(inode, type);
3927         RETURN(rc);
3928 }
3929 #endif /* CONFIG_FS_POSIX_ACL */
3930 #endif /* HAVE_IOP_SET_ACL */
3931
3932 #ifndef HAVE_GENERIC_PERMISSION_2ARGS
3933 static int
3934 # ifdef HAVE_GENERIC_PERMISSION_4ARGS
3935 ll_check_acl(struct inode *inode, int mask, unsigned int flags)
3936 # else
3937 ll_check_acl(struct inode *inode, int mask)
3938 # endif
3939 {
3940 # ifdef CONFIG_FS_POSIX_ACL
3941         struct posix_acl *acl;
3942         int rc;
3943         ENTRY;
3944
3945 #  ifdef HAVE_GENERIC_PERMISSION_4ARGS
3946         if (flags & IPERM_FLAG_RCU)
3947                 return -ECHILD;
3948 #  endif
3949         acl = ll_get_acl(inode, ACL_TYPE_ACCESS);
3950
3951         if (!acl)
3952                 RETURN(-EAGAIN);
3953
3954         rc = posix_acl_permission(inode, acl, mask);
3955         posix_acl_release(acl);
3956
3957         RETURN(rc);
3958 # else /* !CONFIG_FS_POSIX_ACL */
3959         return -EAGAIN;
3960 # endif /* CONFIG_FS_POSIX_ACL */
3961 }
3962 #endif /* HAVE_GENERIC_PERMISSION_2ARGS */
3963
3964 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
3965 int ll_inode_permission(struct inode *inode, int mask, unsigned int flags)
3966 #else
3967 # ifdef HAVE_INODE_PERMISION_2ARGS
3968 int ll_inode_permission(struct inode *inode, int mask)
3969 # else
3970 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3971 # endif
3972 #endif
3973 {
3974         int rc = 0;
3975         struct ll_sb_info *sbi;
3976         struct root_squash_info *squash;
3977         struct cred *cred = NULL;
3978         const struct cred *old_cred = NULL;
3979         cfs_cap_t cap;
3980         bool squash_id = false;
3981         ENTRY;
3982
3983 #ifdef MAY_NOT_BLOCK
3984         if (mask & MAY_NOT_BLOCK)
3985                 return -ECHILD;
3986 #elif defined(HAVE_GENERIC_PERMISSION_4ARGS)
3987         if (flags & IPERM_FLAG_RCU)
3988                 return -ECHILD;
3989 #endif
3990
3991        /* as root inode are NOT getting validated in lookup operation,
3992         * need to do it before permission check. */
3993
3994         if (inode == inode->i_sb->s_root->d_inode) {
3995                 rc = __ll_inode_revalidate(inode->i_sb->s_root,
3996                                            MDS_INODELOCK_LOOKUP);
3997                 if (rc)
3998                         RETURN(rc);
3999         }
4000
4001         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), inode mode %x mask %o\n",
4002                PFID(ll_inode2fid(inode)), inode, inode->i_mode, mask);
4003
4004         /* squash fsuid/fsgid if needed */
4005         sbi = ll_i2sbi(inode);
4006         squash = &sbi->ll_squash;
4007         if (unlikely(squash->rsi_uid != 0 &&
4008                      uid_eq(current_fsuid(), GLOBAL_ROOT_UID) &&
4009                      !(sbi->ll_flags & LL_SBI_NOROOTSQUASH))) {
4010                         squash_id = true;
4011         }
4012         if (squash_id) {
4013                 CDEBUG(D_OTHER, "squash creds (%d:%d)=>(%d:%d)\n",
4014                        __kuid_val(current_fsuid()), __kgid_val(current_fsgid()),
4015                        squash->rsi_uid, squash->rsi_gid);
4016
4017                 /* update current process's credentials
4018                  * and FS capability */
4019                 cred = prepare_creds();
4020                 if (cred == NULL)
4021                         RETURN(-ENOMEM);
4022
4023                 cred->fsuid = make_kuid(&init_user_ns, squash->rsi_uid);
4024                 cred->fsgid = make_kgid(&init_user_ns, squash->rsi_gid);
4025                 for (cap = 0; cap < sizeof(cfs_cap_t) * 8; cap++) {
4026                         if ((1 << cap) & CFS_CAP_FS_MASK)
4027                                 cap_lower(cred->cap_effective, cap);
4028                 }
4029                 old_cred = override_creds(cred);
4030         }
4031
4032         ll_stats_ops_tally(sbi, LPROC_LL_INODE_PERM, 1);
4033         rc = ll_generic_permission(inode, mask, flags, ll_check_acl);
4034         /* restore current process's credentials and FS capability */
4035         if (squash_id) {
4036                 revert_creds(old_cred);
4037                 put_cred(cred);
4038         }
4039
4040         RETURN(rc);
4041 }
4042
4043 /* -o localflock - only provides locally consistent flock locks */
4044 struct file_operations ll_file_operations = {
4045 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
4046 # ifdef HAVE_SYNC_READ_WRITE
4047         .read           = new_sync_read,
4048         .write          = new_sync_write,
4049 # endif
4050         .read_iter      = ll_file_read_iter,
4051         .write_iter     = ll_file_write_iter,
4052 #else /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
4053         .read           = ll_file_read,
4054         .aio_read       = ll_file_aio_read,
4055         .write          = ll_file_write,
4056         .aio_write      = ll_file_aio_write,
4057 #endif /* HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
4058         .unlocked_ioctl = ll_file_ioctl,
4059         .open           = ll_file_open,
4060         .release        = ll_file_release,
4061         .mmap           = ll_file_mmap,
4062         .llseek         = ll_file_seek,
4063         .splice_read    = ll_file_splice_read,
4064         .fsync          = ll_fsync,
4065         .flush          = ll_flush
4066 };
4067
4068 struct file_operations ll_file_operations_flock = {
4069 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
4070 # ifdef HAVE_SYNC_READ_WRITE
4071         .read           = new_sync_read,
4072         .write          = new_sync_write,
4073 # endif /* HAVE_SYNC_READ_WRITE */
4074         .read_iter      = ll_file_read_iter,
4075         .write_iter     = ll_file_write_iter,
4076 #else /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
4077         .read           = ll_file_read,
4078         .aio_read       = ll_file_aio_read,
4079         .write          = ll_file_write,
4080         .aio_write      = ll_file_aio_write,
4081 #endif /* HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
4082         .unlocked_ioctl = ll_file_ioctl,
4083         .open           = ll_file_open,
4084         .release        = ll_file_release,
4085         .mmap           = ll_file_mmap,
4086         .llseek         = ll_file_seek,
4087         .splice_read    = ll_file_splice_read,
4088         .fsync          = ll_fsync,
4089         .flush          = ll_flush,
4090         .flock          = ll_file_flock,
4091         .lock           = ll_file_flock
4092 };
4093
4094 /* These are for -o noflock - to return ENOSYS on flock calls */
4095 struct file_operations ll_file_operations_noflock = {
4096 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
4097 # ifdef HAVE_SYNC_READ_WRITE
4098         .read           = new_sync_read,
4099         .write          = new_sync_write,
4100 # endif /* HAVE_SYNC_READ_WRITE */
4101         .read_iter      = ll_file_read_iter,
4102         .write_iter     = ll_file_write_iter,
4103 #else /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
4104         .read           = ll_file_read,
4105         .aio_read       = ll_file_aio_read,
4106         .write          = ll_file_write,
4107         .aio_write      = ll_file_aio_write,
4108 #endif /* HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
4109         .unlocked_ioctl = ll_file_ioctl,
4110         .open           = ll_file_open,
4111         .release        = ll_file_release,
4112         .mmap           = ll_file_mmap,
4113         .llseek         = ll_file_seek,
4114         .splice_read    = ll_file_splice_read,
4115         .fsync          = ll_fsync,
4116         .flush          = ll_flush,
4117         .flock          = ll_file_noflock,
4118         .lock           = ll_file_noflock
4119 };
4120
4121 struct inode_operations ll_file_inode_operations = {
4122         .setattr        = ll_setattr,
4123         .getattr        = ll_getattr,
4124         .permission     = ll_inode_permission,
4125 #ifdef HAVE_IOP_XATTR
4126         .setxattr       = ll_setxattr,
4127         .getxattr       = ll_getxattr,
4128         .removexattr    = ll_removexattr,
4129 #endif
4130         .listxattr      = ll_listxattr,
4131         .fiemap         = ll_fiemap,
4132 #ifdef HAVE_IOP_GET_ACL
4133         .get_acl        = ll_get_acl,
4134 #endif
4135 #ifdef HAVE_IOP_SET_ACL
4136         .set_acl        = ll_set_acl,
4137 #endif
4138 };
4139
4140 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
4141 {
4142         struct ll_inode_info *lli = ll_i2info(inode);
4143         struct cl_object *obj = lli->lli_clob;
4144         struct lu_env *env;
4145         int rc;
4146         __u16 refcheck;
4147         ENTRY;
4148
4149         if (obj == NULL)
4150                 RETURN(0);
4151
4152         env = cl_env_get(&refcheck);
4153         if (IS_ERR(env))
4154                 RETURN(PTR_ERR(env));
4155
4156         rc = cl_conf_set(env, lli->lli_clob, conf);
4157         if (rc < 0)
4158                 GOTO(out, rc);
4159
4160         if (conf->coc_opc == OBJECT_CONF_SET) {
4161                 struct ldlm_lock *lock = conf->coc_lock;
4162                 struct cl_layout cl = {
4163                         .cl_layout_gen = 0,
4164                 };
4165
4166                 LASSERT(lock != NULL);
4167                 LASSERT(ldlm_has_layout(lock));
4168
4169                 /* it can only be allowed to match after layout is
4170                  * applied to inode otherwise false layout would be
4171                  * seen. Applying layout shoud happen before dropping
4172                  * the intent lock. */
4173                 ldlm_lock_allow_match(lock);
4174
4175                 rc = cl_object_layout_get(env, obj, &cl);
4176                 if (rc < 0)
4177                         GOTO(out, rc);
4178
4179                 CDEBUG(D_VFSTRACE,
4180                        DFID": layout version change: %u -> %u\n",
4181                        PFID(&lli->lli_fid), ll_layout_version_get(lli),
4182                        cl.cl_layout_gen);
4183                 ll_layout_version_set(lli, cl.cl_layout_gen);
4184         }
4185
4186 out:
4187         cl_env_put(env, &refcheck);
4188
4189         RETURN(rc);
4190 }
4191
4192 /* Fetch layout from MDT with getxattr request, if it's not ready yet */
4193 static int ll_layout_fetch(struct inode *inode, struct ldlm_lock *lock)
4194
4195 {
4196         struct ll_sb_info *sbi = ll_i2sbi(inode);
4197         struct ptlrpc_request *req;
4198         struct mdt_body *body;
4199         void *lvbdata;
4200         void *lmm;
4201         int lmmsize;
4202         int rc;
4203         ENTRY;
4204
4205         CDEBUG(D_INODE, DFID" LVB_READY=%d l_lvb_data=%p l_lvb_len=%d\n",
4206                PFID(ll_inode2fid(inode)), ldlm_is_lvb_ready(lock),
4207                lock->l_lvb_data, lock->l_lvb_len);
4208
4209         if (lock->l_lvb_data != NULL)
4210                 RETURN(0);
4211
4212         /* if layout lock was granted right away, the layout is returned
4213          * within DLM_LVB of dlm reply; otherwise if the lock was ever
4214          * blocked and then granted via completion ast, we have to fetch
4215          * layout here. Please note that we can't use the LVB buffer in
4216          * completion AST because it doesn't have a large enough buffer */
4217         rc = ll_get_default_mdsize(sbi, &lmmsize);
4218         if (rc == 0)
4219                 rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode),
4220                                 OBD_MD_FLXATTR, XATTR_NAME_LOV, NULL, 0,
4221                                 lmmsize, 0, &req);
4222         if (rc < 0)
4223                 RETURN(rc);
4224
4225         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
4226         if (body == NULL)
4227                 GOTO(out, rc = -EPROTO);
4228
4229         lmmsize = body->mbo_eadatasize;
4230         if (lmmsize == 0) /* empty layout */
4231                 GOTO(out, rc = 0);
4232
4233         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA, lmmsize);
4234         if (lmm == NULL)
4235                 GOTO(out, rc = -EFAULT);
4236
4237         OBD_ALLOC_LARGE(lvbdata, lmmsize);
4238         if (lvbdata == NULL)
4239                 GOTO(out, rc = -ENOMEM);
4240
4241         memcpy(lvbdata, lmm, lmmsize);
4242         lock_res_and_lock(lock);
4243         if (unlikely(lock->l_lvb_data == NULL)) {
4244                 lock->l_lvb_type = LVB_T_LAYOUT;
4245                 lock->l_lvb_data = lvbdata;
4246                 lock->l_lvb_len = lmmsize;
4247                 lvbdata = NULL;
4248         }
4249         unlock_res_and_lock(lock);
4250
4251         if (lvbdata)
4252                 OBD_FREE_LARGE(lvbdata, lmmsize);
4253
4254         EXIT;
4255
4256 out:
4257         ptlrpc_req_finished(req);
4258         return rc;
4259 }
4260
4261 /**
4262  * Apply the layout to the inode. Layout lock is held and will be released
4263  * in this function.
4264  */
4265 static int ll_layout_lock_set(struct lustre_handle *lockh, enum ldlm_mode mode,
4266                               struct inode *inode)
4267 {
4268         struct ll_inode_info *lli = ll_i2info(inode);
4269         struct ll_sb_info    *sbi = ll_i2sbi(inode);
4270         struct ldlm_lock *lock;
4271         struct cl_object_conf conf;
4272         int rc = 0;
4273         bool lvb_ready;
4274         bool wait_layout = false;
4275         ENTRY;
4276
4277         LASSERT(lustre_handle_is_used(lockh));
4278
4279         lock = ldlm_handle2lock(lockh);
4280         LASSERT(lock != NULL);
4281         LASSERT(ldlm_has_layout(lock));
4282
4283         LDLM_DEBUG(lock, "file "DFID"(%p) being reconfigured",
4284                    PFID(&lli->lli_fid), inode);
4285
4286         /* in case this is a caching lock and reinstate with new inode */
4287         md_set_lock_data(sbi->ll_md_exp, lockh, inode, NULL);
4288
4289         lock_res_and_lock(lock);
4290         lvb_ready = ldlm_is_lvb_ready(lock);
4291         unlock_res_and_lock(lock);
4292
4293         /* checking lvb_ready is racy but this is okay. The worst case is
4294          * that multi processes may configure the file on the same time. */
4295         if (lvb_ready)
4296                 GOTO(out, rc = 0);
4297
4298         rc = ll_layout_fetch(inode, lock);
4299         if (rc < 0)
4300                 GOTO(out, rc);
4301
4302         /* for layout lock, lmm is stored in lock's lvb.
4303          * lvb_data is immutable if the lock is held so it's safe to access it
4304          * without res lock.
4305          *
4306          * set layout to file. Unlikely this will fail as old layout was
4307          * surely eliminated */
4308         memset(&conf, 0, sizeof conf);
4309         conf.coc_opc = OBJECT_CONF_SET;
4310         conf.coc_inode = inode;
4311         conf.coc_lock = lock;
4312         conf.u.coc_layout.lb_buf = lock->l_lvb_data;
4313         conf.u.coc_layout.lb_len = lock->l_lvb_len;
4314         rc = ll_layout_conf(inode, &conf);
4315
4316         /* refresh layout failed, need to wait */
4317         wait_layout = rc == -EBUSY;
4318         EXIT;
4319 out:
4320         LDLM_LOCK_PUT(lock);
4321         ldlm_lock_decref(lockh, mode);
4322
4323         /* wait for IO to complete if it's still being used. */
4324         if (wait_layout) {
4325                 CDEBUG(D_INODE, "%s: "DFID"(%p) wait for layout reconf\n",
4326                        ll_get_fsname(inode->i_sb, NULL, 0),
4327                        PFID(&lli->lli_fid), inode);
4328
4329                 memset(&conf, 0, sizeof conf);
4330                 conf.coc_opc = OBJECT_CONF_WAIT;
4331                 conf.coc_inode = inode;
4332                 rc = ll_layout_conf(inode, &conf);
4333                 if (rc == 0)
4334                         rc = -EAGAIN;
4335
4336                 CDEBUG(D_INODE, "%s file="DFID" waiting layout return: %d\n",
4337                        ll_get_fsname(inode->i_sb, NULL, 0),
4338                        PFID(&lli->lli_fid), rc);
4339         }
4340         RETURN(rc);
4341 }
4342
4343 /**
4344  * Issue layout intent RPC to MDS.
4345  * \param inode [in]    file inode
4346  * \param intent [in]   layout intent
4347  *
4348  * \retval 0    on success
4349  * \retval < 0  error code
4350  */
4351 static int ll_layout_intent(struct inode *inode, struct layout_intent *intent)
4352 {
4353         struct ll_inode_info  *lli = ll_i2info(inode);
4354         struct ll_sb_info     *sbi = ll_i2sbi(inode);
4355         struct md_op_data     *op_data;
4356         struct lookup_intent it;
4357         struct ptlrpc_request *req;
4358         int rc;
4359         ENTRY;
4360
4361         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
4362                                      0, 0, LUSTRE_OPC_ANY, NULL);
4363         if (IS_ERR(op_data))
4364                 RETURN(PTR_ERR(op_data));
4365
4366         op_data->op_data = intent;
4367         op_data->op_data_size = sizeof(*intent);
4368
4369         memset(&it, 0, sizeof(it));
4370         it.it_op = IT_LAYOUT;
4371         if (intent->li_opc == LAYOUT_INTENT_WRITE ||
4372             intent->li_opc == LAYOUT_INTENT_TRUNC)
4373                 it.it_flags = FMODE_WRITE;
4374
4375         LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file "DFID"(%p)",
4376                           ll_get_fsname(inode->i_sb, NULL, 0),
4377                           PFID(&lli->lli_fid), inode);
4378
4379         rc = md_intent_lock(sbi->ll_md_exp, op_data, &it, &req,
4380                             &ll_md_blocking_ast, 0);
4381         if (it.it_request != NULL)
4382                 ptlrpc_req_finished(it.it_request);
4383         it.it_request = NULL;
4384
4385         ll_finish_md_op_data(op_data);
4386
4387         /* set lock data in case this is a new lock */
4388         if (!rc)
4389                 ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
4390
4391         ll_intent_drop_lock(&it);
4392
4393         RETURN(rc);
4394 }
4395
4396 /**
4397  * This function checks if there exists a LAYOUT lock on the client side,
4398  * or enqueues it if it doesn't have one in cache.
4399  *
4400  * This function will not hold layout lock so it may be revoked any time after
4401  * this function returns. Any operations depend on layout should be redone
4402  * in that case.
4403  *
4404  * This function should be called before lov_io_init() to get an uptodate
4405  * layout version, the caller should save the version number and after IO
4406  * is finished, this function should be called again to verify that layout
4407  * is not changed during IO time.
4408  */
4409 int ll_layout_refresh(struct inode *inode, __u32 *gen)
4410 {
4411         struct ll_inode_info    *lli = ll_i2info(inode);
4412         struct ll_sb_info       *sbi = ll_i2sbi(inode);
4413         struct lustre_handle lockh;
4414         struct layout_intent intent = {
4415                 .li_opc = LAYOUT_INTENT_ACCESS,
4416         };
4417         enum ldlm_mode mode;
4418         int rc;
4419         ENTRY;
4420
4421         *gen = ll_layout_version_get(lli);
4422         if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK) || *gen != CL_LAYOUT_GEN_NONE)
4423                 RETURN(0);
4424
4425         /* sanity checks */
4426         LASSERT(fid_is_sane(ll_inode2fid(inode)));
4427         LASSERT(S_ISREG(inode->i_mode));
4428
4429         /* take layout lock mutex to enqueue layout lock exclusively. */
4430         mutex_lock(&lli->lli_layout_mutex);
4431
4432         while (1) {
4433                 /* mostly layout lock is caching on the local side, so try to
4434                  * match it before grabbing layout lock mutex. */
4435                 mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0,
4436                                        LCK_CR | LCK_CW | LCK_PR | LCK_PW);
4437                 if (mode != 0) { /* hit cached lock */
4438                         rc = ll_layout_lock_set(&lockh, mode, inode);
4439                         if (rc == -EAGAIN)
4440                                 continue;
4441                         break;
4442                 }
4443
4444                 rc = ll_layout_intent(inode, &intent);
4445                 if (rc != 0)
4446                         break;
4447         }
4448
4449         if (rc == 0)
4450                 *gen = ll_layout_version_get(lli);
4451         mutex_unlock(&lli->lli_layout_mutex);
4452
4453         RETURN(rc);
4454 }
4455
4456 /**
4457  * Issue layout intent RPC indicating where in a file an IO is about to write.
4458  *
4459  * \param[in] inode     file inode.
4460  * \param[in] start     start offset of fille in bytes where an IO is about to
4461  *                      write.
4462  * \param[in] end       exclusive end offset in bytes of the write range.
4463  *
4464  * \retval 0    on success
4465  * \retval < 0  error code
4466  */
4467 int ll_layout_write_intent(struct inode *inode, __u64 start, __u64 end)
4468 {
4469         struct layout_intent intent = {
4470                 .li_opc = LAYOUT_INTENT_WRITE,
4471                 .li_start = start,
4472                 .li_end = end,
4473         };
4474         int rc;
4475         ENTRY;
4476
4477         rc = ll_layout_intent(inode, &intent);
4478
4479         RETURN(rc);
4480 }
4481
4482 /**
4483  *  This function send a restore request to the MDT
4484  */
4485 int ll_layout_restore(struct inode *inode, loff_t offset, __u64 length)
4486 {
4487         struct hsm_user_request *hur;
4488         int                      len, rc;
4489         ENTRY;
4490
4491         len = sizeof(struct hsm_user_request) +
4492               sizeof(struct hsm_user_item);
4493         OBD_ALLOC(hur, len);
4494         if (hur == NULL)
4495                 RETURN(-ENOMEM);
4496
4497         hur->hur_request.hr_action = HUA_RESTORE;
4498         hur->hur_request.hr_archive_id = 0;
4499         hur->hur_request.hr_flags = 0;
4500         memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid,
4501                sizeof(hur->hur_user_item[0].hui_fid));
4502         hur->hur_user_item[0].hui_extent.offset = offset;
4503         hur->hur_user_item[0].hui_extent.length = length;
4504         hur->hur_request.hr_itemcount = 1;
4505         rc = obd_iocontrol(LL_IOC_HSM_REQUEST, ll_i2sbi(inode)->ll_md_exp,
4506                            len, hur, NULL);
4507         OBD_FREE(hur, len);
4508         RETURN(rc);
4509 }