Whamcloud - gitweb
LU-6401 uapi: migrate remaining uapi headers to uapi directory
[fs/lustre-release.git] / lustre / llite / file.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/llite/file.c
33  *
34  * Author: Peter Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_LLITE
40 #include <lustre_dlm.h>
41 #include <linux/pagemap.h>
42 #include <linux/file.h>
43 #include <linux/sched.h>
44 #include <linux/user_namespace.h>
45 #ifdef HAVE_UIDGID_HEADER
46 # include <linux/uidgid.h>
47 #endif
48
49 #include <uapi/linux/lustre/lustre_ioctl.h>
50 #include <lustre_swab.h>
51
52 #include "cl_object.h"
53 #include "llite_internal.h"
54 #include "vvp_internal.h"
55
56 static int
57 ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg);
58
59 static int ll_lease_close(struct obd_client_handle *och, struct inode *inode,
60                           bool *lease_broken);
61
62 static struct ll_file_data *ll_file_data_get(void)
63 {
64         struct ll_file_data *fd;
65
66         OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, GFP_NOFS);
67         if (fd == NULL)
68                 return NULL;
69
70         fd->fd_write_failed = false;
71
72         return fd;
73 }
74
75 static void ll_file_data_put(struct ll_file_data *fd)
76 {
77         if (fd != NULL)
78                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
79 }
80
81 /**
82  * Packs all the attributes into @op_data for the CLOSE rpc.
83  */
84 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
85                              struct obd_client_handle *och)
86 {
87         ENTRY;
88
89         ll_prep_md_op_data(op_data, inode, NULL, NULL,
90                            0, 0, LUSTRE_OPC_ANY, NULL);
91
92         op_data->op_attr.ia_mode = inode->i_mode;
93         op_data->op_attr.ia_atime = inode->i_atime;
94         op_data->op_attr.ia_mtime = inode->i_mtime;
95         op_data->op_attr.ia_ctime = inode->i_ctime;
96         op_data->op_attr.ia_size = i_size_read(inode);
97         op_data->op_attr.ia_valid |= ATTR_MODE | ATTR_ATIME | ATTR_ATIME_SET |
98                                      ATTR_MTIME | ATTR_MTIME_SET |
99                                      ATTR_CTIME | ATTR_CTIME_SET;
100         op_data->op_attr_blocks = inode->i_blocks;
101         op_data->op_attr_flags = ll_inode_to_ext_flags(inode->i_flags);
102         op_data->op_handle = och->och_fh;
103
104         if (och->och_flags & FMODE_WRITE &&
105             ll_file_test_and_clear_flag(ll_i2info(inode), LLIF_DATA_MODIFIED))
106                 /* For HSM: if inode data has been modified, pack it so that
107                  * MDT can set data dirty flag in the archive. */
108                 op_data->op_bias |= MDS_DATA_MODIFIED;
109
110         EXIT;
111 }
112
113 /**
114  * Perform a close, possibly with a bias.
115  * The meaning of "data" depends on the value of "bias".
116  *
117  * If \a bias is MDS_HSM_RELEASE then \a data is a pointer to the data version.
118  * If \a bias is MDS_CLOSE_LAYOUT_SWAP then \a data is a pointer to the inode to
119  * swap layouts with.
120  */
121 static int ll_close_inode_openhandle(struct inode *inode,
122                                      struct obd_client_handle *och,
123                                      enum mds_op_bias bias, void *data)
124 {
125         struct obd_export *md_exp = ll_i2mdexp(inode);
126         const struct ll_inode_info *lli = ll_i2info(inode);
127         struct md_op_data *op_data;
128         struct ptlrpc_request *req = NULL;
129         int rc;
130         ENTRY;
131
132         if (class_exp2obd(md_exp) == NULL) {
133                 CERROR("%s: invalid MDC connection handle closing "DFID"\n",
134                        ll_get_fsname(inode->i_sb, NULL, 0),
135                        PFID(&lli->lli_fid));
136                 GOTO(out, rc = 0);
137         }
138
139         OBD_ALLOC_PTR(op_data);
140         /* We leak openhandle and request here on error, but not much to be
141          * done in OOM case since app won't retry close on error either. */
142         if (op_data == NULL)
143                 GOTO(out, rc = -ENOMEM);
144
145         ll_prepare_close(inode, op_data, och);
146         switch (bias) {
147         case MDS_CLOSE_LAYOUT_SWAP:
148                 LASSERT(data != NULL);
149                 op_data->op_bias |= MDS_CLOSE_LAYOUT_SWAP;
150                 op_data->op_data_version = 0;
151                 op_data->op_lease_handle = och->och_lease_handle;
152                 op_data->op_fid2 = *ll_inode2fid(data);
153                 break;
154
155         case MDS_HSM_RELEASE:
156                 LASSERT(data != NULL);
157                 op_data->op_bias |= MDS_HSM_RELEASE;
158                 op_data->op_data_version = *(__u64 *)data;
159                 op_data->op_lease_handle = och->och_lease_handle;
160                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
161                 break;
162
163         default:
164                 LASSERT(data == NULL);
165                 break;
166         }
167
168         rc = md_close(md_exp, op_data, och->och_mod, &req);
169         if (rc != 0 && rc != -EINTR)
170                 CERROR("%s: inode "DFID" mdc close failed: rc = %d\n",
171                        md_exp->exp_obd->obd_name, PFID(&lli->lli_fid), rc);
172
173         if (rc == 0 &&
174             op_data->op_bias & (MDS_HSM_RELEASE | MDS_CLOSE_LAYOUT_SWAP)) {
175                 struct mdt_body *body;
176
177                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
178                 if (!(body->mbo_valid & OBD_MD_CLOSE_INTENT_EXECED))
179                         rc = -EBUSY;
180         }
181
182         ll_finish_md_op_data(op_data);
183         EXIT;
184 out:
185
186         md_clear_open_replay_data(md_exp, och);
187         och->och_fh.cookie = DEAD_HANDLE_MAGIC;
188         OBD_FREE_PTR(och);
189
190         ptlrpc_req_finished(req);       /* This is close request */
191         return rc;
192 }
193
194 int ll_md_real_close(struct inode *inode, fmode_t fmode)
195 {
196         struct ll_inode_info *lli = ll_i2info(inode);
197         struct obd_client_handle **och_p;
198         struct obd_client_handle *och;
199         __u64 *och_usecount;
200         int rc = 0;
201         ENTRY;
202
203         if (fmode & FMODE_WRITE) {
204                 och_p = &lli->lli_mds_write_och;
205                 och_usecount = &lli->lli_open_fd_write_count;
206         } else if (fmode & FMODE_EXEC) {
207                 och_p = &lli->lli_mds_exec_och;
208                 och_usecount = &lli->lli_open_fd_exec_count;
209         } else {
210                 LASSERT(fmode & FMODE_READ);
211                 och_p = &lli->lli_mds_read_och;
212                 och_usecount = &lli->lli_open_fd_read_count;
213         }
214
215         mutex_lock(&lli->lli_och_mutex);
216         if (*och_usecount > 0) {
217                 /* There are still users of this handle, so skip
218                  * freeing it. */
219                 mutex_unlock(&lli->lli_och_mutex);
220                 RETURN(0);
221         }
222
223         och = *och_p;
224         *och_p = NULL;
225         mutex_unlock(&lli->lli_och_mutex);
226
227         if (och != NULL) {
228                 /* There might be a race and this handle may already
229                  * be closed. */
230                 rc = ll_close_inode_openhandle(inode, och, 0, NULL);
231         }
232
233         RETURN(rc);
234 }
235
236 static int ll_md_close(struct inode *inode, struct file *file)
237 {
238         union ldlm_policy_data policy = {
239                 .l_inodebits    = { MDS_INODELOCK_OPEN },
240         };
241         __u64 flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
242         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
243         struct ll_inode_info *lli = ll_i2info(inode);
244         struct lustre_handle lockh;
245         enum ldlm_mode lockmode;
246         int rc = 0;
247         ENTRY;
248
249         /* clear group lock, if present */
250         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
251                 ll_put_grouplock(inode, file, fd->fd_grouplock.lg_gid);
252
253         if (fd->fd_lease_och != NULL) {
254                 bool lease_broken;
255
256                 /* Usually the lease is not released when the
257                  * application crashed, we need to release here. */
258                 rc = ll_lease_close(fd->fd_lease_och, inode, &lease_broken);
259                 CDEBUG(rc ? D_ERROR : D_INODE, "Clean up lease "DFID" %d/%d\n",
260                         PFID(&lli->lli_fid), rc, lease_broken);
261
262                 fd->fd_lease_och = NULL;
263         }
264
265         if (fd->fd_och != NULL) {
266                 rc = ll_close_inode_openhandle(inode, fd->fd_och, 0, NULL);
267                 fd->fd_och = NULL;
268                 GOTO(out, rc);
269         }
270
271         /* Let's see if we have good enough OPEN lock on the file and if
272            we can skip talking to MDS */
273         mutex_lock(&lli->lli_och_mutex);
274         if (fd->fd_omode & FMODE_WRITE) {
275                 lockmode = LCK_CW;
276                 LASSERT(lli->lli_open_fd_write_count);
277                 lli->lli_open_fd_write_count--;
278         } else if (fd->fd_omode & FMODE_EXEC) {
279                 lockmode = LCK_PR;
280                 LASSERT(lli->lli_open_fd_exec_count);
281                 lli->lli_open_fd_exec_count--;
282         } else {
283                 lockmode = LCK_CR;
284                 LASSERT(lli->lli_open_fd_read_count);
285                 lli->lli_open_fd_read_count--;
286         }
287         mutex_unlock(&lli->lli_och_mutex);
288
289         if (!md_lock_match(ll_i2mdexp(inode), flags, ll_inode2fid(inode),
290                            LDLM_IBITS, &policy, lockmode, &lockh))
291                 rc = ll_md_real_close(inode, fd->fd_omode);
292
293 out:
294         LUSTRE_FPRIVATE(file) = NULL;
295         ll_file_data_put(fd);
296
297         RETURN(rc);
298 }
299
300 /* While this returns an error code, fput() the caller does not, so we need
301  * to make every effort to clean up all of our state here.  Also, applications
302  * rarely check close errors and even if an error is returned they will not
303  * re-try the close call.
304  */
305 int ll_file_release(struct inode *inode, struct file *file)
306 {
307         struct ll_file_data *fd;
308         struct ll_sb_info *sbi = ll_i2sbi(inode);
309         struct ll_inode_info *lli = ll_i2info(inode);
310         int rc;
311         ENTRY;
312
313         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p)\n",
314                PFID(ll_inode2fid(inode)), inode);
315
316         if (inode->i_sb->s_root != file_dentry(file))
317                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
318         fd = LUSTRE_FPRIVATE(file);
319         LASSERT(fd != NULL);
320
321         /* The last ref on @file, maybe not the the owner pid of statahead,
322          * because parent and child process can share the same file handle. */
323         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd)
324                 ll_deauthorize_statahead(inode, fd);
325
326         if (inode->i_sb->s_root == file_dentry(file)) {
327                 LUSTRE_FPRIVATE(file) = NULL;
328                 ll_file_data_put(fd);
329                 RETURN(0);
330         }
331
332         if (!S_ISDIR(inode->i_mode)) {
333                 if (lli->lli_clob != NULL)
334                         lov_read_and_clear_async_rc(lli->lli_clob);
335                 lli->lli_async_rc = 0;
336         }
337
338         rc = ll_md_close(inode, file);
339
340         if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
341                 libcfs_debug_dumplog();
342
343         RETURN(rc);
344 }
345
346 static int ll_intent_file_open(struct dentry *de, void *lmm, int lmmsize,
347                                 struct lookup_intent *itp)
348 {
349         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
350         struct dentry *parent = de->d_parent;
351         const char *name = NULL;
352         int len = 0;
353         struct md_op_data *op_data;
354         struct ptlrpc_request *req = NULL;
355         int rc;
356         ENTRY;
357
358         LASSERT(parent != NULL);
359         LASSERT(itp->it_flags & MDS_OPEN_BY_FID);
360
361         /* if server supports open-by-fid, or file name is invalid, don't pack
362          * name in open request */
363         if (!(exp_connect_flags(sbi->ll_md_exp) & OBD_CONNECT_OPEN_BY_FID) &&
364             lu_name_is_valid_2(de->d_name.name, de->d_name.len)) {
365                 name = de->d_name.name;
366                 len = de->d_name.len;
367         }
368
369         op_data = ll_prep_md_op_data(NULL, parent->d_inode, de->d_inode,
370                                      name, len, 0, LUSTRE_OPC_ANY, NULL);
371         if (IS_ERR(op_data))
372                 RETURN(PTR_ERR(op_data));
373         op_data->op_data = lmm;
374         op_data->op_data_size = lmmsize;
375
376         rc = md_intent_lock(sbi->ll_md_exp, op_data, itp, &req,
377                             &ll_md_blocking_ast, 0);
378         ll_finish_md_op_data(op_data);
379         if (rc == -ESTALE) {
380                 /* reason for keep own exit path - don`t flood log
381                  * with messages with -ESTALE errors.
382                  */
383                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
384                      it_open_error(DISP_OPEN_OPEN, itp))
385                         GOTO(out, rc);
386                 ll_release_openhandle(de, itp);
387                 GOTO(out, rc);
388         }
389
390         if (it_disposition(itp, DISP_LOOKUP_NEG))
391                 GOTO(out, rc = -ENOENT);
392
393         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
394                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
395                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
396                 GOTO(out, rc);
397         }
398
399         rc = ll_prep_inode(&de->d_inode, req, NULL, itp);
400         if (!rc && itp->it_lock_mode)
401                 ll_set_lock_data(sbi->ll_md_exp, de->d_inode, itp, NULL);
402
403 out:
404         ptlrpc_req_finished(req);
405         ll_intent_drop_lock(itp);
406
407         /* We did open by fid, but by the time we got to the server,
408          * the object disappeared. If this is a create, we cannot really
409          * tell the userspace that the file it was trying to create
410          * does not exist. Instead let's return -ESTALE, and the VFS will
411          * retry the create with LOOKUP_REVAL that we are going to catch
412          * in ll_revalidate_dentry() and use lookup then.
413          */
414         if (rc == -ENOENT && itp->it_op & IT_CREAT)
415                 rc = -ESTALE;
416
417         RETURN(rc);
418 }
419
420 static int ll_och_fill(struct obd_export *md_exp, struct lookup_intent *it,
421                        struct obd_client_handle *och)
422 {
423         struct mdt_body *body;
424
425         body = req_capsule_server_get(&it->it_request->rq_pill, &RMF_MDT_BODY);
426         och->och_fh = body->mbo_handle;
427         och->och_fid = body->mbo_fid1;
428         och->och_lease_handle.cookie = it->it_lock_handle;
429         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
430         och->och_flags = it->it_flags;
431
432         return md_set_open_replay_data(md_exp, och, it);
433 }
434
435 static int ll_local_open(struct file *file, struct lookup_intent *it,
436                          struct ll_file_data *fd, struct obd_client_handle *och)
437 {
438         struct inode *inode = file_inode(file);
439         ENTRY;
440
441         LASSERT(!LUSTRE_FPRIVATE(file));
442
443         LASSERT(fd != NULL);
444
445         if (och) {
446                 int rc;
447
448                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och);
449                 if (rc != 0)
450                         RETURN(rc);
451         }
452
453         LUSTRE_FPRIVATE(file) = fd;
454         ll_readahead_init(inode, &fd->fd_ras);
455         fd->fd_omode = it->it_flags & (FMODE_READ | FMODE_WRITE | FMODE_EXEC);
456
457         /* ll_cl_context initialize */
458         rwlock_init(&fd->fd_lock);
459         INIT_LIST_HEAD(&fd->fd_lccs);
460
461         RETURN(0);
462 }
463
464 /* Open a file, and (for the very first open) create objects on the OSTs at
465  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
466  * creation or open until ll_lov_setstripe() ioctl is called.
467  *
468  * If we already have the stripe MD locally then we don't request it in
469  * md_open(), by passing a lmm_size = 0.
470  *
471  * It is up to the application to ensure no other processes open this file
472  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
473  * used.  We might be able to avoid races of that sort by getting lli_open_sem
474  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
475  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
476  */
477 int ll_file_open(struct inode *inode, struct file *file)
478 {
479         struct ll_inode_info *lli = ll_i2info(inode);
480         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
481                                           .it_flags = file->f_flags };
482         struct obd_client_handle **och_p = NULL;
483         __u64 *och_usecount = NULL;
484         struct ll_file_data *fd;
485         int rc = 0;
486         ENTRY;
487
488         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), flags %o\n",
489                PFID(ll_inode2fid(inode)), inode, file->f_flags);
490
491         it = file->private_data; /* XXX: compat macro */
492         file->private_data = NULL; /* prevent ll_local_open assertion */
493
494         fd = ll_file_data_get();
495         if (fd == NULL)
496                 GOTO(out_openerr, rc = -ENOMEM);
497
498         fd->fd_file = file;
499         if (S_ISDIR(inode->i_mode))
500                 ll_authorize_statahead(inode, fd);
501
502         if (inode->i_sb->s_root == file_dentry(file)) {
503                 LUSTRE_FPRIVATE(file) = fd;
504                 RETURN(0);
505         }
506
507         if (!it || !it->it_disposition) {
508                 /* Convert f_flags into access mode. We cannot use file->f_mode,
509                  * because everything but O_ACCMODE mask was stripped from
510                  * there */
511                 if ((oit.it_flags + 1) & O_ACCMODE)
512                         oit.it_flags++;
513                 if (file->f_flags & O_TRUNC)
514                         oit.it_flags |= FMODE_WRITE;
515
516                 /* kernel only call f_op->open in dentry_open.  filp_open calls
517                  * dentry_open after call to open_namei that checks permissions.
518                  * Only nfsd_open call dentry_open directly without checking
519                  * permissions and because of that this code below is safe. */
520                 if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
521                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
522
523                 /* We do not want O_EXCL here, presumably we opened the file
524                  * already? XXX - NFS implications? */
525                 oit.it_flags &= ~O_EXCL;
526
527                 /* bug20584, if "it_flags" contains O_CREAT, the file will be
528                  * created if necessary, then "IT_CREAT" should be set to keep
529                  * consistent with it */
530                 if (oit.it_flags & O_CREAT)
531                         oit.it_op |= IT_CREAT;
532
533                 it = &oit;
534         }
535
536 restart:
537         /* Let's see if we have file open on MDS already. */
538         if (it->it_flags & FMODE_WRITE) {
539                 och_p = &lli->lli_mds_write_och;
540                 och_usecount = &lli->lli_open_fd_write_count;
541         } else if (it->it_flags & FMODE_EXEC) {
542                 och_p = &lli->lli_mds_exec_och;
543                 och_usecount = &lli->lli_open_fd_exec_count;
544          } else {
545                 och_p = &lli->lli_mds_read_och;
546                 och_usecount = &lli->lli_open_fd_read_count;
547         }
548
549         mutex_lock(&lli->lli_och_mutex);
550         if (*och_p) { /* Open handle is present */
551                 if (it_disposition(it, DISP_OPEN_OPEN)) {
552                         /* Well, there's extra open request that we do not need,
553                            let's close it somehow. This will decref request. */
554                         rc = it_open_error(DISP_OPEN_OPEN, it);
555                         if (rc) {
556                                 mutex_unlock(&lli->lli_och_mutex);
557                                 GOTO(out_openerr, rc);
558                         }
559
560                         ll_release_openhandle(file_dentry(file), it);
561                 }
562                 (*och_usecount)++;
563
564                 rc = ll_local_open(file, it, fd, NULL);
565                 if (rc) {
566                         (*och_usecount)--;
567                         mutex_unlock(&lli->lli_och_mutex);
568                         GOTO(out_openerr, rc);
569                 }
570         } else {
571                 LASSERT(*och_usecount == 0);
572                 if (!it->it_disposition) {
573                         struct ll_dentry_data *ldd = ll_d2d(file->f_path.dentry);
574                         /* We cannot just request lock handle now, new ELC code
575                            means that one of other OPEN locks for this file
576                            could be cancelled, and since blocking ast handler
577                            would attempt to grab och_mutex as well, that would
578                            result in a deadlock */
579                         mutex_unlock(&lli->lli_och_mutex);
580                         /*
581                          * Normally called under two situations:
582                          * 1. NFS export.
583                          * 2. A race/condition on MDS resulting in no open
584                          *    handle to be returned from LOOKUP|OPEN request,
585                          *    for example if the target entry was a symlink.
586                          *
587                          *  Only fetch MDS_OPEN_LOCK if this is in NFS path,
588                          *  marked by a bit set in ll_iget_for_nfs. Clear the
589                          *  bit so that it's not confusing later callers.
590                          *
591                          *  NB; when ldd is NULL, it must have come via normal
592                          *  lookup path only, since ll_iget_for_nfs always calls
593                          *  ll_d_init().
594                          */
595                         if (ldd && ldd->lld_nfs_dentry) {
596                                 ldd->lld_nfs_dentry = 0;
597                                 it->it_flags |= MDS_OPEN_LOCK;
598                         }
599
600                          /*
601                          * Always specify MDS_OPEN_BY_FID because we don't want
602                          * to get file with different fid.
603                          */
604                         it->it_flags |= MDS_OPEN_BY_FID;
605                         rc = ll_intent_file_open(file_dentry(file), NULL, 0,
606                                                  it);
607                         if (rc)
608                                 GOTO(out_openerr, rc);
609
610                         goto restart;
611                 }
612                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
613                 if (!*och_p)
614                         GOTO(out_och_free, rc = -ENOMEM);
615
616                 (*och_usecount)++;
617
618                 /* md_intent_lock() didn't get a request ref if there was an
619                  * open error, so don't do cleanup on the request here
620                  * (bug 3430) */
621                 /* XXX (green): Should not we bail out on any error here, not
622                  * just open error? */
623                 rc = it_open_error(DISP_OPEN_OPEN, it);
624                 if (rc != 0)
625                         GOTO(out_och_free, rc);
626
627                 LASSERTF(it_disposition(it, DISP_ENQ_OPEN_REF),
628                          "inode %p: disposition %x, status %d\n", inode,
629                          it_disposition(it, ~0), it->it_status);
630
631                 rc = ll_local_open(file, it, fd, *och_p);
632                 if (rc)
633                         GOTO(out_och_free, rc);
634         }
635         mutex_unlock(&lli->lli_och_mutex);
636         fd = NULL;
637
638         /* Must do this outside lli_och_mutex lock to prevent deadlock where
639            different kind of OPEN lock for this same inode gets cancelled
640            by ldlm_cancel_lru */
641         if (!S_ISREG(inode->i_mode))
642                 GOTO(out_och_free, rc);
643
644         cl_lov_delay_create_clear(&file->f_flags);
645         GOTO(out_och_free, rc);
646
647 out_och_free:
648         if (rc) {
649                 if (och_p && *och_p) {
650                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
651                         *och_p = NULL; /* OBD_FREE writes some magic there */
652                         (*och_usecount)--;
653                 }
654                 mutex_unlock(&lli->lli_och_mutex);
655
656 out_openerr:
657                 if (lli->lli_opendir_key == fd)
658                         ll_deauthorize_statahead(inode, fd);
659                 if (fd != NULL)
660                         ll_file_data_put(fd);
661         } else {
662                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
663         }
664
665         if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
666                 ptlrpc_req_finished(it->it_request);
667                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
668         }
669
670         return rc;
671 }
672
673 static int ll_md_blocking_lease_ast(struct ldlm_lock *lock,
674                         struct ldlm_lock_desc *desc, void *data, int flag)
675 {
676         int rc;
677         struct lustre_handle lockh;
678         ENTRY;
679
680         switch (flag) {
681         case LDLM_CB_BLOCKING:
682                 ldlm_lock2handle(lock, &lockh);
683                 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
684                 if (rc < 0) {
685                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
686                         RETURN(rc);
687                 }
688                 break;
689         case LDLM_CB_CANCELING:
690                 /* do nothing */
691                 break;
692         }
693         RETURN(0);
694 }
695
696 /**
697  * When setting a lease on a file, we take ownership of the lli_mds_*_och
698  * and save it as fd->fd_och so as to force client to reopen the file even
699  * if it has an open lock in cache already.
700  */
701 static int ll_lease_och_acquire(struct inode *inode, struct file *file,
702                                 struct lustre_handle *old_handle)
703 {
704         struct ll_inode_info *lli = ll_i2info(inode);
705         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
706         struct obd_client_handle **och_p;
707         __u64 *och_usecount;
708         int rc = 0;
709         ENTRY;
710
711         /* Get the openhandle of the file */
712         mutex_lock(&lli->lli_och_mutex);
713         if (fd->fd_lease_och != NULL)
714                 GOTO(out_unlock, rc = -EBUSY);
715
716         if (fd->fd_och == NULL) {
717                 if (file->f_mode & FMODE_WRITE) {
718                         LASSERT(lli->lli_mds_write_och != NULL);
719                         och_p = &lli->lli_mds_write_och;
720                         och_usecount = &lli->lli_open_fd_write_count;
721                 } else {
722                         LASSERT(lli->lli_mds_read_och != NULL);
723                         och_p = &lli->lli_mds_read_och;
724                         och_usecount = &lli->lli_open_fd_read_count;
725                 }
726
727                 if (*och_usecount > 1)
728                         GOTO(out_unlock, rc = -EBUSY);
729
730                 fd->fd_och = *och_p;
731                 *och_usecount = 0;
732                 *och_p = NULL;
733         }
734
735         *old_handle = fd->fd_och->och_fh;
736
737         EXIT;
738 out_unlock:
739         mutex_unlock(&lli->lli_och_mutex);
740         return rc;
741 }
742
743 /**
744  * Release ownership on lli_mds_*_och when putting back a file lease.
745  */
746 static int ll_lease_och_release(struct inode *inode, struct file *file)
747 {
748         struct ll_inode_info *lli = ll_i2info(inode);
749         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
750         struct obd_client_handle **och_p;
751         struct obd_client_handle *old_och = NULL;
752         __u64 *och_usecount;
753         int rc = 0;
754         ENTRY;
755
756         mutex_lock(&lli->lli_och_mutex);
757         if (file->f_mode & FMODE_WRITE) {
758                 och_p = &lli->lli_mds_write_och;
759                 och_usecount = &lli->lli_open_fd_write_count;
760         } else {
761                 och_p = &lli->lli_mds_read_och;
762                 och_usecount = &lli->lli_open_fd_read_count;
763         }
764
765         /* The file may have been open by another process (broken lease) so
766          * *och_p is not NULL. In this case we should simply increase usecount
767          * and close fd_och.
768          */
769         if (*och_p != NULL) {
770                 old_och = fd->fd_och;
771                 (*och_usecount)++;
772         } else {
773                 *och_p = fd->fd_och;
774                 *och_usecount = 1;
775         }
776         fd->fd_och = NULL;
777         mutex_unlock(&lli->lli_och_mutex);
778
779         if (old_och != NULL)
780                 rc = ll_close_inode_openhandle(inode, old_och, 0, NULL);
781
782         RETURN(rc);
783 }
784
785 /**
786  * Acquire a lease and open the file.
787  */
788 static struct obd_client_handle *
789 ll_lease_open(struct inode *inode, struct file *file, fmode_t fmode,
790               __u64 open_flags)
791 {
792         struct lookup_intent it = { .it_op = IT_OPEN };
793         struct ll_sb_info *sbi = ll_i2sbi(inode);
794         struct md_op_data *op_data;
795         struct ptlrpc_request *req = NULL;
796         struct lustre_handle old_handle = { 0 };
797         struct obd_client_handle *och = NULL;
798         int rc;
799         int rc2;
800         ENTRY;
801
802         if (fmode != FMODE_WRITE && fmode != FMODE_READ)
803                 RETURN(ERR_PTR(-EINVAL));
804
805         if (file != NULL) {
806                 if (!(fmode & file->f_mode) || (file->f_mode & FMODE_EXEC))
807                         RETURN(ERR_PTR(-EPERM));
808
809                 rc = ll_lease_och_acquire(inode, file, &old_handle);
810                 if (rc)
811                         RETURN(ERR_PTR(rc));
812         }
813
814         OBD_ALLOC_PTR(och);
815         if (och == NULL)
816                 RETURN(ERR_PTR(-ENOMEM));
817
818         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, 0, 0,
819                                         LUSTRE_OPC_ANY, NULL);
820         if (IS_ERR(op_data))
821                 GOTO(out, rc = PTR_ERR(op_data));
822
823         /* To tell the MDT this openhandle is from the same owner */
824         op_data->op_handle = old_handle;
825
826         it.it_flags = fmode | open_flags;
827         it.it_flags |= MDS_OPEN_LOCK | MDS_OPEN_BY_FID | MDS_OPEN_LEASE;
828         rc = md_intent_lock(sbi->ll_md_exp, op_data, &it, &req,
829                             &ll_md_blocking_lease_ast,
830         /* LDLM_FL_NO_LRU: To not put the lease lock into LRU list, otherwise
831          * it can be cancelled which may mislead applications that the lease is
832          * broken;
833          * LDLM_FL_EXCL: Set this flag so that it won't be matched by normal
834          * open in ll_md_blocking_ast(). Otherwise as ll_md_blocking_lease_ast
835          * doesn't deal with openhandle, so normal openhandle will be leaked. */
836                             LDLM_FL_NO_LRU | LDLM_FL_EXCL);
837         ll_finish_md_op_data(op_data);
838         ptlrpc_req_finished(req);
839         if (rc < 0)
840                 GOTO(out_release_it, rc);
841
842         if (it_disposition(&it, DISP_LOOKUP_NEG))
843                 GOTO(out_release_it, rc = -ENOENT);
844
845         rc = it_open_error(DISP_OPEN_OPEN, &it);
846         if (rc)
847                 GOTO(out_release_it, rc);
848
849         LASSERT(it_disposition(&it, DISP_ENQ_OPEN_REF));
850         ll_och_fill(sbi->ll_md_exp, &it, och);
851
852         if (!it_disposition(&it, DISP_OPEN_LEASE)) /* old server? */
853                 GOTO(out_close, rc = -EOPNOTSUPP);
854
855         /* already get lease, handle lease lock */
856         ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
857         if (it.it_lock_mode == 0 ||
858             it.it_lock_bits != MDS_INODELOCK_OPEN) {
859                 /* open lock must return for lease */
860                 CERROR(DFID "lease granted but no open lock, %d/%llu.\n",
861                         PFID(ll_inode2fid(inode)), it.it_lock_mode,
862                         it.it_lock_bits);
863                 GOTO(out_close, rc = -EPROTO);
864         }
865
866         ll_intent_release(&it);
867         RETURN(och);
868
869 out_close:
870         /* Cancel open lock */
871         if (it.it_lock_mode != 0) {
872                 ldlm_lock_decref_and_cancel(&och->och_lease_handle,
873                                             it.it_lock_mode);
874                 it.it_lock_mode = 0;
875                 och->och_lease_handle.cookie = 0ULL;
876         }
877         rc2 = ll_close_inode_openhandle(inode, och, 0, NULL);
878         if (rc2 < 0)
879                 CERROR("%s: error closing file "DFID": %d\n",
880                        ll_get_fsname(inode->i_sb, NULL, 0),
881                        PFID(&ll_i2info(inode)->lli_fid), rc2);
882         och = NULL; /* och has been freed in ll_close_inode_openhandle() */
883 out_release_it:
884         ll_intent_release(&it);
885 out:
886         if (och != NULL)
887                 OBD_FREE_PTR(och);
888         RETURN(ERR_PTR(rc));
889 }
890
891 /**
892  * Check whether a layout swap can be done between two inodes.
893  *
894  * \param[in] inode1  First inode to check
895  * \param[in] inode2  Second inode to check
896  *
897  * \retval 0 on success, layout swap can be performed between both inodes
898  * \retval negative error code if requirements are not met
899  */
900 static int ll_check_swap_layouts_validity(struct inode *inode1,
901                                           struct inode *inode2)
902 {
903         if (!S_ISREG(inode1->i_mode) || !S_ISREG(inode2->i_mode))
904                 return -EINVAL;
905
906         if (inode_permission(inode1, MAY_WRITE) ||
907             inode_permission(inode2, MAY_WRITE))
908                 return -EPERM;
909
910         if (inode1->i_sb != inode2->i_sb)
911                 return -EXDEV;
912
913         return 0;
914 }
915
916 static int ll_swap_layouts_close(struct obd_client_handle *och,
917                                  struct inode *inode, struct inode *inode2)
918 {
919         const struct lu_fid     *fid1 = ll_inode2fid(inode);
920         const struct lu_fid     *fid2;
921         int                      rc;
922         ENTRY;
923
924         CDEBUG(D_INODE, "%s: biased close of file "DFID"\n",
925                ll_get_fsname(inode->i_sb, NULL, 0), PFID(fid1));
926
927         rc = ll_check_swap_layouts_validity(inode, inode2);
928         if (rc < 0)
929                 GOTO(out_free_och, rc);
930
931         /* We now know that inode2 is a lustre inode */
932         fid2 = ll_inode2fid(inode2);
933
934         rc = lu_fid_cmp(fid1, fid2);
935         if (rc == 0)
936                 GOTO(out_free_och, rc = -EINVAL);
937
938         /* Close the file and swap layouts between inode & inode2.
939          * NB: lease lock handle is released in mdc_close_layout_swap_pack()
940          * because we still need it to pack l_remote_handle to MDT. */
941         rc = ll_close_inode_openhandle(inode, och, MDS_CLOSE_LAYOUT_SWAP,
942                                        inode2);
943
944         och = NULL; /* freed in ll_close_inode_openhandle() */
945
946 out_free_och:
947         if (och != NULL)
948                 OBD_FREE_PTR(och);
949
950         RETURN(rc);
951 }
952
953 /**
954  * Release lease and close the file.
955  * It will check if the lease has ever broken.
956  */
957 static int ll_lease_close(struct obd_client_handle *och, struct inode *inode,
958                           bool *lease_broken)
959 {
960         struct ldlm_lock *lock;
961         bool cancelled = true;
962         int rc;
963         ENTRY;
964
965         lock = ldlm_handle2lock(&och->och_lease_handle);
966         if (lock != NULL) {
967                 lock_res_and_lock(lock);
968                 cancelled = ldlm_is_cancel(lock);
969                 unlock_res_and_lock(lock);
970                 LDLM_LOCK_PUT(lock);
971         }
972
973         CDEBUG(D_INODE, "lease for "DFID" broken? %d\n",
974                PFID(&ll_i2info(inode)->lli_fid), cancelled);
975
976         if (!cancelled)
977                 ldlm_cli_cancel(&och->och_lease_handle, 0);
978
979         if (lease_broken != NULL)
980                 *lease_broken = cancelled;
981
982         rc = ll_close_inode_openhandle(inode, och, 0, NULL);
983         RETURN(rc);
984 }
985
986 int ll_merge_attr(const struct lu_env *env, struct inode *inode)
987 {
988         struct ll_inode_info *lli = ll_i2info(inode);
989         struct cl_object *obj = lli->lli_clob;
990         struct cl_attr *attr = vvp_env_thread_attr(env);
991         s64 atime;
992         s64 mtime;
993         s64 ctime;
994         int rc = 0;
995
996         ENTRY;
997
998         ll_inode_size_lock(inode);
999
1000         /* Merge timestamps the most recently obtained from MDS with
1001          * timestamps obtained from OSTs.
1002          *
1003          * Do not overwrite atime of inode because it may be refreshed
1004          * by file_accessed() function. If the read was served by cache
1005          * data, there is no RPC to be sent so that atime may not be
1006          * transferred to OSTs at all. MDT only updates atime at close time
1007          * if it's at least 'mdd.*.atime_diff' older.
1008          * All in all, the atime in Lustre does not strictly comply with
1009          * POSIX. Solving this problem needs to send an RPC to MDT for each
1010          * read, this will hurt performance. */
1011         if (LTIME_S(inode->i_atime) < lli->lli_atime || lli->lli_update_atime) {
1012                 LTIME_S(inode->i_atime) = lli->lli_atime;
1013                 lli->lli_update_atime = 0;
1014         }
1015         LTIME_S(inode->i_mtime) = lli->lli_mtime;
1016         LTIME_S(inode->i_ctime) = lli->lli_ctime;
1017
1018         atime = LTIME_S(inode->i_atime);
1019         mtime = LTIME_S(inode->i_mtime);
1020         ctime = LTIME_S(inode->i_ctime);
1021
1022         cl_object_attr_lock(obj);
1023         rc = cl_object_attr_get(env, obj, attr);
1024         cl_object_attr_unlock(obj);
1025
1026         if (rc != 0)
1027                 GOTO(out_size_unlock, rc);
1028
1029         if (atime < attr->cat_atime)
1030                 atime = attr->cat_atime;
1031
1032         if (ctime < attr->cat_ctime)
1033                 ctime = attr->cat_ctime;
1034
1035         if (mtime < attr->cat_mtime)
1036                 mtime = attr->cat_mtime;
1037
1038         CDEBUG(D_VFSTRACE, DFID" updating i_size %llu\n",
1039                PFID(&lli->lli_fid), attr->cat_size);
1040
1041         i_size_write(inode, attr->cat_size);
1042         inode->i_blocks = attr->cat_blocks;
1043
1044         LTIME_S(inode->i_atime) = atime;
1045         LTIME_S(inode->i_mtime) = mtime;
1046         LTIME_S(inode->i_ctime) = ctime;
1047
1048 out_size_unlock:
1049         ll_inode_size_unlock(inode);
1050
1051         RETURN(rc);
1052 }
1053
1054 static bool file_is_noatime(const struct file *file)
1055 {
1056         const struct vfsmount *mnt = file->f_path.mnt;
1057         const struct inode *inode = file_inode((struct file *)file);
1058
1059         /* Adapted from file_accessed() and touch_atime().*/
1060         if (file->f_flags & O_NOATIME)
1061                 return true;
1062
1063         if (inode->i_flags & S_NOATIME)
1064                 return true;
1065
1066         if (IS_NOATIME(inode))
1067                 return true;
1068
1069         if (mnt->mnt_flags & (MNT_NOATIME | MNT_READONLY))
1070                 return true;
1071
1072         if ((mnt->mnt_flags & MNT_NODIRATIME) && S_ISDIR(inode->i_mode))
1073                 return true;
1074
1075         if ((inode->i_sb->s_flags & MS_NODIRATIME) && S_ISDIR(inode->i_mode))
1076                 return true;
1077
1078         return false;
1079 }
1080
1081 static int ll_file_io_ptask(struct cfs_ptask *ptask);
1082
1083 static void ll_io_init(struct cl_io *io, struct file *file, enum cl_io_type iot)
1084 {
1085         struct inode *inode = file_inode(file);
1086
1087         memset(&io->u.ci_rw.rw_iter, 0, sizeof(io->u.ci_rw.rw_iter));
1088         init_sync_kiocb(&io->u.ci_rw.rw_iocb, file);
1089         io->u.ci_rw.rw_file = file;
1090         io->u.ci_rw.rw_ptask = ll_file_io_ptask;
1091         io->u.ci_rw.rw_nonblock = !!(file->f_flags & O_NONBLOCK);
1092         if (iot == CIT_WRITE) {
1093                 io->u.ci_rw.rw_append = !!(file->f_flags & O_APPEND);
1094                 io->u.ci_rw.rw_sync   = !!(file->f_flags & O_SYNC ||
1095                                            file->f_flags & O_DIRECT ||
1096                                            IS_SYNC(inode));
1097         }
1098         io->ci_obj = ll_i2info(inode)->lli_clob;
1099         io->ci_lockreq = CILR_MAYBE;
1100         if (ll_file_nolock(file)) {
1101                 io->ci_lockreq = CILR_NEVER;
1102                 io->ci_no_srvlock = 1;
1103         } else if (file->f_flags & O_APPEND) {
1104                 io->ci_lockreq = CILR_MANDATORY;
1105         }
1106         io->ci_noatime = file_is_noatime(file);
1107         if (ll_i2sbi(inode)->ll_flags & LL_SBI_PIO)
1108                 io->ci_pio = !io->u.ci_rw.rw_append;
1109         else
1110                 io->ci_pio = 0;
1111 }
1112
1113 static int ll_file_io_ptask(struct cfs_ptask *ptask)
1114 {
1115         struct cl_io_pt *pt = ptask->pt_cbdata;
1116         struct file *file = pt->cip_file;
1117         struct lu_env *env;
1118         struct cl_io *io;
1119         loff_t pos = pt->cip_pos;
1120         int rc;
1121         __u16 refcheck;
1122         ENTRY;
1123
1124         env = cl_env_get(&refcheck);
1125         if (IS_ERR(env))
1126                 RETURN(PTR_ERR(env));
1127
1128         CDEBUG(D_VFSTRACE, "%s: %s range: [%llu, %llu)\n",
1129                 file_dentry(file)->d_name.name,
1130                 pt->cip_iot == CIT_READ ? "read" : "write",
1131                 pos, pos + pt->cip_count);
1132
1133 restart:
1134         io = vvp_env_thread_io(env);
1135         ll_io_init(io, file, pt->cip_iot);
1136         io->u.ci_rw.rw_iter = pt->cip_iter;
1137         io->u.ci_rw.rw_iocb = pt->cip_iocb;
1138         io->ci_pio = 0; /* It's already in parallel task */
1139
1140         rc = cl_io_rw_init(env, io, pt->cip_iot, pos,
1141                            pt->cip_count - pt->cip_result);
1142         if (!rc) {
1143                 struct vvp_io *vio = vvp_env_io(env);
1144
1145                 vio->vui_io_subtype = IO_NORMAL;
1146                 vio->vui_fd = LUSTRE_FPRIVATE(file);
1147
1148                 ll_cl_add(file, env, io, LCC_RW);
1149                 rc = cl_io_loop(env, io);
1150                 ll_cl_remove(file, env);
1151         } else {
1152                 /* cl_io_rw_init() handled IO */
1153                 rc = io->ci_result;
1154         }
1155
1156         if (OBD_FAIL_CHECK_RESET(OBD_FAIL_LLITE_PTASK_IO_FAIL, 0)) {
1157                 if (io->ci_nob > 0)
1158                         io->ci_nob /= 2;
1159                 rc = -EIO;
1160         }
1161
1162         if (io->ci_nob > 0) {
1163                 pt->cip_result += io->ci_nob;
1164                 iov_iter_advance(&pt->cip_iter, io->ci_nob);
1165                 pos += io->ci_nob;
1166                 pt->cip_iocb.ki_pos = pos;
1167 #ifdef HAVE_KIOCB_KI_LEFT
1168                 pt->cip_iocb.ki_left = pt->cip_count - pt->cip_result;
1169 #elif defined(HAVE_KI_NBYTES)
1170                 pt->cip_iocb.ki_nbytes = pt->cip_count - pt->cip_result;
1171 #endif
1172         }
1173
1174         cl_io_fini(env, io);
1175
1176         if ((rc == 0 || rc == -ENODATA) &&
1177             pt->cip_result < pt->cip_count &&
1178             io->ci_need_restart) {
1179                 CDEBUG(D_VFSTRACE,
1180                         "%s: restart %s range: [%llu, %llu) ret: %zd, rc: %d\n",
1181                         file_dentry(file)->d_name.name,
1182                         pt->cip_iot == CIT_READ ? "read" : "write",
1183                         pos, pos + pt->cip_count - pt->cip_result,
1184                         pt->cip_result, rc);
1185                 goto restart;
1186         }
1187
1188         CDEBUG(D_VFSTRACE, "%s: %s ret: %zd, rc: %d\n",
1189                 file_dentry(file)->d_name.name,
1190                 pt->cip_iot == CIT_READ ? "read" : "write",
1191                 pt->cip_result, rc);
1192
1193         cl_env_put(env, &refcheck);
1194         RETURN(pt->cip_result > 0 ? 0 : rc);
1195 }
1196
1197 static ssize_t
1198 ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
1199                    struct file *file, enum cl_io_type iot,
1200                    loff_t *ppos, size_t count)
1201 {
1202         struct range_lock       range;
1203         struct vvp_io           *vio = vvp_env_io(env);
1204         struct inode            *inode = file_inode(file);
1205         struct ll_inode_info    *lli = ll_i2info(inode);
1206         struct ll_file_data     *fd  = LUSTRE_FPRIVATE(file);
1207         struct cl_io            *io;
1208         loff_t                  pos = *ppos;
1209         ssize_t                 result = 0;
1210         int                     rc = 0;
1211
1212         ENTRY;
1213
1214         CDEBUG(D_VFSTRACE, "%s: %s range: [%llu, %llu)\n",
1215                 file_dentry(file)->d_name.name,
1216                 iot == CIT_READ ? "read" : "write", pos, pos + count);
1217
1218 restart:
1219         io = vvp_env_thread_io(env);
1220         ll_io_init(io, file, iot);
1221         if (args->via_io_subtype == IO_NORMAL) {
1222                 io->u.ci_rw.rw_iter = *args->u.normal.via_iter;
1223                 io->u.ci_rw.rw_iocb = *args->u.normal.via_iocb;
1224         } else {
1225                 io->ci_pio = 0;
1226         }
1227
1228         if (cl_io_rw_init(env, io, iot, pos, count) == 0) {
1229                 bool range_locked = false;
1230
1231                 if (file->f_flags & O_APPEND)
1232                         range_lock_init(&range, 0, LUSTRE_EOF);
1233                 else
1234                         range_lock_init(&range, pos, pos + count - 1);
1235
1236                 vio->vui_fd  = LUSTRE_FPRIVATE(file);
1237                 vio->vui_io_subtype = args->via_io_subtype;
1238
1239                 switch (vio->vui_io_subtype) {
1240                 case IO_NORMAL:
1241                         /* Direct IO reads must also take range lock,
1242                          * or multiple reads will try to work on the same pages
1243                          * See LU-6227 for details. */
1244                         if (((iot == CIT_WRITE) ||
1245                             (iot == CIT_READ && (file->f_flags & O_DIRECT))) &&
1246                             !(vio->vui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1247                                 CDEBUG(D_VFSTRACE, "Range lock "RL_FMT"\n",
1248                                        RL_PARA(&range));
1249                                 rc = range_lock(&lli->lli_write_tree, &range);
1250                                 if (rc < 0)
1251                                         GOTO(out, rc);
1252
1253                                 range_locked = true;
1254                         }
1255                         break;
1256                 case IO_SPLICE:
1257                         vio->u.splice.vui_pipe = args->u.splice.via_pipe;
1258                         vio->u.splice.vui_flags = args->u.splice.via_flags;
1259                         break;
1260                 default:
1261                         CERROR("unknown IO subtype %u\n", vio->vui_io_subtype);
1262                         LBUG();
1263                 }
1264
1265                 ll_cl_add(file, env, io, LCC_RW);
1266                 if (io->ci_pio && iot == CIT_WRITE && !IS_NOSEC(inode) &&
1267                     !lli->lli_inode_locked) {
1268                         inode_lock(inode);
1269                         lli->lli_inode_locked = 1;
1270                 }
1271                 rc = cl_io_loop(env, io);
1272                 if (lli->lli_inode_locked) {
1273                         lli->lli_inode_locked = 0;
1274                         inode_unlock(inode);
1275                 }
1276                 ll_cl_remove(file, env);
1277
1278                 if (range_locked) {
1279                         CDEBUG(D_VFSTRACE, "Range unlock "RL_FMT"\n",
1280                                RL_PARA(&range));
1281                         range_unlock(&lli->lli_write_tree, &range);
1282                 }
1283         } else {
1284                 /* cl_io_rw_init() handled IO */
1285                 rc = io->ci_result;
1286         }
1287
1288         if (io->ci_nob > 0) {
1289                 result += io->ci_nob;
1290                 count  -= io->ci_nob;
1291
1292                 if (args->via_io_subtype == IO_NORMAL) {
1293                         iov_iter_advance(args->u.normal.via_iter, io->ci_nob);
1294                         pos += io->ci_nob;
1295                         args->u.normal.via_iocb->ki_pos = pos;
1296 #ifdef HAVE_KIOCB_KI_LEFT
1297                         args->u.normal.via_iocb->ki_left = count;
1298 #elif defined(HAVE_KI_NBYTES)
1299                         args->u.normal.via_iocb->ki_nbytes = count;
1300 #endif
1301                 } else {
1302                         /* for splice */
1303                         pos = io->u.ci_rw.rw_range.cir_pos;
1304                 }
1305         }
1306 out:
1307         cl_io_fini(env, io);
1308
1309         if ((rc == 0 || rc == -ENODATA) && count > 0 && io->ci_need_restart) {
1310                 CDEBUG(D_VFSTRACE,
1311                         "%s: restart %s range: [%llu, %llu) ret: %zd, rc: %d\n",
1312                         file_dentry(file)->d_name.name,
1313                         iot == CIT_READ ? "read" : "write",
1314                         pos, pos + count, result, rc);
1315                 goto restart;
1316         }
1317
1318         if (iot == CIT_READ) {
1319                 if (result > 0)
1320                         ll_stats_ops_tally(ll_i2sbi(inode),
1321                                            LPROC_LL_READ_BYTES, result);
1322         } else if (iot == CIT_WRITE) {
1323                 if (result > 0) {
1324                         ll_stats_ops_tally(ll_i2sbi(inode),
1325                                            LPROC_LL_WRITE_BYTES, result);
1326                         fd->fd_write_failed = false;
1327                 } else if (result == 0 && rc == 0) {
1328                         rc = io->ci_result;
1329                         if (rc < 0)
1330                                 fd->fd_write_failed = true;
1331                         else
1332                                 fd->fd_write_failed = false;
1333                 } else if (rc != -ERESTARTSYS) {
1334                         fd->fd_write_failed = true;
1335                 }
1336         }
1337
1338         CDEBUG(D_VFSTRACE, "%s: %s *ppos: %llu, pos: %llu, ret: %zd, rc: %d\n",
1339                 file_dentry(file)->d_name.name,
1340                 iot == CIT_READ ? "read" : "write", *ppos, pos, result, rc);
1341
1342         *ppos = pos;
1343
1344         RETURN(result > 0 ? result : rc);
1345 }
1346
1347 /**
1348  * The purpose of fast read is to overcome per I/O overhead and improve IOPS
1349  * especially for small I/O.
1350  *
1351  * To serve a read request, CLIO has to create and initialize a cl_io and
1352  * then request DLM lock. This has turned out to have siginificant overhead
1353  * and affects the performance of small I/O dramatically.
1354  *
1355  * It's not necessary to create a cl_io for each I/O. Under the help of read
1356  * ahead, most of the pages being read are already in memory cache and we can
1357  * read those pages directly because if the pages exist, the corresponding DLM
1358  * lock must exist so that page content must be valid.
1359  *
1360  * In fast read implementation, the llite speculatively finds and reads pages
1361  * in memory cache. There are three scenarios for fast read:
1362  *   - If the page exists and is uptodate, kernel VM will provide the data and
1363  *     CLIO won't be intervened;
1364  *   - If the page was brought into memory by read ahead, it will be exported
1365  *     and read ahead parameters will be updated;
1366  *   - Otherwise the page is not in memory, we can't do fast read. Therefore,
1367  *     it will go back and invoke normal read, i.e., a cl_io will be created
1368  *     and DLM lock will be requested.
1369  *
1370  * POSIX compliance: posix standard states that read is intended to be atomic.
1371  * Lustre read implementation is in line with Linux kernel read implementation
1372  * and neither of them complies with POSIX standard in this matter. Fast read
1373  * doesn't make the situation worse on single node but it may interleave write
1374  * results from multiple nodes due to short read handling in ll_file_aio_read().
1375  *
1376  * \param env - lu_env
1377  * \param iocb - kiocb from kernel
1378  * \param iter - user space buffers where the data will be copied
1379  *
1380  * \retval - number of bytes have been read, or error code if error occurred.
1381  */
1382 static ssize_t
1383 ll_do_fast_read(struct kiocb *iocb, struct iov_iter *iter)
1384 {
1385         ssize_t result;
1386
1387         if (!ll_sbi_has_fast_read(ll_i2sbi(file_inode(iocb->ki_filp))))
1388                 return 0;
1389
1390         /* NB: we can't do direct IO for fast read because it will need a lock
1391          * to make IO engine happy. */
1392         if (iocb->ki_filp->f_flags & O_DIRECT)
1393                 return 0;
1394
1395         result = generic_file_read_iter(iocb, iter);
1396
1397         /* If the first page is not in cache, generic_file_aio_read() will be
1398          * returned with -ENODATA.
1399          * See corresponding code in ll_readpage(). */
1400         if (result == -ENODATA)
1401                 result = 0;
1402
1403         if (result > 0)
1404                 ll_stats_ops_tally(ll_i2sbi(file_inode(iocb->ki_filp)),
1405                                 LPROC_LL_READ_BYTES, result);
1406
1407         return result;
1408 }
1409
1410 /*
1411  * Read from a file (through the page cache).
1412  */
1413 static ssize_t ll_file_read_iter(struct kiocb *iocb, struct iov_iter *to)
1414 {
1415         struct lu_env *env;
1416         struct vvp_io_args *args;
1417         ssize_t result;
1418         ssize_t rc2;
1419         __u16 refcheck;
1420
1421         result = ll_do_fast_read(iocb, to);
1422         if (result < 0 || iov_iter_count(to) == 0)
1423                 GOTO(out, result);
1424
1425         env = cl_env_get(&refcheck);
1426         if (IS_ERR(env))
1427                 return PTR_ERR(env);
1428
1429         args = ll_env_args(env, IO_NORMAL);
1430         args->u.normal.via_iter = to;
1431         args->u.normal.via_iocb = iocb;
1432
1433         rc2 = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1434                                  &iocb->ki_pos, iov_iter_count(to));
1435         if (rc2 > 0)
1436                 result += rc2;
1437         else if (result == 0)
1438                 result = rc2;
1439
1440         cl_env_put(env, &refcheck);
1441 out:
1442         return result;
1443 }
1444
1445 /*
1446  * Write to a file (through the page cache).
1447  */
1448 static ssize_t ll_file_write_iter(struct kiocb *iocb, struct iov_iter *from)
1449 {
1450         struct vvp_io_args *args;
1451         struct lu_env *env;
1452         ssize_t result;
1453         __u16 refcheck;
1454
1455         env = cl_env_get(&refcheck);
1456         if (IS_ERR(env))
1457                 return PTR_ERR(env);
1458
1459         args = ll_env_args(env, IO_NORMAL);
1460         args->u.normal.via_iter = from;
1461         args->u.normal.via_iocb = iocb;
1462
1463         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1464                                     &iocb->ki_pos, iov_iter_count(from));
1465         cl_env_put(env, &refcheck);
1466         return result;
1467 }
1468
1469 #ifndef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1470 /*
1471  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
1472  */
1473 static int ll_file_get_iov_count(const struct iovec *iov,
1474                                  unsigned long *nr_segs, size_t *count)
1475 {
1476         size_t cnt = 0;
1477         unsigned long seg;
1478
1479         for (seg = 0; seg < *nr_segs; seg++) {
1480                 const struct iovec *iv = &iov[seg];
1481
1482                 /*
1483                  * If any segment has a negative length, or the cumulative
1484                  * length ever wraps negative then return -EINVAL.
1485                  */
1486                 cnt += iv->iov_len;
1487                 if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
1488                         return -EINVAL;
1489                 if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
1490                         continue;
1491                 if (seg == 0)
1492                         return -EFAULT;
1493                 *nr_segs = seg;
1494                 cnt -= iv->iov_len;     /* This segment is no good */
1495                 break;
1496         }
1497         *count = cnt;
1498         return 0;
1499 }
1500
1501 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
1502                                 unsigned long nr_segs, loff_t pos)
1503 {
1504         struct iov_iter to;
1505         size_t iov_count;
1506         ssize_t result;
1507         ENTRY;
1508
1509         result = ll_file_get_iov_count(iov, &nr_segs, &iov_count);
1510         if (result)
1511                 RETURN(result);
1512
1513 # ifdef HAVE_IOV_ITER_INIT_DIRECTION
1514         iov_iter_init(&to, READ, iov, nr_segs, iov_count);
1515 # else /* !HAVE_IOV_ITER_INIT_DIRECTION */
1516         iov_iter_init(&to, iov, nr_segs, iov_count, 0);
1517 # endif /* HAVE_IOV_ITER_INIT_DIRECTION */
1518
1519         result = ll_file_read_iter(iocb, &to);
1520
1521         RETURN(result);
1522 }
1523
1524 static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count,
1525                             loff_t *ppos)
1526 {
1527         struct iovec   iov = { .iov_base = buf, .iov_len = count };
1528         struct kiocb   kiocb;
1529         ssize_t        result;
1530         ENTRY;
1531
1532         init_sync_kiocb(&kiocb, file);
1533         kiocb.ki_pos = *ppos;
1534 #ifdef HAVE_KIOCB_KI_LEFT
1535         kiocb.ki_left = count;
1536 #elif defined(HAVE_KI_NBYTES)
1537         kiocb.i_nbytes = count;
1538 #endif
1539
1540         result = ll_file_aio_read(&kiocb, &iov, 1, kiocb.ki_pos);
1541         *ppos = kiocb.ki_pos;
1542
1543         RETURN(result);
1544 }
1545
1546 /*
1547  * Write to a file (through the page cache).
1548  * AIO stuff
1549  */
1550 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1551                                  unsigned long nr_segs, loff_t pos)
1552 {
1553         struct iov_iter from;
1554         size_t iov_count;
1555         ssize_t result;
1556         ENTRY;
1557
1558         result = ll_file_get_iov_count(iov, &nr_segs, &iov_count);
1559         if (result)
1560                 RETURN(result);
1561
1562 # ifdef HAVE_IOV_ITER_INIT_DIRECTION
1563         iov_iter_init(&from, WRITE, iov, nr_segs, iov_count);
1564 # else /* !HAVE_IOV_ITER_INIT_DIRECTION */
1565         iov_iter_init(&from, iov, nr_segs, iov_count, 0);
1566 # endif /* HAVE_IOV_ITER_INIT_DIRECTION */
1567
1568         result = ll_file_write_iter(iocb, &from);
1569
1570         RETURN(result);
1571 }
1572
1573 static ssize_t ll_file_write(struct file *file, const char __user *buf,
1574                              size_t count, loff_t *ppos)
1575 {
1576         struct lu_env *env;
1577         struct iovec   iov = { .iov_base = (void __user *)buf,
1578                                .iov_len = count };
1579         struct kiocb  *kiocb;
1580         ssize_t        result;
1581         __u16          refcheck;
1582         ENTRY;
1583
1584         env = cl_env_get(&refcheck);
1585         if (IS_ERR(env))
1586                 RETURN(PTR_ERR(env));
1587
1588         kiocb = &ll_env_info(env)->lti_kiocb;
1589         init_sync_kiocb(kiocb, file);
1590         kiocb->ki_pos = *ppos;
1591 #ifdef HAVE_KIOCB_KI_LEFT
1592         kiocb->ki_left = count;
1593 #elif defined(HAVE_KI_NBYTES)
1594         kiocb->ki_nbytes = count;
1595 #endif
1596
1597         result = ll_file_aio_write(kiocb, &iov, 1, kiocb->ki_pos);
1598         *ppos = kiocb->ki_pos;
1599
1600         cl_env_put(env, &refcheck);
1601         RETURN(result);
1602 }
1603 #endif /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
1604
1605 /*
1606  * Send file content (through pagecache) somewhere with helper
1607  */
1608 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1609                                    struct pipe_inode_info *pipe, size_t count,
1610                                    unsigned int flags)
1611 {
1612         struct lu_env      *env;
1613         struct vvp_io_args *args;
1614         ssize_t             result;
1615         __u16               refcheck;
1616         ENTRY;
1617
1618         env = cl_env_get(&refcheck);
1619         if (IS_ERR(env))
1620                 RETURN(PTR_ERR(env));
1621
1622         args = ll_env_args(env, IO_SPLICE);
1623         args->u.splice.via_pipe = pipe;
1624         args->u.splice.via_flags = flags;
1625
1626         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1627         cl_env_put(env, &refcheck);
1628         RETURN(result);
1629 }
1630
1631 int ll_lov_setstripe_ea_info(struct inode *inode, struct dentry *dentry,
1632                              __u64 flags, struct lov_user_md *lum, int lum_size)
1633 {
1634         struct lookup_intent oit = {
1635                 .it_op = IT_OPEN,
1636                 .it_flags = flags | MDS_OPEN_BY_FID,
1637         };
1638         int rc;
1639         ENTRY;
1640
1641         ll_inode_size_lock(inode);
1642         rc = ll_intent_file_open(dentry, lum, lum_size, &oit);
1643         if (rc < 0)
1644                 GOTO(out_unlock, rc);
1645
1646         ll_release_openhandle(dentry, &oit);
1647
1648 out_unlock:
1649         ll_inode_size_unlock(inode);
1650         ll_intent_release(&oit);
1651
1652         RETURN(rc);
1653 }
1654
1655 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1656                              struct lov_mds_md **lmmp, int *lmm_size,
1657                              struct ptlrpc_request **request)
1658 {
1659         struct ll_sb_info *sbi = ll_i2sbi(inode);
1660         struct mdt_body  *body;
1661         struct lov_mds_md *lmm = NULL;
1662         struct ptlrpc_request *req = NULL;
1663         struct md_op_data *op_data;
1664         int rc, lmmsize;
1665
1666         rc = ll_get_default_mdsize(sbi, &lmmsize);
1667         if (rc)
1668                 RETURN(rc);
1669
1670         op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1671                                      strlen(filename), lmmsize,
1672                                      LUSTRE_OPC_ANY, NULL);
1673         if (IS_ERR(op_data))
1674                 RETURN(PTR_ERR(op_data));
1675
1676         op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1677         rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1678         ll_finish_md_op_data(op_data);
1679         if (rc < 0) {
1680                 CDEBUG(D_INFO, "md_getattr_name failed "
1681                        "on %s: rc %d\n", filename, rc);
1682                 GOTO(out, rc);
1683         }
1684
1685         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1686         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1687
1688         lmmsize = body->mbo_eadatasize;
1689
1690         if (!(body->mbo_valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1691                         lmmsize == 0) {
1692                 GOTO(out, rc = -ENODATA);
1693         }
1694
1695         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1696         LASSERT(lmm != NULL);
1697
1698         if (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1) &&
1699             lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3) &&
1700             lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_COMP_V1))
1701                 GOTO(out, rc = -EPROTO);
1702
1703         /*
1704          * This is coming from the MDS, so is probably in
1705          * little endian.  We convert it to host endian before
1706          * passing it to userspace.
1707          */
1708         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1709                 int stripe_count;
1710
1711                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1) ||
1712                     lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1713                         stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
1714                         if (le32_to_cpu(lmm->lmm_pattern) &
1715                             LOV_PATTERN_F_RELEASED)
1716                                 stripe_count = 0;
1717                 }
1718
1719                 /* if function called for directory - we should
1720                  * avoid swab not existent lsm objects */
1721                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1722                         lustre_swab_lov_user_md_v1(
1723                                         (struct lov_user_md_v1 *)lmm);
1724                         if (S_ISREG(body->mbo_mode))
1725                                 lustre_swab_lov_user_md_objects(
1726                                     ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1727                                     stripe_count);
1728                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1729                         lustre_swab_lov_user_md_v3(
1730                                         (struct lov_user_md_v3 *)lmm);
1731                         if (S_ISREG(body->mbo_mode))
1732                                 lustre_swab_lov_user_md_objects(
1733                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1734                                  stripe_count);
1735                 } else if (lmm->lmm_magic ==
1736                            cpu_to_le32(LOV_MAGIC_COMP_V1)) {
1737                         lustre_swab_lov_comp_md_v1(
1738                                         (struct lov_comp_md_v1 *)lmm);
1739                 }
1740         }
1741
1742 out:
1743         *lmmp = lmm;
1744         *lmm_size = lmmsize;
1745         *request = req;
1746         return rc;
1747 }
1748
1749 static int ll_lov_setea(struct inode *inode, struct file *file,
1750                         void __user *arg)
1751 {
1752         __u64                    flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1753         struct lov_user_md      *lump;
1754         int                      lum_size = sizeof(struct lov_user_md) +
1755                                             sizeof(struct lov_user_ost_data);
1756         int                      rc;
1757         ENTRY;
1758
1759         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1760                 RETURN(-EPERM);
1761
1762         OBD_ALLOC_LARGE(lump, lum_size);
1763         if (lump == NULL)
1764                 RETURN(-ENOMEM);
1765
1766         if (copy_from_user(lump, arg, lum_size))
1767                 GOTO(out_lump, rc = -EFAULT);
1768
1769         rc = ll_lov_setstripe_ea_info(inode, file_dentry(file), flags, lump,
1770                                       lum_size);
1771         cl_lov_delay_create_clear(&file->f_flags);
1772
1773 out_lump:
1774         OBD_FREE_LARGE(lump, lum_size);
1775         RETURN(rc);
1776 }
1777
1778 static int ll_file_getstripe(struct inode *inode, void __user *lum, size_t size)
1779 {
1780         struct lu_env   *env;
1781         __u16           refcheck;
1782         int             rc;
1783         ENTRY;
1784
1785         env = cl_env_get(&refcheck);
1786         if (IS_ERR(env))
1787                 RETURN(PTR_ERR(env));
1788
1789         rc = cl_object_getstripe(env, ll_i2info(inode)->lli_clob, lum, size);
1790         cl_env_put(env, &refcheck);
1791         RETURN(rc);
1792 }
1793
1794 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1795                             void __user *arg)
1796 {
1797         struct lov_user_md __user *lum = (struct lov_user_md __user *)arg;
1798         struct lov_user_md        *klum;
1799         int                        lum_size, rc;
1800         __u64                      flags = FMODE_WRITE;
1801         ENTRY;
1802
1803         rc = ll_copy_user_md(lum, &klum);
1804         if (rc < 0)
1805                 RETURN(rc);
1806
1807         lum_size = rc;
1808         rc = ll_lov_setstripe_ea_info(inode, file_dentry(file), flags, klum,
1809                                       lum_size);
1810         if (!rc) {
1811                 __u32 gen;
1812
1813                 rc = put_user(0, &lum->lmm_stripe_count);
1814                 if (rc)
1815                         GOTO(out, rc);
1816
1817                 rc = ll_layout_refresh(inode, &gen);
1818                 if (rc)
1819                         GOTO(out, rc);
1820
1821                 rc = ll_file_getstripe(inode, arg, lum_size);
1822         }
1823         cl_lov_delay_create_clear(&file->f_flags);
1824
1825 out:
1826         OBD_FREE(klum, lum_size);
1827         RETURN(rc);
1828 }
1829
1830 static int
1831 ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1832 {
1833         struct ll_inode_info *lli = ll_i2info(inode);
1834         struct cl_object *obj = lli->lli_clob;
1835         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1836         struct ll_grouplock grouplock;
1837         int rc;
1838         ENTRY;
1839
1840         if (arg == 0) {
1841                 CWARN("group id for group lock must not be 0\n");
1842                 RETURN(-EINVAL);
1843         }
1844
1845         if (ll_file_nolock(file))
1846                 RETURN(-EOPNOTSUPP);
1847
1848         spin_lock(&lli->lli_lock);
1849         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1850                 CWARN("group lock already existed with gid %lu\n",
1851                       fd->fd_grouplock.lg_gid);
1852                 spin_unlock(&lli->lli_lock);
1853                 RETURN(-EINVAL);
1854         }
1855         LASSERT(fd->fd_grouplock.lg_lock == NULL);
1856         spin_unlock(&lli->lli_lock);
1857
1858         /**
1859          * XXX: group lock needs to protect all OST objects while PFL
1860          * can add new OST objects during the IO, so we'd instantiate
1861          * all OST objects before getting its group lock.
1862          */
1863         if (obj) {
1864                 struct lu_env *env;
1865                 __u16 refcheck;
1866                 struct cl_layout cl = {
1867                         .cl_is_composite = false,
1868                 };
1869
1870                 env = cl_env_get(&refcheck);
1871                 if (IS_ERR(env))
1872                         RETURN(PTR_ERR(env));
1873
1874                 rc = cl_object_layout_get(env, obj, &cl);
1875                 if (!rc && cl.cl_is_composite)
1876                         rc = ll_layout_write_intent(inode, 0, OBD_OBJECT_EOF);
1877
1878                 cl_env_put(env, &refcheck);
1879                 if (rc)
1880                         RETURN(rc);
1881         }
1882
1883         rc = cl_get_grouplock(ll_i2info(inode)->lli_clob,
1884                               arg, (file->f_flags & O_NONBLOCK), &grouplock);
1885         if (rc)
1886                 RETURN(rc);
1887
1888         spin_lock(&lli->lli_lock);
1889         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1890                 spin_unlock(&lli->lli_lock);
1891                 CERROR("another thread just won the race\n");
1892                 cl_put_grouplock(&grouplock);
1893                 RETURN(-EINVAL);
1894         }
1895
1896         fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1897         fd->fd_grouplock = grouplock;
1898         spin_unlock(&lli->lli_lock);
1899
1900         CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1901         RETURN(0);
1902 }
1903
1904 static int ll_put_grouplock(struct inode *inode, struct file *file,
1905                             unsigned long arg)
1906 {
1907         struct ll_inode_info   *lli = ll_i2info(inode);
1908         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1909         struct ll_grouplock     grouplock;
1910         ENTRY;
1911
1912         spin_lock(&lli->lli_lock);
1913         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1914                 spin_unlock(&lli->lli_lock);
1915                 CWARN("no group lock held\n");
1916                 RETURN(-EINVAL);
1917         }
1918
1919         LASSERT(fd->fd_grouplock.lg_lock != NULL);
1920
1921         if (fd->fd_grouplock.lg_gid != arg) {
1922                 CWARN("group lock %lu doesn't match current id %lu\n",
1923                       arg, fd->fd_grouplock.lg_gid);
1924                 spin_unlock(&lli->lli_lock);
1925                 RETURN(-EINVAL);
1926         }
1927
1928         grouplock = fd->fd_grouplock;
1929         memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1930         fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1931         spin_unlock(&lli->lli_lock);
1932
1933         cl_put_grouplock(&grouplock);
1934         CDEBUG(D_INFO, "group lock %lu released\n", arg);
1935         RETURN(0);
1936 }
1937
1938 /**
1939  * Close inode open handle
1940  *
1941  * \param dentry [in]     dentry which contains the inode
1942  * \param it     [in,out] intent which contains open info and result
1943  *
1944  * \retval 0     success
1945  * \retval <0    failure
1946  */
1947 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1948 {
1949         struct inode *inode = dentry->d_inode;
1950         struct obd_client_handle *och;
1951         int rc;
1952         ENTRY;
1953
1954         LASSERT(inode);
1955
1956         /* Root ? Do nothing. */
1957         if (dentry->d_inode->i_sb->s_root == dentry)
1958                 RETURN(0);
1959
1960         /* No open handle to close? Move away */
1961         if (!it_disposition(it, DISP_OPEN_OPEN))
1962                 RETURN(0);
1963
1964         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1965
1966         OBD_ALLOC(och, sizeof(*och));
1967         if (!och)
1968                 GOTO(out, rc = -ENOMEM);
1969
1970         ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och);
1971
1972         rc = ll_close_inode_openhandle(inode, och, 0, NULL);
1973 out:
1974         /* this one is in place of ll_file_open */
1975         if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1976                 ptlrpc_req_finished(it->it_request);
1977                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1978         }
1979         RETURN(rc);
1980 }
1981
1982 /**
1983  * Get size for inode for which FIEMAP mapping is requested.
1984  * Make the FIEMAP get_info call and returns the result.
1985  * \param fiemap        kernel buffer to hold extens
1986  * \param num_bytes     kernel buffer size
1987  */
1988 static int ll_do_fiemap(struct inode *inode, struct fiemap *fiemap,
1989                         size_t num_bytes)
1990 {
1991         struct lu_env                   *env;
1992         __u16                           refcheck;
1993         int                             rc = 0;
1994         struct ll_fiemap_info_key       fmkey = { .lfik_name = KEY_FIEMAP, };
1995         ENTRY;
1996
1997         /* Checks for fiemap flags */
1998         if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1999                 fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
2000                 return -EBADR;
2001         }
2002
2003         /* Check for FIEMAP_FLAG_SYNC */
2004         if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
2005                 rc = filemap_fdatawrite(inode->i_mapping);
2006                 if (rc)
2007                         return rc;
2008         }
2009
2010         env = cl_env_get(&refcheck);
2011         if (IS_ERR(env))
2012                 RETURN(PTR_ERR(env));
2013
2014         if (i_size_read(inode) == 0) {
2015                 rc = ll_glimpse_size(inode);
2016                 if (rc)
2017                         GOTO(out, rc);
2018         }
2019
2020         fmkey.lfik_oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2021         obdo_from_inode(&fmkey.lfik_oa, inode, OBD_MD_FLSIZE);
2022         obdo_set_parent_fid(&fmkey.lfik_oa, &ll_i2info(inode)->lli_fid);
2023
2024         /* If filesize is 0, then there would be no objects for mapping */
2025         if (fmkey.lfik_oa.o_size == 0) {
2026                 fiemap->fm_mapped_extents = 0;
2027                 GOTO(out, rc = 0);
2028         }
2029
2030         fmkey.lfik_fiemap = *fiemap;
2031
2032         rc = cl_object_fiemap(env, ll_i2info(inode)->lli_clob,
2033                               &fmkey, fiemap, &num_bytes);
2034 out:
2035         cl_env_put(env, &refcheck);
2036         RETURN(rc);
2037 }
2038
2039 int ll_fid2path(struct inode *inode, void __user *arg)
2040 {
2041         struct obd_export       *exp = ll_i2mdexp(inode);
2042         const struct getinfo_fid2path __user *gfin = arg;
2043         __u32                    pathlen;
2044         struct getinfo_fid2path *gfout;
2045         size_t                   outsize;
2046         int                      rc;
2047
2048         ENTRY;
2049
2050         if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
2051             !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
2052                 RETURN(-EPERM);
2053
2054         /* Only need to get the buflen */
2055         if (get_user(pathlen, &gfin->gf_pathlen))
2056                 RETURN(-EFAULT);
2057
2058         if (pathlen > PATH_MAX)
2059                 RETURN(-EINVAL);
2060
2061         outsize = sizeof(*gfout) + pathlen;
2062         OBD_ALLOC(gfout, outsize);
2063         if (gfout == NULL)
2064                 RETURN(-ENOMEM);
2065
2066         if (copy_from_user(gfout, arg, sizeof(*gfout)))
2067                 GOTO(gf_free, rc = -EFAULT);
2068         /* append root FID after gfout to let MDT know the root FID so that it
2069          * can lookup the correct path, this is mainly for fileset.
2070          * old server without fileset mount support will ignore this. */
2071         *gfout->gf_u.gf_root_fid = *ll_inode2fid(inode);
2072
2073         /* Call mdc_iocontrol */
2074         rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
2075         if (rc != 0)
2076                 GOTO(gf_free, rc);
2077
2078         if (copy_to_user(arg, gfout, outsize))
2079                 rc = -EFAULT;
2080
2081 gf_free:
2082         OBD_FREE(gfout, outsize);
2083         RETURN(rc);
2084 }
2085
2086 /*
2087  * Read the data_version for inode.
2088  *
2089  * This value is computed using stripe object version on OST.
2090  * Version is computed using server side locking.
2091  *
2092  * @param flags if do sync on the OST side;
2093  *              0: no sync
2094  *              LL_DV_RD_FLUSH: flush dirty pages, LCK_PR on OSTs
2095  *              LL_DV_WR_FLUSH: drop all caching pages, LCK_PW on OSTs
2096  */
2097 int ll_data_version(struct inode *inode, __u64 *data_version, int flags)
2098 {
2099         struct cl_object *obj = ll_i2info(inode)->lli_clob;
2100         struct lu_env *env;
2101         struct cl_io *io;
2102         __u16  refcheck;
2103         int result;
2104
2105         ENTRY;
2106
2107         /* If no file object initialized, we consider its version is 0. */
2108         if (obj == NULL) {
2109                 *data_version = 0;
2110                 RETURN(0);
2111         }
2112
2113         env = cl_env_get(&refcheck);
2114         if (IS_ERR(env))
2115                 RETURN(PTR_ERR(env));
2116
2117         io = vvp_env_thread_io(env);
2118         io->ci_obj = obj;
2119         io->u.ci_data_version.dv_data_version = 0;
2120         io->u.ci_data_version.dv_flags = flags;
2121
2122 restart:
2123         if (cl_io_init(env, io, CIT_DATA_VERSION, io->ci_obj) == 0)
2124                 result = cl_io_loop(env, io);
2125         else
2126                 result = io->ci_result;
2127
2128         *data_version = io->u.ci_data_version.dv_data_version;
2129
2130         cl_io_fini(env, io);
2131
2132         if (unlikely(io->ci_need_restart))
2133                 goto restart;
2134
2135         cl_env_put(env, &refcheck);
2136
2137         RETURN(result);
2138 }
2139
2140 /*
2141  * Trigger a HSM release request for the provided inode.
2142  */
2143 int ll_hsm_release(struct inode *inode)
2144 {
2145         struct lu_env *env;
2146         struct obd_client_handle *och = NULL;
2147         __u64 data_version = 0;
2148         int rc;
2149         __u16 refcheck;
2150         ENTRY;
2151
2152         CDEBUG(D_INODE, "%s: Releasing file "DFID".\n",
2153                ll_get_fsname(inode->i_sb, NULL, 0),
2154                PFID(&ll_i2info(inode)->lli_fid));
2155
2156         och = ll_lease_open(inode, NULL, FMODE_WRITE, MDS_OPEN_RELEASE);
2157         if (IS_ERR(och))
2158                 GOTO(out, rc = PTR_ERR(och));
2159
2160         /* Grab latest data_version and [am]time values */
2161         rc = ll_data_version(inode, &data_version, LL_DV_WR_FLUSH);
2162         if (rc != 0)
2163                 GOTO(out, rc);
2164
2165         env = cl_env_get(&refcheck);
2166         if (IS_ERR(env))
2167                 GOTO(out, rc = PTR_ERR(env));
2168
2169         ll_merge_attr(env, inode);
2170         cl_env_put(env, &refcheck);
2171
2172         /* Release the file.
2173          * NB: lease lock handle is released in mdc_hsm_release_pack() because
2174          * we still need it to pack l_remote_handle to MDT. */
2175         rc = ll_close_inode_openhandle(inode, och, MDS_HSM_RELEASE,
2176                                        &data_version);
2177         och = NULL;
2178
2179         EXIT;
2180 out:
2181         if (och != NULL && !IS_ERR(och)) /* close the file */
2182                 ll_lease_close(och, inode, NULL);
2183
2184         return rc;
2185 }
2186
2187 struct ll_swap_stack {
2188         __u64                    dv1;
2189         __u64                    dv2;
2190         struct inode            *inode1;
2191         struct inode            *inode2;
2192         bool                     check_dv1;
2193         bool                     check_dv2;
2194 };
2195
2196 static int ll_swap_layouts(struct file *file1, struct file *file2,
2197                            struct lustre_swap_layouts *lsl)
2198 {
2199         struct mdc_swap_layouts  msl;
2200         struct md_op_data       *op_data;
2201         __u32                    gid;
2202         __u64                    dv;
2203         struct ll_swap_stack    *llss = NULL;
2204         int                      rc;
2205
2206         OBD_ALLOC_PTR(llss);
2207         if (llss == NULL)
2208                 RETURN(-ENOMEM);
2209
2210         llss->inode1 = file_inode(file1);
2211         llss->inode2 = file_inode(file2);
2212
2213         rc = ll_check_swap_layouts_validity(llss->inode1, llss->inode2);
2214         if (rc < 0)
2215                 GOTO(free, rc);
2216
2217         /* we use 2 bool because it is easier to swap than 2 bits */
2218         if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV1)
2219                 llss->check_dv1 = true;
2220
2221         if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV2)
2222                 llss->check_dv2 = true;
2223
2224         /* we cannot use lsl->sl_dvX directly because we may swap them */
2225         llss->dv1 = lsl->sl_dv1;
2226         llss->dv2 = lsl->sl_dv2;
2227
2228         rc = lu_fid_cmp(ll_inode2fid(llss->inode1), ll_inode2fid(llss->inode2));
2229         if (rc == 0) /* same file, done! */
2230                 GOTO(free, rc);
2231
2232         if (rc < 0) { /* sequentialize it */
2233                 swap(llss->inode1, llss->inode2);
2234                 swap(file1, file2);
2235                 swap(llss->dv1, llss->dv2);
2236                 swap(llss->check_dv1, llss->check_dv2);
2237         }
2238
2239         gid = lsl->sl_gid;
2240         if (gid != 0) { /* application asks to flush dirty cache */
2241                 rc = ll_get_grouplock(llss->inode1, file1, gid);
2242                 if (rc < 0)
2243                         GOTO(free, rc);
2244
2245                 rc = ll_get_grouplock(llss->inode2, file2, gid);
2246                 if (rc < 0) {
2247                         ll_put_grouplock(llss->inode1, file1, gid);
2248                         GOTO(free, rc);
2249                 }
2250         }
2251
2252         /* ultimate check, before swaping the layouts we check if
2253          * dataversion has changed (if requested) */
2254         if (llss->check_dv1) {
2255                 rc = ll_data_version(llss->inode1, &dv, 0);
2256                 if (rc)
2257                         GOTO(putgl, rc);
2258                 if (dv != llss->dv1)
2259                         GOTO(putgl, rc = -EAGAIN);
2260         }
2261
2262         if (llss->check_dv2) {
2263                 rc = ll_data_version(llss->inode2, &dv, 0);
2264                 if (rc)
2265                         GOTO(putgl, rc);
2266                 if (dv != llss->dv2)
2267                         GOTO(putgl, rc = -EAGAIN);
2268         }
2269
2270         /* struct md_op_data is used to send the swap args to the mdt
2271          * only flags is missing, so we use struct mdc_swap_layouts
2272          * through the md_op_data->op_data */
2273         /* flags from user space have to be converted before they are send to
2274          * server, no flag is sent today, they are only used on the client */
2275         msl.msl_flags = 0;
2276         rc = -ENOMEM;
2277         op_data = ll_prep_md_op_data(NULL, llss->inode1, llss->inode2, NULL, 0,
2278                                      0, LUSTRE_OPC_ANY, &msl);
2279         if (IS_ERR(op_data))
2280                 GOTO(free, rc = PTR_ERR(op_data));
2281
2282         rc = obd_iocontrol(LL_IOC_LOV_SWAP_LAYOUTS, ll_i2mdexp(llss->inode1),
2283                            sizeof(*op_data), op_data, NULL);
2284         ll_finish_md_op_data(op_data);
2285
2286         if (rc < 0)
2287                 GOTO(putgl, rc);
2288
2289 putgl:
2290         if (gid != 0) {
2291                 ll_put_grouplock(llss->inode2, file2, gid);
2292                 ll_put_grouplock(llss->inode1, file1, gid);
2293         }
2294
2295 free:
2296         if (llss != NULL)
2297                 OBD_FREE_PTR(llss);
2298
2299         RETURN(rc);
2300 }
2301
2302 int ll_hsm_state_set(struct inode *inode, struct hsm_state_set *hss)
2303 {
2304         struct md_op_data       *op_data;
2305         int                      rc;
2306         ENTRY;
2307
2308         /* Detect out-of range masks */
2309         if ((hss->hss_setmask | hss->hss_clearmask) & ~HSM_FLAGS_MASK)
2310                 RETURN(-EINVAL);
2311
2312         /* Non-root users are forbidden to set or clear flags which are
2313          * NOT defined in HSM_USER_MASK. */
2314         if (((hss->hss_setmask | hss->hss_clearmask) & ~HSM_USER_MASK) &&
2315             !cfs_capable(CFS_CAP_SYS_ADMIN))
2316                 RETURN(-EPERM);
2317
2318         /* Detect out-of range archive id */
2319         if ((hss->hss_valid & HSS_ARCHIVE_ID) &&
2320             (hss->hss_archive_id > LL_HSM_MAX_ARCHIVE))
2321                 RETURN(-EINVAL);
2322
2323         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2324                                      LUSTRE_OPC_ANY, hss);
2325         if (IS_ERR(op_data))
2326                 RETURN(PTR_ERR(op_data));
2327
2328         rc = obd_iocontrol(LL_IOC_HSM_STATE_SET, ll_i2mdexp(inode),
2329                            sizeof(*op_data), op_data, NULL);
2330
2331         ll_finish_md_op_data(op_data);
2332
2333         RETURN(rc);
2334 }
2335
2336 static int ll_hsm_import(struct inode *inode, struct file *file,
2337                          struct hsm_user_import *hui)
2338 {
2339         struct hsm_state_set    *hss = NULL;
2340         struct iattr            *attr = NULL;
2341         int                      rc;
2342         ENTRY;
2343
2344         if (!S_ISREG(inode->i_mode))
2345                 RETURN(-EINVAL);
2346
2347         /* set HSM flags */
2348         OBD_ALLOC_PTR(hss);
2349         if (hss == NULL)
2350                 GOTO(out, rc = -ENOMEM);
2351
2352         hss->hss_valid = HSS_SETMASK | HSS_ARCHIVE_ID;
2353         hss->hss_archive_id = hui->hui_archive_id;
2354         hss->hss_setmask = HS_ARCHIVED | HS_EXISTS | HS_RELEASED;
2355         rc = ll_hsm_state_set(inode, hss);
2356         if (rc != 0)
2357                 GOTO(out, rc);
2358
2359         OBD_ALLOC_PTR(attr);
2360         if (attr == NULL)
2361                 GOTO(out, rc = -ENOMEM);
2362
2363         attr->ia_mode = hui->hui_mode & (S_IRWXU | S_IRWXG | S_IRWXO);
2364         attr->ia_mode |= S_IFREG;
2365         attr->ia_uid = make_kuid(&init_user_ns, hui->hui_uid);
2366         attr->ia_gid = make_kgid(&init_user_ns, hui->hui_gid);
2367         attr->ia_size = hui->hui_size;
2368         attr->ia_mtime.tv_sec = hui->hui_mtime;
2369         attr->ia_mtime.tv_nsec = hui->hui_mtime_ns;
2370         attr->ia_atime.tv_sec = hui->hui_atime;
2371         attr->ia_atime.tv_nsec = hui->hui_atime_ns;
2372
2373         attr->ia_valid = ATTR_SIZE | ATTR_MODE | ATTR_FORCE |
2374                          ATTR_UID | ATTR_GID |
2375                          ATTR_MTIME | ATTR_MTIME_SET |
2376                          ATTR_ATIME | ATTR_ATIME_SET;
2377
2378         inode_lock(inode);
2379
2380         rc = ll_setattr_raw(file_dentry(file), attr, true);
2381         if (rc == -ENODATA)
2382                 rc = 0;
2383
2384         inode_unlock(inode);
2385
2386 out:
2387         if (hss != NULL)
2388                 OBD_FREE_PTR(hss);
2389
2390         if (attr != NULL)
2391                 OBD_FREE_PTR(attr);
2392
2393         RETURN(rc);
2394 }
2395
2396 static inline long ll_lease_type_from_fmode(fmode_t fmode)
2397 {
2398         return ((fmode & FMODE_READ) ? LL_LEASE_RDLCK : 0) |
2399                ((fmode & FMODE_WRITE) ? LL_LEASE_WRLCK : 0);
2400 }
2401
2402 static int ll_file_futimes_3(struct file *file, const struct ll_futimes_3 *lfu)
2403 {
2404         struct inode *inode = file_inode(file);
2405         struct iattr ia = {
2406                 .ia_valid = ATTR_ATIME | ATTR_ATIME_SET |
2407                             ATTR_MTIME | ATTR_MTIME_SET |
2408                             ATTR_CTIME | ATTR_CTIME_SET,
2409                 .ia_atime = {
2410                         .tv_sec = lfu->lfu_atime_sec,
2411                         .tv_nsec = lfu->lfu_atime_nsec,
2412                 },
2413                 .ia_mtime = {
2414                         .tv_sec = lfu->lfu_mtime_sec,
2415                         .tv_nsec = lfu->lfu_mtime_nsec,
2416                 },
2417                 .ia_ctime = {
2418                         .tv_sec = lfu->lfu_ctime_sec,
2419                         .tv_nsec = lfu->lfu_ctime_nsec,
2420                 },
2421         };
2422         int rc;
2423         ENTRY;
2424
2425         if (!capable(CAP_SYS_ADMIN))
2426                 RETURN(-EPERM);
2427
2428         if (!S_ISREG(inode->i_mode))
2429                 RETURN(-EINVAL);
2430
2431         inode_lock(inode);
2432         rc = ll_setattr_raw(file_dentry(file), &ia, false);
2433         inode_unlock(inode);
2434
2435         RETURN(rc);
2436 }
2437
2438 /*
2439  * Give file access advices
2440  *
2441  * The ladvise interface is similar to Linux fadvise() system call, except it
2442  * forwards the advices directly from Lustre client to server. The server side
2443  * codes will apply appropriate read-ahead and caching techniques for the
2444  * corresponding files.
2445  *
2446  * A typical workload for ladvise is e.g. a bunch of different clients are
2447  * doing small random reads of a file, so prefetching pages into OSS cache
2448  * with big linear reads before the random IO is a net benefit. Fetching
2449  * all that data into each client cache with fadvise() may not be, due to
2450  * much more data being sent to the client.
2451  */
2452 static int ll_ladvise(struct inode *inode, struct file *file, __u64 flags,
2453                       struct llapi_lu_ladvise *ladvise)
2454 {
2455         struct lu_env *env;
2456         struct cl_io *io;
2457         struct cl_ladvise_io *lio;
2458         int rc;
2459         __u16 refcheck;
2460         ENTRY;
2461
2462         env = cl_env_get(&refcheck);
2463         if (IS_ERR(env))
2464                 RETURN(PTR_ERR(env));
2465
2466         io = vvp_env_thread_io(env);
2467         io->ci_obj = ll_i2info(inode)->lli_clob;
2468
2469         /* initialize parameters for ladvise */
2470         lio = &io->u.ci_ladvise;
2471         lio->li_start = ladvise->lla_start;
2472         lio->li_end = ladvise->lla_end;
2473         lio->li_fid = ll_inode2fid(inode);
2474         lio->li_advice = ladvise->lla_advice;
2475         lio->li_flags = flags;
2476
2477         if (cl_io_init(env, io, CIT_LADVISE, io->ci_obj) == 0)
2478                 rc = cl_io_loop(env, io);
2479         else
2480                 rc = io->ci_result;
2481
2482         cl_io_fini(env, io);
2483         cl_env_put(env, &refcheck);
2484         RETURN(rc);
2485 }
2486
2487 int ll_ioctl_fsgetxattr(struct inode *inode, unsigned int cmd,
2488                         unsigned long arg)
2489 {
2490         struct fsxattr fsxattr;
2491
2492         if (copy_from_user(&fsxattr,
2493                            (const struct fsxattr __user *)arg,
2494                            sizeof(fsxattr)))
2495                 RETURN(-EFAULT);
2496
2497         fsxattr.fsx_projid = ll_i2info(inode)->lli_projid;
2498         if (copy_to_user((struct fsxattr __user *)arg,
2499                          &fsxattr, sizeof(fsxattr)))
2500                 RETURN(-EFAULT);
2501
2502         RETURN(0);
2503 }
2504
2505 int ll_ioctl_fssetxattr(struct inode *inode, unsigned int cmd,
2506                         unsigned long arg)
2507 {
2508
2509         struct md_op_data *op_data;
2510         struct ptlrpc_request *req = NULL;
2511         int rc = 0;
2512         struct fsxattr fsxattr;
2513
2514         /* only root could change project ID */
2515         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2516                 RETURN(-EPERM);
2517
2518         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2519                                      LUSTRE_OPC_ANY, NULL);
2520         if (IS_ERR(op_data))
2521                 RETURN(PTR_ERR(op_data));
2522
2523         if (copy_from_user(&fsxattr,
2524                            (const struct fsxattr __user *)arg,
2525                            sizeof(fsxattr)))
2526                 GOTO(out_fsxattr1, rc = -EFAULT);
2527
2528         op_data->op_projid = fsxattr.fsx_projid;
2529         op_data->op_attr.ia_valid |= MDS_ATTR_PROJID;
2530         rc = md_setattr(ll_i2sbi(inode)->ll_md_exp, op_data, NULL,
2531                         0, &req);
2532         ptlrpc_req_finished(req);
2533
2534 out_fsxattr1:
2535         ll_finish_md_op_data(op_data);
2536         RETURN(rc);
2537
2538
2539 }
2540
2541 static long
2542 ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
2543 {
2544         struct inode            *inode = file_inode(file);
2545         struct ll_file_data     *fd = LUSTRE_FPRIVATE(file);
2546         int                      flags, rc;
2547         ENTRY;
2548
2549         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), cmd=%x\n",
2550                PFID(ll_inode2fid(inode)), inode, cmd);
2551         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2552
2553         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2554         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2555                 RETURN(-ENOTTY);
2556
2557         switch(cmd) {
2558         case LL_IOC_GETFLAGS:
2559                 /* Get the current value of the file flags */
2560                 return put_user(fd->fd_flags, (int __user *)arg);
2561         case LL_IOC_SETFLAGS:
2562         case LL_IOC_CLRFLAGS:
2563                 /* Set or clear specific file flags */
2564                 /* XXX This probably needs checks to ensure the flags are
2565                  *     not abused, and to handle any flag side effects.
2566                  */
2567                 if (get_user(flags, (int __user *) arg))
2568                         RETURN(-EFAULT);
2569
2570                 if (cmd == LL_IOC_SETFLAGS) {
2571                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2572                             !(file->f_flags & O_DIRECT)) {
2573                                 CERROR("%s: unable to disable locking on "
2574                                        "non-O_DIRECT file\n", current->comm);
2575                                 RETURN(-EINVAL);
2576                         }
2577
2578                         fd->fd_flags |= flags;
2579                 } else {
2580                         fd->fd_flags &= ~flags;
2581                 }
2582                 RETURN(0);
2583         case LL_IOC_LOV_SETSTRIPE:
2584         case LL_IOC_LOV_SETSTRIPE_NEW:
2585                 RETURN(ll_lov_setstripe(inode, file, (void __user *)arg));
2586         case LL_IOC_LOV_SETEA:
2587                 RETURN(ll_lov_setea(inode, file, (void __user *)arg));
2588         case LL_IOC_LOV_SWAP_LAYOUTS: {
2589                 struct file *file2;
2590                 struct lustre_swap_layouts lsl;
2591
2592                 if (copy_from_user(&lsl, (char __user *)arg,
2593                                    sizeof(struct lustre_swap_layouts)))
2594                         RETURN(-EFAULT);
2595
2596                 if ((file->f_flags & O_ACCMODE) == O_RDONLY)
2597                         RETURN(-EPERM);
2598
2599                 file2 = fget(lsl.sl_fd);
2600                 if (file2 == NULL)
2601                         RETURN(-EBADF);
2602
2603                 /* O_WRONLY or O_RDWR */
2604                 if ((file2->f_flags & O_ACCMODE) == O_RDONLY)
2605                         GOTO(out, rc = -EPERM);
2606
2607                 if (lsl.sl_flags & SWAP_LAYOUTS_CLOSE) {
2608                         struct inode                    *inode2;
2609                         struct ll_inode_info            *lli;
2610                         struct obd_client_handle        *och = NULL;
2611
2612                         if (lsl.sl_flags != SWAP_LAYOUTS_CLOSE)
2613                                 GOTO(out, rc = -EINVAL);
2614
2615                         lli = ll_i2info(inode);
2616                         mutex_lock(&lli->lli_och_mutex);
2617                         if (fd->fd_lease_och != NULL) {
2618                                 och = fd->fd_lease_och;
2619                                 fd->fd_lease_och = NULL;
2620                         }
2621                         mutex_unlock(&lli->lli_och_mutex);
2622                         if (och == NULL)
2623                                 GOTO(out, rc = -ENOLCK);
2624                         inode2 = file_inode(file2);
2625                         rc = ll_swap_layouts_close(och, inode, inode2);
2626                 } else {
2627                         rc = ll_swap_layouts(file, file2, &lsl);
2628                 }
2629 out:
2630                 fput(file2);
2631                 RETURN(rc);
2632         }
2633         case LL_IOC_LOV_GETSTRIPE:
2634         case LL_IOC_LOV_GETSTRIPE_NEW:
2635                 RETURN(ll_file_getstripe(inode, (void __user *)arg, 0));
2636         case FSFILT_IOC_GETFLAGS:
2637         case FSFILT_IOC_SETFLAGS:
2638                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2639         case FSFILT_IOC_GETVERSION_OLD:
2640         case FSFILT_IOC_GETVERSION:
2641                 RETURN(put_user(inode->i_generation, (int __user *)arg));
2642         case LL_IOC_GROUP_LOCK:
2643                 RETURN(ll_get_grouplock(inode, file, arg));
2644         case LL_IOC_GROUP_UNLOCK:
2645                 RETURN(ll_put_grouplock(inode, file, arg));
2646         case IOC_OBD_STATFS:
2647                 RETURN(ll_obd_statfs(inode, (void __user *)arg));
2648
2649         /* We need to special case any other ioctls we want to handle,
2650          * to send them to the MDS/OST as appropriate and to properly
2651          * network encode the arg field.
2652         case FSFILT_IOC_SETVERSION_OLD:
2653         case FSFILT_IOC_SETVERSION:
2654         */
2655         case LL_IOC_FLUSHCTX:
2656                 RETURN(ll_flush_ctx(inode));
2657         case LL_IOC_PATH2FID: {
2658                 if (copy_to_user((void __user *)arg, ll_inode2fid(inode),
2659                                  sizeof(struct lu_fid)))
2660                         RETURN(-EFAULT);
2661
2662                 RETURN(0);
2663         }
2664         case LL_IOC_GETPARENT:
2665                 RETURN(ll_getparent(file, (struct getparent __user *)arg));
2666
2667         case OBD_IOC_FID2PATH:
2668                 RETURN(ll_fid2path(inode, (void __user *)arg));
2669         case LL_IOC_DATA_VERSION: {
2670                 struct ioc_data_version idv;
2671                 int rc;
2672
2673                 if (copy_from_user(&idv, (char __user *)arg, sizeof(idv)))
2674                         RETURN(-EFAULT);
2675
2676                 idv.idv_flags &= LL_DV_RD_FLUSH | LL_DV_WR_FLUSH;
2677                 rc = ll_data_version(inode, &idv.idv_version, idv.idv_flags);
2678
2679                 if (rc == 0 &&
2680                     copy_to_user((char __user *)arg, &idv, sizeof(idv)))
2681                         RETURN(-EFAULT);
2682
2683                 RETURN(rc);
2684         }
2685
2686         case LL_IOC_GET_MDTIDX: {
2687                 int mdtidx;
2688
2689                 mdtidx = ll_get_mdt_idx(inode);
2690                 if (mdtidx < 0)
2691                         RETURN(mdtidx);
2692
2693                 if (put_user((int)mdtidx, (int __user *)arg))
2694                         RETURN(-EFAULT);
2695
2696                 RETURN(0);
2697         }
2698         case OBD_IOC_GETDTNAME:
2699         case OBD_IOC_GETMDNAME:
2700                 RETURN(ll_get_obd_name(inode, cmd, arg));
2701         case LL_IOC_HSM_STATE_GET: {
2702                 struct md_op_data       *op_data;
2703                 struct hsm_user_state   *hus;
2704                 int                      rc;
2705
2706                 OBD_ALLOC_PTR(hus);
2707                 if (hus == NULL)
2708                         RETURN(-ENOMEM);
2709
2710                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2711                                              LUSTRE_OPC_ANY, hus);
2712                 if (IS_ERR(op_data)) {
2713                         OBD_FREE_PTR(hus);
2714                         RETURN(PTR_ERR(op_data));
2715                 }
2716
2717                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2718                                    op_data, NULL);
2719
2720                 if (copy_to_user((void __user *)arg, hus, sizeof(*hus)))
2721                         rc = -EFAULT;
2722
2723                 ll_finish_md_op_data(op_data);
2724                 OBD_FREE_PTR(hus);
2725                 RETURN(rc);
2726         }
2727         case LL_IOC_HSM_STATE_SET: {
2728                 struct hsm_state_set    *hss;
2729                 int                      rc;
2730
2731                 OBD_ALLOC_PTR(hss);
2732                 if (hss == NULL)
2733                         RETURN(-ENOMEM);
2734
2735                 if (copy_from_user(hss, (char __user *)arg, sizeof(*hss))) {
2736                         OBD_FREE_PTR(hss);
2737                         RETURN(-EFAULT);
2738                 }
2739
2740                 rc = ll_hsm_state_set(inode, hss);
2741
2742                 OBD_FREE_PTR(hss);
2743                 RETURN(rc);
2744         }
2745         case LL_IOC_HSM_ACTION: {
2746                 struct md_op_data               *op_data;
2747                 struct hsm_current_action       *hca;
2748                 int                              rc;
2749
2750                 OBD_ALLOC_PTR(hca);
2751                 if (hca == NULL)
2752                         RETURN(-ENOMEM);
2753
2754                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2755                                              LUSTRE_OPC_ANY, hca);
2756                 if (IS_ERR(op_data)) {
2757                         OBD_FREE_PTR(hca);
2758                         RETURN(PTR_ERR(op_data));
2759                 }
2760
2761                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2762                                    op_data, NULL);
2763
2764                 if (copy_to_user((char __user *)arg, hca, sizeof(*hca)))
2765                         rc = -EFAULT;
2766
2767                 ll_finish_md_op_data(op_data);
2768                 OBD_FREE_PTR(hca);
2769                 RETURN(rc);
2770         }
2771         case LL_IOC_SET_LEASE: {
2772                 struct ll_inode_info *lli = ll_i2info(inode);
2773                 struct obd_client_handle *och = NULL;
2774                 bool lease_broken;
2775                 fmode_t fmode;
2776
2777                 switch (arg) {
2778                 case LL_LEASE_WRLCK:
2779                         if (!(file->f_mode & FMODE_WRITE))
2780                                 RETURN(-EPERM);
2781                         fmode = FMODE_WRITE;
2782                         break;
2783                 case LL_LEASE_RDLCK:
2784                         if (!(file->f_mode & FMODE_READ))
2785                                 RETURN(-EPERM);
2786                         fmode = FMODE_READ;
2787                         break;
2788                 case LL_LEASE_UNLCK:
2789                         mutex_lock(&lli->lli_och_mutex);
2790                         if (fd->fd_lease_och != NULL) {
2791                                 och = fd->fd_lease_och;
2792                                 fd->fd_lease_och = NULL;
2793                         }
2794                         mutex_unlock(&lli->lli_och_mutex);
2795
2796                         if (och == NULL)
2797                                 RETURN(-ENOLCK);
2798
2799                         fmode = och->och_flags;
2800                         rc = ll_lease_close(och, inode, &lease_broken);
2801                         if (rc < 0)
2802                                 RETURN(rc);
2803
2804                         rc = ll_lease_och_release(inode, file);
2805                         if (rc < 0)
2806                                 RETURN(rc);
2807
2808                         if (lease_broken)
2809                                 fmode = 0;
2810
2811                         RETURN(ll_lease_type_from_fmode(fmode));
2812                 default:
2813                         RETURN(-EINVAL);
2814                 }
2815
2816                 CDEBUG(D_INODE, "Set lease with mode %u\n", fmode);
2817
2818                 /* apply for lease */
2819                 och = ll_lease_open(inode, file, fmode, 0);
2820                 if (IS_ERR(och))
2821                         RETURN(PTR_ERR(och));
2822
2823                 rc = 0;
2824                 mutex_lock(&lli->lli_och_mutex);
2825                 if (fd->fd_lease_och == NULL) {
2826                         fd->fd_lease_och = och;
2827                         och = NULL;
2828                 }
2829                 mutex_unlock(&lli->lli_och_mutex);
2830                 if (och != NULL) {
2831                         /* impossible now that only excl is supported for now */
2832                         ll_lease_close(och, inode, &lease_broken);
2833                         rc = -EBUSY;
2834                 }
2835                 RETURN(rc);
2836         }
2837         case LL_IOC_GET_LEASE: {
2838                 struct ll_inode_info *lli = ll_i2info(inode);
2839                 struct ldlm_lock *lock = NULL;
2840                 fmode_t fmode = 0;
2841
2842                 mutex_lock(&lli->lli_och_mutex);
2843                 if (fd->fd_lease_och != NULL) {
2844                         struct obd_client_handle *och = fd->fd_lease_och;
2845
2846                         lock = ldlm_handle2lock(&och->och_lease_handle);
2847                         if (lock != NULL) {
2848                                 lock_res_and_lock(lock);
2849                                 if (!ldlm_is_cancel(lock))
2850                                         fmode = och->och_flags;
2851
2852                                 unlock_res_and_lock(lock);
2853                                 LDLM_LOCK_PUT(lock);
2854                         }
2855                 }
2856                 mutex_unlock(&lli->lli_och_mutex);
2857
2858                 RETURN(ll_lease_type_from_fmode(fmode));
2859         }
2860         case LL_IOC_HSM_IMPORT: {
2861                 struct hsm_user_import *hui;
2862
2863                 OBD_ALLOC_PTR(hui);
2864                 if (hui == NULL)
2865                         RETURN(-ENOMEM);
2866
2867                 if (copy_from_user(hui, (void __user *)arg, sizeof(*hui))) {
2868                         OBD_FREE_PTR(hui);
2869                         RETURN(-EFAULT);
2870                 }
2871
2872                 rc = ll_hsm_import(inode, file, hui);
2873
2874                 OBD_FREE_PTR(hui);
2875                 RETURN(rc);
2876         }
2877         case LL_IOC_FUTIMES_3: {
2878                 struct ll_futimes_3 lfu;
2879
2880                 if (copy_from_user(&lfu,
2881                                    (const struct ll_futimes_3 __user *)arg,
2882                                    sizeof(lfu)))
2883                         RETURN(-EFAULT);
2884
2885                 RETURN(ll_file_futimes_3(file, &lfu));
2886         }
2887         case LL_IOC_LADVISE: {
2888                 struct llapi_ladvise_hdr *ladvise_hdr;
2889                 int i;
2890                 int num_advise;
2891                 int alloc_size = sizeof(*ladvise_hdr);
2892
2893                 rc = 0;
2894                 OBD_ALLOC_PTR(ladvise_hdr);
2895                 if (ladvise_hdr == NULL)
2896                         RETURN(-ENOMEM);
2897
2898                 if (copy_from_user(ladvise_hdr,
2899                                    (const struct llapi_ladvise_hdr __user *)arg,
2900                                    alloc_size))
2901                         GOTO(out_ladvise, rc = -EFAULT);
2902
2903                 if (ladvise_hdr->lah_magic != LADVISE_MAGIC ||
2904                     ladvise_hdr->lah_count < 1)
2905                         GOTO(out_ladvise, rc = -EINVAL);
2906
2907                 num_advise = ladvise_hdr->lah_count;
2908                 if (num_advise >= LAH_COUNT_MAX)
2909                         GOTO(out_ladvise, rc = -EFBIG);
2910
2911                 OBD_FREE_PTR(ladvise_hdr);
2912                 alloc_size = offsetof(typeof(*ladvise_hdr),
2913                                       lah_advise[num_advise]);
2914                 OBD_ALLOC(ladvise_hdr, alloc_size);
2915                 if (ladvise_hdr == NULL)
2916                         RETURN(-ENOMEM);
2917
2918                 /*
2919                  * TODO: submit multiple advices to one server in a single RPC
2920                  */
2921                 if (copy_from_user(ladvise_hdr,
2922                                    (const struct llapi_ladvise_hdr __user *)arg,
2923                                    alloc_size))
2924                         GOTO(out_ladvise, rc = -EFAULT);
2925
2926                 for (i = 0; i < num_advise; i++) {
2927                         rc = ll_ladvise(inode, file, ladvise_hdr->lah_flags,
2928                                         &ladvise_hdr->lah_advise[i]);
2929                         if (rc)
2930                                 break;
2931                 }
2932
2933 out_ladvise:
2934                 OBD_FREE(ladvise_hdr, alloc_size);
2935                 RETURN(rc);
2936         }
2937         case LL_IOC_FSGETXATTR:
2938                 RETURN(ll_ioctl_fsgetxattr(inode, cmd, arg));
2939         case LL_IOC_FSSETXATTR:
2940                 RETURN(ll_ioctl_fssetxattr(inode, cmd, arg));
2941         default:
2942                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2943                                      (void __user *)arg));
2944         }
2945 }
2946
2947 #ifndef HAVE_FILE_LLSEEK_SIZE
2948 static inline loff_t
2949 llseek_execute(struct file *file, loff_t offset, loff_t maxsize)
2950 {
2951         if (offset < 0 && !(file->f_mode & FMODE_UNSIGNED_OFFSET))
2952                 return -EINVAL;
2953         if (offset > maxsize)
2954                 return -EINVAL;
2955
2956         if (offset != file->f_pos) {
2957                 file->f_pos = offset;
2958                 file->f_version = 0;
2959         }
2960         return offset;
2961 }
2962
2963 static loff_t
2964 generic_file_llseek_size(struct file *file, loff_t offset, int origin,
2965                 loff_t maxsize, loff_t eof)
2966 {
2967         struct inode *inode = file_inode(file);
2968
2969         switch (origin) {
2970         case SEEK_END:
2971                 offset += eof;
2972                 break;
2973         case SEEK_CUR:
2974                 /*
2975                  * Here we special-case the lseek(fd, 0, SEEK_CUR)
2976                  * position-querying operation.  Avoid rewriting the "same"
2977                  * f_pos value back to the file because a concurrent read(),
2978                  * write() or lseek() might have altered it
2979                  */
2980                 if (offset == 0)
2981                         return file->f_pos;
2982                 /*
2983                  * f_lock protects against read/modify/write race with other
2984                  * SEEK_CURs. Note that parallel writes and reads behave
2985                  * like SEEK_SET.
2986                  */
2987                 inode_lock(inode);
2988                 offset = llseek_execute(file, file->f_pos + offset, maxsize);
2989                 inode_unlock(inode);
2990                 return offset;
2991         case SEEK_DATA:
2992                 /*
2993                  * In the generic case the entire file is data, so as long as
2994                  * offset isn't at the end of the file then the offset is data.
2995                  */
2996                 if (offset >= eof)
2997                         return -ENXIO;
2998                 break;
2999         case SEEK_HOLE:
3000                 /*
3001                  * There is a virtual hole at the end of the file, so as long as
3002                  * offset isn't i_size or larger, return i_size.
3003                  */
3004                 if (offset >= eof)
3005                         return -ENXIO;
3006                 offset = eof;
3007                 break;
3008         }
3009
3010         return llseek_execute(file, offset, maxsize);
3011 }
3012 #endif
3013
3014 static loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
3015 {
3016         struct inode *inode = file_inode(file);
3017         loff_t retval, eof = 0;
3018
3019         ENTRY;
3020         retval = offset + ((origin == SEEK_END) ? i_size_read(inode) :
3021                            (origin == SEEK_CUR) ? file->f_pos : 0);
3022         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), to=%llu=%#llx(%d)\n",
3023                PFID(ll_inode2fid(inode)), inode, retval, retval,
3024                origin);
3025         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
3026
3027         if (origin == SEEK_END || origin == SEEK_HOLE || origin == SEEK_DATA) {
3028                 retval = ll_glimpse_size(inode);
3029                 if (retval != 0)
3030                         RETURN(retval);
3031                 eof = i_size_read(inode);
3032         }
3033
3034         retval = ll_generic_file_llseek_size(file, offset, origin,
3035                                           ll_file_maxbytes(inode), eof);
3036         RETURN(retval);
3037 }
3038
3039 static int ll_flush(struct file *file, fl_owner_t id)
3040 {
3041         struct inode *inode = file_inode(file);
3042         struct ll_inode_info *lli = ll_i2info(inode);
3043         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
3044         int rc, err;
3045
3046         LASSERT(!S_ISDIR(inode->i_mode));
3047
3048         /* catch async errors that were recorded back when async writeback
3049          * failed for pages in this mapping. */
3050         rc = lli->lli_async_rc;
3051         lli->lli_async_rc = 0;
3052         if (lli->lli_clob != NULL) {
3053                 err = lov_read_and_clear_async_rc(lli->lli_clob);
3054                 if (rc == 0)
3055                         rc = err;
3056         }
3057
3058         /* The application has been told write failure already.
3059          * Do not report failure again. */
3060         if (fd->fd_write_failed)
3061                 return 0;
3062         return rc ? -EIO : 0;
3063 }
3064
3065 /**
3066  * Called to make sure a portion of file has been written out.
3067  * if @mode is not CL_FSYNC_LOCAL, it will send OST_SYNC RPCs to OST.
3068  *
3069  * Return how many pages have been written.
3070  */
3071 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
3072                        enum cl_fsync_mode mode, int ignore_layout)
3073 {
3074         struct lu_env *env;
3075         struct cl_io *io;
3076         struct cl_fsync_io *fio;
3077         int result;
3078         __u16 refcheck;
3079         ENTRY;
3080
3081         if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
3082             mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
3083                 RETURN(-EINVAL);
3084
3085         env = cl_env_get(&refcheck);
3086         if (IS_ERR(env))
3087                 RETURN(PTR_ERR(env));
3088
3089         io = vvp_env_thread_io(env);
3090         io->ci_obj = ll_i2info(inode)->lli_clob;
3091         io->ci_ignore_layout = ignore_layout;
3092
3093         /* initialize parameters for sync */
3094         fio = &io->u.ci_fsync;
3095         fio->fi_start = start;
3096         fio->fi_end = end;
3097         fio->fi_fid = ll_inode2fid(inode);
3098         fio->fi_mode = mode;
3099         fio->fi_nr_written = 0;
3100
3101         if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
3102                 result = cl_io_loop(env, io);
3103         else
3104                 result = io->ci_result;
3105         if (result == 0)
3106                 result = fio->fi_nr_written;
3107         cl_io_fini(env, io);
3108         cl_env_put(env, &refcheck);
3109
3110         RETURN(result);
3111 }
3112
3113 /*
3114  * When dentry is provided (the 'else' case), file_dentry() may be
3115  * null and dentry must be used directly rather than pulled from
3116  * file_dentry() as is done otherwise.
3117  */
3118
3119 #ifdef HAVE_FILE_FSYNC_4ARGS
3120 int ll_fsync(struct file *file, loff_t start, loff_t end, int datasync)
3121 {
3122         struct dentry *dentry = file_dentry(file);
3123         bool lock_inode;
3124 #elif defined(HAVE_FILE_FSYNC_2ARGS)
3125 int ll_fsync(struct file *file, int datasync)
3126 {
3127         struct dentry *dentry = file_dentry(file);
3128         loff_t start = 0;
3129         loff_t end = LLONG_MAX;
3130 #else
3131 int ll_fsync(struct file *file, struct dentry *dentry, int datasync)
3132 {
3133         loff_t start = 0;
3134         loff_t end = LLONG_MAX;
3135 #endif
3136         struct inode *inode = dentry->d_inode;
3137         struct ll_inode_info *lli = ll_i2info(inode);
3138         struct ptlrpc_request *req;
3139         int rc, err;
3140         ENTRY;
3141
3142         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p)\n",
3143                PFID(ll_inode2fid(inode)), inode);
3144         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
3145
3146 #ifdef HAVE_FILE_FSYNC_4ARGS
3147         rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
3148         lock_inode = !lli->lli_inode_locked;
3149         if (lock_inode)
3150                 inode_lock(inode);
3151 #else
3152         /* fsync's caller has already called _fdata{sync,write}, we want
3153          * that IO to finish before calling the osc and mdc sync methods */
3154         rc = filemap_fdatawait(inode->i_mapping);
3155 #endif
3156
3157         /* catch async errors that were recorded back when async writeback
3158          * failed for pages in this mapping. */
3159         if (!S_ISDIR(inode->i_mode)) {
3160                 err = lli->lli_async_rc;
3161                 lli->lli_async_rc = 0;
3162                 if (rc == 0)
3163                         rc = err;
3164                 if (lli->lli_clob != NULL) {
3165                         err = lov_read_and_clear_async_rc(lli->lli_clob);
3166                         if (rc == 0)
3167                                 rc = err;
3168                 }
3169         }
3170
3171         err = md_fsync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), &req);
3172         if (!rc)
3173                 rc = err;
3174         if (!err)
3175                 ptlrpc_req_finished(req);
3176
3177         if (S_ISREG(inode->i_mode)) {
3178                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
3179
3180                 err = cl_sync_file_range(inode, start, end, CL_FSYNC_ALL, 0);
3181                 if (rc == 0 && err < 0)
3182                         rc = err;
3183                 if (rc < 0)
3184                         fd->fd_write_failed = true;
3185                 else
3186                         fd->fd_write_failed = false;
3187         }
3188
3189 #ifdef HAVE_FILE_FSYNC_4ARGS
3190         if (lock_inode)
3191                 inode_unlock(inode);
3192 #endif
3193         RETURN(rc);
3194 }
3195
3196 static int
3197 ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
3198 {
3199         struct inode *inode = file_inode(file);
3200         struct ll_sb_info *sbi = ll_i2sbi(inode);
3201         struct ldlm_enqueue_info einfo = {
3202                 .ei_type        = LDLM_FLOCK,
3203                 .ei_cb_cp       = ldlm_flock_completion_ast,
3204                 .ei_cbdata      = file_lock,
3205         };
3206         struct md_op_data *op_data;
3207         struct lustre_handle lockh = { 0 };
3208         union ldlm_policy_data flock = { { 0 } };
3209         int fl_type = file_lock->fl_type;
3210         __u64 flags = 0;
3211         int rc;
3212         int rc2 = 0;
3213         ENTRY;
3214
3215         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID" file_lock=%p\n",
3216                PFID(ll_inode2fid(inode)), file_lock);
3217
3218         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
3219
3220         if (file_lock->fl_flags & FL_FLOCK) {
3221                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
3222                 /* flocks are whole-file locks */
3223                 flock.l_flock.end = OFFSET_MAX;
3224                 /* For flocks owner is determined by the local file desctiptor*/
3225                 flock.l_flock.owner = (unsigned long)file_lock->fl_file;
3226         } else if (file_lock->fl_flags & FL_POSIX) {
3227                 flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
3228                 flock.l_flock.start = file_lock->fl_start;
3229                 flock.l_flock.end = file_lock->fl_end;
3230         } else {
3231                 RETURN(-EINVAL);
3232         }
3233         flock.l_flock.pid = file_lock->fl_pid;
3234
3235         /* Somewhat ugly workaround for svc lockd.
3236          * lockd installs custom fl_lmops->lm_compare_owner that checks
3237          * for the fl_owner to be the same (which it always is on local node
3238          * I guess between lockd processes) and then compares pid.
3239          * As such we assign pid to the owner field to make it all work,
3240          * conflict with normal locks is unlikely since pid space and
3241          * pointer space for current->files are not intersecting */
3242         if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
3243                 flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
3244
3245         switch (fl_type) {
3246         case F_RDLCK:
3247                 einfo.ei_mode = LCK_PR;
3248                 break;
3249         case F_UNLCK:
3250                 /* An unlock request may or may not have any relation to
3251                  * existing locks so we may not be able to pass a lock handle
3252                  * via a normal ldlm_lock_cancel() request. The request may even
3253                  * unlock a byte range in the middle of an existing lock. In
3254                  * order to process an unlock request we need all of the same
3255                  * information that is given with a normal read or write record
3256                  * lock request. To avoid creating another ldlm unlock (cancel)
3257                  * message we'll treat a LCK_NL flock request as an unlock. */
3258                 einfo.ei_mode = LCK_NL;
3259                 break;
3260         case F_WRLCK:
3261                 einfo.ei_mode = LCK_PW;
3262                 break;
3263         default:
3264                 CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n", fl_type);
3265                 RETURN (-ENOTSUPP);
3266         }
3267
3268         switch (cmd) {
3269         case F_SETLKW:
3270 #ifdef F_SETLKW64
3271         case F_SETLKW64:
3272 #endif
3273                 flags = 0;
3274                 break;
3275         case F_SETLK:
3276 #ifdef F_SETLK64
3277         case F_SETLK64:
3278 #endif
3279                 flags = LDLM_FL_BLOCK_NOWAIT;
3280                 break;
3281         case F_GETLK:
3282 #ifdef F_GETLK64
3283         case F_GETLK64:
3284 #endif
3285                 flags = LDLM_FL_TEST_LOCK;
3286                 break;
3287         default:
3288                 CERROR("unknown fcntl lock command: %d\n", cmd);
3289                 RETURN (-EINVAL);
3290         }
3291
3292         /* Save the old mode so that if the mode in the lock changes we
3293          * can decrement the appropriate reader or writer refcount. */
3294         file_lock->fl_type = einfo.ei_mode;
3295
3296         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
3297                                      LUSTRE_OPC_ANY, NULL);
3298         if (IS_ERR(op_data))
3299                 RETURN(PTR_ERR(op_data));
3300
3301         CDEBUG(D_DLMTRACE, "inode="DFID", pid=%u, flags=%#llx, mode=%u, "
3302                "start=%llu, end=%llu\n", PFID(ll_inode2fid(inode)),
3303                flock.l_flock.pid, flags, einfo.ei_mode,
3304                flock.l_flock.start, flock.l_flock.end);
3305
3306         rc = md_enqueue(sbi->ll_md_exp, &einfo, &flock, op_data, &lockh,
3307                         flags);
3308
3309         /* Restore the file lock type if not TEST lock. */
3310         if (!(flags & LDLM_FL_TEST_LOCK))
3311                 file_lock->fl_type = fl_type;
3312
3313 #ifdef HAVE_LOCKS_LOCK_FILE_WAIT
3314         if ((rc == 0 || file_lock->fl_type == F_UNLCK) &&
3315             !(flags & LDLM_FL_TEST_LOCK))
3316                 rc2  = locks_lock_file_wait(file, file_lock);
3317 #else
3318         if ((file_lock->fl_flags & FL_FLOCK) &&
3319             (rc == 0 || file_lock->fl_type == F_UNLCK))
3320                 rc2  = flock_lock_file_wait(file, file_lock);
3321         if ((file_lock->fl_flags & FL_POSIX) &&
3322             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
3323             !(flags & LDLM_FL_TEST_LOCK))
3324                 rc2  = posix_lock_file_wait(file, file_lock);
3325 #endif /* HAVE_LOCKS_LOCK_FILE_WAIT */
3326
3327         if (rc2 && file_lock->fl_type != F_UNLCK) {
3328                 einfo.ei_mode = LCK_NL;
3329                 md_enqueue(sbi->ll_md_exp, &einfo, &flock, op_data,
3330                            &lockh, flags);
3331                 rc = rc2;
3332         }
3333
3334         ll_finish_md_op_data(op_data);
3335
3336         RETURN(rc);
3337 }
3338
3339 int ll_get_fid_by_name(struct inode *parent, const char *name,
3340                        int namelen, struct lu_fid *fid,
3341                        struct inode **inode)
3342 {
3343         struct md_op_data       *op_data = NULL;
3344         struct mdt_body         *body;
3345         struct ptlrpc_request   *req;
3346         int                     rc;
3347         ENTRY;
3348
3349         op_data = ll_prep_md_op_data(NULL, parent, NULL, name, namelen, 0,
3350                                      LUSTRE_OPC_ANY, NULL);
3351         if (IS_ERR(op_data))
3352                 RETURN(PTR_ERR(op_data));
3353
3354         op_data->op_valid = OBD_MD_FLID | OBD_MD_FLTYPE;
3355         rc = md_getattr_name(ll_i2sbi(parent)->ll_md_exp, op_data, &req);
3356         ll_finish_md_op_data(op_data);
3357         if (rc < 0)
3358                 RETURN(rc);
3359
3360         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
3361         if (body == NULL)
3362                 GOTO(out_req, rc = -EFAULT);
3363         if (fid != NULL)
3364                 *fid = body->mbo_fid1;
3365
3366         if (inode != NULL)
3367                 rc = ll_prep_inode(inode, req, parent->i_sb, NULL);
3368 out_req:
3369         ptlrpc_req_finished(req);
3370         RETURN(rc);
3371 }
3372
3373 int ll_migrate(struct inode *parent, struct file *file, int mdtidx,
3374                const char *name, int namelen)
3375 {
3376         struct dentry         *dchild = NULL;
3377         struct inode          *child_inode = NULL;
3378         struct md_op_data     *op_data;
3379         struct ptlrpc_request *request = NULL;
3380         struct obd_client_handle *och = NULL;
3381         struct qstr           qstr;
3382         struct mdt_body         *body;
3383         int                    rc;
3384         __u64                   data_version = 0;
3385         ENTRY;
3386
3387         CDEBUG(D_VFSTRACE, "migrate %s under "DFID" to MDT%04x\n",
3388                name, PFID(ll_inode2fid(parent)), mdtidx);
3389
3390         op_data = ll_prep_md_op_data(NULL, parent, NULL, name, namelen,
3391                                      0, LUSTRE_OPC_ANY, NULL);
3392         if (IS_ERR(op_data))
3393                 RETURN(PTR_ERR(op_data));
3394
3395         /* Get child FID first */
3396         qstr.hash = ll_full_name_hash(file_dentry(file), name, namelen);
3397         qstr.name = name;
3398         qstr.len = namelen;
3399         dchild = d_lookup(file_dentry(file), &qstr);
3400         if (dchild != NULL) {
3401                 if (dchild->d_inode != NULL)
3402                         child_inode = igrab(dchild->d_inode);
3403                 dput(dchild);
3404         }
3405
3406         if (child_inode == NULL) {
3407                 rc = ll_get_fid_by_name(parent, name, namelen,
3408                                         &op_data->op_fid3, &child_inode);
3409                 if (rc != 0)
3410                         GOTO(out_free, rc);
3411         }
3412
3413         if (child_inode == NULL)
3414                 GOTO(out_free, rc = -EINVAL);
3415
3416         /*
3417          * lfs migrate command needs to be blocked on the client
3418          * by checking the migrate FID against the FID of the
3419          * filesystem root.
3420          */
3421         if (child_inode == parent->i_sb->s_root->d_inode)
3422                 GOTO(out_iput, rc = -EINVAL);
3423
3424         inode_lock(child_inode);
3425         op_data->op_fid3 = *ll_inode2fid(child_inode);
3426         if (!fid_is_sane(&op_data->op_fid3)) {
3427                 CERROR("%s: migrate %s, but FID "DFID" is insane\n",
3428                        ll_get_fsname(parent->i_sb, NULL, 0), name,
3429                        PFID(&op_data->op_fid3));
3430                 GOTO(out_unlock, rc = -EINVAL);
3431         }
3432
3433         rc = ll_get_mdt_idx_by_fid(ll_i2sbi(parent), &op_data->op_fid3);
3434         if (rc < 0)
3435                 GOTO(out_unlock, rc);
3436
3437         if (rc == mdtidx) {
3438                 CDEBUG(D_INFO, "%s: "DFID" is already on MDT%04x\n", name,
3439                        PFID(&op_data->op_fid3), mdtidx);
3440                 GOTO(out_unlock, rc = 0);
3441         }
3442 again:
3443         if (S_ISREG(child_inode->i_mode)) {
3444                 och = ll_lease_open(child_inode, NULL, FMODE_WRITE, 0);
3445                 if (IS_ERR(och)) {
3446                         rc = PTR_ERR(och);
3447                         och = NULL;
3448                         GOTO(out_unlock, rc);
3449                 }
3450
3451                 rc = ll_data_version(child_inode, &data_version,
3452                                      LL_DV_WR_FLUSH);
3453                 if (rc != 0)
3454                         GOTO(out_close, rc);
3455
3456                 op_data->op_handle = och->och_fh;
3457                 op_data->op_data = och->och_mod;
3458                 op_data->op_data_version = data_version;
3459                 op_data->op_lease_handle = och->och_lease_handle;
3460                 op_data->op_bias |= MDS_RENAME_MIGRATE;
3461         }
3462
3463         op_data->op_mds = mdtidx;
3464         op_data->op_cli_flags = CLI_MIGRATE;
3465         rc = md_rename(ll_i2sbi(parent)->ll_md_exp, op_data, name,
3466                        namelen, name, namelen, &request);
3467         if (rc == 0) {
3468                 LASSERT(request != NULL);
3469                 ll_update_times(request, parent);
3470
3471                 body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
3472                 LASSERT(body != NULL);
3473
3474                 /* If the server does release layout lock, then we cleanup
3475                  * the client och here, otherwise release it in out_close: */
3476                 if (och != NULL &&
3477                     body->mbo_valid & OBD_MD_CLOSE_INTENT_EXECED) {
3478                         obd_mod_put(och->och_mod);
3479                         md_clear_open_replay_data(ll_i2sbi(parent)->ll_md_exp,
3480                                                   och);
3481                         och->och_fh.cookie = DEAD_HANDLE_MAGIC;
3482                         OBD_FREE_PTR(och);
3483                         och = NULL;
3484                 }
3485         }
3486
3487         if (request != NULL) {
3488                 ptlrpc_req_finished(request);
3489                 request = NULL;
3490         }
3491
3492         /* Try again if the file layout has changed. */
3493         if (rc == -EAGAIN && S_ISREG(child_inode->i_mode))
3494                 goto again;
3495
3496 out_close:
3497         if (och != NULL) /* close the file */
3498                 ll_lease_close(och, child_inode, NULL);
3499         if (rc == 0)
3500                 clear_nlink(child_inode);
3501 out_unlock:
3502         inode_unlock(child_inode);
3503 out_iput:
3504         iput(child_inode);
3505 out_free:
3506         ll_finish_md_op_data(op_data);
3507         RETURN(rc);
3508 }
3509
3510 static int
3511 ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
3512 {
3513         ENTRY;
3514
3515         RETURN(-ENOSYS);
3516 }
3517
3518 /**
3519  * test if some locks matching bits and l_req_mode are acquired
3520  * - bits can be in different locks
3521  * - if found clear the common lock bits in *bits
3522  * - the bits not found, are kept in *bits
3523  * \param inode [IN]
3524  * \param bits [IN] searched lock bits [IN]
3525  * \param l_req_mode [IN] searched lock mode
3526  * \retval boolean, true iff all bits are found
3527  */
3528 int ll_have_md_lock(struct inode *inode, __u64 *bits, enum ldlm_mode l_req_mode)
3529 {
3530         struct lustre_handle lockh;
3531         union ldlm_policy_data policy;
3532         enum ldlm_mode mode = (l_req_mode == LCK_MINMODE) ?
3533                               (LCK_CR | LCK_CW | LCK_PR | LCK_PW) : l_req_mode;
3534         struct lu_fid *fid;
3535         __u64 flags;
3536         int i;
3537         ENTRY;
3538
3539         if (!inode)
3540                RETURN(0);
3541
3542         fid = &ll_i2info(inode)->lli_fid;
3543         CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
3544                ldlm_lockname[mode]);
3545
3546         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
3547         for (i = 0; i <= MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
3548                 policy.l_inodebits.bits = *bits & (1 << i);
3549                 if (policy.l_inodebits.bits == 0)
3550                         continue;
3551
3552                 if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
3553                                   &policy, mode, &lockh)) {
3554                         struct ldlm_lock *lock;
3555
3556                         lock = ldlm_handle2lock(&lockh);
3557                         if (lock) {
3558                                 *bits &=
3559                                       ~(lock->l_policy_data.l_inodebits.bits);
3560                                 LDLM_LOCK_PUT(lock);
3561                         } else {
3562                                 *bits &= ~policy.l_inodebits.bits;
3563                         }
3564                 }
3565         }
3566         RETURN(*bits == 0);
3567 }
3568
3569 enum ldlm_mode ll_take_md_lock(struct inode *inode, __u64 bits,
3570                                struct lustre_handle *lockh, __u64 flags,
3571                                enum ldlm_mode mode)
3572 {
3573         union ldlm_policy_data policy = { .l_inodebits = { bits } };
3574         struct lu_fid *fid;
3575         enum ldlm_mode rc;
3576         ENTRY;
3577
3578         fid = &ll_i2info(inode)->lli_fid;
3579         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
3580
3581         rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
3582                            fid, LDLM_IBITS, &policy, mode, lockh);
3583
3584         RETURN(rc);
3585 }
3586
3587 static int ll_inode_revalidate_fini(struct inode *inode, int rc)
3588 {
3589         /* Already unlinked. Just update nlink and return success */
3590         if (rc == -ENOENT) {
3591                 clear_nlink(inode);
3592                 /* If it is striped directory, and there is bad stripe
3593                  * Let's revalidate the dentry again, instead of returning
3594                  * error */
3595                 if (S_ISDIR(inode->i_mode) &&
3596                     ll_i2info(inode)->lli_lsm_md != NULL)
3597                         return 0;
3598
3599                 /* This path cannot be hit for regular files unless in
3600                  * case of obscure races, so no need to to validate
3601                  * size. */
3602                 if (!S_ISREG(inode->i_mode) && !S_ISDIR(inode->i_mode))
3603                         return 0;
3604         } else if (rc != 0) {
3605                 CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
3606                              "%s: revalidate FID "DFID" error: rc = %d\n",
3607                              ll_get_fsname(inode->i_sb, NULL, 0),
3608                              PFID(ll_inode2fid(inode)), rc);
3609         }
3610
3611         return rc;
3612 }
3613
3614 static int __ll_inode_revalidate(struct dentry *dentry, __u64 ibits)
3615 {
3616         struct inode *inode = dentry->d_inode;
3617         struct ptlrpc_request *req = NULL;
3618         struct obd_export *exp;
3619         int rc = 0;
3620         ENTRY;
3621
3622         LASSERT(inode != NULL);
3623
3624         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p),name=%s\n",
3625                PFID(ll_inode2fid(inode)), inode, dentry->d_name.name);
3626
3627         exp = ll_i2mdexp(inode);
3628
3629         /* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
3630          *      But under CMD case, it caused some lock issues, should be fixed
3631          *      with new CMD ibits lock. See bug 12718 */
3632         if (exp_connect_flags(exp) & OBD_CONNECT_ATTRFID) {
3633                 struct lookup_intent oit = { .it_op = IT_GETATTR };
3634                 struct md_op_data *op_data;
3635
3636                 if (ibits == MDS_INODELOCK_LOOKUP)
3637                         oit.it_op = IT_LOOKUP;
3638
3639                 /* Call getattr by fid, so do not provide name at all. */
3640                 op_data = ll_prep_md_op_data(NULL, dentry->d_inode,
3641                                              dentry->d_inode, NULL, 0, 0,
3642                                              LUSTRE_OPC_ANY, NULL);
3643                 if (IS_ERR(op_data))
3644                         RETURN(PTR_ERR(op_data));
3645
3646                 rc = md_intent_lock(exp, op_data, &oit, &req,
3647                                     &ll_md_blocking_ast, 0);
3648                 ll_finish_md_op_data(op_data);
3649                 if (rc < 0) {
3650                         rc = ll_inode_revalidate_fini(inode, rc);
3651                         GOTO (out, rc);
3652                 }
3653
3654                 rc = ll_revalidate_it_finish(req, &oit, dentry);
3655                 if (rc != 0) {
3656                         ll_intent_release(&oit);
3657                         GOTO(out, rc);
3658                 }
3659
3660                 /* Unlinked? Unhash dentry, so it is not picked up later by
3661                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
3662                    here to preserve get_cwd functionality on 2.6.
3663                    Bug 10503 */
3664                 if (!dentry->d_inode->i_nlink) {
3665                         ll_lock_dcache(inode);
3666                         d_lustre_invalidate(dentry, 0);
3667                         ll_unlock_dcache(inode);
3668                 }
3669
3670                 ll_lookup_finish_locks(&oit, dentry);
3671         } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
3672                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
3673                 u64 valid = OBD_MD_FLGETATTR;
3674                 struct md_op_data *op_data;
3675                 int ealen = 0;
3676
3677                 if (S_ISREG(inode->i_mode)) {
3678                         rc = ll_get_default_mdsize(sbi, &ealen);
3679                         if (rc)
3680                                 RETURN(rc);
3681                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
3682                 }
3683
3684                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
3685                                              0, ealen, LUSTRE_OPC_ANY,
3686                                              NULL);
3687                 if (IS_ERR(op_data))
3688                         RETURN(PTR_ERR(op_data));
3689
3690                 op_data->op_valid = valid;
3691                 rc = md_getattr(sbi->ll_md_exp, op_data, &req);
3692                 ll_finish_md_op_data(op_data);
3693                 if (rc) {
3694                         rc = ll_inode_revalidate_fini(inode, rc);
3695                         RETURN(rc);
3696                 }
3697
3698                 rc = ll_prep_inode(&inode, req, NULL, NULL);
3699         }
3700 out:
3701         ptlrpc_req_finished(req);
3702         return rc;
3703 }
3704
3705 static int ll_merge_md_attr(struct inode *inode)
3706 {
3707         struct cl_attr attr = { 0 };
3708         int rc;
3709
3710         LASSERT(ll_i2info(inode)->lli_lsm_md != NULL);
3711         rc = md_merge_attr(ll_i2mdexp(inode), ll_i2info(inode)->lli_lsm_md,
3712                            &attr, ll_md_blocking_ast);
3713         if (rc != 0)
3714                 RETURN(rc);
3715
3716         set_nlink(inode, attr.cat_nlink);
3717         inode->i_blocks = attr.cat_blocks;
3718         i_size_write(inode, attr.cat_size);
3719
3720         ll_i2info(inode)->lli_atime = attr.cat_atime;
3721         ll_i2info(inode)->lli_mtime = attr.cat_mtime;
3722         ll_i2info(inode)->lli_ctime = attr.cat_ctime;
3723
3724         RETURN(0);
3725 }
3726
3727 static int
3728 ll_inode_revalidate(struct dentry *dentry, __u64 ibits)
3729 {
3730         struct inode    *inode = dentry->d_inode;
3731         int              rc;
3732         ENTRY;
3733
3734         rc = __ll_inode_revalidate(dentry, ibits);
3735         if (rc != 0)
3736                 RETURN(rc);
3737
3738         /* if object isn't regular file, don't validate size */
3739         if (!S_ISREG(inode->i_mode)) {
3740                 if (S_ISDIR(inode->i_mode) &&
3741                     ll_i2info(inode)->lli_lsm_md != NULL) {
3742                         rc = ll_merge_md_attr(inode);
3743                         if (rc != 0)
3744                                 RETURN(rc);
3745                 }
3746
3747                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_atime;
3748                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_mtime;
3749                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_ctime;
3750         } else {
3751                 /* In case of restore, the MDT has the right size and has
3752                  * already send it back without granting the layout lock,
3753                  * inode is up-to-date so glimpse is useless.
3754                  * Also to glimpse we need the layout, in case of a running
3755                  * restore the MDT holds the layout lock so the glimpse will
3756                  * block up to the end of restore (getattr will block)
3757                  */
3758                 if (!ll_file_test_flag(ll_i2info(inode), LLIF_FILE_RESTORING))
3759                         rc = ll_glimpse_size(inode);
3760         }
3761         RETURN(rc);
3762 }
3763
3764 static inline dev_t ll_compat_encode_dev(dev_t dev)
3765 {
3766         /* The compat_sys_*stat*() syscalls will fail unless the
3767          * device majors and minors are both less than 256. Note that
3768          * the value returned here will be passed through
3769          * old_encode_dev() in cp_compat_stat(). And so we are not
3770          * trying to return a valid compat (u16) device number, just
3771          * one that will pass the old_valid_dev() check. */
3772
3773         return MKDEV(MAJOR(dev) & 0xff, MINOR(dev) & 0xff);
3774 }
3775
3776 #ifdef HAVE_INODEOPS_ENHANCED_GETATTR
3777 int ll_getattr(const struct path *path, struct kstat *stat,
3778                u32 request_mask, unsigned int flags)
3779
3780 {
3781         struct dentry *de = path->dentry;
3782 #else
3783 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3784 {
3785 #endif
3786         struct inode *inode = de->d_inode;
3787         struct ll_sb_info *sbi = ll_i2sbi(inode);
3788         struct ll_inode_info *lli = ll_i2info(inode);
3789         int res = 0;
3790
3791         res = ll_inode_revalidate(de, MDS_INODELOCK_UPDATE |
3792                                       MDS_INODELOCK_LOOKUP);
3793         ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
3794
3795         if (res)
3796                 return res;
3797
3798         OBD_FAIL_TIMEOUT(OBD_FAIL_GETATTR_DELAY, 30);
3799
3800         if (ll_need_32bit_api(sbi)) {
3801                 stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
3802                 stat->dev = ll_compat_encode_dev(inode->i_sb->s_dev);
3803                 stat->rdev = ll_compat_encode_dev(inode->i_rdev);
3804         } else {
3805                 stat->ino = inode->i_ino;
3806                 stat->dev = inode->i_sb->s_dev;
3807                 stat->rdev = inode->i_rdev;
3808         }
3809
3810         stat->mode = inode->i_mode;
3811         stat->uid = inode->i_uid;
3812         stat->gid = inode->i_gid;
3813         stat->atime = inode->i_atime;
3814         stat->mtime = inode->i_mtime;
3815         stat->ctime = inode->i_ctime;
3816         stat->blksize = sbi->ll_stat_blksize ?: 1 << inode->i_blkbits;
3817
3818         stat->nlink = inode->i_nlink;
3819         stat->size = i_size_read(inode);
3820         stat->blocks = inode->i_blocks;
3821
3822         return 0;
3823 }
3824
3825 static int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
3826                      __u64 start, __u64 len)
3827 {
3828         int             rc;
3829         size_t          num_bytes;
3830         struct fiemap   *fiemap;
3831         unsigned int    extent_count = fieinfo->fi_extents_max;
3832
3833         num_bytes = sizeof(*fiemap) + (extent_count *
3834                                        sizeof(struct fiemap_extent));
3835         OBD_ALLOC_LARGE(fiemap, num_bytes);
3836
3837         if (fiemap == NULL)
3838                 RETURN(-ENOMEM);
3839
3840         fiemap->fm_flags = fieinfo->fi_flags;
3841         fiemap->fm_extent_count = fieinfo->fi_extents_max;
3842         fiemap->fm_start = start;
3843         fiemap->fm_length = len;
3844         if (extent_count > 0 &&
3845             copy_from_user(&fiemap->fm_extents[0], fieinfo->fi_extents_start,
3846                            sizeof(struct fiemap_extent)) != 0)
3847                 GOTO(out, rc = -EFAULT);
3848
3849         rc = ll_do_fiemap(inode, fiemap, num_bytes);
3850
3851         fieinfo->fi_flags = fiemap->fm_flags;
3852         fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
3853         if (extent_count > 0 &&
3854             copy_to_user(fieinfo->fi_extents_start, &fiemap->fm_extents[0],
3855                          fiemap->fm_mapped_extents *
3856                          sizeof(struct fiemap_extent)) != 0)
3857                 GOTO(out, rc = -EFAULT);
3858 out:
3859         OBD_FREE_LARGE(fiemap, num_bytes);
3860         return rc;
3861 }
3862
3863 struct posix_acl *ll_get_acl(struct inode *inode, int type)
3864 {
3865         struct ll_inode_info *lli = ll_i2info(inode);
3866         struct posix_acl *acl = NULL;
3867         ENTRY;
3868
3869         spin_lock(&lli->lli_lock);
3870         /* VFS' acl_permission_check->check_acl will release the refcount */
3871         acl = posix_acl_dup(lli->lli_posix_acl);
3872         spin_unlock(&lli->lli_lock);
3873
3874         RETURN(acl);
3875 }
3876
3877 #ifdef HAVE_IOP_SET_ACL
3878 #ifdef CONFIG_FS_POSIX_ACL
3879 int ll_set_acl(struct inode *inode, struct posix_acl *acl, int type)
3880 {
3881         const char *name = NULL;
3882         char *value = NULL;
3883         size_t size = 0;
3884         int rc = 0;
3885         ENTRY;
3886
3887         switch (type) {
3888         case ACL_TYPE_ACCESS:
3889                 if (acl) {
3890                         rc = posix_acl_update_mode(inode, &inode->i_mode, &acl);
3891                         if (rc)
3892                                 GOTO(out, rc);
3893                 }
3894                 name = XATTR_NAME_POSIX_ACL_ACCESS;
3895                 break;
3896         case ACL_TYPE_DEFAULT:
3897                 if (!S_ISDIR(inode->i_mode))
3898                         GOTO(out, rc = acl ? -EACCES : 0);
3899                 name = XATTR_NAME_POSIX_ACL_DEFAULT;
3900                 break;
3901         default:
3902                 GOTO(out, rc = -EINVAL);
3903         }
3904
3905         if (acl) {
3906                 size = posix_acl_xattr_size(acl->a_count);
3907                 value = kmalloc(size, GFP_NOFS);
3908                 if (value == NULL)
3909                         GOTO(out, rc = -ENOMEM);
3910
3911                 rc = posix_acl_to_xattr(&init_user_ns, acl, value, size);
3912                 if (rc < 0)
3913                         GOTO(out_free, rc);
3914         }
3915
3916         /* dentry is only used for *.lov attributes so it's safe to be NULL */
3917         rc = __vfs_setxattr(NULL, inode, name, value, size, XATTR_CREATE);
3918 out_free:
3919         kfree(value);
3920 out:
3921         if (!rc)
3922                 set_cached_acl(inode, type, acl);
3923         else
3924                 forget_cached_acl(inode, type);
3925         RETURN(rc);
3926 }
3927 #endif /* CONFIG_FS_POSIX_ACL */
3928 #endif /* HAVE_IOP_SET_ACL */
3929
3930 #ifndef HAVE_GENERIC_PERMISSION_2ARGS
3931 static int
3932 # ifdef HAVE_GENERIC_PERMISSION_4ARGS
3933 ll_check_acl(struct inode *inode, int mask, unsigned int flags)
3934 # else
3935 ll_check_acl(struct inode *inode, int mask)
3936 # endif
3937 {
3938 # ifdef CONFIG_FS_POSIX_ACL
3939         struct posix_acl *acl;
3940         int rc;
3941         ENTRY;
3942
3943 #  ifdef HAVE_GENERIC_PERMISSION_4ARGS
3944         if (flags & IPERM_FLAG_RCU)
3945                 return -ECHILD;
3946 #  endif
3947         acl = ll_get_acl(inode, ACL_TYPE_ACCESS);
3948
3949         if (!acl)
3950                 RETURN(-EAGAIN);
3951
3952         rc = posix_acl_permission(inode, acl, mask);
3953         posix_acl_release(acl);
3954
3955         RETURN(rc);
3956 # else /* !CONFIG_FS_POSIX_ACL */
3957         return -EAGAIN;
3958 # endif /* CONFIG_FS_POSIX_ACL */
3959 }
3960 #endif /* HAVE_GENERIC_PERMISSION_2ARGS */
3961
3962 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
3963 int ll_inode_permission(struct inode *inode, int mask, unsigned int flags)
3964 #else
3965 # ifdef HAVE_INODE_PERMISION_2ARGS
3966 int ll_inode_permission(struct inode *inode, int mask)
3967 # else
3968 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3969 # endif
3970 #endif
3971 {
3972         int rc = 0;
3973         struct ll_sb_info *sbi;
3974         struct root_squash_info *squash;
3975         struct cred *cred = NULL;
3976         const struct cred *old_cred = NULL;
3977         cfs_cap_t cap;
3978         bool squash_id = false;
3979         ENTRY;
3980
3981 #ifdef MAY_NOT_BLOCK
3982         if (mask & MAY_NOT_BLOCK)
3983                 return -ECHILD;
3984 #elif defined(HAVE_GENERIC_PERMISSION_4ARGS)
3985         if (flags & IPERM_FLAG_RCU)
3986                 return -ECHILD;
3987 #endif
3988
3989        /* as root inode are NOT getting validated in lookup operation,
3990         * need to do it before permission check. */
3991
3992         if (inode == inode->i_sb->s_root->d_inode) {
3993                 rc = __ll_inode_revalidate(inode->i_sb->s_root,
3994                                            MDS_INODELOCK_LOOKUP);
3995                 if (rc)
3996                         RETURN(rc);
3997         }
3998
3999         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), inode mode %x mask %o\n",
4000                PFID(ll_inode2fid(inode)), inode, inode->i_mode, mask);
4001
4002         /* squash fsuid/fsgid if needed */
4003         sbi = ll_i2sbi(inode);
4004         squash = &sbi->ll_squash;
4005         if (unlikely(squash->rsi_uid != 0 &&
4006                      uid_eq(current_fsuid(), GLOBAL_ROOT_UID) &&
4007                      !(sbi->ll_flags & LL_SBI_NOROOTSQUASH))) {
4008                         squash_id = true;
4009         }
4010         if (squash_id) {
4011                 CDEBUG(D_OTHER, "squash creds (%d:%d)=>(%d:%d)\n",
4012                        __kuid_val(current_fsuid()), __kgid_val(current_fsgid()),
4013                        squash->rsi_uid, squash->rsi_gid);
4014
4015                 /* update current process's credentials
4016                  * and FS capability */
4017                 cred = prepare_creds();
4018                 if (cred == NULL)
4019                         RETURN(-ENOMEM);
4020
4021                 cred->fsuid = make_kuid(&init_user_ns, squash->rsi_uid);
4022                 cred->fsgid = make_kgid(&init_user_ns, squash->rsi_gid);
4023                 for (cap = 0; cap < sizeof(cfs_cap_t) * 8; cap++) {
4024                         if ((1 << cap) & CFS_CAP_FS_MASK)
4025                                 cap_lower(cred->cap_effective, cap);
4026                 }
4027                 old_cred = override_creds(cred);
4028         }
4029
4030         ll_stats_ops_tally(sbi, LPROC_LL_INODE_PERM, 1);
4031         rc = ll_generic_permission(inode, mask, flags, ll_check_acl);
4032         /* restore current process's credentials and FS capability */
4033         if (squash_id) {
4034                 revert_creds(old_cred);
4035                 put_cred(cred);
4036         }
4037
4038         RETURN(rc);
4039 }
4040
4041 /* -o localflock - only provides locally consistent flock locks */
4042 struct file_operations ll_file_operations = {
4043 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
4044 # ifdef HAVE_SYNC_READ_WRITE
4045         .read           = new_sync_read,
4046         .write          = new_sync_write,
4047 # endif
4048         .read_iter      = ll_file_read_iter,
4049         .write_iter     = ll_file_write_iter,
4050 #else /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
4051         .read           = ll_file_read,
4052         .aio_read       = ll_file_aio_read,
4053         .write          = ll_file_write,
4054         .aio_write      = ll_file_aio_write,
4055 #endif /* HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
4056         .unlocked_ioctl = ll_file_ioctl,
4057         .open           = ll_file_open,
4058         .release        = ll_file_release,
4059         .mmap           = ll_file_mmap,
4060         .llseek         = ll_file_seek,
4061         .splice_read    = ll_file_splice_read,
4062         .fsync          = ll_fsync,
4063         .flush          = ll_flush
4064 };
4065
4066 struct file_operations ll_file_operations_flock = {
4067 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
4068 # ifdef HAVE_SYNC_READ_WRITE
4069         .read           = new_sync_read,
4070         .write          = new_sync_write,
4071 # endif /* HAVE_SYNC_READ_WRITE */
4072         .read_iter      = ll_file_read_iter,
4073         .write_iter     = ll_file_write_iter,
4074 #else /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
4075         .read           = ll_file_read,
4076         .aio_read       = ll_file_aio_read,
4077         .write          = ll_file_write,
4078         .aio_write      = ll_file_aio_write,
4079 #endif /* HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
4080         .unlocked_ioctl = ll_file_ioctl,
4081         .open           = ll_file_open,
4082         .release        = ll_file_release,
4083         .mmap           = ll_file_mmap,
4084         .llseek         = ll_file_seek,
4085         .splice_read    = ll_file_splice_read,
4086         .fsync          = ll_fsync,
4087         .flush          = ll_flush,
4088         .flock          = ll_file_flock,
4089         .lock           = ll_file_flock
4090 };
4091
4092 /* These are for -o noflock - to return ENOSYS on flock calls */
4093 struct file_operations ll_file_operations_noflock = {
4094 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
4095 # ifdef HAVE_SYNC_READ_WRITE
4096         .read           = new_sync_read,
4097         .write          = new_sync_write,
4098 # endif /* HAVE_SYNC_READ_WRITE */
4099         .read_iter      = ll_file_read_iter,
4100         .write_iter     = ll_file_write_iter,
4101 #else /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
4102         .read           = ll_file_read,
4103         .aio_read       = ll_file_aio_read,
4104         .write          = ll_file_write,
4105         .aio_write      = ll_file_aio_write,
4106 #endif /* HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
4107         .unlocked_ioctl = ll_file_ioctl,
4108         .open           = ll_file_open,
4109         .release        = ll_file_release,
4110         .mmap           = ll_file_mmap,
4111         .llseek         = ll_file_seek,
4112         .splice_read    = ll_file_splice_read,
4113         .fsync          = ll_fsync,
4114         .flush          = ll_flush,
4115         .flock          = ll_file_noflock,
4116         .lock           = ll_file_noflock
4117 };
4118
4119 struct inode_operations ll_file_inode_operations = {
4120         .setattr        = ll_setattr,
4121         .getattr        = ll_getattr,
4122         .permission     = ll_inode_permission,
4123 #ifdef HAVE_IOP_XATTR
4124         .setxattr       = ll_setxattr,
4125         .getxattr       = ll_getxattr,
4126         .removexattr    = ll_removexattr,
4127 #endif
4128         .listxattr      = ll_listxattr,
4129         .fiemap         = ll_fiemap,
4130 #ifdef HAVE_IOP_GET_ACL
4131         .get_acl        = ll_get_acl,
4132 #endif
4133 #ifdef HAVE_IOP_SET_ACL
4134         .set_acl        = ll_set_acl,
4135 #endif
4136 };
4137
4138 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
4139 {
4140         struct ll_inode_info *lli = ll_i2info(inode);
4141         struct cl_object *obj = lli->lli_clob;
4142         struct lu_env *env;
4143         int rc;
4144         __u16 refcheck;
4145         ENTRY;
4146
4147         if (obj == NULL)
4148                 RETURN(0);
4149
4150         env = cl_env_get(&refcheck);
4151         if (IS_ERR(env))
4152                 RETURN(PTR_ERR(env));
4153
4154         rc = cl_conf_set(env, lli->lli_clob, conf);
4155         if (rc < 0)
4156                 GOTO(out, rc);
4157
4158         if (conf->coc_opc == OBJECT_CONF_SET) {
4159                 struct ldlm_lock *lock = conf->coc_lock;
4160                 struct cl_layout cl = {
4161                         .cl_layout_gen = 0,
4162                 };
4163
4164                 LASSERT(lock != NULL);
4165                 LASSERT(ldlm_has_layout(lock));
4166
4167                 /* it can only be allowed to match after layout is
4168                  * applied to inode otherwise false layout would be
4169                  * seen. Applying layout shoud happen before dropping
4170                  * the intent lock. */
4171                 ldlm_lock_allow_match(lock);
4172
4173                 rc = cl_object_layout_get(env, obj, &cl);
4174                 if (rc < 0)
4175                         GOTO(out, rc);
4176
4177                 CDEBUG(D_VFSTRACE,
4178                        DFID": layout version change: %u -> %u\n",
4179                        PFID(&lli->lli_fid), ll_layout_version_get(lli),
4180                        cl.cl_layout_gen);
4181                 ll_layout_version_set(lli, cl.cl_layout_gen);
4182         }
4183
4184 out:
4185         cl_env_put(env, &refcheck);
4186
4187         RETURN(rc);
4188 }
4189
4190 /* Fetch layout from MDT with getxattr request, if it's not ready yet */
4191 static int ll_layout_fetch(struct inode *inode, struct ldlm_lock *lock)
4192
4193 {
4194         struct ll_sb_info *sbi = ll_i2sbi(inode);
4195         struct ptlrpc_request *req;
4196         struct mdt_body *body;
4197         void *lvbdata;
4198         void *lmm;
4199         int lmmsize;
4200         int rc;
4201         ENTRY;
4202
4203         CDEBUG(D_INODE, DFID" LVB_READY=%d l_lvb_data=%p l_lvb_len=%d\n",
4204                PFID(ll_inode2fid(inode)), ldlm_is_lvb_ready(lock),
4205                lock->l_lvb_data, lock->l_lvb_len);
4206
4207         if (lock->l_lvb_data != NULL)
4208                 RETURN(0);
4209
4210         /* if layout lock was granted right away, the layout is returned
4211          * within DLM_LVB of dlm reply; otherwise if the lock was ever
4212          * blocked and then granted via completion ast, we have to fetch
4213          * layout here. Please note that we can't use the LVB buffer in
4214          * completion AST because it doesn't have a large enough buffer */
4215         rc = ll_get_default_mdsize(sbi, &lmmsize);
4216         if (rc == 0)
4217                 rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode),
4218                                 OBD_MD_FLXATTR, XATTR_NAME_LOV, NULL, 0,
4219                                 lmmsize, 0, &req);
4220         if (rc < 0)
4221                 RETURN(rc);
4222
4223         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
4224         if (body == NULL)
4225                 GOTO(out, rc = -EPROTO);
4226
4227         lmmsize = body->mbo_eadatasize;
4228         if (lmmsize == 0) /* empty layout */
4229                 GOTO(out, rc = 0);
4230
4231         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA, lmmsize);
4232         if (lmm == NULL)
4233                 GOTO(out, rc = -EFAULT);
4234
4235         OBD_ALLOC_LARGE(lvbdata, lmmsize);
4236         if (lvbdata == NULL)
4237                 GOTO(out, rc = -ENOMEM);
4238
4239         memcpy(lvbdata, lmm, lmmsize);
4240         lock_res_and_lock(lock);
4241         if (unlikely(lock->l_lvb_data == NULL)) {
4242                 lock->l_lvb_type = LVB_T_LAYOUT;
4243                 lock->l_lvb_data = lvbdata;
4244                 lock->l_lvb_len = lmmsize;
4245                 lvbdata = NULL;
4246         }
4247         unlock_res_and_lock(lock);
4248
4249         if (lvbdata)
4250                 OBD_FREE_LARGE(lvbdata, lmmsize);
4251
4252         EXIT;
4253
4254 out:
4255         ptlrpc_req_finished(req);
4256         return rc;
4257 }
4258
4259 /**
4260  * Apply the layout to the inode. Layout lock is held and will be released
4261  * in this function.
4262  */
4263 static int ll_layout_lock_set(struct lustre_handle *lockh, enum ldlm_mode mode,
4264                               struct inode *inode)
4265 {
4266         struct ll_inode_info *lli = ll_i2info(inode);
4267         struct ll_sb_info    *sbi = ll_i2sbi(inode);
4268         struct ldlm_lock *lock;
4269         struct cl_object_conf conf;
4270         int rc = 0;
4271         bool lvb_ready;
4272         bool wait_layout = false;
4273         ENTRY;
4274
4275         LASSERT(lustre_handle_is_used(lockh));
4276
4277         lock = ldlm_handle2lock(lockh);
4278         LASSERT(lock != NULL);
4279         LASSERT(ldlm_has_layout(lock));
4280
4281         LDLM_DEBUG(lock, "file "DFID"(%p) being reconfigured",
4282                    PFID(&lli->lli_fid), inode);
4283
4284         /* in case this is a caching lock and reinstate with new inode */
4285         md_set_lock_data(sbi->ll_md_exp, lockh, inode, NULL);
4286
4287         lock_res_and_lock(lock);
4288         lvb_ready = ldlm_is_lvb_ready(lock);
4289         unlock_res_and_lock(lock);
4290
4291         /* checking lvb_ready is racy but this is okay. The worst case is
4292          * that multi processes may configure the file on the same time. */
4293         if (lvb_ready)
4294                 GOTO(out, rc = 0);
4295
4296         rc = ll_layout_fetch(inode, lock);
4297         if (rc < 0)
4298                 GOTO(out, rc);
4299
4300         /* for layout lock, lmm is stored in lock's lvb.
4301          * lvb_data is immutable if the lock is held so it's safe to access it
4302          * without res lock.
4303          *
4304          * set layout to file. Unlikely this will fail as old layout was
4305          * surely eliminated */
4306         memset(&conf, 0, sizeof conf);
4307         conf.coc_opc = OBJECT_CONF_SET;
4308         conf.coc_inode = inode;
4309         conf.coc_lock = lock;
4310         conf.u.coc_layout.lb_buf = lock->l_lvb_data;
4311         conf.u.coc_layout.lb_len = lock->l_lvb_len;
4312         rc = ll_layout_conf(inode, &conf);
4313
4314         /* refresh layout failed, need to wait */
4315         wait_layout = rc == -EBUSY;
4316         EXIT;
4317 out:
4318         LDLM_LOCK_PUT(lock);
4319         ldlm_lock_decref(lockh, mode);
4320
4321         /* wait for IO to complete if it's still being used. */
4322         if (wait_layout) {
4323                 CDEBUG(D_INODE, "%s: "DFID"(%p) wait for layout reconf\n",
4324                        ll_get_fsname(inode->i_sb, NULL, 0),
4325                        PFID(&lli->lli_fid), inode);
4326
4327                 memset(&conf, 0, sizeof conf);
4328                 conf.coc_opc = OBJECT_CONF_WAIT;
4329                 conf.coc_inode = inode;
4330                 rc = ll_layout_conf(inode, &conf);
4331                 if (rc == 0)
4332                         rc = -EAGAIN;
4333
4334                 CDEBUG(D_INODE, "%s file="DFID" waiting layout return: %d\n",
4335                        ll_get_fsname(inode->i_sb, NULL, 0),
4336                        PFID(&lli->lli_fid), rc);
4337         }
4338         RETURN(rc);
4339 }
4340
4341 /**
4342  * Issue layout intent RPC to MDS.
4343  * \param inode [in]    file inode
4344  * \param intent [in]   layout intent
4345  *
4346  * \retval 0    on success
4347  * \retval < 0  error code
4348  */
4349 static int ll_layout_intent(struct inode *inode, struct layout_intent *intent)
4350 {
4351         struct ll_inode_info  *lli = ll_i2info(inode);
4352         struct ll_sb_info     *sbi = ll_i2sbi(inode);
4353         struct md_op_data     *op_data;
4354         struct lookup_intent it;
4355         struct ptlrpc_request *req;
4356         int rc;
4357         ENTRY;
4358
4359         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
4360                                      0, 0, LUSTRE_OPC_ANY, NULL);
4361         if (IS_ERR(op_data))
4362                 RETURN(PTR_ERR(op_data));
4363
4364         op_data->op_data = intent;
4365         op_data->op_data_size = sizeof(*intent);
4366
4367         memset(&it, 0, sizeof(it));
4368         it.it_op = IT_LAYOUT;
4369         if (intent->li_opc == LAYOUT_INTENT_WRITE ||
4370             intent->li_opc == LAYOUT_INTENT_TRUNC)
4371                 it.it_flags = FMODE_WRITE;
4372
4373         LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file "DFID"(%p)",
4374                           ll_get_fsname(inode->i_sb, NULL, 0),
4375                           PFID(&lli->lli_fid), inode);
4376
4377         rc = md_intent_lock(sbi->ll_md_exp, op_data, &it, &req,
4378                             &ll_md_blocking_ast, 0);
4379         if (it.it_request != NULL)
4380                 ptlrpc_req_finished(it.it_request);
4381         it.it_request = NULL;
4382
4383         ll_finish_md_op_data(op_data);
4384
4385         /* set lock data in case this is a new lock */
4386         if (!rc)
4387                 ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
4388
4389         ll_intent_drop_lock(&it);
4390
4391         RETURN(rc);
4392 }
4393
4394 /**
4395  * This function checks if there exists a LAYOUT lock on the client side,
4396  * or enqueues it if it doesn't have one in cache.
4397  *
4398  * This function will not hold layout lock so it may be revoked any time after
4399  * this function returns. Any operations depend on layout should be redone
4400  * in that case.
4401  *
4402  * This function should be called before lov_io_init() to get an uptodate
4403  * layout version, the caller should save the version number and after IO
4404  * is finished, this function should be called again to verify that layout
4405  * is not changed during IO time.
4406  */
4407 int ll_layout_refresh(struct inode *inode, __u32 *gen)
4408 {
4409         struct ll_inode_info    *lli = ll_i2info(inode);
4410         struct ll_sb_info       *sbi = ll_i2sbi(inode);
4411         struct lustre_handle lockh;
4412         struct layout_intent intent = {
4413                 .li_opc = LAYOUT_INTENT_ACCESS,
4414         };
4415         enum ldlm_mode mode;
4416         int rc;
4417         ENTRY;
4418
4419         *gen = ll_layout_version_get(lli);
4420         if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK) || *gen != CL_LAYOUT_GEN_NONE)
4421                 RETURN(0);
4422
4423         /* sanity checks */
4424         LASSERT(fid_is_sane(ll_inode2fid(inode)));
4425         LASSERT(S_ISREG(inode->i_mode));
4426
4427         /* take layout lock mutex to enqueue layout lock exclusively. */
4428         mutex_lock(&lli->lli_layout_mutex);
4429
4430         while (1) {
4431                 /* mostly layout lock is caching on the local side, so try to
4432                  * match it before grabbing layout lock mutex. */
4433                 mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0,
4434                                        LCK_CR | LCK_CW | LCK_PR | LCK_PW);
4435                 if (mode != 0) { /* hit cached lock */
4436                         rc = ll_layout_lock_set(&lockh, mode, inode);
4437                         if (rc == -EAGAIN)
4438                                 continue;
4439                         break;
4440                 }
4441
4442                 rc = ll_layout_intent(inode, &intent);
4443                 if (rc != 0)
4444                         break;
4445         }
4446
4447         if (rc == 0)
4448                 *gen = ll_layout_version_get(lli);
4449         mutex_unlock(&lli->lli_layout_mutex);
4450
4451         RETURN(rc);
4452 }
4453
4454 /**
4455  * Issue layout intent RPC indicating where in a file an IO is about to write.
4456  *
4457  * \param[in] inode     file inode.
4458  * \param[in] start     start offset of fille in bytes where an IO is about to
4459  *                      write.
4460  * \param[in] end       exclusive end offset in bytes of the write range.
4461  *
4462  * \retval 0    on success
4463  * \retval < 0  error code
4464  */
4465 int ll_layout_write_intent(struct inode *inode, __u64 start, __u64 end)
4466 {
4467         struct layout_intent intent = {
4468                 .li_opc = LAYOUT_INTENT_WRITE,
4469                 .li_start = start,
4470                 .li_end = end,
4471         };
4472         int rc;
4473         ENTRY;
4474
4475         rc = ll_layout_intent(inode, &intent);
4476
4477         RETURN(rc);
4478 }
4479
4480 /**
4481  *  This function send a restore request to the MDT
4482  */
4483 int ll_layout_restore(struct inode *inode, loff_t offset, __u64 length)
4484 {
4485         struct hsm_user_request *hur;
4486         int                      len, rc;
4487         ENTRY;
4488
4489         len = sizeof(struct hsm_user_request) +
4490               sizeof(struct hsm_user_item);
4491         OBD_ALLOC(hur, len);
4492         if (hur == NULL)
4493                 RETURN(-ENOMEM);
4494
4495         hur->hur_request.hr_action = HUA_RESTORE;
4496         hur->hur_request.hr_archive_id = 0;
4497         hur->hur_request.hr_flags = 0;
4498         memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid,
4499                sizeof(hur->hur_user_item[0].hui_fid));
4500         hur->hur_user_item[0].hui_extent.offset = offset;
4501         hur->hur_user_item[0].hui_extent.length = length;
4502         hur->hur_request.hr_itemcount = 1;
4503         rc = obd_iocontrol(LL_IOC_HSM_REQUEST, ll_i2sbi(inode)->ll_md_exp,
4504                            len, hur, NULL);
4505         OBD_FREE(hur, len);
4506         RETURN(rc);
4507 }