Whamcloud - gitweb
ca7dd438a897f87e8b2dfe01f705ceb837c9faa4
[fs/lustre-release.git] / lustre / llite / file.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <linux/pagemap.h>
47 #include <linux/file.h>
48 #include "llite_internal.h"
49 #include <lustre/ll_fiemap.h>
50
51 #include "cl_object.h"
52
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, __GFP_IO);
58         if (fd == NULL)
59                 return NULL;
60
61         fd->fd_write_failed = false;
62
63         return fd;
64 }
65
66 static void ll_file_data_put(struct ll_file_data *fd)
67 {
68         if (fd != NULL)
69                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
70 }
71
72 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
73                           struct lustre_handle *fh)
74 {
75         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
76         op_data->op_attr.ia_mode = inode->i_mode;
77         op_data->op_attr.ia_atime = inode->i_atime;
78         op_data->op_attr.ia_mtime = inode->i_mtime;
79         op_data->op_attr.ia_ctime = inode->i_ctime;
80         op_data->op_attr.ia_size = i_size_read(inode);
81         op_data->op_attr_blocks = inode->i_blocks;
82         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
83                                         ll_inode_to_ext_flags(inode->i_flags);
84         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
85         if (fh)
86                 op_data->op_handle = *fh;
87         op_data->op_capa1 = ll_mdscapa_get(inode);
88
89         if (LLIF_DATA_MODIFIED & ll_i2info(inode)->lli_flags)
90                 op_data->op_bias |= MDS_DATA_MODIFIED;
91 }
92
93 /**
94  * Closes the IO epoch and packs all the attributes into @op_data for
95  * the CLOSE rpc.
96  */
97 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
98                              struct obd_client_handle *och)
99 {
100         ENTRY;
101
102         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME | ATTR_ATIME_SET |
103                                         ATTR_MTIME | ATTR_MTIME_SET |
104                                         ATTR_CTIME | ATTR_CTIME_SET;
105
106         if (!(och->och_flags & FMODE_WRITE))
107                 goto out;
108
109         if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
110                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
111         else
112                 ll_ioepoch_close(inode, op_data, &och, 0);
113
114 out:
115         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
116         ll_prep_md_op_data(op_data, inode, NULL, NULL,
117                            0, 0, LUSTRE_OPC_ANY, NULL);
118         EXIT;
119 }
120
121 static int ll_close_inode_openhandle(struct obd_export *md_exp,
122                                      struct inode *inode,
123                                      struct obd_client_handle *och)
124 {
125         struct obd_export *exp = ll_i2mdexp(inode);
126         struct md_op_data *op_data;
127         struct ptlrpc_request *req = NULL;
128         struct obd_device *obd = class_exp2obd(exp);
129         int epoch_close = 1;
130         int rc;
131         ENTRY;
132
133         if (obd == NULL) {
134                 /*
135                  * XXX: in case of LMV, is this correct to access
136                  * ->exp_handle?
137                  */
138                 CERROR("Invalid MDC connection handle "LPX64"\n",
139                        ll_i2mdexp(inode)->exp_handle.h_cookie);
140                 GOTO(out, rc = 0);
141         }
142
143         OBD_ALLOC_PTR(op_data);
144         if (op_data == NULL)
145                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
146
147         ll_prepare_close(inode, op_data, och);
148         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
149         rc = md_close(md_exp, op_data, och->och_mod, &req);
150         if (rc == -EAGAIN) {
151                 /* This close must have the epoch closed. */
152                 LASSERT(epoch_close);
153                 /* MDS has instructed us to obtain Size-on-MDS attribute from
154                  * OSTs and send setattr to back to MDS. */
155                 rc = ll_som_update(inode, op_data);
156                 if (rc) {
157                         CERROR("inode %lu mdc Size-on-MDS update failed: "
158                                "rc = %d\n", inode->i_ino, rc);
159                         rc = 0;
160                 }
161         } else if (rc) {
162                 CERROR("inode %lu mdc close failed: rc = %d\n",
163                        inode->i_ino, rc);
164         }
165
166         /* DATA_MODIFIED flag was successfully sent on close, cancel data
167          * modification flag. */
168         if (rc == 0 && (op_data->op_bias & MDS_DATA_MODIFIED)) {
169                 struct ll_inode_info *lli = ll_i2info(inode);
170
171                 spin_lock(&lli->lli_lock);
172                 lli->lli_flags &= ~LLIF_DATA_MODIFIED;
173                 spin_unlock(&lli->lli_lock);
174         }
175
176         ll_finish_md_op_data(op_data);
177
178         if (rc == 0) {
179                 rc = ll_objects_destroy(req, inode);
180                 if (rc)
181                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
182                                inode->i_ino, rc);
183         }
184
185         EXIT;
186 out:
187
188         if (exp_connect_som(exp) && !epoch_close &&
189             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
190                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
191         } else {
192                 md_clear_open_replay_data(md_exp, och);
193                 /* Free @och if it is not waiting for DONE_WRITING. */
194                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
195                 OBD_FREE_PTR(och);
196         }
197         if (req) /* This is close request */
198                 ptlrpc_req_finished(req);
199         return rc;
200 }
201
202 int ll_md_real_close(struct inode *inode, int flags)
203 {
204         struct ll_inode_info *lli = ll_i2info(inode);
205         struct obd_client_handle **och_p;
206         struct obd_client_handle *och;
207         __u64 *och_usecount;
208         int rc = 0;
209         ENTRY;
210
211         if (flags & FMODE_WRITE) {
212                 och_p = &lli->lli_mds_write_och;
213                 och_usecount = &lli->lli_open_fd_write_count;
214         } else if (flags & FMODE_EXEC) {
215                 och_p = &lli->lli_mds_exec_och;
216                 och_usecount = &lli->lli_open_fd_exec_count;
217         } else {
218                 LASSERT(flags & FMODE_READ);
219                 och_p = &lli->lli_mds_read_och;
220                 och_usecount = &lli->lli_open_fd_read_count;
221         }
222
223         mutex_lock(&lli->lli_och_mutex);
224         if (*och_usecount) { /* There are still users of this handle, so
225                                 skip freeing it. */
226                 mutex_unlock(&lli->lli_och_mutex);
227                 RETURN(0);
228         }
229         och=*och_p;
230         *och_p = NULL;
231         mutex_unlock(&lli->lli_och_mutex);
232
233         if (och) { /* There might be a race and somebody have freed this och
234                       already */
235                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
236                                                inode, och);
237         }
238
239         RETURN(rc);
240 }
241
242 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
243                 struct file *file)
244 {
245         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
246         struct ll_inode_info *lli = ll_i2info(inode);
247         int rc = 0;
248         ENTRY;
249
250         /* clear group lock, if present */
251         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
252                 ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
253
254         /* Let's see if we have good enough OPEN lock on the file and if
255            we can skip talking to MDS */
256         if (file->f_dentry->d_inode) { /* Can this ever be false? */
257                 int lockmode;
258                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
259                 struct lustre_handle lockh;
260                 struct inode *inode = file->f_dentry->d_inode;
261                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
262
263                 mutex_lock(&lli->lli_och_mutex);
264                 if (fd->fd_omode & FMODE_WRITE) {
265                         lockmode = LCK_CW;
266                         LASSERT(lli->lli_open_fd_write_count);
267                         lli->lli_open_fd_write_count--;
268                 } else if (fd->fd_omode & FMODE_EXEC) {
269                         lockmode = LCK_PR;
270                         LASSERT(lli->lli_open_fd_exec_count);
271                         lli->lli_open_fd_exec_count--;
272                 } else {
273                         lockmode = LCK_CR;
274                         LASSERT(lli->lli_open_fd_read_count);
275                         lli->lli_open_fd_read_count--;
276                 }
277                 mutex_unlock(&lli->lli_och_mutex);
278
279                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
280                                    LDLM_IBITS, &policy, lockmode,
281                                    &lockh)) {
282                         rc = ll_md_real_close(file->f_dentry->d_inode,
283                                               fd->fd_omode);
284                 }
285         } else {
286                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
287                        file, file->f_dentry, file->f_dentry->d_name.name);
288         }
289
290         LUSTRE_FPRIVATE(file) = NULL;
291         ll_file_data_put(fd);
292         ll_capa_close(inode);
293
294         RETURN(rc);
295 }
296
297 /* While this returns an error code, fput() the caller does not, so we need
298  * to make every effort to clean up all of our state here.  Also, applications
299  * rarely check close errors and even if an error is returned they will not
300  * re-try the close call.
301  */
302 int ll_file_release(struct inode *inode, struct file *file)
303 {
304         struct ll_file_data *fd;
305         struct ll_sb_info *sbi = ll_i2sbi(inode);
306         struct ll_inode_info *lli = ll_i2info(inode);
307         int rc;
308         ENTRY;
309
310         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
311                inode->i_generation, inode);
312
313 #ifdef CONFIG_FS_POSIX_ACL
314         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
315             inode == inode->i_sb->s_root->d_inode) {
316                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
317
318                 LASSERT(fd != NULL);
319                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
320                         fd->fd_flags &= ~LL_FILE_RMTACL;
321                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
322                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
323                 }
324         }
325 #endif
326
327         if (inode->i_sb->s_root != file->f_dentry)
328                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
329         fd = LUSTRE_FPRIVATE(file);
330         LASSERT(fd != NULL);
331
332         /* The last ref on @file, maybe not the the owner pid of statahead.
333          * Different processes can open the same dir, "ll_opendir_key" means:
334          * it is me that should stop the statahead thread. */
335         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
336             lli->lli_opendir_pid != 0)
337                 ll_stop_statahead(inode, lli->lli_opendir_key);
338
339         if (inode->i_sb->s_root == file->f_dentry) {
340                 LUSTRE_FPRIVATE(file) = NULL;
341                 ll_file_data_put(fd);
342                 RETURN(0);
343         }
344
345         if (!S_ISDIR(inode->i_mode)) {
346                 lov_read_and_clear_async_rc(lli->lli_clob);
347                 lli->lli_async_rc = 0;
348         }
349
350         rc = ll_md_close(sbi->ll_md_exp, inode, file);
351
352         if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
353                 libcfs_debug_dumplog();
354
355         RETURN(rc);
356 }
357
358 static int ll_intent_file_open(struct file *file, void *lmm,
359                                int lmmsize, struct lookup_intent *itp)
360 {
361         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
362         struct dentry *parent = file->f_dentry->d_parent;
363         const char *name = file->f_dentry->d_name.name;
364         const int len = file->f_dentry->d_name.len;
365         struct md_op_data *op_data;
366         struct ptlrpc_request *req;
367         __u32 opc = LUSTRE_OPC_ANY;
368         int rc;
369         ENTRY;
370
371         if (!parent)
372                 RETURN(-ENOENT);
373
374         /* Usually we come here only for NFSD, and we want open lock.
375            But we can also get here with pre 2.6.15 patchless kernels, and in
376            that case that lock is also ok */
377         /* We can also get here if there was cached open handle in revalidate_it
378          * but it disappeared while we were getting from there to ll_file_open.
379          * But this means this file was closed and immediatelly opened which
380          * makes a good candidate for using OPEN lock */
381         /* If lmmsize & lmm are not 0, we are just setting stripe info
382          * parameters. No need for the open lock */
383         if (lmm == NULL && lmmsize == 0) {
384                 itp->it_flags |= MDS_OPEN_LOCK;
385                 if (itp->it_flags & FMODE_WRITE)
386                         opc = LUSTRE_OPC_CREATE;
387         }
388
389         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
390                                       file->f_dentry->d_inode, name, len,
391                                       O_RDWR, opc, NULL);
392         if (IS_ERR(op_data))
393                 RETURN(PTR_ERR(op_data));
394
395         itp->it_flags |= MDS_OPEN_BY_FID;
396         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
397                             0 /*unused */, &req, ll_md_blocking_ast, 0);
398         ll_finish_md_op_data(op_data);
399         if (rc == -ESTALE) {
400                 /* reason for keep own exit path - don`t flood log
401                 * with messages with -ESTALE errors.
402                 */
403                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
404                      it_open_error(DISP_OPEN_OPEN, itp))
405                         GOTO(out, rc);
406                 ll_release_openhandle(file->f_dentry, itp);
407                 GOTO(out, rc);
408         }
409
410         if (it_disposition(itp, DISP_LOOKUP_NEG))
411                 GOTO(out, rc = -ENOENT);
412
413         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
414                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
415                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
416                 GOTO(out, rc);
417         }
418
419         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL, itp);
420         if (!rc && itp->d.lustre.it_lock_mode)
421                 ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
422                                  itp, NULL);
423
424 out:
425         ptlrpc_req_finished(itp->d.lustre.it_data);
426         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
427         ll_intent_drop_lock(itp);
428
429         RETURN(rc);
430 }
431
432 /**
433  * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
434  * not believe attributes if a few ioepoch holders exist. Attributes for
435  * previous ioepoch if new one is opened are also skipped by MDS.
436  */
437 void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
438 {
439         if (ioepoch && lli->lli_ioepoch != ioepoch) {
440                 lli->lli_ioepoch = ioepoch;
441                 CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
442                        ioepoch, PFID(&lli->lli_fid));
443         }
444 }
445
446 static int ll_och_fill(struct obd_export *md_exp, struct lookup_intent *it,
447                        struct obd_client_handle *och)
448 {
449         struct ptlrpc_request *req = it->d.lustre.it_data;
450         struct mdt_body *body;
451
452         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
453         och->och_fh = body->handle;
454         och->och_fid = body->fid1;
455         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
456         och->och_flags = it->it_flags;
457
458         return md_set_open_replay_data(md_exp, och, req);
459 }
460
461 int ll_local_open(struct file *file, struct lookup_intent *it,
462                   struct ll_file_data *fd, struct obd_client_handle *och)
463 {
464         struct inode *inode = file->f_dentry->d_inode;
465         struct ll_inode_info *lli = ll_i2info(inode);
466         ENTRY;
467
468         LASSERT(!LUSTRE_FPRIVATE(file));
469
470         LASSERT(fd != NULL);
471
472         if (och) {
473                 struct ptlrpc_request *req = it->d.lustre.it_data;
474                 struct mdt_body *body;
475                 int rc;
476
477                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och);
478                 if (rc != 0)
479                         RETURN(rc);
480
481                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
482                 ll_ioepoch_open(lli, body->ioepoch);
483         }
484
485         LUSTRE_FPRIVATE(file) = fd;
486         ll_readahead_init(inode, &fd->fd_ras);
487         fd->fd_omode = it->it_flags;
488
489         RETURN(0);
490 }
491
492 /* Open a file, and (for the very first open) create objects on the OSTs at
493  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
494  * creation or open until ll_lov_setstripe() ioctl is called.
495  *
496  * If we already have the stripe MD locally then we don't request it in
497  * md_open(), by passing a lmm_size = 0.
498  *
499  * It is up to the application to ensure no other processes open this file
500  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
501  * used.  We might be able to avoid races of that sort by getting lli_open_sem
502  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
503  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
504  */
505 int ll_file_open(struct inode *inode, struct file *file)
506 {
507         struct ll_inode_info *lli = ll_i2info(inode);
508         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
509                                           .it_flags = file->f_flags };
510         struct obd_client_handle **och_p = NULL;
511         __u64 *och_usecount = NULL;
512         struct ll_file_data *fd;
513         int rc = 0, opendir_set = 0;
514         ENTRY;
515
516         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
517                inode->i_generation, inode, file->f_flags);
518
519         it = file->private_data; /* XXX: compat macro */
520         file->private_data = NULL; /* prevent ll_local_open assertion */
521
522         fd = ll_file_data_get();
523         if (fd == NULL)
524                 GOTO(out_openerr, rc = -ENOMEM);
525
526         fd->fd_file = file;
527         if (S_ISDIR(inode->i_mode)) {
528                 spin_lock(&lli->lli_sa_lock);
529                 if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
530                     lli->lli_opendir_pid == 0) {
531                         lli->lli_opendir_key = fd;
532                         lli->lli_opendir_pid = cfs_curproc_pid();
533                         opendir_set = 1;
534                 }
535                 spin_unlock(&lli->lli_sa_lock);
536         }
537
538         if (inode->i_sb->s_root == file->f_dentry) {
539                 LUSTRE_FPRIVATE(file) = fd;
540                 RETURN(0);
541         }
542
543         if (!it || !it->d.lustre.it_disposition) {
544                 /* Convert f_flags into access mode. We cannot use file->f_mode,
545                  * because everything but O_ACCMODE mask was stripped from
546                  * there */
547                 if ((oit.it_flags + 1) & O_ACCMODE)
548                         oit.it_flags++;
549                 if (file->f_flags & O_TRUNC)
550                         oit.it_flags |= FMODE_WRITE;
551
552                 /* kernel only call f_op->open in dentry_open.  filp_open calls
553                  * dentry_open after call to open_namei that checks permissions.
554                  * Only nfsd_open call dentry_open directly without checking
555                  * permissions and because of that this code below is safe. */
556                 if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
557                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
558
559                 /* We do not want O_EXCL here, presumably we opened the file
560                  * already? XXX - NFS implications? */
561                 oit.it_flags &= ~O_EXCL;
562
563                 /* bug20584, if "it_flags" contains O_CREAT, the file will be
564                  * created if necessary, then "IT_CREAT" should be set to keep
565                  * consistent with it */
566                 if (oit.it_flags & O_CREAT)
567                         oit.it_op |= IT_CREAT;
568
569                 it = &oit;
570         }
571
572 restart:
573         /* Let's see if we have file open on MDS already. */
574         if (it->it_flags & FMODE_WRITE) {
575                 och_p = &lli->lli_mds_write_och;
576                 och_usecount = &lli->lli_open_fd_write_count;
577         } else if (it->it_flags & FMODE_EXEC) {
578                 och_p = &lli->lli_mds_exec_och;
579                 och_usecount = &lli->lli_open_fd_exec_count;
580          } else {
581                 och_p = &lli->lli_mds_read_och;
582                 och_usecount = &lli->lli_open_fd_read_count;
583         }
584
585         mutex_lock(&lli->lli_och_mutex);
586         if (*och_p) { /* Open handle is present */
587                 if (it_disposition(it, DISP_OPEN_OPEN)) {
588                         /* Well, there's extra open request that we do not need,
589                            let's close it somehow. This will decref request. */
590                         rc = it_open_error(DISP_OPEN_OPEN, it);
591                         if (rc) {
592                                 mutex_unlock(&lli->lli_och_mutex);
593                                 GOTO(out_openerr, rc);
594                         }
595
596                         ll_release_openhandle(file->f_dentry, it);
597                 }
598                 (*och_usecount)++;
599
600                 rc = ll_local_open(file, it, fd, NULL);
601                 if (rc) {
602                         (*och_usecount)--;
603                         mutex_unlock(&lli->lli_och_mutex);
604                         GOTO(out_openerr, rc);
605                 }
606         } else {
607                 LASSERT(*och_usecount == 0);
608                 if (!it->d.lustre.it_disposition) {
609                         /* We cannot just request lock handle now, new ELC code
610                            means that one of other OPEN locks for this file
611                            could be cancelled, and since blocking ast handler
612                            would attempt to grab och_mutex as well, that would
613                            result in a deadlock */
614                         mutex_unlock(&lli->lli_och_mutex);
615                         it->it_create_mode |= M_CHECK_STALE;
616                         rc = ll_intent_file_open(file, NULL, 0, it);
617                         it->it_create_mode &= ~M_CHECK_STALE;
618                         if (rc)
619                                 GOTO(out_openerr, rc);
620
621                         goto restart;
622                 }
623                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
624                 if (!*och_p)
625                         GOTO(out_och_free, rc = -ENOMEM);
626
627                 (*och_usecount)++;
628
629                 /* md_intent_lock() didn't get a request ref if there was an
630                  * open error, so don't do cleanup on the request here
631                  * (bug 3430) */
632                 /* XXX (green): Should not we bail out on any error here, not
633                  * just open error? */
634                 rc = it_open_error(DISP_OPEN_OPEN, it);
635                 if (rc)
636                         GOTO(out_och_free, rc);
637
638                 LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
639
640                 rc = ll_local_open(file, it, fd, *och_p);
641                 if (rc)
642                         GOTO(out_och_free, rc);
643         }
644         mutex_unlock(&lli->lli_och_mutex);
645         fd = NULL;
646
647         /* Must do this outside lli_och_mutex lock to prevent deadlock where
648            different kind of OPEN lock for this same inode gets cancelled
649            by ldlm_cancel_lru */
650         if (!S_ISREG(inode->i_mode))
651                 GOTO(out_och_free, rc);
652
653         ll_capa_open(inode);
654
655         if (!lli->lli_has_smd) {
656                 if (file->f_flags & O_LOV_DELAY_CREATE ||
657                     !(file->f_mode & FMODE_WRITE)) {
658                         CDEBUG(D_INODE, "object creation was delayed\n");
659                         GOTO(out_och_free, rc);
660                 }
661         }
662         file->f_flags &= ~O_LOV_DELAY_CREATE;
663         GOTO(out_och_free, rc);
664
665 out_och_free:
666         if (rc) {
667                 if (och_p && *och_p) {
668                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
669                         *och_p = NULL; /* OBD_FREE writes some magic there */
670                         (*och_usecount)--;
671                 }
672                 mutex_unlock(&lli->lli_och_mutex);
673
674 out_openerr:
675                 if (opendir_set != 0)
676                         ll_stop_statahead(inode, lli->lli_opendir_key);
677                 if (fd != NULL)
678                         ll_file_data_put(fd);
679         } else {
680                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
681         }
682
683         if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
684                 ptlrpc_req_finished(it->d.lustre.it_data);
685                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
686         }
687
688         return rc;
689 }
690
691 /* Fills the obdo with the attributes for the lsm */
692 static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
693                           struct obd_capa *capa, struct obdo *obdo,
694                           __u64 ioepoch, int sync)
695 {
696         struct ptlrpc_request_set *set;
697         struct obd_info            oinfo = { { { 0 } } };
698         int                        rc;
699
700         ENTRY;
701
702         LASSERT(lsm != NULL);
703
704         oinfo.oi_md = lsm;
705         oinfo.oi_oa = obdo;
706         oinfo.oi_oa->o_oi = lsm->lsm_oi;
707         oinfo.oi_oa->o_mode = S_IFREG;
708         oinfo.oi_oa->o_ioepoch = ioepoch;
709         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
710                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
711                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
712                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
713                                OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
714                                OBD_MD_FLDATAVERSION;
715         oinfo.oi_capa = capa;
716         if (sync) {
717                 oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
718                 oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
719         }
720
721         set = ptlrpc_prep_set();
722         if (set == NULL) {
723                 CERROR("can't allocate ptlrpc set\n");
724                 rc = -ENOMEM;
725         } else {
726                 rc = obd_getattr_async(exp, &oinfo, set);
727                 if (rc == 0)
728                         rc = ptlrpc_set_wait(set);
729                 ptlrpc_set_destroy(set);
730         }
731         if (rc == 0)
732                 oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
733                                          OBD_MD_FLATIME | OBD_MD_FLMTIME |
734                                          OBD_MD_FLCTIME | OBD_MD_FLSIZE |
735                                          OBD_MD_FLDATAVERSION);
736         RETURN(rc);
737 }
738
739 /**
740   * Performs the getattr on the inode and updates its fields.
741   * If @sync != 0, perform the getattr under the server-side lock.
742   */
743 int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
744                      __u64 ioepoch, int sync)
745 {
746         struct obd_capa      *capa = ll_mdscapa_get(inode);
747         struct lov_stripe_md *lsm;
748         int rc;
749         ENTRY;
750
751         lsm = ccc_inode_lsm_get(inode);
752         rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
753                             capa, obdo, ioepoch, sync);
754         capa_put(capa);
755         if (rc == 0) {
756                 struct ost_id *oi = lsm ? &lsm->lsm_oi : &obdo->o_oi;
757
758                 obdo_refresh_inode(inode, obdo, obdo->o_valid);
759                 CDEBUG(D_INODE, "objid "DOSTID" size %llu, blocks %llu,"
760                        " blksize %lu\n", POSTID(oi), i_size_read(inode),
761                        (unsigned long long)inode->i_blocks,
762                        (unsigned long)ll_inode_blksize(inode));
763         }
764         ccc_inode_lsm_put(inode, lsm);
765         RETURN(rc);
766 }
767
768 int ll_merge_lvb(const struct lu_env *env, struct inode *inode)
769 {
770         struct ll_inode_info *lli = ll_i2info(inode);
771         struct cl_object *obj = lli->lli_clob;
772         struct cl_attr *attr = ccc_env_thread_attr(env);
773         struct ost_lvb lvb;
774         int rc = 0;
775
776         ENTRY;
777
778         ll_inode_size_lock(inode);
779         /* merge timestamps the most recently obtained from mds with
780            timestamps obtained from osts */
781         LTIME_S(inode->i_atime) = lli->lli_lvb.lvb_atime;
782         LTIME_S(inode->i_mtime) = lli->lli_lvb.lvb_mtime;
783         LTIME_S(inode->i_ctime) = lli->lli_lvb.lvb_ctime;
784         inode_init_lvb(inode, &lvb);
785
786         cl_object_attr_lock(obj);
787         rc = cl_object_attr_get(env, obj, attr);
788         cl_object_attr_unlock(obj);
789
790         if (rc == 0) {
791                 if (lvb.lvb_atime < attr->cat_atime)
792                         lvb.lvb_atime = attr->cat_atime;
793                 if (lvb.lvb_ctime < attr->cat_ctime)
794                         lvb.lvb_ctime = attr->cat_ctime;
795                 if (lvb.lvb_mtime < attr->cat_mtime)
796                         lvb.lvb_mtime = attr->cat_mtime;
797
798                 CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
799                                 PFID(&lli->lli_fid), attr->cat_size);
800                 cl_isize_write_nolock(inode, attr->cat_size);
801
802                 inode->i_blocks = attr->cat_blocks;
803
804                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
805                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
806                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
807         }
808         ll_inode_size_unlock(inode);
809
810         RETURN(rc);
811 }
812
813 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
814                      lstat_t *st)
815 {
816         struct obdo obdo = { 0 };
817         int rc;
818
819         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
820         if (rc == 0) {
821                 st->st_size   = obdo.o_size;
822                 st->st_blocks = obdo.o_blocks;
823                 st->st_mtime  = obdo.o_mtime;
824                 st->st_atime  = obdo.o_atime;
825                 st->st_ctime  = obdo.o_ctime;
826         }
827         return rc;
828 }
829
830 void ll_io_init(struct cl_io *io, const struct file *file, int write)
831 {
832         struct inode *inode = file->f_dentry->d_inode;
833
834         io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
835         if (write) {
836                 io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
837                 io->u.ci_wr.wr_sync = file->f_flags & O_SYNC ||
838                                       file->f_flags & O_DIRECT ||
839                                       IS_SYNC(inode);
840         }
841         io->ci_obj     = ll_i2info(inode)->lli_clob;
842         io->ci_lockreq = CILR_MAYBE;
843         if (ll_file_nolock(file)) {
844                 io->ci_lockreq = CILR_NEVER;
845                 io->ci_no_srvlock = 1;
846         } else if (file->f_flags & O_APPEND) {
847                 io->ci_lockreq = CILR_MANDATORY;
848         }
849 }
850
851 static ssize_t
852 ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
853                    struct file *file, enum cl_io_type iot,
854                    loff_t *ppos, size_t count)
855 {
856         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
857         struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
858         struct cl_io         *io;
859         ssize_t               result;
860         ENTRY;
861
862 restart:
863         io = ccc_env_thread_io(env);
864         ll_io_init(io, file, iot == CIT_WRITE);
865
866         if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
867                 struct vvp_io *vio = vvp_env_io(env);
868                 struct ccc_io *cio = ccc_env_io(env);
869                 int write_mutex_locked = 0;
870
871                 cio->cui_fd  = LUSTRE_FPRIVATE(file);
872                 vio->cui_io_subtype = args->via_io_subtype;
873
874                 switch (vio->cui_io_subtype) {
875                 case IO_NORMAL:
876                         cio->cui_iov = args->u.normal.via_iov;
877                         cio->cui_nrsegs = args->u.normal.via_nrsegs;
878                         cio->cui_tot_nrsegs = cio->cui_nrsegs;
879 #ifndef HAVE_FILE_WRITEV
880                         cio->cui_iocb = args->u.normal.via_iocb;
881 #endif
882                         if ((iot == CIT_WRITE) &&
883                             !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
884                                 if (mutex_lock_interruptible(&lli->
885                                                                lli_write_mutex))
886                                         GOTO(out, result = -ERESTARTSYS);
887                                 write_mutex_locked = 1;
888                         } else if (iot == CIT_READ) {
889                                 down_read(&lli->lli_trunc_sem);
890                         }
891                         break;
892                 case IO_SENDFILE:
893                         vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
894                         vio->u.sendfile.cui_target = args->u.sendfile.via_target;
895                         break;
896                 case IO_SPLICE:
897                         vio->u.splice.cui_pipe = args->u.splice.via_pipe;
898                         vio->u.splice.cui_flags = args->u.splice.via_flags;
899                         break;
900                 default:
901                         CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
902                         LBUG();
903                 }
904                 result = cl_io_loop(env, io);
905                 if (write_mutex_locked)
906                         mutex_unlock(&lli->lli_write_mutex);
907                 else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
908                         up_read(&lli->lli_trunc_sem);
909         } else {
910                 /* cl_io_rw_init() handled IO */
911                 result = io->ci_result;
912         }
913
914         if (io->ci_nob > 0) {
915                 result = io->ci_nob;
916                 *ppos = io->u.ci_wr.wr.crw_pos;
917         }
918         GOTO(out, result);
919 out:
920         cl_io_fini(env, io);
921         /* If any bit been read/written (result != 0), we just return
922          * short read/write instead of restart io. */
923         if ((result == 0 || result == -ENODATA) && io->ci_need_restart) {
924                 CDEBUG(D_VFSTRACE, "Restart %s on %s from %lld, count:%zd\n",
925                        iot == CIT_READ ? "read" : "write",
926                        file->f_dentry->d_name.name, *ppos, count);
927                 LASSERTF(io->ci_nob == 0, "%zd", io->ci_nob);
928                 goto restart;
929         }
930
931         if (iot == CIT_READ) {
932                 if (result >= 0)
933                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
934                                            LPROC_LL_READ_BYTES, result);
935         } else if (iot == CIT_WRITE) {
936                 if (result >= 0) {
937                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
938                                            LPROC_LL_WRITE_BYTES, result);
939                         fd->fd_write_failed = false;
940                 } else if (result != -ERESTARTSYS) {
941                         fd->fd_write_failed = true;
942                 }
943         }
944
945         return result;
946 }
947
948
949 /*
950  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
951  */
952 static int ll_file_get_iov_count(const struct iovec *iov,
953                                  unsigned long *nr_segs, size_t *count)
954 {
955         size_t cnt = 0;
956         unsigned long seg;
957
958         for (seg = 0; seg < *nr_segs; seg++) {
959                 const struct iovec *iv = &iov[seg];
960
961                 /*
962                  * If any segment has a negative length, or the cumulative
963                  * length ever wraps negative then return -EINVAL.
964                  */
965                 cnt += iv->iov_len;
966                 if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
967                         return -EINVAL;
968                 if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
969                         continue;
970                 if (seg == 0)
971                         return -EFAULT;
972                 *nr_segs = seg;
973                 cnt -= iv->iov_len;   /* This segment is no good */
974                 break;
975         }
976         *count = cnt;
977         return 0;
978 }
979
980 #ifdef HAVE_FILE_READV
981 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
982                               unsigned long nr_segs, loff_t *ppos)
983 {
984         struct lu_env      *env;
985         struct vvp_io_args *args;
986         size_t              count;
987         ssize_t             result;
988         int                 refcheck;
989         ENTRY;
990
991         result = ll_file_get_iov_count(iov, &nr_segs, &count);
992         if (result)
993                 RETURN(result);
994
995         env = cl_env_get(&refcheck);
996         if (IS_ERR(env))
997                 RETURN(PTR_ERR(env));
998
999         args = vvp_env_args(env, IO_NORMAL);
1000         args->u.normal.via_iov = (struct iovec *)iov;
1001         args->u.normal.via_nrsegs = nr_segs;
1002
1003         result = ll_file_io_generic(env, args, file, CIT_READ, ppos, count);
1004         cl_env_put(env, &refcheck);
1005         RETURN(result);
1006 }
1007
1008 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1009                             loff_t *ppos)
1010 {
1011         struct lu_env *env;
1012         struct iovec  *local_iov;
1013         ssize_t        result;
1014         int            refcheck;
1015         ENTRY;
1016
1017         env = cl_env_get(&refcheck);
1018         if (IS_ERR(env))
1019                 RETURN(PTR_ERR(env));
1020
1021         local_iov = &vvp_env_info(env)->vti_local_iov;
1022         local_iov->iov_base = (void __user *)buf;
1023         local_iov->iov_len = count;
1024         result = ll_file_readv(file, local_iov, 1, ppos);
1025         cl_env_put(env, &refcheck);
1026         RETURN(result);
1027 }
1028
1029 #else
1030 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
1031                                 unsigned long nr_segs, loff_t pos)
1032 {
1033         struct lu_env      *env;
1034         struct vvp_io_args *args;
1035         size_t              count;
1036         ssize_t             result;
1037         int                 refcheck;
1038         ENTRY;
1039
1040         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1041         if (result)
1042                 RETURN(result);
1043
1044         env = cl_env_get(&refcheck);
1045         if (IS_ERR(env))
1046                 RETURN(PTR_ERR(env));
1047
1048         args = vvp_env_args(env, IO_NORMAL);
1049         args->u.normal.via_iov = (struct iovec *)iov;
1050         args->u.normal.via_nrsegs = nr_segs;
1051         args->u.normal.via_iocb = iocb;
1052
1053         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1054                                     &iocb->ki_pos, count);
1055         cl_env_put(env, &refcheck);
1056         RETURN(result);
1057 }
1058
1059 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1060                             loff_t *ppos)
1061 {
1062         struct lu_env *env;
1063         struct iovec  *local_iov;
1064         struct kiocb  *kiocb;
1065         ssize_t        result;
1066         int            refcheck;
1067         ENTRY;
1068
1069         env = cl_env_get(&refcheck);
1070         if (IS_ERR(env))
1071                 RETURN(PTR_ERR(env));
1072
1073         local_iov = &vvp_env_info(env)->vti_local_iov;
1074         kiocb = &vvp_env_info(env)->vti_kiocb;
1075         local_iov->iov_base = (void __user *)buf;
1076         local_iov->iov_len = count;
1077         init_sync_kiocb(kiocb, file);
1078         kiocb->ki_pos = *ppos;
1079         kiocb->ki_left = count;
1080
1081         result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1082         *ppos = kiocb->ki_pos;
1083
1084         cl_env_put(env, &refcheck);
1085         RETURN(result);
1086 }
1087 #endif
1088
1089 /*
1090  * Write to a file (through the page cache).
1091  */
1092 #ifdef HAVE_FILE_WRITEV
1093 static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
1094                               unsigned long nr_segs, loff_t *ppos)
1095 {
1096         struct lu_env      *env;
1097         struct vvp_io_args *args;
1098         size_t              count;
1099         ssize_t             result;
1100         int                 refcheck;
1101         ENTRY;
1102
1103         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1104         if (result)
1105                 RETURN(result);
1106
1107         env = cl_env_get(&refcheck);
1108         if (IS_ERR(env))
1109                 RETURN(PTR_ERR(env));
1110
1111         args = vvp_env_args(env, IO_NORMAL);
1112         args->u.normal.via_iov = (struct iovec *)iov;
1113         args->u.normal.via_nrsegs = nr_segs;
1114
1115         result = ll_file_io_generic(env, args, file, CIT_WRITE, ppos, count);
1116         cl_env_put(env, &refcheck);
1117         RETURN(result);
1118 }
1119
1120 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1121                              loff_t *ppos)
1122 {
1123         struct lu_env    *env;
1124         struct iovec     *local_iov;
1125         ssize_t           result;
1126         int               refcheck;
1127         ENTRY;
1128
1129         env = cl_env_get(&refcheck);
1130         if (IS_ERR(env))
1131                 RETURN(PTR_ERR(env));
1132
1133         local_iov = &vvp_env_info(env)->vti_local_iov;
1134         local_iov->iov_base = (void __user *)buf;
1135         local_iov->iov_len = count;
1136
1137         result = ll_file_writev(file, local_iov, 1, ppos);
1138         cl_env_put(env, &refcheck);
1139         RETURN(result);
1140 }
1141
1142 #else /* AIO stuff */
1143 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1144                                  unsigned long nr_segs, loff_t pos)
1145 {
1146         struct lu_env      *env;
1147         struct vvp_io_args *args;
1148         size_t              count;
1149         ssize_t             result;
1150         int                 refcheck;
1151         ENTRY;
1152
1153         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1154         if (result)
1155                 RETURN(result);
1156
1157         env = cl_env_get(&refcheck);
1158         if (IS_ERR(env))
1159                 RETURN(PTR_ERR(env));
1160
1161         args = vvp_env_args(env, IO_NORMAL);
1162         args->u.normal.via_iov = (struct iovec *)iov;
1163         args->u.normal.via_nrsegs = nr_segs;
1164         args->u.normal.via_iocb = iocb;
1165
1166         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1167                                   &iocb->ki_pos, count);
1168         cl_env_put(env, &refcheck);
1169         RETURN(result);
1170 }
1171
1172 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1173                              loff_t *ppos)
1174 {
1175         struct lu_env *env;
1176         struct iovec  *local_iov;
1177         struct kiocb  *kiocb;
1178         ssize_t        result;
1179         int            refcheck;
1180         ENTRY;
1181
1182         env = cl_env_get(&refcheck);
1183         if (IS_ERR(env))
1184                 RETURN(PTR_ERR(env));
1185
1186         local_iov = &vvp_env_info(env)->vti_local_iov;
1187         kiocb = &vvp_env_info(env)->vti_kiocb;
1188         local_iov->iov_base = (void __user *)buf;
1189         local_iov->iov_len = count;
1190         init_sync_kiocb(kiocb, file);
1191         kiocb->ki_pos = *ppos;
1192         kiocb->ki_left = count;
1193
1194         result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1195         *ppos = kiocb->ki_pos;
1196
1197         cl_env_put(env, &refcheck);
1198         RETURN(result);
1199 }
1200 #endif
1201
1202 /*
1203  * Send file content (through pagecache) somewhere with helper
1204  */
1205 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1206                                    struct pipe_inode_info *pipe, size_t count,
1207                                    unsigned int flags)
1208 {
1209         struct lu_env      *env;
1210         struct vvp_io_args *args;
1211         ssize_t             result;
1212         int                 refcheck;
1213         ENTRY;
1214
1215         env = cl_env_get(&refcheck);
1216         if (IS_ERR(env))
1217                 RETURN(PTR_ERR(env));
1218
1219         args = vvp_env_args(env, IO_SPLICE);
1220         args->u.splice.via_pipe = pipe;
1221         args->u.splice.via_flags = flags;
1222
1223         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1224         cl_env_put(env, &refcheck);
1225         RETURN(result);
1226 }
1227
1228 static int ll_lov_recreate(struct inode *inode, struct ost_id *oi,
1229                            obd_count ost_idx)
1230 {
1231         struct obd_export *exp = ll_i2dtexp(inode);
1232         struct obd_trans_info oti = { 0 };
1233         struct obdo *oa = NULL;
1234         int lsm_size;
1235         int rc = 0;
1236         struct lov_stripe_md *lsm = NULL, *lsm2;
1237         ENTRY;
1238
1239         OBDO_ALLOC(oa);
1240         if (oa == NULL)
1241                 RETURN(-ENOMEM);
1242
1243         lsm = ccc_inode_lsm_get(inode);
1244         if (!lsm_has_objects(lsm))
1245                 GOTO(out, rc = -ENOENT);
1246
1247         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1248                    (lsm->lsm_stripe_count));
1249
1250         OBD_ALLOC_LARGE(lsm2, lsm_size);
1251         if (lsm2 == NULL)
1252                 GOTO(out, rc = -ENOMEM);
1253
1254         oa->o_oi = *oi;
1255         oa->o_nlink = ost_idx;
1256         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1257         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1258         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1259                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1260         obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1261         memcpy(lsm2, lsm, lsm_size);
1262         ll_inode_size_lock(inode);
1263         rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1264         ll_inode_size_unlock(inode);
1265
1266         OBD_FREE_LARGE(lsm2, lsm_size);
1267         GOTO(out, rc);
1268 out:
1269         ccc_inode_lsm_put(inode, lsm);
1270         OBDO_FREE(oa);
1271         return rc;
1272 }
1273
1274 static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1275 {
1276         struct ll_recreate_obj ucreat;
1277         struct ost_id           oi;
1278         ENTRY;
1279
1280         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1281                 RETURN(-EPERM);
1282
1283         if (copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1284                            sizeof(ucreat)))
1285                 RETURN(-EFAULT);
1286
1287         ostid_set_seq_mdt0(&oi);
1288         ostid_set_id(&oi, ucreat.lrc_id);
1289         RETURN(ll_lov_recreate(inode, &oi, ucreat.lrc_ost_idx));
1290 }
1291
1292 static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1293 {
1294         struct lu_fid   fid;
1295         struct ost_id   oi;
1296         obd_count       ost_idx;
1297         ENTRY;
1298
1299         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1300                 RETURN(-EPERM);
1301
1302         if (copy_from_user(&fid, (struct lu_fid *)arg, sizeof(fid)))
1303                 RETURN(-EFAULT);
1304
1305         fid_to_ostid(&fid, &oi);
1306         ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1307         RETURN(ll_lov_recreate(inode, &oi, ost_idx));
1308 }
1309
1310 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1311                              int flags, struct lov_user_md *lum, int lum_size)
1312 {
1313         struct lov_stripe_md *lsm = NULL;
1314         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1315         int rc = 0;
1316         ENTRY;
1317
1318         lsm = ccc_inode_lsm_get(inode);
1319         if (lsm != NULL) {
1320                 ccc_inode_lsm_put(inode, lsm);
1321                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1322                        inode->i_ino);
1323                 RETURN(-EEXIST);
1324         }
1325
1326         ll_inode_size_lock(inode);
1327         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1328         if (rc)
1329                 GOTO(out, rc);
1330         rc = oit.d.lustre.it_status;
1331         if (rc < 0)
1332                 GOTO(out_req_free, rc);
1333
1334         ll_release_openhandle(file->f_dentry, &oit);
1335
1336  out:
1337         ll_inode_size_unlock(inode);
1338         ll_intent_release(&oit);
1339         ccc_inode_lsm_put(inode, lsm);
1340         RETURN(rc);
1341 out_req_free:
1342         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1343         goto out;
1344 }
1345
1346 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1347                              struct lov_mds_md **lmmp, int *lmm_size,
1348                              struct ptlrpc_request **request)
1349 {
1350         struct ll_sb_info *sbi = ll_i2sbi(inode);
1351         struct mdt_body  *body;
1352         struct lov_mds_md *lmm = NULL;
1353         struct ptlrpc_request *req = NULL;
1354         struct md_op_data *op_data;
1355         int rc, lmmsize;
1356
1357         rc = ll_get_max_mdsize(sbi, &lmmsize);
1358         if (rc)
1359                 RETURN(rc);
1360
1361         op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1362                                      strlen(filename), lmmsize,
1363                                      LUSTRE_OPC_ANY, NULL);
1364         if (IS_ERR(op_data))
1365                 RETURN(PTR_ERR(op_data));
1366
1367         op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1368         rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1369         ll_finish_md_op_data(op_data);
1370         if (rc < 0) {
1371                 CDEBUG(D_INFO, "md_getattr_name failed "
1372                        "on %s: rc %d\n", filename, rc);
1373                 GOTO(out, rc);
1374         }
1375
1376         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1377         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1378
1379         lmmsize = body->eadatasize;
1380
1381         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1382                         lmmsize == 0) {
1383                 GOTO(out, rc = -ENODATA);
1384         }
1385
1386         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1387         LASSERT(lmm != NULL);
1388
1389         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1390             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1391                 GOTO(out, rc = -EPROTO);
1392         }
1393
1394         /*
1395          * This is coming from the MDS, so is probably in
1396          * little endian.  We convert it to host endian before
1397          * passing it to userspace.
1398          */
1399         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1400                 int stripe_count;
1401
1402                 stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
1403                 if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_RELEASED)
1404                         stripe_count = 0;
1405
1406                 /* if function called for directory - we should
1407                  * avoid swab not existent lsm objects */
1408                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1409                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1410                         if (S_ISREG(body->mode))
1411                                 lustre_swab_lov_user_md_objects(
1412                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1413                                  stripe_count);
1414                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1415                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1416                         if (S_ISREG(body->mode))
1417                                 lustre_swab_lov_user_md_objects(
1418                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1419                                  stripe_count);
1420                 }
1421         }
1422
1423 out:
1424         *lmmp = lmm;
1425         *lmm_size = lmmsize;
1426         *request = req;
1427         return rc;
1428 }
1429
1430 static int ll_lov_setea(struct inode *inode, struct file *file,
1431                             unsigned long arg)
1432 {
1433         int                      flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1434         struct lov_user_md      *lump;
1435         int                      lum_size = sizeof(struct lov_user_md) +
1436                                             sizeof(struct lov_user_ost_data);
1437         int                      rc;
1438         ENTRY;
1439
1440         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1441                 RETURN(-EPERM);
1442
1443         OBD_ALLOC_LARGE(lump, lum_size);
1444         if (lump == NULL)
1445                 RETURN(-ENOMEM);
1446
1447         if (copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1448                 OBD_FREE_LARGE(lump, lum_size);
1449                 RETURN(-EFAULT);
1450         }
1451
1452         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1453
1454         OBD_FREE_LARGE(lump, lum_size);
1455         RETURN(rc);
1456 }
1457
1458 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1459                             unsigned long arg)
1460 {
1461         struct lov_user_md_v3    lumv3;
1462         struct lov_user_md_v1   *lumv1 = (struct lov_user_md_v1 *)&lumv3;
1463         struct lov_user_md_v1   *lumv1p = (struct lov_user_md_v1 *)arg;
1464         struct lov_user_md_v3   *lumv3p = (struct lov_user_md_v3 *)arg;
1465         int                      lum_size, rc;
1466         int                      flags = FMODE_WRITE;
1467         ENTRY;
1468
1469         /* first try with v1 which is smaller than v3 */
1470         lum_size = sizeof(struct lov_user_md_v1);
1471         if (copy_from_user(lumv1, lumv1p, lum_size))
1472                 RETURN(-EFAULT);
1473
1474         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1475                 lum_size = sizeof(struct lov_user_md_v3);
1476                 if (copy_from_user(&lumv3, lumv3p, lum_size))
1477                         RETURN(-EFAULT);
1478         }
1479
1480         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1481         if (rc == 0) {
1482                 struct lov_stripe_md *lsm;
1483                 __u32 gen;
1484
1485                 put_user(0, &lumv1p->lmm_stripe_count);
1486
1487                 ll_layout_refresh(inode, &gen);
1488                 lsm = ccc_inode_lsm_get(inode);
1489                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1490                                    0, lsm, (void *)arg);
1491                 ccc_inode_lsm_put(inode, lsm);
1492         }
1493         RETURN(rc);
1494 }
1495
1496 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1497 {
1498         struct lov_stripe_md *lsm;
1499         int rc = -ENODATA;
1500         ENTRY;
1501
1502         lsm = ccc_inode_lsm_get(inode);
1503         if (lsm != NULL)
1504                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1505                                    lsm, (void *)arg);
1506         ccc_inode_lsm_put(inode, lsm);
1507         RETURN(rc);
1508 }
1509
1510 int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1511 {
1512         struct ll_inode_info   *lli = ll_i2info(inode);
1513         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1514         struct ccc_grouplock    grouplock;
1515         int                     rc;
1516         ENTRY;
1517
1518         if (ll_file_nolock(file))
1519                 RETURN(-EOPNOTSUPP);
1520
1521         spin_lock(&lli->lli_lock);
1522         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1523                 CWARN("group lock already existed with gid %lu\n",
1524                       fd->fd_grouplock.cg_gid);
1525                 spin_unlock(&lli->lli_lock);
1526                 RETURN(-EINVAL);
1527         }
1528         LASSERT(fd->fd_grouplock.cg_lock == NULL);
1529         spin_unlock(&lli->lli_lock);
1530
1531         rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1532                               arg, (file->f_flags & O_NONBLOCK), &grouplock);
1533         if (rc)
1534                 RETURN(rc);
1535
1536         spin_lock(&lli->lli_lock);
1537         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1538                 spin_unlock(&lli->lli_lock);
1539                 CERROR("another thread just won the race\n");
1540                 cl_put_grouplock(&grouplock);
1541                 RETURN(-EINVAL);
1542         }
1543
1544         fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1545         fd->fd_grouplock = grouplock;
1546         spin_unlock(&lli->lli_lock);
1547
1548         CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1549         RETURN(0);
1550 }
1551
1552 int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1553 {
1554         struct ll_inode_info   *lli = ll_i2info(inode);
1555         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1556         struct ccc_grouplock    grouplock;
1557         ENTRY;
1558
1559         spin_lock(&lli->lli_lock);
1560         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1561                 spin_unlock(&lli->lli_lock);
1562                 CWARN("no group lock held\n");
1563                 RETURN(-EINVAL);
1564         }
1565         LASSERT(fd->fd_grouplock.cg_lock != NULL);
1566
1567         if (fd->fd_grouplock.cg_gid != arg) {
1568                 CWARN("group lock %lu doesn't match current id %lu\n",
1569                        arg, fd->fd_grouplock.cg_gid);
1570                 spin_unlock(&lli->lli_lock);
1571                 RETURN(-EINVAL);
1572         }
1573
1574         grouplock = fd->fd_grouplock;
1575         memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1576         fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1577         spin_unlock(&lli->lli_lock);
1578
1579         cl_put_grouplock(&grouplock);
1580         CDEBUG(D_INFO, "group lock %lu released\n", arg);
1581         RETURN(0);
1582 }
1583
1584 /**
1585  * Close inode open handle
1586  *
1587  * \param dentry [in]     dentry which contains the inode
1588  * \param it     [in,out] intent which contains open info and result
1589  *
1590  * \retval 0     success
1591  * \retval <0    failure
1592  */
1593 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1594 {
1595         struct inode *inode = dentry->d_inode;
1596         struct obd_client_handle *och;
1597         int rc;
1598         ENTRY;
1599
1600         LASSERT(inode);
1601
1602         /* Root ? Do nothing. */
1603         if (dentry->d_inode->i_sb->s_root == dentry)
1604                 RETURN(0);
1605
1606         /* No open handle to close? Move away */
1607         if (!it_disposition(it, DISP_OPEN_OPEN))
1608                 RETURN(0);
1609
1610         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1611
1612         OBD_ALLOC(och, sizeof(*och));
1613         if (!och)
1614                 GOTO(out, rc = -ENOMEM);
1615
1616         ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och);
1617
1618         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1619                                        inode, och);
1620  out:
1621         /* this one is in place of ll_file_open */
1622         if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1623                 ptlrpc_req_finished(it->d.lustre.it_data);
1624                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1625         }
1626         RETURN(rc);
1627 }
1628
1629 /**
1630  * Get size for inode for which FIEMAP mapping is requested.
1631  * Make the FIEMAP get_info call and returns the result.
1632  */
1633 int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1634               int num_bytes)
1635 {
1636         struct obd_export *exp = ll_i2dtexp(inode);
1637         struct lov_stripe_md *lsm = NULL;
1638         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1639         int vallen = num_bytes;
1640         int rc;
1641         ENTRY;
1642
1643         /* Checks for fiemap flags */
1644         if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1645                 fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1646                 return -EBADR;
1647         }
1648
1649         /* Check for FIEMAP_FLAG_SYNC */
1650         if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1651                 rc = filemap_fdatawrite(inode->i_mapping);
1652                 if (rc)
1653                         return rc;
1654         }
1655
1656         lsm = ccc_inode_lsm_get(inode);
1657         if (lsm == NULL)
1658                 return -ENOENT;
1659
1660         /* If the stripe_count > 1 and the application does not understand
1661          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1662          */
1663         if (lsm->lsm_stripe_count > 1 &&
1664             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1665                 GOTO(out, rc = -EOPNOTSUPP);
1666
1667         fm_key.oa.o_oi = lsm->lsm_oi;
1668         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1669
1670         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1671         obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1672         /* If filesize is 0, then there would be no objects for mapping */
1673         if (fm_key.oa.o_size == 0) {
1674                 fiemap->fm_mapped_extents = 0;
1675                 GOTO(out, rc = 0);
1676         }
1677
1678         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1679
1680         rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1681                           fiemap, lsm);
1682         if (rc)
1683                 CERROR("obd_get_info failed: rc = %d\n", rc);
1684
1685 out:
1686         ccc_inode_lsm_put(inode, lsm);
1687         RETURN(rc);
1688 }
1689
1690 int ll_fid2path(struct inode *inode, void *arg)
1691 {
1692         struct obd_export       *exp = ll_i2mdexp(inode);
1693         struct getinfo_fid2path *gfout, *gfin;
1694         int                      outsize, rc;
1695         ENTRY;
1696
1697         if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
1698             !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
1699                 RETURN(-EPERM);
1700
1701         /* Need to get the buflen */
1702         OBD_ALLOC_PTR(gfin);
1703         if (gfin == NULL)
1704                 RETURN(-ENOMEM);
1705         if (copy_from_user(gfin, arg, sizeof(*gfin))) {
1706                 OBD_FREE_PTR(gfin);
1707                 RETURN(-EFAULT);
1708         }
1709
1710         outsize = sizeof(*gfout) + gfin->gf_pathlen;
1711         OBD_ALLOC(gfout, outsize);
1712         if (gfout == NULL) {
1713                 OBD_FREE_PTR(gfin);
1714                 RETURN(-ENOMEM);
1715         }
1716         memcpy(gfout, gfin, sizeof(*gfout));
1717         OBD_FREE_PTR(gfin);
1718
1719         /* Call mdc_iocontrol */
1720         rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1721         if (rc)
1722                 GOTO(gf_free, rc);
1723
1724         if (copy_to_user(arg, gfout, outsize))
1725                 rc = -EFAULT;
1726
1727 gf_free:
1728         OBD_FREE(gfout, outsize);
1729         RETURN(rc);
1730 }
1731
1732 static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1733 {
1734         struct ll_user_fiemap *fiemap_s;
1735         size_t num_bytes, ret_bytes;
1736         unsigned int extent_count;
1737         int rc = 0;
1738
1739         /* Get the extent count so we can calculate the size of
1740          * required fiemap buffer */
1741         if (get_user(extent_count,
1742             &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1743                 RETURN(-EFAULT);
1744         num_bytes = sizeof(*fiemap_s) + (extent_count *
1745                                          sizeof(struct ll_fiemap_extent));
1746
1747         OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1748         if (fiemap_s == NULL)
1749                 RETURN(-ENOMEM);
1750
1751         /* get the fiemap value */
1752         if (copy_from_user(fiemap_s, (struct ll_user_fiemap __user *)arg,
1753                            sizeof(*fiemap_s)))
1754                 GOTO(error, rc = -EFAULT);
1755
1756         /* If fm_extent_count is non-zero, read the first extent since
1757          * it is used to calculate end_offset and device from previous
1758          * fiemap call. */
1759         if (extent_count) {
1760                 if (copy_from_user(&fiemap_s->fm_extents[0],
1761                     (char __user *)arg + sizeof(*fiemap_s),
1762                     sizeof(struct ll_fiemap_extent)))
1763                         GOTO(error, rc = -EFAULT);
1764         }
1765
1766         rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1767         if (rc)
1768                 GOTO(error, rc);
1769
1770         ret_bytes = sizeof(struct ll_user_fiemap);
1771
1772         if (extent_count != 0)
1773                 ret_bytes += (fiemap_s->fm_mapped_extents *
1774                                  sizeof(struct ll_fiemap_extent));
1775
1776         if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1777                 rc = -EFAULT;
1778
1779 error:
1780         OBD_FREE_LARGE(fiemap_s, num_bytes);
1781         RETURN(rc);
1782 }
1783
1784 /*
1785  * Read the data_version for inode.
1786  *
1787  * This value is computed using stripe object version on OST.
1788  * Version is computed using server side locking.
1789  *
1790  * @param extent_lock  Take extent lock. Not needed if a process is already
1791  *                     holding the OST object group locks.
1792  */
1793 int ll_data_version(struct inode *inode, __u64 *data_version,
1794                     int extent_lock)
1795 {
1796         struct lov_stripe_md    *lsm = NULL;
1797         struct ll_sb_info       *sbi = ll_i2sbi(inode);
1798         struct obdo             *obdo = NULL;
1799         int                      rc;
1800         ENTRY;
1801
1802         /* If no stripe, we consider version is 0. */
1803         lsm = ccc_inode_lsm_get(inode);
1804         if (!lsm_has_objects(lsm)) {
1805                 *data_version = 0;
1806                 CDEBUG(D_INODE, "No object for inode\n");
1807                 GOTO(out, rc = 0);
1808         }
1809
1810         OBD_ALLOC_PTR(obdo);
1811         if (obdo == NULL)
1812                 GOTO(out, rc = -ENOMEM);
1813
1814         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
1815         if (rc == 0) {
1816                 if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1817                         rc = -EOPNOTSUPP;
1818                 else
1819                         *data_version = obdo->o_data_version;
1820         }
1821
1822         OBD_FREE_PTR(obdo);
1823         EXIT;
1824 out:
1825         ccc_inode_lsm_put(inode, lsm);
1826         RETURN(rc);
1827 }
1828
1829 struct ll_swap_stack {
1830         struct iattr             ia1, ia2;
1831         __u64                    dv1, dv2;
1832         struct inode            *inode1, *inode2;
1833         bool                     check_dv1, check_dv2;
1834 };
1835
1836 static int ll_swap_layouts(struct file *file1, struct file *file2,
1837                            struct lustre_swap_layouts *lsl)
1838 {
1839         struct mdc_swap_layouts  msl;
1840         struct md_op_data       *op_data;
1841         __u32                    gid;
1842         __u64                    dv;
1843         struct ll_swap_stack    *llss = NULL;
1844         int                      rc;
1845
1846         OBD_ALLOC_PTR(llss);
1847         if (llss == NULL)
1848                 RETURN(-ENOMEM);
1849
1850         llss->inode1 = file1->f_dentry->d_inode;
1851         llss->inode2 = file2->f_dentry->d_inode;
1852
1853         if (!S_ISREG(llss->inode2->i_mode))
1854                 GOTO(free, rc = -EINVAL);
1855
1856         if (inode_permission(llss->inode1, MAY_WRITE) ||
1857             inode_permission(llss->inode2, MAY_WRITE))
1858                 GOTO(free, rc = -EPERM);
1859
1860         if (llss->inode2->i_sb != llss->inode1->i_sb)
1861                 GOTO(free, rc = -EXDEV);
1862
1863         /* we use 2 bool because it is easier to swap than 2 bits */
1864         if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV1)
1865                 llss->check_dv1 = true;
1866
1867         if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV2)
1868                 llss->check_dv2 = true;
1869
1870         /* we cannot use lsl->sl_dvX directly because we may swap them */
1871         llss->dv1 = lsl->sl_dv1;
1872         llss->dv2 = lsl->sl_dv2;
1873
1874         rc = lu_fid_cmp(ll_inode2fid(llss->inode1), ll_inode2fid(llss->inode2));
1875         if (rc == 0) /* same file, done! */
1876                 GOTO(free, rc = 0);
1877
1878         if (rc < 0) { /* sequentialize it */
1879                 swap(llss->inode1, llss->inode2);
1880                 swap(file1, file2);
1881                 swap(llss->dv1, llss->dv2);
1882                 swap(llss->check_dv1, llss->check_dv2);
1883         }
1884
1885         gid = lsl->sl_gid;
1886         if (gid != 0) { /* application asks to flush dirty cache */
1887                 rc = ll_get_grouplock(llss->inode1, file1, gid);
1888                 if (rc < 0)
1889                         GOTO(free, rc);
1890
1891                 rc = ll_get_grouplock(llss->inode2, file2, gid);
1892                 if (rc < 0) {
1893                         ll_put_grouplock(llss->inode1, file1, gid);
1894                         GOTO(free, rc);
1895                 }
1896         }
1897
1898         /* to be able to restore mtime and atime after swap
1899          * we need to first save them */
1900         if (lsl->sl_flags &
1901             (SWAP_LAYOUTS_KEEP_MTIME | SWAP_LAYOUTS_KEEP_ATIME)) {
1902                 llss->ia1.ia_mtime = llss->inode1->i_mtime;
1903                 llss->ia1.ia_atime = llss->inode1->i_atime;
1904                 llss->ia1.ia_valid = ATTR_MTIME | ATTR_ATIME;
1905                 llss->ia2.ia_mtime = llss->inode2->i_mtime;
1906                 llss->ia2.ia_atime = llss->inode2->i_atime;
1907                 llss->ia2.ia_valid = ATTR_MTIME | ATTR_ATIME;
1908         }
1909
1910         /* ultimate check, before swaping the layouts we check if
1911          * dataversion has changed (if requested) */
1912         if (llss->check_dv1) {
1913                 rc = ll_data_version(llss->inode1, &dv, 0);
1914                 if (rc)
1915                         GOTO(putgl, rc);
1916                 if (dv != llss->dv1)
1917                         GOTO(putgl, rc = -EAGAIN);
1918         }
1919
1920         if (llss->check_dv2) {
1921                 rc = ll_data_version(llss->inode2, &dv, 0);
1922                 if (rc)
1923                         GOTO(putgl, rc);
1924                 if (dv != llss->dv2)
1925                         GOTO(putgl, rc = -EAGAIN);
1926         }
1927
1928         /* struct md_op_data is used to send the swap args to the mdt
1929          * only flags is missing, so we use struct mdc_swap_layouts
1930          * through the md_op_data->op_data */
1931         /* flags from user space have to be converted before they are send to
1932          * server, no flag is sent today, they are only used on the client */
1933         msl.msl_flags = 0;
1934         rc = -ENOMEM;
1935         op_data = ll_prep_md_op_data(NULL, llss->inode1, llss->inode2, NULL, 0,
1936                                      0, LUSTRE_OPC_ANY, &msl);
1937         if (IS_ERR(op_data))
1938                 GOTO(free, rc = PTR_ERR(op_data));
1939
1940         rc = obd_iocontrol(LL_IOC_LOV_SWAP_LAYOUTS, ll_i2mdexp(llss->inode1),
1941                            sizeof(*op_data), op_data, NULL);
1942         ll_finish_md_op_data(op_data);
1943
1944 putgl:
1945         if (gid != 0) {
1946                 ll_put_grouplock(llss->inode2, file2, gid);
1947                 ll_put_grouplock(llss->inode1, file1, gid);
1948         }
1949
1950         /* rc can be set from obd_iocontrol() or from a GOTO(putgl, ...) */
1951         if (rc != 0)
1952                 GOTO(free, rc);
1953
1954         /* clear useless flags */
1955         if (!(lsl->sl_flags & SWAP_LAYOUTS_KEEP_MTIME)) {
1956                 llss->ia1.ia_valid &= ~ATTR_MTIME;
1957                 llss->ia2.ia_valid &= ~ATTR_MTIME;
1958         }
1959
1960         if (!(lsl->sl_flags & SWAP_LAYOUTS_KEEP_ATIME)) {
1961                 llss->ia1.ia_valid &= ~ATTR_ATIME;
1962                 llss->ia2.ia_valid &= ~ATTR_ATIME;
1963         }
1964
1965         /* update time if requested */
1966         rc = 0;
1967         if (llss->ia2.ia_valid != 0) {
1968                 mutex_lock(&llss->inode1->i_mutex);
1969                 rc = ll_setattr(file1->f_dentry, &llss->ia2);
1970                 mutex_unlock(&llss->inode1->i_mutex);
1971         }
1972
1973         if (llss->ia1.ia_valid != 0) {
1974                 int rc1;
1975
1976                 mutex_lock(&llss->inode2->i_mutex);
1977                 rc1 = ll_setattr(file2->f_dentry, &llss->ia1);
1978                 mutex_unlock(&llss->inode2->i_mutex);
1979                 if (rc == 0)
1980                         rc = rc1;
1981         }
1982
1983 free:
1984         if (llss != NULL)
1985                 OBD_FREE_PTR(llss);
1986
1987         RETURN(rc);
1988 }
1989
1990 long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
1991 {
1992         struct inode            *inode = file->f_dentry->d_inode;
1993         struct ll_file_data     *fd = LUSTRE_FPRIVATE(file);
1994         int                      flags, rc;
1995         ENTRY;
1996
1997         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1998                inode->i_generation, inode, cmd);
1999         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2000
2001         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2002         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2003                 RETURN(-ENOTTY);
2004
2005         switch(cmd) {
2006         case LL_IOC_GETFLAGS:
2007                 /* Get the current value of the file flags */
2008                 return put_user(fd->fd_flags, (int *)arg);
2009         case LL_IOC_SETFLAGS:
2010         case LL_IOC_CLRFLAGS:
2011                 /* Set or clear specific file flags */
2012                 /* XXX This probably needs checks to ensure the flags are
2013                  *     not abused, and to handle any flag side effects.
2014                  */
2015                 if (get_user(flags, (int *) arg))
2016                         RETURN(-EFAULT);
2017
2018                 if (cmd == LL_IOC_SETFLAGS) {
2019                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2020                             !(file->f_flags & O_DIRECT)) {
2021                                 CERROR("%s: unable to disable locking on "
2022                                        "non-O_DIRECT file\n", current->comm);
2023                                 RETURN(-EINVAL);
2024                         }
2025
2026                         fd->fd_flags |= flags;
2027                 } else {
2028                         fd->fd_flags &= ~flags;
2029                 }
2030                 RETURN(0);
2031         case LL_IOC_LOV_SETSTRIPE:
2032                 RETURN(ll_lov_setstripe(inode, file, arg));
2033         case LL_IOC_LOV_SETEA:
2034                 RETURN(ll_lov_setea(inode, file, arg));
2035         case LL_IOC_LOV_SWAP_LAYOUTS: {
2036                 struct file *file2;
2037                 struct lustre_swap_layouts lsl;
2038
2039                 if (copy_from_user(&lsl, (char *)arg,
2040                                        sizeof(struct lustre_swap_layouts)))
2041                         RETURN(-EFAULT);
2042
2043                 if ((file->f_flags & O_ACCMODE) == 0) /* O_RDONLY */
2044                         RETURN(-EPERM);
2045
2046                 file2 = fget(lsl.sl_fd);
2047                 if (file2 == NULL)
2048                         RETURN(-EBADF);
2049
2050                 rc = -EPERM;
2051                 if ((file2->f_flags & O_ACCMODE) != 0) /* O_WRONLY or O_RDWR */
2052                         rc = ll_swap_layouts(file, file2, &lsl);
2053                 fput(file2);
2054                 RETURN(rc);
2055         }
2056         case LL_IOC_LOV_GETSTRIPE:
2057                 RETURN(ll_lov_getstripe(inode, arg));
2058         case LL_IOC_RECREATE_OBJ:
2059                 RETURN(ll_lov_recreate_obj(inode, arg));
2060         case LL_IOC_RECREATE_FID:
2061                 RETURN(ll_lov_recreate_fid(inode, arg));
2062         case FSFILT_IOC_FIEMAP:
2063                 RETURN(ll_ioctl_fiemap(inode, arg));
2064         case FSFILT_IOC_GETFLAGS:
2065         case FSFILT_IOC_SETFLAGS:
2066                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2067         case FSFILT_IOC_GETVERSION_OLD:
2068         case FSFILT_IOC_GETVERSION:
2069                 RETURN(put_user(inode->i_generation, (int *)arg));
2070         case LL_IOC_GROUP_LOCK:
2071                 RETURN(ll_get_grouplock(inode, file, arg));
2072         case LL_IOC_GROUP_UNLOCK:
2073                 RETURN(ll_put_grouplock(inode, file, arg));
2074         case IOC_OBD_STATFS:
2075                 RETURN(ll_obd_statfs(inode, (void *)arg));
2076
2077         /* We need to special case any other ioctls we want to handle,
2078          * to send them to the MDS/OST as appropriate and to properly
2079          * network encode the arg field.
2080         case FSFILT_IOC_SETVERSION_OLD:
2081         case FSFILT_IOC_SETVERSION:
2082         */
2083         case LL_IOC_FLUSHCTX:
2084                 RETURN(ll_flush_ctx(inode));
2085         case LL_IOC_PATH2FID: {
2086                 if (copy_to_user((void *)arg, ll_inode2fid(inode),
2087                                  sizeof(struct lu_fid)))
2088                         RETURN(-EFAULT);
2089
2090                 RETURN(0);
2091         }
2092         case OBD_IOC_FID2PATH:
2093                 RETURN(ll_fid2path(inode, (void *)arg));
2094         case LL_IOC_DATA_VERSION: {
2095                 struct ioc_data_version idv;
2096                 int                     rc;
2097
2098                 if (copy_from_user(&idv, (char *)arg, sizeof(idv)))
2099                         RETURN(-EFAULT);
2100
2101                 rc = ll_data_version(inode, &idv.idv_version,
2102                                 !(idv.idv_flags & LL_DV_NOFLUSH));
2103
2104                 if (rc == 0 && copy_to_user((char *) arg, &idv, sizeof(idv)))
2105                         RETURN(-EFAULT);
2106
2107                 RETURN(rc);
2108         }
2109
2110         case LL_IOC_GET_MDTIDX: {
2111                 int mdtidx;
2112
2113                 mdtidx = ll_get_mdt_idx(inode);
2114                 if (mdtidx < 0)
2115                         RETURN(mdtidx);
2116
2117                 if (put_user((int)mdtidx, (int*)arg))
2118                         RETURN(-EFAULT);
2119
2120                 RETURN(0);
2121         }
2122         case OBD_IOC_GETDTNAME:
2123         case OBD_IOC_GETMDNAME:
2124                 RETURN(ll_get_obd_name(inode, cmd, arg));
2125         case LL_IOC_HSM_STATE_GET: {
2126                 struct md_op_data       *op_data;
2127                 struct hsm_user_state   *hus;
2128                 int                      rc;
2129
2130                 OBD_ALLOC_PTR(hus);
2131                 if (hus == NULL)
2132                         RETURN(-ENOMEM);
2133
2134                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2135                                              LUSTRE_OPC_ANY, hus);
2136                 if (IS_ERR(op_data)) {
2137                         OBD_FREE_PTR(hus);
2138                         RETURN(PTR_ERR(op_data));
2139                 }
2140
2141                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2142                                    op_data, NULL);
2143
2144                 if (copy_to_user((void *)arg, hus, sizeof(*hus)))
2145                         rc = -EFAULT;
2146
2147                 ll_finish_md_op_data(op_data);
2148                 OBD_FREE_PTR(hus);
2149                 RETURN(rc);
2150         }
2151         case LL_IOC_HSM_STATE_SET: {
2152                 struct md_op_data       *op_data;
2153                 struct hsm_state_set    *hss;
2154                 int                      rc;
2155
2156                 OBD_ALLOC_PTR(hss);
2157                 if (hss == NULL)
2158                         RETURN(-ENOMEM);
2159                 if (copy_from_user(hss, (char *)arg, sizeof(*hss))) {
2160                         OBD_FREE_PTR(hss);
2161                         RETURN(-EFAULT);
2162                 }
2163
2164                 /* Non-root users are forbidden to set or clear flags which are
2165                  * NOT defined in HSM_USER_MASK. */
2166                 if (((hss->hss_setmask | hss->hss_clearmask) & ~HSM_USER_MASK)
2167                     && !cfs_capable(CFS_CAP_SYS_ADMIN)) {
2168                         OBD_FREE_PTR(hss);
2169                         RETURN(-EPERM);
2170                 }
2171
2172                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2173                                              LUSTRE_OPC_ANY, hss);
2174                 if (IS_ERR(op_data)) {
2175                         OBD_FREE_PTR(hss);
2176                         RETURN(PTR_ERR(op_data));
2177                 }
2178
2179                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2180                                    op_data, NULL);
2181
2182                 ll_finish_md_op_data(op_data);
2183
2184                 OBD_FREE_PTR(hss);
2185                 RETURN(rc);
2186         }
2187         case LL_IOC_HSM_ACTION: {
2188                 struct md_op_data               *op_data;
2189                 struct hsm_current_action       *hca;
2190                 int                              rc;
2191
2192                 OBD_ALLOC_PTR(hca);
2193                 if (hca == NULL)
2194                         RETURN(-ENOMEM);
2195
2196                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2197                                              LUSTRE_OPC_ANY, hca);
2198                 if (IS_ERR(op_data)) {
2199                         OBD_FREE_PTR(hca);
2200                         RETURN(PTR_ERR(op_data));
2201                 }
2202
2203                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2204                                    op_data, NULL);
2205
2206                 if (copy_to_user((char *)arg, hca, sizeof(*hca)))
2207                         rc = -EFAULT;
2208
2209                 ll_finish_md_op_data(op_data);
2210                 OBD_FREE_PTR(hca);
2211                 RETURN(rc);
2212         }
2213         default: {
2214                 int err;
2215
2216                 if (LLIOC_STOP ==
2217                      ll_iocontrol_call(inode, file, cmd, arg, &err))
2218                         RETURN(err);
2219
2220                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2221                                      (void *)arg));
2222         }
2223         }
2224 }
2225
2226 #ifndef HAVE_FILE_LLSEEK_SIZE
2227 static inline loff_t
2228 llseek_execute(struct file *file, loff_t offset, loff_t maxsize)
2229 {
2230         if (offset < 0 && !(file->f_mode & FMODE_UNSIGNED_OFFSET))
2231                 return -EINVAL;
2232         if (offset > maxsize)
2233                 return -EINVAL;
2234
2235         if (offset != file->f_pos) {
2236                 file->f_pos = offset;
2237                 file->f_version = 0;
2238         }
2239         return offset;
2240 }
2241
2242 static loff_t
2243 generic_file_llseek_size(struct file *file, loff_t offset, int origin,
2244                 loff_t maxsize, loff_t eof)
2245 {
2246         struct inode *inode = file->f_dentry->d_inode;
2247
2248         switch (origin) {
2249         case SEEK_END:
2250                 offset += eof;
2251                 break;
2252         case SEEK_CUR:
2253                 /*
2254                  * Here we special-case the lseek(fd, 0, SEEK_CUR)
2255                  * position-querying operation.  Avoid rewriting the "same"
2256                  * f_pos value back to the file because a concurrent read(),
2257                  * write() or lseek() might have altered it
2258                  */
2259                 if (offset == 0)
2260                         return file->f_pos;
2261                 /*
2262                  * f_lock protects against read/modify/write race with other
2263                  * SEEK_CURs. Note that parallel writes and reads behave
2264                  * like SEEK_SET.
2265                  */
2266                 mutex_lock(&inode->i_mutex);
2267                 offset = llseek_execute(file, file->f_pos + offset, maxsize);
2268                 mutex_unlock(&inode->i_mutex);
2269                 return offset;
2270         case SEEK_DATA:
2271                 /*
2272                  * In the generic case the entire file is data, so as long as
2273                  * offset isn't at the end of the file then the offset is data.
2274                  */
2275                 if (offset >= eof)
2276                         return -ENXIO;
2277                 break;
2278         case SEEK_HOLE:
2279                 /*
2280                  * There is a virtual hole at the end of the file, so as long as
2281                  * offset isn't i_size or larger, return i_size.
2282                  */
2283                 if (offset >= eof)
2284                         return -ENXIO;
2285                 offset = eof;
2286                 break;
2287         }
2288
2289         return llseek_execute(file, offset, maxsize);
2290 }
2291 #endif
2292
2293 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2294 {
2295         struct inode *inode = file->f_dentry->d_inode;
2296         loff_t retval, eof = 0;
2297
2298         ENTRY;
2299         retval = offset + ((origin == SEEK_END) ? i_size_read(inode) :
2300                            (origin == SEEK_CUR) ? file->f_pos : 0);
2301         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%d)\n",
2302                inode->i_ino, inode->i_generation, inode, retval, retval,
2303                origin);
2304         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2305
2306         if (origin == SEEK_END || origin == SEEK_HOLE || origin == SEEK_DATA) {
2307                 retval = ll_glimpse_size(inode);
2308                 if (retval != 0)
2309                         RETURN(retval);
2310                 eof = i_size_read(inode);
2311         }
2312
2313         retval = ll_generic_file_llseek_size(file, offset, origin,
2314                                           ll_file_maxbytes(inode), eof);
2315         RETURN(retval);
2316 }
2317
2318 int ll_flush(struct file *file, fl_owner_t id)
2319 {
2320         struct inode *inode = file->f_dentry->d_inode;
2321         struct ll_inode_info *lli = ll_i2info(inode);
2322         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2323         int rc, err;
2324
2325         LASSERT(!S_ISDIR(inode->i_mode));
2326
2327         /* catch async errors that were recorded back when async writeback
2328          * failed for pages in this mapping. */
2329         rc = lli->lli_async_rc;
2330         lli->lli_async_rc = 0;
2331         err = lov_read_and_clear_async_rc(lli->lli_clob);
2332         if (rc == 0)
2333                 rc = err;
2334
2335         /* The application has been told write failure already.
2336          * Do not report failure again. */
2337         if (fd->fd_write_failed)
2338                 return 0;
2339         return rc ? -EIO : 0;
2340 }
2341
2342 /**
2343  * Called to make sure a portion of file has been written out.
2344  * if @local_only is not true, it will send OST_SYNC RPCs to ost.
2345  *
2346  * Return how many pages have been written.
2347  */
2348 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2349                        enum cl_fsync_mode mode, int ignore_layout)
2350 {
2351         struct cl_env_nest nest;
2352         struct lu_env *env;
2353         struct cl_io *io;
2354         struct obd_capa *capa = NULL;
2355         struct cl_fsync_io *fio;
2356         int result;
2357         ENTRY;
2358
2359         if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2360             mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2361                 RETURN(-EINVAL);
2362
2363         env = cl_env_nested_get(&nest);
2364         if (IS_ERR(env))
2365                 RETURN(PTR_ERR(env));
2366
2367         capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2368
2369         io = ccc_env_thread_io(env);
2370         io->ci_obj = cl_i2info(inode)->lli_clob;
2371         io->ci_ignore_layout = ignore_layout;
2372
2373         /* initialize parameters for sync */
2374         fio = &io->u.ci_fsync;
2375         fio->fi_capa = capa;
2376         fio->fi_start = start;
2377         fio->fi_end = end;
2378         fio->fi_fid = ll_inode2fid(inode);
2379         fio->fi_mode = mode;
2380         fio->fi_nr_written = 0;
2381
2382         if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2383                 result = cl_io_loop(env, io);
2384         else
2385                 result = io->ci_result;
2386         if (result == 0)
2387                 result = fio->fi_nr_written;
2388         cl_io_fini(env, io);
2389         cl_env_nested_put(&nest, env);
2390
2391         capa_put(capa);
2392
2393         RETURN(result);
2394 }
2395
2396 /*
2397  * When dentry is provided (the 'else' case), *file->f_dentry may be
2398  * null and dentry must be used directly rather than pulled from
2399  * *file->f_dentry as is done otherwise.
2400  */
2401
2402 #ifdef HAVE_FILE_FSYNC_4ARGS
2403 int ll_fsync(struct file *file, loff_t start, loff_t end, int datasync)
2404 {
2405         struct dentry *dentry = file->f_dentry;
2406 #elif defined(HAVE_FILE_FSYNC_2ARGS)
2407 int ll_fsync(struct file *file, int datasync)
2408 {
2409         struct dentry *dentry = file->f_dentry;
2410 #else
2411 int ll_fsync(struct file *file, struct dentry *dentry, int datasync)
2412 {
2413 #endif
2414         struct inode *inode = dentry->d_inode;
2415         struct ll_inode_info *lli = ll_i2info(inode);
2416         struct ptlrpc_request *req;
2417         struct obd_capa *oc;
2418         int rc, err;
2419         ENTRY;
2420
2421         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2422                inode->i_generation, inode);
2423         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2424
2425 #ifdef HAVE_FILE_FSYNC_4ARGS
2426         rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
2427         mutex_lock(&inode->i_mutex);
2428 #else
2429         /* fsync's caller has already called _fdata{sync,write}, we want
2430          * that IO to finish before calling the osc and mdc sync methods */
2431         rc = filemap_fdatawait(inode->i_mapping);
2432 #endif
2433
2434         /* catch async errors that were recorded back when async writeback
2435          * failed for pages in this mapping. */
2436         if (!S_ISDIR(inode->i_mode)) {
2437                 err = lli->lli_async_rc;
2438                 lli->lli_async_rc = 0;
2439                 if (rc == 0)
2440                         rc = err;
2441                 err = lov_read_and_clear_async_rc(lli->lli_clob);
2442                 if (rc == 0)
2443                         rc = err;
2444         }
2445
2446         oc = ll_mdscapa_get(inode);
2447         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2448                       &req);
2449         capa_put(oc);
2450         if (!rc)
2451                 rc = err;
2452         if (!err)
2453                 ptlrpc_req_finished(req);
2454
2455         if (datasync && S_ISREG(inode->i_mode)) {
2456                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2457
2458                 err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
2459                                 CL_FSYNC_ALL, 0);
2460                 if (rc == 0 && err < 0)
2461                         rc = err;
2462                 if (rc < 0)
2463                         fd->fd_write_failed = true;
2464                 else
2465                         fd->fd_write_failed = false;
2466         }
2467
2468 #ifdef HAVE_FILE_FSYNC_4ARGS
2469         mutex_unlock(&inode->i_mutex);
2470 #endif
2471         RETURN(rc);
2472 }
2473
2474 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2475 {
2476         struct inode *inode = file->f_dentry->d_inode;
2477         struct ll_sb_info *sbi = ll_i2sbi(inode);
2478         struct ldlm_enqueue_info einfo = {
2479                 .ei_type        = LDLM_FLOCK,
2480                 .ei_cb_cp       = ldlm_flock_completion_ast,
2481                 .ei_cbdata      = file_lock,
2482         };
2483         struct md_op_data *op_data;
2484         struct lustre_handle lockh = {0};
2485         ldlm_policy_data_t flock = {{0}};
2486         int flags = 0;
2487         int rc;
2488         int rc2 = 0;
2489         ENTRY;
2490
2491         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2492                inode->i_ino, file_lock);
2493
2494         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2495
2496         if (file_lock->fl_flags & FL_FLOCK) {
2497                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2498                 /* flocks are whole-file locks */
2499                 flock.l_flock.end = OFFSET_MAX;
2500                 /* For flocks owner is determined by the local file desctiptor*/
2501                 flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2502         } else if (file_lock->fl_flags & FL_POSIX) {
2503                 flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2504                 flock.l_flock.start = file_lock->fl_start;
2505                 flock.l_flock.end = file_lock->fl_end;
2506         } else {
2507                 RETURN(-EINVAL);
2508         }
2509         flock.l_flock.pid = file_lock->fl_pid;
2510
2511         /* Somewhat ugly workaround for svc lockd.
2512          * lockd installs custom fl_lmops->lm_compare_owner that checks
2513          * for the fl_owner to be the same (which it always is on local node
2514          * I guess between lockd processes) and then compares pid.
2515          * As such we assign pid to the owner field to make it all work,
2516          * conflict with normal locks is unlikely since pid space and
2517          * pointer space for current->files are not intersecting */
2518         if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
2519                 flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2520
2521         switch (file_lock->fl_type) {
2522         case F_RDLCK:
2523                 einfo.ei_mode = LCK_PR;
2524                 break;
2525         case F_UNLCK:
2526                 /* An unlock request may or may not have any relation to
2527                  * existing locks so we may not be able to pass a lock handle
2528                  * via a normal ldlm_lock_cancel() request. The request may even
2529                  * unlock a byte range in the middle of an existing lock. In
2530                  * order to process an unlock request we need all of the same
2531                  * information that is given with a normal read or write record
2532                  * lock request. To avoid creating another ldlm unlock (cancel)
2533                  * message we'll treat a LCK_NL flock request as an unlock. */
2534                 einfo.ei_mode = LCK_NL;
2535                 break;
2536         case F_WRLCK:
2537                 einfo.ei_mode = LCK_PW;
2538                 break;
2539         default:
2540                 CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2541                         file_lock->fl_type);
2542                 RETURN (-ENOTSUPP);
2543         }
2544
2545         switch (cmd) {
2546         case F_SETLKW:
2547 #ifdef F_SETLKW64
2548         case F_SETLKW64:
2549 #endif
2550                 flags = 0;
2551                 break;
2552         case F_SETLK:
2553 #ifdef F_SETLK64
2554         case F_SETLK64:
2555 #endif
2556                 flags = LDLM_FL_BLOCK_NOWAIT;
2557                 break;
2558         case F_GETLK:
2559 #ifdef F_GETLK64
2560         case F_GETLK64:
2561 #endif
2562                 flags = LDLM_FL_TEST_LOCK;
2563                 /* Save the old mode so that if the mode in the lock changes we
2564                  * can decrement the appropriate reader or writer refcount. */
2565                 file_lock->fl_type = einfo.ei_mode;
2566                 break;
2567         default:
2568                 CERROR("unknown fcntl lock command: %d\n", cmd);
2569                 RETURN (-EINVAL);
2570         }
2571
2572         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2573                                      LUSTRE_OPC_ANY, NULL);
2574         if (IS_ERR(op_data))
2575                 RETURN(PTR_ERR(op_data));
2576
2577         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2578                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2579                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2580
2581         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2582                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2583
2584         if ((file_lock->fl_flags & FL_FLOCK) &&
2585             (rc == 0 || file_lock->fl_type == F_UNLCK))
2586                 rc2  = flock_lock_file_wait(file, file_lock);
2587         if ((file_lock->fl_flags & FL_POSIX) &&
2588             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2589             !(flags & LDLM_FL_TEST_LOCK))
2590                 rc2  = posix_lock_file_wait(file, file_lock);
2591
2592         if (rc2 && file_lock->fl_type != F_UNLCK) {
2593                 einfo.ei_mode = LCK_NL;
2594                 md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2595                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2596                 rc = rc2;
2597         }
2598
2599         ll_finish_md_op_data(op_data);
2600
2601         RETURN(rc);
2602 }
2603
2604 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2605 {
2606         ENTRY;
2607
2608         RETURN(-ENOSYS);
2609 }
2610
2611 /**
2612  * test if some locks matching bits and l_req_mode are acquired
2613  * - bits can be in different locks
2614  * - if found clear the common lock bits in *bits
2615  * - the bits not found, are kept in *bits
2616  * \param inode [IN]
2617  * \param bits [IN] searched lock bits [IN]
2618  * \param l_req_mode [IN] searched lock mode
2619  * \retval boolean, true iff all bits are found
2620  */
2621 int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2622 {
2623         struct lustre_handle lockh;
2624         ldlm_policy_data_t policy;
2625         ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
2626                                 (LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
2627         struct lu_fid *fid;
2628         __u64 flags;
2629         int i;
2630         ENTRY;
2631
2632         if (!inode)
2633                RETURN(0);
2634
2635         fid = &ll_i2info(inode)->lli_fid;
2636         CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
2637                ldlm_lockname[mode]);
2638
2639         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2640         for (i = 0; i <= MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
2641                 policy.l_inodebits.bits = *bits & (1 << i);
2642                 if (policy.l_inodebits.bits == 0)
2643                         continue;
2644
2645                 if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
2646                                   &policy, mode, &lockh)) {
2647                         struct ldlm_lock *lock;
2648
2649                         lock = ldlm_handle2lock(&lockh);
2650                         if (lock) {
2651                                 *bits &=
2652                                       ~(lock->l_policy_data.l_inodebits.bits);
2653                                 LDLM_LOCK_PUT(lock);
2654                         } else {
2655                                 *bits &= ~policy.l_inodebits.bits;
2656                         }
2657                 }
2658         }
2659         RETURN(*bits == 0);
2660 }
2661
2662 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2663                             struct lustre_handle *lockh, __u64 flags)
2664 {
2665         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2666         struct lu_fid *fid;
2667         ldlm_mode_t rc;
2668         ENTRY;
2669
2670         fid = &ll_i2info(inode)->lli_fid;
2671         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2672
2673         rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
2674                            fid, LDLM_IBITS, &policy,
2675                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2676         RETURN(rc);
2677 }
2678
2679 static int ll_inode_revalidate_fini(struct inode *inode, int rc)
2680 {
2681         /* Already unlinked. Just update nlink and return success */
2682         if (rc == -ENOENT) {
2683                 clear_nlink(inode);
2684                 /* This path cannot be hit for regular files unless in
2685                  * case of obscure races, so no need to to validate
2686                  * size. */
2687                 if (!S_ISREG(inode->i_mode) && !S_ISDIR(inode->i_mode))
2688                         return 0;
2689         } else if (rc != 0) {
2690                 CERROR("%s: revalidate FID "DFID" error: rc = %d\n",
2691                        ll_get_fsname(inode->i_sb, NULL, 0),
2692                        PFID(ll_inode2fid(inode)), rc);
2693         }
2694
2695         return rc;
2696 }
2697
2698 int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2699                              __u64 ibits)
2700 {
2701         struct inode *inode = dentry->d_inode;
2702         struct ptlrpc_request *req = NULL;
2703         struct obd_export *exp;
2704         int rc = 0;
2705         ENTRY;
2706
2707         LASSERT(inode != NULL);
2708
2709         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2710                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2711
2712         exp = ll_i2mdexp(inode);
2713
2714         /* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
2715          *      But under CMD case, it caused some lock issues, should be fixed
2716          *      with new CMD ibits lock. See bug 12718 */
2717         if (exp_connect_flags(exp) & OBD_CONNECT_ATTRFID) {
2718                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2719                 struct md_op_data *op_data;
2720
2721                 if (ibits == MDS_INODELOCK_LOOKUP)
2722                         oit.it_op = IT_LOOKUP;
2723
2724                 /* Call getattr by fid, so do not provide name at all. */
2725                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2726                                              dentry->d_inode, NULL, 0, 0,
2727                                              LUSTRE_OPC_ANY, NULL);
2728                 if (IS_ERR(op_data))
2729                         RETURN(PTR_ERR(op_data));
2730
2731                 oit.it_create_mode |= M_CHECK_STALE;
2732                 rc = md_intent_lock(exp, op_data, NULL, 0,
2733                                     /* we are not interested in name
2734                                        based lookup */
2735                                     &oit, 0, &req,
2736                                     ll_md_blocking_ast, 0);
2737                 ll_finish_md_op_data(op_data);
2738                 oit.it_create_mode &= ~M_CHECK_STALE;
2739                 if (rc < 0) {
2740                         rc = ll_inode_revalidate_fini(inode, rc);
2741                         GOTO (out, rc);
2742                 }
2743
2744                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2745                 if (rc != 0) {
2746                         ll_intent_release(&oit);
2747                         GOTO(out, rc);
2748                 }
2749
2750                 /* Unlinked? Unhash dentry, so it is not picked up later by
2751                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2752                    here to preserve get_cwd functionality on 2.6.
2753                    Bug 10503 */
2754                 if (!dentry->d_inode->i_nlink)
2755                         d_lustre_invalidate(dentry, 0);
2756
2757                 ll_lookup_finish_locks(&oit, dentry);
2758         } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
2759                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2760                 obd_valid valid = OBD_MD_FLGETATTR;
2761                 struct md_op_data *op_data;
2762                 int ealen = 0;
2763
2764                 if (S_ISREG(inode->i_mode)) {
2765                         rc = ll_get_max_mdsize(sbi, &ealen);
2766                         if (rc)
2767                                 RETURN(rc);
2768                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2769                 }
2770
2771                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
2772                                              0, ealen, LUSTRE_OPC_ANY,
2773                                              NULL);
2774                 if (IS_ERR(op_data))
2775                         RETURN(PTR_ERR(op_data));
2776
2777                 op_data->op_valid = valid;
2778                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2779                  * capa for this inode. Because we only keep capas of dirs
2780                  * fresh. */
2781                 rc = md_getattr(sbi->ll_md_exp, op_data, &req);
2782                 ll_finish_md_op_data(op_data);
2783                 if (rc) {
2784                         rc = ll_inode_revalidate_fini(inode, rc);
2785                         RETURN(rc);
2786                 }
2787
2788                 rc = ll_prep_inode(&inode, req, NULL, NULL);
2789         }
2790 out:
2791         ptlrpc_req_finished(req);
2792         return rc;
2793 }
2794
2795 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2796                            __u64 ibits)
2797 {
2798         struct inode    *inode = dentry->d_inode;
2799         int              rc;
2800         ENTRY;
2801
2802         rc = __ll_inode_revalidate_it(dentry, it, ibits);
2803         if (rc != 0)
2804                 RETURN(rc);
2805
2806         /* if object isn't regular file, don't validate size */
2807         if (!S_ISREG(inode->i_mode)) {
2808                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
2809                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
2810                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
2811         } else {
2812                 /* In case of restore, the MDT has the right size and has
2813                  * already send it back without granting the layout lock,
2814                  * inode is up-to-date so glimpse is useless.
2815                  * Also to glimpse we need the layout, in case of a running
2816                  * restore the MDT holds the layout lock so the glimpse will
2817                  * block up to the end of restore (getattr will block)
2818                  */
2819                 if (!(ll_i2info(inode)->lli_flags & LLIF_FILE_RESTORING))
2820                         rc = ll_glimpse_size(inode);
2821         }
2822         RETURN(rc);
2823 }
2824
2825 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2826                   struct lookup_intent *it, struct kstat *stat)
2827 {
2828         struct inode *inode = de->d_inode;
2829         struct ll_sb_info *sbi = ll_i2sbi(inode);
2830         struct ll_inode_info *lli = ll_i2info(inode);
2831         int res = 0;
2832
2833         res = ll_inode_revalidate_it(de, it, MDS_INODELOCK_UPDATE |
2834                                              MDS_INODELOCK_LOOKUP);
2835         ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
2836
2837         if (res)
2838                 return res;
2839
2840         stat->dev = inode->i_sb->s_dev;
2841         if (ll_need_32bit_api(sbi))
2842                 stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
2843         else
2844                 stat->ino = inode->i_ino;
2845         stat->mode = inode->i_mode;
2846         stat->nlink = inode->i_nlink;
2847         stat->uid = inode->i_uid;
2848         stat->gid = inode->i_gid;
2849         stat->rdev = inode->i_rdev;
2850         stat->atime = inode->i_atime;
2851         stat->mtime = inode->i_mtime;
2852         stat->ctime = inode->i_ctime;
2853         stat->blksize = 1 << inode->i_blkbits;
2854
2855         stat->size = i_size_read(inode);
2856         stat->blocks = inode->i_blocks;
2857
2858         return 0;
2859 }
2860 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2861 {
2862         struct lookup_intent it = { .it_op = IT_GETATTR };
2863
2864         return ll_getattr_it(mnt, de, &it, stat);
2865 }
2866
2867 #ifdef HAVE_LINUX_FIEMAP_H
2868 int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
2869                 __u64 start, __u64 len)
2870 {
2871         int rc;
2872         size_t num_bytes;
2873         struct ll_user_fiemap *fiemap;
2874         unsigned int extent_count = fieinfo->fi_extents_max;
2875
2876         num_bytes = sizeof(*fiemap) + (extent_count *
2877                                        sizeof(struct ll_fiemap_extent));
2878         OBD_ALLOC_LARGE(fiemap, num_bytes);
2879
2880         if (fiemap == NULL)
2881                 RETURN(-ENOMEM);
2882
2883         fiemap->fm_flags = fieinfo->fi_flags;
2884         fiemap->fm_extent_count = fieinfo->fi_extents_max;
2885         fiemap->fm_start = start;
2886         fiemap->fm_length = len;
2887         memcpy(&fiemap->fm_extents[0], fieinfo->fi_extents_start,
2888                sizeof(struct ll_fiemap_extent));
2889
2890         rc = ll_do_fiemap(inode, fiemap, num_bytes);
2891
2892         fieinfo->fi_flags = fiemap->fm_flags;
2893         fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
2894         memcpy(fieinfo->fi_extents_start, &fiemap->fm_extents[0],
2895                fiemap->fm_mapped_extents * sizeof(struct ll_fiemap_extent));
2896
2897         OBD_FREE_LARGE(fiemap, num_bytes);
2898         return rc;
2899 }
2900 #endif
2901
2902 struct posix_acl * ll_get_acl(struct inode *inode, int type)
2903 {
2904         struct ll_inode_info *lli = ll_i2info(inode);
2905         struct posix_acl *acl = NULL;
2906         ENTRY;
2907
2908         spin_lock(&lli->lli_lock);
2909         /* VFS' acl_permission_check->check_acl will release the refcount */
2910         acl = posix_acl_dup(lli->lli_posix_acl);
2911         spin_unlock(&lli->lli_lock);
2912
2913         RETURN(acl);
2914 }
2915
2916 #ifndef HAVE_GENERIC_PERMISSION_2ARGS
2917 static int
2918 # ifdef HAVE_GENERIC_PERMISSION_4ARGS
2919 ll_check_acl(struct inode *inode, int mask, unsigned int flags)
2920 # else
2921 ll_check_acl(struct inode *inode, int mask)
2922 # endif
2923 {
2924 # ifdef CONFIG_FS_POSIX_ACL
2925         struct posix_acl *acl;
2926         int rc;
2927         ENTRY;
2928
2929 #  ifdef HAVE_GENERIC_PERMISSION_4ARGS
2930         if (flags & IPERM_FLAG_RCU)
2931                 return -ECHILD;
2932 #  endif
2933         acl = ll_get_acl(inode, ACL_TYPE_ACCESS);
2934
2935         if (!acl)
2936                 RETURN(-EAGAIN);
2937
2938         rc = posix_acl_permission(inode, acl, mask);
2939         posix_acl_release(acl);
2940
2941         RETURN(rc);
2942 # else /* !CONFIG_FS_POSIX_ACL */
2943         return -EAGAIN;
2944 # endif /* CONFIG_FS_POSIX_ACL */
2945 }
2946 #endif /* HAVE_GENERIC_PERMISSION_2ARGS */
2947
2948 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
2949 int ll_inode_permission(struct inode *inode, int mask, unsigned int flags)
2950 #else
2951 # ifdef HAVE_INODE_PERMISION_2ARGS
2952 int ll_inode_permission(struct inode *inode, int mask)
2953 # else
2954 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2955 # endif
2956 #endif
2957 {
2958         int rc = 0;
2959         ENTRY;
2960
2961 #ifdef MAY_NOT_BLOCK
2962         if (mask & MAY_NOT_BLOCK)
2963                 return -ECHILD;
2964 #elif defined(HAVE_GENERIC_PERMISSION_4ARGS)
2965         if (flags & IPERM_FLAG_RCU)
2966                 return -ECHILD;
2967 #endif
2968
2969        /* as root inode are NOT getting validated in lookup operation,
2970         * need to do it before permission check. */
2971
2972         if (inode == inode->i_sb->s_root->d_inode) {
2973                 struct lookup_intent it = { .it_op = IT_LOOKUP };
2974
2975                 rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
2976                                               MDS_INODELOCK_LOOKUP);
2977                 if (rc)
2978                         RETURN(rc);
2979         }
2980
2981         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
2982                inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
2983
2984         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2985                 return lustre_check_remote_perm(inode, mask);
2986
2987         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2988         rc = ll_generic_permission(inode, mask, flags, ll_check_acl);
2989
2990         RETURN(rc);
2991 }
2992
2993 #ifdef HAVE_FILE_READV
2994 #define READ_METHOD readv
2995 #define READ_FUNCTION ll_file_readv
2996 #define WRITE_METHOD writev
2997 #define WRITE_FUNCTION ll_file_writev
2998 #else
2999 #define READ_METHOD aio_read
3000 #define READ_FUNCTION ll_file_aio_read
3001 #define WRITE_METHOD aio_write
3002 #define WRITE_FUNCTION ll_file_aio_write
3003 #endif
3004
3005 /* -o localflock - only provides locally consistent flock locks */
3006 struct file_operations ll_file_operations = {
3007         .read           = ll_file_read,
3008         .READ_METHOD    = READ_FUNCTION,
3009         .write          = ll_file_write,
3010         .WRITE_METHOD   = WRITE_FUNCTION,
3011         .unlocked_ioctl = ll_file_ioctl,
3012         .open           = ll_file_open,
3013         .release        = ll_file_release,
3014         .mmap           = ll_file_mmap,
3015         .llseek         = ll_file_seek,
3016         .splice_read    = ll_file_splice_read,
3017         .fsync          = ll_fsync,
3018         .flush          = ll_flush
3019 };
3020
3021 struct file_operations ll_file_operations_flock = {
3022         .read           = ll_file_read,
3023         .READ_METHOD    = READ_FUNCTION,
3024         .write          = ll_file_write,
3025         .WRITE_METHOD   = WRITE_FUNCTION,
3026         .unlocked_ioctl = ll_file_ioctl,
3027         .open           = ll_file_open,
3028         .release        = ll_file_release,
3029         .mmap           = ll_file_mmap,
3030         .llseek         = ll_file_seek,
3031         .splice_read    = ll_file_splice_read,
3032         .fsync          = ll_fsync,
3033         .flush          = ll_flush,
3034         .flock          = ll_file_flock,
3035         .lock           = ll_file_flock
3036 };
3037
3038 /* These are for -o noflock - to return ENOSYS on flock calls */
3039 struct file_operations ll_file_operations_noflock = {
3040         .read           = ll_file_read,
3041         .READ_METHOD    = READ_FUNCTION,
3042         .write          = ll_file_write,
3043         .WRITE_METHOD   = WRITE_FUNCTION,
3044         .unlocked_ioctl = ll_file_ioctl,
3045         .open           = ll_file_open,
3046         .release        = ll_file_release,
3047         .mmap           = ll_file_mmap,
3048         .llseek         = ll_file_seek,
3049         .splice_read    = ll_file_splice_read,
3050         .fsync          = ll_fsync,
3051         .flush          = ll_flush,
3052         .flock          = ll_file_noflock,
3053         .lock           = ll_file_noflock
3054 };
3055
3056 struct inode_operations ll_file_inode_operations = {
3057         .setattr        = ll_setattr,
3058         .getattr        = ll_getattr,
3059         .permission     = ll_inode_permission,
3060         .setxattr       = ll_setxattr,
3061         .getxattr       = ll_getxattr,
3062         .listxattr      = ll_listxattr,
3063         .removexattr    = ll_removexattr,
3064 #ifdef  HAVE_LINUX_FIEMAP_H
3065         .fiemap         = ll_fiemap,
3066 #endif
3067 #ifdef HAVE_IOP_GET_ACL
3068         .get_acl        = ll_get_acl,
3069 #endif
3070 };
3071
3072 /* dynamic ioctl number support routins */
3073 static struct llioc_ctl_data {
3074         struct rw_semaphore     ioc_sem;
3075         cfs_list_t              ioc_head;
3076 } llioc = {
3077         __RWSEM_INITIALIZER(llioc.ioc_sem),
3078         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3079 };
3080
3081
3082 struct llioc_data {
3083         cfs_list_t              iocd_list;
3084         unsigned int            iocd_size;
3085         llioc_callback_t        iocd_cb;
3086         unsigned int            iocd_count;
3087         unsigned int            iocd_cmd[0];
3088 };
3089
3090 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3091 {
3092         unsigned int size;
3093         struct llioc_data *in_data = NULL;
3094         ENTRY;
3095
3096         if (cb == NULL || cmd == NULL ||
3097             count > LLIOC_MAX_CMD || count < 0)
3098                 RETURN(NULL);
3099
3100         size = sizeof(*in_data) + count * sizeof(unsigned int);
3101         OBD_ALLOC(in_data, size);
3102         if (in_data == NULL)
3103                 RETURN(NULL);
3104
3105         memset(in_data, 0, sizeof(*in_data));
3106         in_data->iocd_size = size;
3107         in_data->iocd_cb = cb;
3108         in_data->iocd_count = count;
3109         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3110
3111         down_write(&llioc.ioc_sem);
3112         cfs_list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3113         up_write(&llioc.ioc_sem);
3114
3115         RETURN(in_data);
3116 }
3117
3118 void ll_iocontrol_unregister(void *magic)
3119 {
3120         struct llioc_data *tmp;
3121
3122         if (magic == NULL)
3123                 return;
3124
3125         down_write(&llioc.ioc_sem);
3126         cfs_list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3127                 if (tmp == magic) {
3128                         unsigned int size = tmp->iocd_size;
3129
3130                         cfs_list_del(&tmp->iocd_list);
3131                         up_write(&llioc.ioc_sem);
3132
3133                         OBD_FREE(tmp, size);
3134                         return;
3135                 }
3136         }
3137         up_write(&llioc.ioc_sem);
3138
3139         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3140 }
3141
3142 EXPORT_SYMBOL(ll_iocontrol_register);
3143 EXPORT_SYMBOL(ll_iocontrol_unregister);
3144
3145 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
3146                         unsigned int cmd, unsigned long arg, int *rcp)
3147 {
3148         enum llioc_iter ret = LLIOC_CONT;
3149         struct llioc_data *data;
3150         int rc = -EINVAL, i;
3151
3152         down_read(&llioc.ioc_sem);
3153         cfs_list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3154                 for (i = 0; i < data->iocd_count; i++) {
3155                         if (cmd != data->iocd_cmd[i])
3156                                 continue;
3157
3158                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3159                         break;
3160                 }
3161
3162                 if (ret == LLIOC_STOP)
3163                         break;
3164         }
3165         up_read(&llioc.ioc_sem);
3166
3167         if (rcp)
3168                 *rcp = rc;
3169         return ret;
3170 }
3171
3172 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
3173 {
3174         struct ll_inode_info *lli = ll_i2info(inode);
3175         struct cl_env_nest nest;
3176         struct lu_env *env;
3177         int result;
3178         ENTRY;
3179
3180         if (lli->lli_clob == NULL)
3181                 RETURN(0);
3182
3183         env = cl_env_nested_get(&nest);
3184         if (IS_ERR(env))
3185                 RETURN(PTR_ERR(env));
3186
3187         result = cl_conf_set(env, lli->lli_clob, conf);
3188         cl_env_nested_put(&nest, env);
3189
3190         if (conf->coc_opc == OBJECT_CONF_SET) {
3191                 struct ldlm_lock *lock = conf->coc_lock;
3192
3193                 LASSERT(lock != NULL);
3194                 LASSERT(ldlm_has_layout(lock));
3195                 if (result == 0) {
3196                         /* it can only be allowed to match after layout is
3197                          * applied to inode otherwise false layout would be
3198                          * seen. Applying layout shoud happen before dropping
3199                          * the intent lock. */
3200                         ldlm_lock_allow_match(lock);
3201                 }
3202         }
3203         RETURN(result);
3204 }
3205
3206 /* Fetch layout from MDT with getxattr request, if it's not ready yet */
3207 static int ll_layout_fetch(struct inode *inode, struct ldlm_lock *lock)
3208
3209 {
3210         struct ll_sb_info *sbi = ll_i2sbi(inode);
3211         struct obd_capa *oc;
3212         struct ptlrpc_request *req;
3213         struct mdt_body *body;
3214         void *lvbdata;
3215         void *lmm;
3216         int lmmsize;
3217         int rc;
3218         ENTRY;
3219
3220         CDEBUG(D_INODE, DFID" LVB_READY=%d l_lvb_data=%p l_lvb_len=%d\n",
3221                PFID(ll_inode2fid(inode)), !!(lock->l_flags & LDLM_FL_LVB_READY),
3222                lock->l_lvb_data, lock->l_lvb_len);
3223
3224         if ((lock->l_lvb_data != NULL) && (lock->l_flags & LDLM_FL_LVB_READY))
3225                 RETURN(0);
3226
3227         /* if layout lock was granted right away, the layout is returned
3228          * within DLM_LVB of dlm reply; otherwise if the lock was ever
3229          * blocked and then granted via completion ast, we have to fetch
3230          * layout here. Please note that we can't use the LVB buffer in
3231          * completion AST because it doesn't have a large enough buffer */
3232         oc = ll_mdscapa_get(inode);
3233         rc = ll_get_max_mdsize(sbi, &lmmsize);
3234         if (rc == 0)
3235                 rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
3236                                 OBD_MD_FLXATTR, XATTR_NAME_LOV, NULL, 0,
3237                                 lmmsize, 0, &req);
3238         capa_put(oc);
3239         if (rc < 0)
3240                 RETURN(rc);
3241
3242         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
3243         if (body == NULL || body->eadatasize > lmmsize)
3244                 GOTO(out, rc = -EPROTO);
3245
3246         lmmsize = body->eadatasize;
3247         if (lmmsize == 0) /* empty layout */
3248                 GOTO(out, rc = 0);
3249
3250         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA, lmmsize);
3251         if (lmm == NULL)
3252                 GOTO(out, rc = -EFAULT);
3253
3254         OBD_ALLOC_LARGE(lvbdata, lmmsize);
3255         if (lvbdata == NULL)
3256                 GOTO(out, rc = -ENOMEM);
3257
3258         memcpy(lvbdata, lmm, lmmsize);
3259         lock_res_and_lock(lock);
3260         if (lock->l_lvb_data != NULL)
3261                 OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
3262
3263         lock->l_lvb_data = lvbdata;
3264         lock->l_lvb_len = lmmsize;
3265         unlock_res_and_lock(lock);
3266
3267         EXIT;
3268
3269 out:
3270         ptlrpc_req_finished(req);
3271         return rc;
3272 }
3273
3274 /**
3275  * Apply the layout to the inode. Layout lock is held and will be released
3276  * in this function.
3277  */
3278 static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
3279                                 struct inode *inode, __u32 *gen, bool reconf)
3280 {
3281         struct ll_inode_info *lli = ll_i2info(inode);
3282         struct ll_sb_info    *sbi = ll_i2sbi(inode);
3283         struct ldlm_lock *lock;
3284         struct lustre_md md = { NULL };
3285         struct cl_object_conf conf;
3286         int rc = 0;
3287         bool lvb_ready;
3288         bool wait_layout = false;
3289         ENTRY;
3290
3291         LASSERT(lustre_handle_is_used(lockh));
3292
3293         lock = ldlm_handle2lock(lockh);
3294         LASSERT(lock != NULL);
3295         LASSERT(ldlm_has_layout(lock));
3296
3297         LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n",
3298                    inode, PFID(&lli->lli_fid), reconf);
3299
3300         /* in case this is a caching lock and reinstate with new inode */
3301         md_set_lock_data(sbi->ll_md_exp, &lockh->cookie, inode, NULL);
3302
3303         lock_res_and_lock(lock);
3304         lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY);
3305         unlock_res_and_lock(lock);
3306         /* checking lvb_ready is racy but this is okay. The worst case is
3307          * that multi processes may configure the file on the same time. */
3308
3309         if (lvb_ready || !reconf) {
3310                 rc = -ENODATA;
3311                 if (lvb_ready) {
3312                         /* layout_gen must be valid if layout lock is not
3313                          * cancelled and stripe has already set */
3314                         *gen = lli->lli_layout_gen;
3315                         rc = 0;
3316                 }
3317                 GOTO(out, rc);
3318         }
3319
3320         rc = ll_layout_fetch(inode, lock);
3321         if (rc < 0)
3322                 GOTO(out, rc);
3323
3324         /* for layout lock, lmm is returned in lock's lvb.
3325          * lvb_data is immutable if the lock is held so it's safe to access it
3326          * without res lock. See the description in ldlm_lock_decref_internal()
3327          * for the condition to free lvb_data of layout lock */
3328         if (lock->l_lvb_data != NULL) {
3329                 rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
3330                                   lock->l_lvb_data, lock->l_lvb_len);
3331                 if (rc >= 0) {
3332                         *gen = LL_LAYOUT_GEN_EMPTY;
3333                         if (md.lsm != NULL)
3334                                 *gen = md.lsm->lsm_layout_gen;
3335                         rc = 0;
3336                 } else {
3337                         CERROR("%s: file "DFID" unpackmd error: %d\n",
3338                                 ll_get_fsname(inode->i_sb, NULL, 0),
3339                                 PFID(&lli->lli_fid), rc);
3340                 }
3341         }
3342         if (rc < 0)
3343                 GOTO(out, rc);
3344
3345         /* set layout to file. Unlikely this will fail as old layout was
3346          * surely eliminated */
3347         memset(&conf, 0, sizeof conf);
3348         conf.coc_opc = OBJECT_CONF_SET;
3349         conf.coc_inode = inode;
3350         conf.coc_lock = lock;
3351         conf.u.coc_md = &md;
3352         rc = ll_layout_conf(inode, &conf);
3353
3354         if (md.lsm != NULL)
3355                 obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
3356
3357         /* refresh layout failed, need to wait */
3358         wait_layout = rc == -EBUSY;
3359         EXIT;
3360
3361 out:
3362         LDLM_LOCK_PUT(lock);
3363         ldlm_lock_decref(lockh, mode);
3364
3365         /* wait for IO to complete if it's still being used. */
3366         if (wait_layout) {
3367                 CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n",
3368                         ll_get_fsname(inode->i_sb, NULL, 0),
3369                         inode, PFID(&lli->lli_fid));
3370
3371                 memset(&conf, 0, sizeof conf);
3372                 conf.coc_opc = OBJECT_CONF_WAIT;
3373                 conf.coc_inode = inode;
3374                 rc = ll_layout_conf(inode, &conf);
3375                 if (rc == 0)
3376                         rc = -EAGAIN;
3377
3378                 CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n",
3379                         PFID(&lli->lli_fid), rc);
3380         }
3381         RETURN(rc);
3382 }
3383
3384 /**
3385  * This function checks if there exists a LAYOUT lock on the client side,
3386  * or enqueues it if it doesn't have one in cache.
3387  *
3388  * This function will not hold layout lock so it may be revoked any time after
3389  * this function returns. Any operations depend on layout should be redone
3390  * in that case.
3391  *
3392  * This function should be called before lov_io_init() to get an uptodate
3393  * layout version, the caller should save the version number and after IO
3394  * is finished, this function should be called again to verify that layout
3395  * is not changed during IO time.
3396  */
3397 int ll_layout_refresh(struct inode *inode, __u32 *gen)
3398 {
3399         struct ll_inode_info  *lli = ll_i2info(inode);
3400         struct ll_sb_info     *sbi = ll_i2sbi(inode);
3401         struct md_op_data     *op_data;
3402         struct lookup_intent   it;
3403         struct lustre_handle   lockh;
3404         ldlm_mode_t            mode;
3405         struct ldlm_enqueue_info einfo = {
3406                 .ei_type = LDLM_IBITS,
3407                 .ei_mode = LCK_CR,
3408                 .ei_cb_bl = ll_md_blocking_ast,
3409                 .ei_cb_cp = ldlm_completion_ast,
3410         };
3411         int rc;
3412         ENTRY;
3413
3414         *gen = lli->lli_layout_gen;
3415         if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
3416                 RETURN(0);
3417
3418         /* sanity checks */
3419         LASSERT(fid_is_sane(ll_inode2fid(inode)));
3420         LASSERT(S_ISREG(inode->i_mode));
3421
3422         /* mostly layout lock is caching on the local side, so try to match
3423          * it before grabbing layout lock mutex. */
3424         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
3425         if (mode != 0) { /* hit cached lock */
3426                 rc = ll_layout_lock_set(&lockh, mode, inode, gen, false);
3427                 if (rc == 0)
3428                         RETURN(0);
3429
3430                 /* better hold lli_layout_mutex to try again otherwise
3431                  * it will have starvation problem. */
3432         }
3433
3434         /* take layout lock mutex to enqueue layout lock exclusively. */
3435         mutex_lock(&lli->lli_layout_mutex);
3436
3437 again:
3438         /* try again. Maybe somebody else has done this. */
3439         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
3440         if (mode != 0) { /* hit cached lock */
3441                 rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3442                 if (rc == -EAGAIN)
3443                         goto again;
3444
3445                 mutex_unlock(&lli->lli_layout_mutex);
3446                 RETURN(rc);
3447         }
3448
3449         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
3450                         0, 0, LUSTRE_OPC_ANY, NULL);
3451         if (IS_ERR(op_data)) {
3452                 mutex_unlock(&lli->lli_layout_mutex);
3453                 RETURN(PTR_ERR(op_data));
3454         }
3455
3456         /* have to enqueue one */
3457         memset(&it, 0, sizeof(it));
3458         it.it_op = IT_LAYOUT;
3459         lockh.cookie = 0ULL;
3460
3461         LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file %p/"DFID".\n",
3462                         ll_get_fsname(inode->i_sb, NULL, 0), inode,
3463                         PFID(&lli->lli_fid));
3464
3465         rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
3466                         NULL, 0, NULL, 0);
3467         if (it.d.lustre.it_data != NULL)
3468                 ptlrpc_req_finished(it.d.lustre.it_data);
3469         it.d.lustre.it_data = NULL;
3470
3471         ll_finish_md_op_data(op_data);
3472
3473         mode = it.d.lustre.it_lock_mode;
3474         it.d.lustre.it_lock_mode = 0;
3475         ll_intent_drop_lock(&it);
3476
3477         if (rc == 0) {
3478                 /* set lock data in case this is a new lock */
3479                 ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
3480                 rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3481                 if (rc == -EAGAIN)
3482                         goto again;
3483         }
3484         mutex_unlock(&lli->lli_layout_mutex);
3485
3486         RETURN(rc);
3487 }
3488
3489 /**
3490  *  This function send a restore request to the MDT
3491  */
3492 int ll_layout_restore(struct inode *inode)
3493 {
3494         struct hsm_user_request *hur;
3495         int                      len, rc;
3496         ENTRY;
3497
3498         len = sizeof(struct hsm_user_request) +
3499               sizeof(struct hsm_user_item);
3500         OBD_ALLOC(hur, len);
3501         if (hur == NULL)
3502                 RETURN(-ENOMEM);
3503
3504         hur->hur_request.hr_action = HUA_RESTORE;
3505         hur->hur_request.hr_archive_id = 0;
3506         hur->hur_request.hr_flags = 0;
3507         memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid,
3508                sizeof(hur->hur_user_item[0].hui_fid));
3509         hur->hur_user_item[0].hui_extent.length = -1;
3510         hur->hur_request.hr_itemcount = 1;
3511         rc = obd_iocontrol(LL_IOC_HSM_REQUEST, cl_i2sbi(inode)->ll_md_exp,
3512                            len, hur, NULL);
3513         OBD_FREE(hur, len);
3514         RETURN(rc);
3515 }
3516