Whamcloud - gitweb
LU-2723 clio: a client hangs on osc_extent_wait in dio
[fs/lustre-release.git] / lustre / llite / file.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <linux/pagemap.h>
47 #include <linux/file.h>
48 #include "llite_internal.h"
49 #include <lustre/ll_fiemap.h>
50
51 #include "cl_object.h"
52
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, CFS_ALLOC_IO);
58         fd->fd_write_failed = false;
59         return fd;
60 }
61
62 static void ll_file_data_put(struct ll_file_data *fd)
63 {
64         if (fd != NULL)
65                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
66 }
67
68 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
69                           struct lustre_handle *fh)
70 {
71         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
72         op_data->op_attr.ia_mode = inode->i_mode;
73         op_data->op_attr.ia_atime = inode->i_atime;
74         op_data->op_attr.ia_mtime = inode->i_mtime;
75         op_data->op_attr.ia_ctime = inode->i_ctime;
76         op_data->op_attr.ia_size = i_size_read(inode);
77         op_data->op_attr_blocks = inode->i_blocks;
78         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
79                                         ll_inode_to_ext_flags(inode->i_flags);
80         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
81         if (fh)
82                 op_data->op_handle = *fh;
83         op_data->op_capa1 = ll_mdscapa_get(inode);
84
85         if (LLIF_DATA_MODIFIED & ll_i2info(inode)->lli_flags)
86                 op_data->op_bias |= MDS_DATA_MODIFIED;
87 }
88
89 /**
90  * Closes the IO epoch and packs all the attributes into @op_data for
91  * the CLOSE rpc.
92  */
93 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
94                              struct obd_client_handle *och)
95 {
96         ENTRY;
97
98         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
99                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
100
101         if (!(och->och_flags & FMODE_WRITE))
102                 goto out;
103
104         if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
105                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
106         else
107                 ll_ioepoch_close(inode, op_data, &och, 0);
108
109 out:
110         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
111         ll_prep_md_op_data(op_data, inode, NULL, NULL,
112                            0, 0, LUSTRE_OPC_ANY, NULL);
113         EXIT;
114 }
115
116 static int ll_close_inode_openhandle(struct obd_export *md_exp,
117                                      struct inode *inode,
118                                      struct obd_client_handle *och)
119 {
120         struct obd_export *exp = ll_i2mdexp(inode);
121         struct md_op_data *op_data;
122         struct ptlrpc_request *req = NULL;
123         struct obd_device *obd = class_exp2obd(exp);
124         int epoch_close = 1;
125         int rc;
126         ENTRY;
127
128         if (obd == NULL) {
129                 /*
130                  * XXX: in case of LMV, is this correct to access
131                  * ->exp_handle?
132                  */
133                 CERROR("Invalid MDC connection handle "LPX64"\n",
134                        ll_i2mdexp(inode)->exp_handle.h_cookie);
135                 GOTO(out, rc = 0);
136         }
137
138         OBD_ALLOC_PTR(op_data);
139         if (op_data == NULL)
140                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
141
142         ll_prepare_close(inode, op_data, och);
143         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
144         rc = md_close(md_exp, op_data, och->och_mod, &req);
145         if (rc == -EAGAIN) {
146                 /* This close must have the epoch closed. */
147                 LASSERT(epoch_close);
148                 /* MDS has instructed us to obtain Size-on-MDS attribute from
149                  * OSTs and send setattr to back to MDS. */
150                 rc = ll_som_update(inode, op_data);
151                 if (rc) {
152                         CERROR("inode %lu mdc Size-on-MDS update failed: "
153                                "rc = %d\n", inode->i_ino, rc);
154                         rc = 0;
155                 }
156         } else if (rc) {
157                 CERROR("inode %lu mdc close failed: rc = %d\n",
158                        inode->i_ino, rc);
159         }
160
161         /* DATA_MODIFIED flag was successfully sent on close, cancel data
162          * modification flag. */
163         if (rc == 0 && (op_data->op_bias & MDS_DATA_MODIFIED)) {
164                 struct ll_inode_info *lli = ll_i2info(inode);
165
166                 spin_lock(&lli->lli_lock);
167                 lli->lli_flags &= ~LLIF_DATA_MODIFIED;
168                 spin_unlock(&lli->lli_lock);
169         }
170
171         ll_finish_md_op_data(op_data);
172
173         if (rc == 0) {
174                 rc = ll_objects_destroy(req, inode);
175                 if (rc)
176                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
177                                inode->i_ino, rc);
178         }
179
180         EXIT;
181 out:
182
183         if (exp_connect_som(exp) && !epoch_close &&
184             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
185                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
186         } else {
187                 md_clear_open_replay_data(md_exp, och);
188                 /* Free @och if it is not waiting for DONE_WRITING. */
189                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
190                 OBD_FREE_PTR(och);
191         }
192         if (req) /* This is close request */
193                 ptlrpc_req_finished(req);
194         return rc;
195 }
196
197 int ll_md_real_close(struct inode *inode, int flags)
198 {
199         struct ll_inode_info *lli = ll_i2info(inode);
200         struct obd_client_handle **och_p;
201         struct obd_client_handle *och;
202         __u64 *och_usecount;
203         int rc = 0;
204         ENTRY;
205
206         if (flags & FMODE_WRITE) {
207                 och_p = &lli->lli_mds_write_och;
208                 och_usecount = &lli->lli_open_fd_write_count;
209         } else if (flags & FMODE_EXEC) {
210                 och_p = &lli->lli_mds_exec_och;
211                 och_usecount = &lli->lli_open_fd_exec_count;
212         } else {
213                 LASSERT(flags & FMODE_READ);
214                 och_p = &lli->lli_mds_read_och;
215                 och_usecount = &lli->lli_open_fd_read_count;
216         }
217
218         mutex_lock(&lli->lli_och_mutex);
219         if (*och_usecount) { /* There are still users of this handle, so
220                                 skip freeing it. */
221                 mutex_unlock(&lli->lli_och_mutex);
222                 RETURN(0);
223         }
224         och=*och_p;
225         *och_p = NULL;
226         mutex_unlock(&lli->lli_och_mutex);
227
228         if (och) { /* There might be a race and somebody have freed this och
229                       already */
230                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
231                                                inode, och);
232         }
233
234         RETURN(rc);
235 }
236
237 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
238                 struct file *file)
239 {
240         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
241         struct ll_inode_info *lli = ll_i2info(inode);
242         int rc = 0;
243         ENTRY;
244
245         /* clear group lock, if present */
246         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
247                 ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
248
249         /* Let's see if we have good enough OPEN lock on the file and if
250            we can skip talking to MDS */
251         if (file->f_dentry->d_inode) { /* Can this ever be false? */
252                 int lockmode;
253                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
254                 struct lustre_handle lockh;
255                 struct inode *inode = file->f_dentry->d_inode;
256                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
257
258                 mutex_lock(&lli->lli_och_mutex);
259                 if (fd->fd_omode & FMODE_WRITE) {
260                         lockmode = LCK_CW;
261                         LASSERT(lli->lli_open_fd_write_count);
262                         lli->lli_open_fd_write_count--;
263                 } else if (fd->fd_omode & FMODE_EXEC) {
264                         lockmode = LCK_PR;
265                         LASSERT(lli->lli_open_fd_exec_count);
266                         lli->lli_open_fd_exec_count--;
267                 } else {
268                         lockmode = LCK_CR;
269                         LASSERT(lli->lli_open_fd_read_count);
270                         lli->lli_open_fd_read_count--;
271                 }
272                 mutex_unlock(&lli->lli_och_mutex);
273
274                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
275                                    LDLM_IBITS, &policy, lockmode,
276                                    &lockh)) {
277                         rc = ll_md_real_close(file->f_dentry->d_inode,
278                                               fd->fd_omode);
279                 }
280         } else {
281                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
282                        file, file->f_dentry, file->f_dentry->d_name.name);
283         }
284
285         LUSTRE_FPRIVATE(file) = NULL;
286         ll_file_data_put(fd);
287         ll_capa_close(inode);
288
289         RETURN(rc);
290 }
291
292 /* While this returns an error code, fput() the caller does not, so we need
293  * to make every effort to clean up all of our state here.  Also, applications
294  * rarely check close errors and even if an error is returned they will not
295  * re-try the close call.
296  */
297 int ll_file_release(struct inode *inode, struct file *file)
298 {
299         struct ll_file_data *fd;
300         struct ll_sb_info *sbi = ll_i2sbi(inode);
301         struct ll_inode_info *lli = ll_i2info(inode);
302         int rc;
303         ENTRY;
304
305         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
306                inode->i_generation, inode);
307
308 #ifdef CONFIG_FS_POSIX_ACL
309         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
310             inode == inode->i_sb->s_root->d_inode) {
311                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
312
313                 LASSERT(fd != NULL);
314                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
315                         fd->fd_flags &= ~LL_FILE_RMTACL;
316                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
317                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
318                 }
319         }
320 #endif
321
322         if (inode->i_sb->s_root != file->f_dentry)
323                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
324         fd = LUSTRE_FPRIVATE(file);
325         LASSERT(fd != NULL);
326
327         /* The last ref on @file, maybe not the the owner pid of statahead.
328          * Different processes can open the same dir, "ll_opendir_key" means:
329          * it is me that should stop the statahead thread. */
330         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
331             lli->lli_opendir_pid != 0)
332                 ll_stop_statahead(inode, lli->lli_opendir_key);
333
334         if (inode->i_sb->s_root == file->f_dentry) {
335                 LUSTRE_FPRIVATE(file) = NULL;
336                 ll_file_data_put(fd);
337                 RETURN(0);
338         }
339
340         if (!S_ISDIR(inode->i_mode)) {
341                 lov_read_and_clear_async_rc(lli->lli_clob);
342                 lli->lli_async_rc = 0;
343         }
344
345         rc = ll_md_close(sbi->ll_md_exp, inode, file);
346
347         if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
348                 libcfs_debug_dumplog();
349
350         RETURN(rc);
351 }
352
353 static int ll_intent_file_open(struct file *file, void *lmm,
354                                int lmmsize, struct lookup_intent *itp)
355 {
356         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
357         struct dentry *parent = file->f_dentry->d_parent;
358         const char *name = file->f_dentry->d_name.name;
359         const int len = file->f_dentry->d_name.len;
360         struct md_op_data *op_data;
361         struct ptlrpc_request *req;
362         __u32 opc = LUSTRE_OPC_ANY;
363         int rc;
364         ENTRY;
365
366         if (!parent)
367                 RETURN(-ENOENT);
368
369         /* Usually we come here only for NFSD, and we want open lock.
370            But we can also get here with pre 2.6.15 patchless kernels, and in
371            that case that lock is also ok */
372         /* We can also get here if there was cached open handle in revalidate_it
373          * but it disappeared while we were getting from there to ll_file_open.
374          * But this means this file was closed and immediatelly opened which
375          * makes a good candidate for using OPEN lock */
376         /* If lmmsize & lmm are not 0, we are just setting stripe info
377          * parameters. No need for the open lock */
378         if (lmm == NULL && lmmsize == 0) {
379                 itp->it_flags |= MDS_OPEN_LOCK;
380                 if (itp->it_flags & FMODE_WRITE)
381                         opc = LUSTRE_OPC_CREATE;
382         }
383
384         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
385                                       file->f_dentry->d_inode, name, len,
386                                       O_RDWR, opc, NULL);
387         if (IS_ERR(op_data))
388                 RETURN(PTR_ERR(op_data));
389
390         itp->it_flags |= MDS_OPEN_BY_FID;
391         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
392                             0 /*unused */, &req, ll_md_blocking_ast, 0);
393         ll_finish_md_op_data(op_data);
394         if (rc == -ESTALE) {
395                 /* reason for keep own exit path - don`t flood log
396                 * with messages with -ESTALE errors.
397                 */
398                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
399                      it_open_error(DISP_OPEN_OPEN, itp))
400                         GOTO(out, rc);
401                 ll_release_openhandle(file->f_dentry, itp);
402                 GOTO(out, rc);
403         }
404
405         if (it_disposition(itp, DISP_LOOKUP_NEG))
406                 GOTO(out, rc = -ENOENT);
407
408         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
409                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
410                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
411                 GOTO(out, rc);
412         }
413
414         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL, itp);
415         if (!rc && itp->d.lustre.it_lock_mode)
416                 ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
417                                  itp, NULL);
418
419 out:
420         ptlrpc_req_finished(itp->d.lustre.it_data);
421         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
422         ll_intent_drop_lock(itp);
423
424         RETURN(rc);
425 }
426
427 /**
428  * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
429  * not believe attributes if a few ioepoch holders exist. Attributes for
430  * previous ioepoch if new one is opened are also skipped by MDS.
431  */
432 void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
433 {
434         if (ioepoch && lli->lli_ioepoch != ioepoch) {
435                 lli->lli_ioepoch = ioepoch;
436                 CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
437                        ioepoch, PFID(&lli->lli_fid));
438         }
439 }
440
441 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
442                        struct lookup_intent *it, struct obd_client_handle *och)
443 {
444         struct ptlrpc_request *req = it->d.lustre.it_data;
445         struct mdt_body *body;
446
447         LASSERT(och);
448
449         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
450         LASSERT(body != NULL);                      /* reply already checked out */
451
452         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
453         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
454         och->och_fid = lli->lli_fid;
455         och->och_flags = it->it_flags;
456         ll_ioepoch_open(lli, body->ioepoch);
457
458         return md_set_open_replay_data(md_exp, och, req);
459 }
460
461 int ll_local_open(struct file *file, struct lookup_intent *it,
462                   struct ll_file_data *fd, struct obd_client_handle *och)
463 {
464         struct inode *inode = file->f_dentry->d_inode;
465         struct ll_inode_info *lli = ll_i2info(inode);
466         ENTRY;
467
468         LASSERT(!LUSTRE_FPRIVATE(file));
469
470         LASSERT(fd != NULL);
471
472         if (och) {
473                 struct ptlrpc_request *req = it->d.lustre.it_data;
474                 struct mdt_body *body;
475                 int rc;
476
477                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
478                 if (rc)
479                         RETURN(rc);
480
481                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
482                 if ((it->it_flags & FMODE_WRITE) &&
483                     (body->valid & OBD_MD_FLSIZE))
484                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
485                                lli->lli_ioepoch, PFID(&lli->lli_fid));
486         }
487
488         LUSTRE_FPRIVATE(file) = fd;
489         ll_readahead_init(inode, &fd->fd_ras);
490         fd->fd_omode = it->it_flags;
491         RETURN(0);
492 }
493
494 /* Open a file, and (for the very first open) create objects on the OSTs at
495  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
496  * creation or open until ll_lov_setstripe() ioctl is called.
497  *
498  * If we already have the stripe MD locally then we don't request it in
499  * md_open(), by passing a lmm_size = 0.
500  *
501  * It is up to the application to ensure no other processes open this file
502  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
503  * used.  We might be able to avoid races of that sort by getting lli_open_sem
504  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
505  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
506  */
507 int ll_file_open(struct inode *inode, struct file *file)
508 {
509         struct ll_inode_info *lli = ll_i2info(inode);
510         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
511                                           .it_flags = file->f_flags };
512         struct obd_client_handle **och_p = NULL;
513         __u64 *och_usecount = NULL;
514         struct ll_file_data *fd;
515         int rc = 0, opendir_set = 0;
516         ENTRY;
517
518         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
519                inode->i_generation, inode, file->f_flags);
520
521         it = file->private_data; /* XXX: compat macro */
522         file->private_data = NULL; /* prevent ll_local_open assertion */
523
524         fd = ll_file_data_get();
525         if (fd == NULL)
526                 GOTO(out_och_free, rc = -ENOMEM);
527
528         fd->fd_file = file;
529         if (S_ISDIR(inode->i_mode)) {
530                 spin_lock(&lli->lli_sa_lock);
531                 if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
532                     lli->lli_opendir_pid == 0) {
533                         lli->lli_opendir_key = fd;
534                         lli->lli_opendir_pid = cfs_curproc_pid();
535                         opendir_set = 1;
536                 }
537                 spin_unlock(&lli->lli_sa_lock);
538         }
539
540         if (inode->i_sb->s_root == file->f_dentry) {
541                 LUSTRE_FPRIVATE(file) = fd;
542                 RETURN(0);
543         }
544
545         if (!it || !it->d.lustre.it_disposition) {
546                 /* Convert f_flags into access mode. We cannot use file->f_mode,
547                  * because everything but O_ACCMODE mask was stripped from
548                  * there */
549                 if ((oit.it_flags + 1) & O_ACCMODE)
550                         oit.it_flags++;
551                 if (file->f_flags & O_TRUNC)
552                         oit.it_flags |= FMODE_WRITE;
553
554                 /* kernel only call f_op->open in dentry_open.  filp_open calls
555                  * dentry_open after call to open_namei that checks permissions.
556                  * Only nfsd_open call dentry_open directly without checking
557                  * permissions and because of that this code below is safe. */
558                 if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
559                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
560
561                 /* We do not want O_EXCL here, presumably we opened the file
562                  * already? XXX - NFS implications? */
563                 oit.it_flags &= ~O_EXCL;
564
565                 /* bug20584, if "it_flags" contains O_CREAT, the file will be
566                  * created if necessary, then "IT_CREAT" should be set to keep
567                  * consistent with it */
568                 if (oit.it_flags & O_CREAT)
569                         oit.it_op |= IT_CREAT;
570
571                 it = &oit;
572         }
573
574 restart:
575         /* Let's see if we have file open on MDS already. */
576         if (it->it_flags & FMODE_WRITE) {
577                 och_p = &lli->lli_mds_write_och;
578                 och_usecount = &lli->lli_open_fd_write_count;
579         } else if (it->it_flags & FMODE_EXEC) {
580                 och_p = &lli->lli_mds_exec_och;
581                 och_usecount = &lli->lli_open_fd_exec_count;
582          } else {
583                 och_p = &lli->lli_mds_read_och;
584                 och_usecount = &lli->lli_open_fd_read_count;
585         }
586
587         mutex_lock(&lli->lli_och_mutex);
588         if (*och_p) { /* Open handle is present */
589                 if (it_disposition(it, DISP_OPEN_OPEN)) {
590                         /* Well, there's extra open request that we do not need,
591                            let's close it somehow. This will decref request. */
592                         rc = it_open_error(DISP_OPEN_OPEN, it);
593                         if (rc) {
594                                 mutex_unlock(&lli->lli_och_mutex);
595                                 GOTO(out_openerr, rc);
596                         }
597
598                         ll_release_openhandle(file->f_dentry, it);
599                 }
600                 (*och_usecount)++;
601
602                 rc = ll_local_open(file, it, fd, NULL);
603                 if (rc) {
604                         (*och_usecount)--;
605                         mutex_unlock(&lli->lli_och_mutex);
606                         GOTO(out_openerr, rc);
607                 }
608         } else {
609                 LASSERT(*och_usecount == 0);
610                 if (!it->d.lustre.it_disposition) {
611                         /* We cannot just request lock handle now, new ELC code
612                            means that one of other OPEN locks for this file
613                            could be cancelled, and since blocking ast handler
614                            would attempt to grab och_mutex as well, that would
615                            result in a deadlock */
616                         mutex_unlock(&lli->lli_och_mutex);
617                         it->it_create_mode |= M_CHECK_STALE;
618                         rc = ll_intent_file_open(file, NULL, 0, it);
619                         it->it_create_mode &= ~M_CHECK_STALE;
620                         if (rc)
621                                 GOTO(out_openerr, rc);
622
623                         goto restart;
624                 }
625                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
626                 if (!*och_p)
627                         GOTO(out_och_free, rc = -ENOMEM);
628
629                 (*och_usecount)++;
630
631                 /* md_intent_lock() didn't get a request ref if there was an
632                  * open error, so don't do cleanup on the request here
633                  * (bug 3430) */
634                 /* XXX (green): Should not we bail out on any error here, not
635                  * just open error? */
636                 rc = it_open_error(DISP_OPEN_OPEN, it);
637                 if (rc)
638                         GOTO(out_och_free, rc);
639
640                 LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
641
642                 rc = ll_local_open(file, it, fd, *och_p);
643                 if (rc)
644                         GOTO(out_och_free, rc);
645         }
646         mutex_unlock(&lli->lli_och_mutex);
647         fd = NULL;
648
649         /* Must do this outside lli_och_mutex lock to prevent deadlock where
650            different kind of OPEN lock for this same inode gets cancelled
651            by ldlm_cancel_lru */
652         if (!S_ISREG(inode->i_mode))
653                 GOTO(out_och_free, rc);
654
655         ll_capa_open(inode);
656
657         if (!lli->lli_has_smd) {
658                 if (file->f_flags & O_LOV_DELAY_CREATE ||
659                     !(file->f_mode & FMODE_WRITE)) {
660                         CDEBUG(D_INODE, "object creation was delayed\n");
661                         GOTO(out_och_free, rc);
662                 }
663         }
664         file->f_flags &= ~O_LOV_DELAY_CREATE;
665         GOTO(out_och_free, rc);
666
667 out_och_free:
668         if (rc) {
669                 if (och_p && *och_p) {
670                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
671                         *och_p = NULL; /* OBD_FREE writes some magic there */
672                         (*och_usecount)--;
673                 }
674                 mutex_unlock(&lli->lli_och_mutex);
675
676 out_openerr:
677                 if (opendir_set != 0)
678                         ll_stop_statahead(inode, lli->lli_opendir_key);
679                 if (fd != NULL)
680                         ll_file_data_put(fd);
681         } else {
682                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
683         }
684
685         if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
686                 ptlrpc_req_finished(it->d.lustre.it_data);
687                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
688         }
689
690         return rc;
691 }
692
693 /* Fills the obdo with the attributes for the lsm */
694 static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
695                           struct obd_capa *capa, struct obdo *obdo,
696                           __u64 ioepoch, int sync)
697 {
698         struct ptlrpc_request_set *set;
699         struct obd_info            oinfo = { { { 0 } } };
700         int                        rc;
701
702         ENTRY;
703
704         LASSERT(lsm != NULL);
705
706         oinfo.oi_md = lsm;
707         oinfo.oi_oa = obdo;
708         oinfo.oi_oa->o_id = lsm->lsm_object_id;
709         oinfo.oi_oa->o_seq = lsm->lsm_object_seq;
710         oinfo.oi_oa->o_mode = S_IFREG;
711         oinfo.oi_oa->o_ioepoch = ioepoch;
712         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
713                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
714                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
715                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
716                                OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
717                                OBD_MD_FLDATAVERSION;
718         oinfo.oi_capa = capa;
719         if (sync) {
720                 oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
721                 oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
722         }
723
724         set = ptlrpc_prep_set();
725         if (set == NULL) {
726                 CERROR("can't allocate ptlrpc set\n");
727                 rc = -ENOMEM;
728         } else {
729                 rc = obd_getattr_async(exp, &oinfo, set);
730                 if (rc == 0)
731                         rc = ptlrpc_set_wait(set);
732                 ptlrpc_set_destroy(set);
733         }
734         if (rc == 0)
735                 oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
736                                          OBD_MD_FLATIME | OBD_MD_FLMTIME |
737                                          OBD_MD_FLCTIME | OBD_MD_FLSIZE |
738                                          OBD_MD_FLDATAVERSION);
739         RETURN(rc);
740 }
741
742 /**
743   * Performs the getattr on the inode and updates its fields.
744   * If @sync != 0, perform the getattr under the server-side lock.
745   */
746 int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
747                      __u64 ioepoch, int sync)
748 {
749         struct obd_capa      *capa = ll_mdscapa_get(inode);
750         struct lov_stripe_md *lsm;
751         int rc;
752         ENTRY;
753
754         lsm = ccc_inode_lsm_get(inode);
755         rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
756                             capa, obdo, ioepoch, sync);
757         capa_put(capa);
758         if (rc == 0) {
759                 obdo_refresh_inode(inode, obdo, obdo->o_valid);
760                 CDEBUG(D_INODE,
761                        "objid "LPX64" size %llu, blocks %llu, blksize %lu\n",
762                        lsm ? lsm->lsm_object_id : 0, i_size_read(inode),
763                        (unsigned long long)inode->i_blocks,
764                        (unsigned long)ll_inode_blksize(inode));
765         }
766         ccc_inode_lsm_put(inode, lsm);
767         RETURN(rc);
768 }
769
770 int ll_merge_lvb(const struct lu_env *env, struct inode *inode)
771 {
772         struct ll_inode_info *lli = ll_i2info(inode);
773         struct cl_object *obj = lli->lli_clob;
774         struct cl_attr *attr = ccc_env_thread_attr(env);
775         struct ost_lvb lvb;
776         int rc = 0;
777
778         ENTRY;
779
780         ll_inode_size_lock(inode);
781         /* merge timestamps the most recently obtained from mds with
782            timestamps obtained from osts */
783         LTIME_S(inode->i_atime) = lli->lli_lvb.lvb_atime;
784         LTIME_S(inode->i_mtime) = lli->lli_lvb.lvb_mtime;
785         LTIME_S(inode->i_ctime) = lli->lli_lvb.lvb_ctime;
786         inode_init_lvb(inode, &lvb);
787
788         cl_object_attr_lock(obj);
789         rc = cl_object_attr_get(env, obj, attr);
790         cl_object_attr_unlock(obj);
791
792         if (rc == 0) {
793                 if (lvb.lvb_atime < attr->cat_atime)
794                         lvb.lvb_atime = attr->cat_atime;
795                 if (lvb.lvb_ctime < attr->cat_ctime)
796                         lvb.lvb_ctime = attr->cat_ctime;
797                 if (lvb.lvb_mtime < attr->cat_mtime)
798                         lvb.lvb_mtime = attr->cat_mtime;
799
800                 CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
801                                 PFID(&lli->lli_fid), attr->cat_size);
802                 cl_isize_write_nolock(inode, attr->cat_size);
803
804                 inode->i_blocks = attr->cat_blocks;
805
806                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
807                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
808                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
809         }
810         ll_inode_size_unlock(inode);
811
812         RETURN(rc);
813 }
814
815 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
816                      lstat_t *st)
817 {
818         struct obdo obdo = { 0 };
819         int rc;
820
821         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
822         if (rc == 0) {
823                 st->st_size   = obdo.o_size;
824                 st->st_blocks = obdo.o_blocks;
825                 st->st_mtime  = obdo.o_mtime;
826                 st->st_atime  = obdo.o_atime;
827                 st->st_ctime  = obdo.o_ctime;
828         }
829         return rc;
830 }
831
832 void ll_io_init(struct cl_io *io, const struct file *file, int write)
833 {
834         struct inode *inode = file->f_dentry->d_inode;
835
836         io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
837         if (write) {
838                 io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
839                 io->u.ci_wr.wr_sync = file->f_flags & O_SYNC ||
840                                       file->f_flags & O_DIRECT ||
841                                       IS_SYNC(inode);
842         }
843         io->ci_obj     = ll_i2info(inode)->lli_clob;
844         io->ci_lockreq = CILR_MAYBE;
845         if (ll_file_nolock(file)) {
846                 io->ci_lockreq = CILR_NEVER;
847                 io->ci_no_srvlock = 1;
848         } else if (file->f_flags & O_APPEND) {
849                 io->ci_lockreq = CILR_MANDATORY;
850         }
851 }
852
853 static ssize_t
854 ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
855                    struct file *file, enum cl_io_type iot,
856                    loff_t *ppos, size_t count)
857 {
858         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
859         struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
860         struct cl_io         *io;
861         ssize_t               result;
862         ENTRY;
863
864         io = ccc_env_thread_io(env);
865         ll_io_init(io, file, iot == CIT_WRITE);
866
867         if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
868                 struct vvp_io *vio = vvp_env_io(env);
869                 struct ccc_io *cio = ccc_env_io(env);
870                 int write_mutex_locked = 0;
871
872                 cio->cui_fd  = LUSTRE_FPRIVATE(file);
873                 vio->cui_io_subtype = args->via_io_subtype;
874
875                 switch (vio->cui_io_subtype) {
876                 case IO_NORMAL:
877                         cio->cui_iov = args->u.normal.via_iov;
878                         cio->cui_nrsegs = args->u.normal.via_nrsegs;
879                         cio->cui_tot_nrsegs = cio->cui_nrsegs;
880 #ifndef HAVE_FILE_WRITEV
881                         cio->cui_iocb = args->u.normal.via_iocb;
882 #endif
883                         if ((iot == CIT_WRITE) &&
884                             !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
885                                 if (mutex_lock_interruptible(&lli->
886                                                                lli_write_mutex))
887                                         GOTO(out, result = -ERESTARTSYS);
888                                 write_mutex_locked = 1;
889                         } else if (iot == CIT_READ) {
890                                 down_read(&lli->lli_trunc_sem);
891                         }
892                         break;
893                 case IO_SENDFILE:
894                         vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
895                         vio->u.sendfile.cui_target = args->u.sendfile.via_target;
896                         break;
897                 case IO_SPLICE:
898                         vio->u.splice.cui_pipe = args->u.splice.via_pipe;
899                         vio->u.splice.cui_flags = args->u.splice.via_flags;
900                         break;
901                 default:
902                         CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
903                         LBUG();
904                 }
905                 result = cl_io_loop(env, io);
906                 if (write_mutex_locked)
907                         mutex_unlock(&lli->lli_write_mutex);
908                 else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
909                         up_read(&lli->lli_trunc_sem);
910         } else {
911                 /* cl_io_rw_init() handled IO */
912                 result = io->ci_result;
913         }
914
915         if (io->ci_nob > 0) {
916                 result = io->ci_nob;
917                 *ppos = io->u.ci_wr.wr.crw_pos;
918         }
919         GOTO(out, result);
920 out:
921         cl_io_fini(env, io);
922
923         if (iot == CIT_READ) {
924                 if (result >= 0)
925                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
926                                            LPROC_LL_READ_BYTES, result);
927         } else if (iot == CIT_WRITE) {
928                 if (result >= 0) {
929                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
930                                            LPROC_LL_WRITE_BYTES, result);
931                         fd->fd_write_failed = false;
932                 } else {
933                         fd->fd_write_failed = true;
934                 }
935         }
936
937         return result;
938 }
939
940
941 /*
942  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
943  */
944 static int ll_file_get_iov_count(const struct iovec *iov,
945                                  unsigned long *nr_segs, size_t *count)
946 {
947         size_t cnt = 0;
948         unsigned long seg;
949
950         for (seg = 0; seg < *nr_segs; seg++) {
951                 const struct iovec *iv = &iov[seg];
952
953                 /*
954                  * If any segment has a negative length, or the cumulative
955                  * length ever wraps negative then return -EINVAL.
956                  */
957                 cnt += iv->iov_len;
958                 if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
959                         return -EINVAL;
960                 if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
961                         continue;
962                 if (seg == 0)
963                         return -EFAULT;
964                 *nr_segs = seg;
965                 cnt -= iv->iov_len;   /* This segment is no good */
966                 break;
967         }
968         *count = cnt;
969         return 0;
970 }
971
972 #ifdef HAVE_FILE_READV
973 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
974                               unsigned long nr_segs, loff_t *ppos)
975 {
976         struct lu_env      *env;
977         struct vvp_io_args *args;
978         size_t              count;
979         ssize_t             result;
980         int                 refcheck;
981         ENTRY;
982
983         result = ll_file_get_iov_count(iov, &nr_segs, &count);
984         if (result)
985                 RETURN(result);
986
987         env = cl_env_get(&refcheck);
988         if (IS_ERR(env))
989                 RETURN(PTR_ERR(env));
990
991         args = vvp_env_args(env, IO_NORMAL);
992         args->u.normal.via_iov = (struct iovec *)iov;
993         args->u.normal.via_nrsegs = nr_segs;
994
995         result = ll_file_io_generic(env, args, file, CIT_READ, ppos, count);
996         cl_env_put(env, &refcheck);
997         RETURN(result);
998 }
999
1000 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1001                             loff_t *ppos)
1002 {
1003         struct lu_env *env;
1004         struct iovec  *local_iov;
1005         ssize_t        result;
1006         int            refcheck;
1007         ENTRY;
1008
1009         env = cl_env_get(&refcheck);
1010         if (IS_ERR(env))
1011                 RETURN(PTR_ERR(env));
1012
1013         local_iov = &vvp_env_info(env)->vti_local_iov;
1014         local_iov->iov_base = (void __user *)buf;
1015         local_iov->iov_len = count;
1016         result = ll_file_readv(file, local_iov, 1, ppos);
1017         cl_env_put(env, &refcheck);
1018         RETURN(result);
1019 }
1020
1021 #else
1022 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
1023                                 unsigned long nr_segs, loff_t pos)
1024 {
1025         struct lu_env      *env;
1026         struct vvp_io_args *args;
1027         size_t              count;
1028         ssize_t             result;
1029         int                 refcheck;
1030         ENTRY;
1031
1032         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1033         if (result)
1034                 RETURN(result);
1035
1036         env = cl_env_get(&refcheck);
1037         if (IS_ERR(env))
1038                 RETURN(PTR_ERR(env));
1039
1040         args = vvp_env_args(env, IO_NORMAL);
1041         args->u.normal.via_iov = (struct iovec *)iov;
1042         args->u.normal.via_nrsegs = nr_segs;
1043         args->u.normal.via_iocb = iocb;
1044
1045         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1046                                     &iocb->ki_pos, count);
1047         cl_env_put(env, &refcheck);
1048         RETURN(result);
1049 }
1050
1051 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1052                             loff_t *ppos)
1053 {
1054         struct lu_env *env;
1055         struct iovec  *local_iov;
1056         struct kiocb  *kiocb;
1057         ssize_t        result;
1058         int            refcheck;
1059         ENTRY;
1060
1061         env = cl_env_get(&refcheck);
1062         if (IS_ERR(env))
1063                 RETURN(PTR_ERR(env));
1064
1065         local_iov = &vvp_env_info(env)->vti_local_iov;
1066         kiocb = &vvp_env_info(env)->vti_kiocb;
1067         local_iov->iov_base = (void __user *)buf;
1068         local_iov->iov_len = count;
1069         init_sync_kiocb(kiocb, file);
1070         kiocb->ki_pos = *ppos;
1071         kiocb->ki_left = count;
1072
1073         result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1074         *ppos = kiocb->ki_pos;
1075
1076         cl_env_put(env, &refcheck);
1077         RETURN(result);
1078 }
1079 #endif
1080
1081 /*
1082  * Write to a file (through the page cache).
1083  */
1084 #ifdef HAVE_FILE_WRITEV
1085 static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
1086                               unsigned long nr_segs, loff_t *ppos)
1087 {
1088         struct lu_env      *env;
1089         struct vvp_io_args *args;
1090         size_t              count;
1091         ssize_t             result;
1092         int                 refcheck;
1093         ENTRY;
1094
1095         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1096         if (result)
1097                 RETURN(result);
1098
1099         env = cl_env_get(&refcheck);
1100         if (IS_ERR(env))
1101                 RETURN(PTR_ERR(env));
1102
1103         args = vvp_env_args(env, IO_NORMAL);
1104         args->u.normal.via_iov = (struct iovec *)iov;
1105         args->u.normal.via_nrsegs = nr_segs;
1106
1107         result = ll_file_io_generic(env, args, file, CIT_WRITE, ppos, count);
1108         cl_env_put(env, &refcheck);
1109         RETURN(result);
1110 }
1111
1112 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1113                              loff_t *ppos)
1114 {
1115         struct lu_env    *env;
1116         struct iovec     *local_iov;
1117         ssize_t           result;
1118         int               refcheck;
1119         ENTRY;
1120
1121         env = cl_env_get(&refcheck);
1122         if (IS_ERR(env))
1123                 RETURN(PTR_ERR(env));
1124
1125         local_iov = &vvp_env_info(env)->vti_local_iov;
1126         local_iov->iov_base = (void __user *)buf;
1127         local_iov->iov_len = count;
1128
1129         result = ll_file_writev(file, local_iov, 1, ppos);
1130         cl_env_put(env, &refcheck);
1131         RETURN(result);
1132 }
1133
1134 #else /* AIO stuff */
1135 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1136                                  unsigned long nr_segs, loff_t pos)
1137 {
1138         struct lu_env      *env;
1139         struct vvp_io_args *args;
1140         size_t              count;
1141         ssize_t             result;
1142         int                 refcheck;
1143         ENTRY;
1144
1145         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1146         if (result)
1147                 RETURN(result);
1148
1149         env = cl_env_get(&refcheck);
1150         if (IS_ERR(env))
1151                 RETURN(PTR_ERR(env));
1152
1153         args = vvp_env_args(env, IO_NORMAL);
1154         args->u.normal.via_iov = (struct iovec *)iov;
1155         args->u.normal.via_nrsegs = nr_segs;
1156         args->u.normal.via_iocb = iocb;
1157
1158         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1159                                   &iocb->ki_pos, count);
1160         cl_env_put(env, &refcheck);
1161         RETURN(result);
1162 }
1163
1164 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1165                              loff_t *ppos)
1166 {
1167         struct lu_env *env;
1168         struct iovec  *local_iov;
1169         struct kiocb  *kiocb;
1170         ssize_t        result;
1171         int            refcheck;
1172         ENTRY;
1173
1174         env = cl_env_get(&refcheck);
1175         if (IS_ERR(env))
1176                 RETURN(PTR_ERR(env));
1177
1178         local_iov = &vvp_env_info(env)->vti_local_iov;
1179         kiocb = &vvp_env_info(env)->vti_kiocb;
1180         local_iov->iov_base = (void __user *)buf;
1181         local_iov->iov_len = count;
1182         init_sync_kiocb(kiocb, file);
1183         kiocb->ki_pos = *ppos;
1184         kiocb->ki_left = count;
1185
1186         result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1187         *ppos = kiocb->ki_pos;
1188
1189         cl_env_put(env, &refcheck);
1190         RETURN(result);
1191 }
1192 #endif
1193
1194
1195 #ifdef HAVE_KERNEL_SENDFILE
1196 /*
1197  * Send file content (through pagecache) somewhere with helper
1198  */
1199 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1200                                 read_actor_t actor, void *target)
1201 {
1202         struct lu_env      *env;
1203         struct vvp_io_args *args;
1204         ssize_t             result;
1205         int                 refcheck;
1206         ENTRY;
1207
1208         env = cl_env_get(&refcheck);
1209         if (IS_ERR(env))
1210                 RETURN(PTR_ERR(env));
1211
1212         args = vvp_env_args(env, IO_SENDFILE);
1213         args->u.sendfile.via_target = target;
1214         args->u.sendfile.via_actor = actor;
1215
1216         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1217         cl_env_put(env, &refcheck);
1218         RETURN(result);
1219 }
1220 #endif
1221
1222 #ifdef HAVE_KERNEL_SPLICE_READ
1223 /*
1224  * Send file content (through pagecache) somewhere with helper
1225  */
1226 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1227                                    struct pipe_inode_info *pipe, size_t count,
1228                                    unsigned int flags)
1229 {
1230         struct lu_env      *env;
1231         struct vvp_io_args *args;
1232         ssize_t             result;
1233         int                 refcheck;
1234         ENTRY;
1235
1236         env = cl_env_get(&refcheck);
1237         if (IS_ERR(env))
1238                 RETURN(PTR_ERR(env));
1239
1240         args = vvp_env_args(env, IO_SPLICE);
1241         args->u.splice.via_pipe = pipe;
1242         args->u.splice.via_flags = flags;
1243
1244         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1245         cl_env_put(env, &refcheck);
1246         RETURN(result);
1247 }
1248 #endif
1249
1250 static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq,
1251                            obd_count ost_idx)
1252 {
1253         struct obd_export *exp = ll_i2dtexp(inode);
1254         struct obd_trans_info oti = { 0 };
1255         struct obdo *oa = NULL;
1256         int lsm_size;
1257         int rc = 0;
1258         struct lov_stripe_md *lsm = NULL, *lsm2;
1259         ENTRY;
1260
1261         OBDO_ALLOC(oa);
1262         if (oa == NULL)
1263                 RETURN(-ENOMEM);
1264
1265         lsm = ccc_inode_lsm_get(inode);
1266         if (lsm == NULL)
1267                 GOTO(out, rc = -ENOENT);
1268
1269         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1270                    (lsm->lsm_stripe_count));
1271
1272         OBD_ALLOC_LARGE(lsm2, lsm_size);
1273         if (lsm2 == NULL)
1274                 GOTO(out, rc = -ENOMEM);
1275
1276         oa->o_id = id;
1277         oa->o_seq = seq;
1278         oa->o_nlink = ost_idx;
1279         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1280         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1281         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1282                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1283         obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1284         memcpy(lsm2, lsm, lsm_size);
1285         ll_inode_size_lock(inode);
1286         rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1287         ll_inode_size_unlock(inode);
1288
1289         OBD_FREE_LARGE(lsm2, lsm_size);
1290         GOTO(out, rc);
1291 out:
1292         ccc_inode_lsm_put(inode, lsm);
1293         OBDO_FREE(oa);
1294         return rc;
1295 }
1296
1297 static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1298 {
1299         struct ll_recreate_obj ucreat;
1300         ENTRY;
1301
1302         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1303                 RETURN(-EPERM);
1304
1305         if (copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1306                            sizeof(ucreat)))
1307                 RETURN(-EFAULT);
1308
1309         RETURN(ll_lov_recreate(inode, ucreat.lrc_id, 0,
1310                                ucreat.lrc_ost_idx));
1311 }
1312
1313 static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1314 {
1315         struct lu_fid   fid;
1316         obd_id          id;
1317         obd_count       ost_idx;
1318         ENTRY;
1319
1320         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1321                 RETURN(-EPERM);
1322
1323         if (copy_from_user(&fid, (struct lu_fid *)arg, sizeof(fid)))
1324                 RETURN(-EFAULT);
1325
1326         id = fid_oid(&fid) | ((fid_seq(&fid) & 0xffff) << 32);
1327         ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1328         RETURN(ll_lov_recreate(inode, id, 0, ost_idx));
1329 }
1330
1331 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1332                              int flags, struct lov_user_md *lum, int lum_size)
1333 {
1334         struct lov_stripe_md *lsm = NULL;
1335         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1336         int rc = 0;
1337         ENTRY;
1338
1339         lsm = ccc_inode_lsm_get(inode);
1340         if (lsm != NULL) {
1341                 ccc_inode_lsm_put(inode, lsm);
1342                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1343                        inode->i_ino);
1344                 RETURN(-EEXIST);
1345         }
1346
1347         ll_inode_size_lock(inode);
1348         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1349         if (rc)
1350                 GOTO(out, rc);
1351         rc = oit.d.lustre.it_status;
1352         if (rc < 0)
1353                 GOTO(out_req_free, rc);
1354
1355         ll_release_openhandle(file->f_dentry, &oit);
1356
1357  out:
1358         ll_inode_size_unlock(inode);
1359         ll_intent_release(&oit);
1360         ccc_inode_lsm_put(inode, lsm);
1361         RETURN(rc);
1362 out_req_free:
1363         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1364         goto out;
1365 }
1366
1367 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1368                              struct lov_mds_md **lmmp, int *lmm_size,
1369                              struct ptlrpc_request **request)
1370 {
1371         struct ll_sb_info *sbi = ll_i2sbi(inode);
1372         struct mdt_body  *body;
1373         struct lov_mds_md *lmm = NULL;
1374         struct ptlrpc_request *req = NULL;
1375         struct md_op_data *op_data;
1376         int rc, lmmsize;
1377
1378         rc = ll_get_max_mdsize(sbi, &lmmsize);
1379         if (rc)
1380                 RETURN(rc);
1381
1382         op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1383                                      strlen(filename), lmmsize,
1384                                      LUSTRE_OPC_ANY, NULL);
1385         if (IS_ERR(op_data))
1386                 RETURN(PTR_ERR(op_data));
1387
1388         op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1389         rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1390         ll_finish_md_op_data(op_data);
1391         if (rc < 0) {
1392                 CDEBUG(D_INFO, "md_getattr_name failed "
1393                        "on %s: rc %d\n", filename, rc);
1394                 GOTO(out, rc);
1395         }
1396
1397         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1398         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1399
1400         lmmsize = body->eadatasize;
1401
1402         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1403                         lmmsize == 0) {
1404                 GOTO(out, rc = -ENODATA);
1405         }
1406
1407         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1408         LASSERT(lmm != NULL);
1409
1410         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1411             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1412                 GOTO(out, rc = -EPROTO);
1413         }
1414
1415         /*
1416          * This is coming from the MDS, so is probably in
1417          * little endian.  We convert it to host endian before
1418          * passing it to userspace.
1419          */
1420         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1421                 /* if function called for directory - we should
1422                  * avoid swab not existent lsm objects */
1423                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1424                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1425                         if (S_ISREG(body->mode))
1426                                 lustre_swab_lov_user_md_objects(
1427                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1428                                  ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
1429                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1430                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1431                         if (S_ISREG(body->mode))
1432                                 lustre_swab_lov_user_md_objects(
1433                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1434                                  ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
1435                 }
1436         }
1437
1438 out:
1439         *lmmp = lmm;
1440         *lmm_size = lmmsize;
1441         *request = req;
1442         return rc;
1443 }
1444
1445 static int ll_lov_setea(struct inode *inode, struct file *file,
1446                             unsigned long arg)
1447 {
1448         int                      flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1449         struct lov_user_md      *lump;
1450         int                      lum_size = sizeof(struct lov_user_md) +
1451                                             sizeof(struct lov_user_ost_data);
1452         int                      rc;
1453         ENTRY;
1454
1455         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1456                 RETURN(-EPERM);
1457
1458         OBD_ALLOC_LARGE(lump, lum_size);
1459         if (lump == NULL)
1460                 RETURN(-ENOMEM);
1461
1462         if (copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1463                 OBD_FREE_LARGE(lump, lum_size);
1464                 RETURN(-EFAULT);
1465         }
1466
1467         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1468
1469         OBD_FREE_LARGE(lump, lum_size);
1470         RETURN(rc);
1471 }
1472
1473 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1474                             unsigned long arg)
1475 {
1476         struct lov_user_md_v3    lumv3;
1477         struct lov_user_md_v1   *lumv1 = (struct lov_user_md_v1 *)&lumv3;
1478         struct lov_user_md_v1   *lumv1p = (struct lov_user_md_v1 *)arg;
1479         struct lov_user_md_v3   *lumv3p = (struct lov_user_md_v3 *)arg;
1480         int                      lum_size, rc;
1481         int                      flags = FMODE_WRITE;
1482         ENTRY;
1483
1484         /* first try with v1 which is smaller than v3 */
1485         lum_size = sizeof(struct lov_user_md_v1);
1486         if (copy_from_user(lumv1, lumv1p, lum_size))
1487                 RETURN(-EFAULT);
1488
1489         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1490                 lum_size = sizeof(struct lov_user_md_v3);
1491                 if (copy_from_user(&lumv3, lumv3p, lum_size))
1492                         RETURN(-EFAULT);
1493         }
1494
1495         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1496         if (rc == 0) {
1497                 struct lov_stripe_md *lsm;
1498                 __u32 gen;
1499
1500                 put_user(0, &lumv1p->lmm_stripe_count);
1501
1502                 ll_layout_refresh(inode, &gen);
1503                 lsm = ccc_inode_lsm_get(inode);
1504                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1505                                    0, lsm, (void *)arg);
1506                 ccc_inode_lsm_put(inode, lsm);
1507         }
1508         RETURN(rc);
1509 }
1510
1511 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1512 {
1513         struct lov_stripe_md *lsm;
1514         int rc = -ENODATA;
1515         ENTRY;
1516
1517         lsm = ccc_inode_lsm_get(inode);
1518         if (lsm != NULL)
1519                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1520                                    lsm, (void *)arg);
1521         ccc_inode_lsm_put(inode, lsm);
1522         RETURN(rc);
1523 }
1524
1525 int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1526 {
1527         struct ll_inode_info   *lli = ll_i2info(inode);
1528         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1529         struct ccc_grouplock    grouplock;
1530         int                     rc;
1531         ENTRY;
1532
1533         if (ll_file_nolock(file))
1534                 RETURN(-EOPNOTSUPP);
1535
1536         spin_lock(&lli->lli_lock);
1537         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1538                 CWARN("group lock already existed with gid %lu\n",
1539                       fd->fd_grouplock.cg_gid);
1540                 spin_unlock(&lli->lli_lock);
1541                 RETURN(-EINVAL);
1542         }
1543         LASSERT(fd->fd_grouplock.cg_lock == NULL);
1544         spin_unlock(&lli->lli_lock);
1545
1546         rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1547                               arg, (file->f_flags & O_NONBLOCK), &grouplock);
1548         if (rc)
1549                 RETURN(rc);
1550
1551         spin_lock(&lli->lli_lock);
1552         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1553                 spin_unlock(&lli->lli_lock);
1554                 CERROR("another thread just won the race\n");
1555                 cl_put_grouplock(&grouplock);
1556                 RETURN(-EINVAL);
1557         }
1558
1559         fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1560         fd->fd_grouplock = grouplock;
1561         spin_unlock(&lli->lli_lock);
1562
1563         CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1564         RETURN(0);
1565 }
1566
1567 int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1568 {
1569         struct ll_inode_info   *lli = ll_i2info(inode);
1570         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1571         struct ccc_grouplock    grouplock;
1572         ENTRY;
1573
1574         spin_lock(&lli->lli_lock);
1575         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1576                 spin_unlock(&lli->lli_lock);
1577                 CWARN("no group lock held\n");
1578                 RETURN(-EINVAL);
1579         }
1580         LASSERT(fd->fd_grouplock.cg_lock != NULL);
1581
1582         if (fd->fd_grouplock.cg_gid != arg) {
1583                 CWARN("group lock %lu doesn't match current id %lu\n",
1584                        arg, fd->fd_grouplock.cg_gid);
1585                 spin_unlock(&lli->lli_lock);
1586                 RETURN(-EINVAL);
1587         }
1588
1589         grouplock = fd->fd_grouplock;
1590         memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1591         fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1592         spin_unlock(&lli->lli_lock);
1593
1594         cl_put_grouplock(&grouplock);
1595         CDEBUG(D_INFO, "group lock %lu released\n", arg);
1596         RETURN(0);
1597 }
1598
1599 /**
1600  * Close inode open handle
1601  *
1602  * \param dentry [in]     dentry which contains the inode
1603  * \param it     [in,out] intent which contains open info and result
1604  *
1605  * \retval 0     success
1606  * \retval <0    failure
1607  */
1608 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1609 {
1610         struct inode *inode = dentry->d_inode;
1611         struct obd_client_handle *och;
1612         int rc;
1613         ENTRY;
1614
1615         LASSERT(inode);
1616
1617         /* Root ? Do nothing. */
1618         if (dentry->d_inode->i_sb->s_root == dentry)
1619                 RETURN(0);
1620
1621         /* No open handle to close? Move away */
1622         if (!it_disposition(it, DISP_OPEN_OPEN))
1623                 RETURN(0);
1624
1625         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1626
1627         OBD_ALLOC(och, sizeof(*och));
1628         if (!och)
1629                 GOTO(out, rc = -ENOMEM);
1630
1631         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
1632                     ll_i2info(inode), it, och);
1633
1634         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1635                                        inode, och);
1636  out:
1637         /* this one is in place of ll_file_open */
1638         if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1639                 ptlrpc_req_finished(it->d.lustre.it_data);
1640                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1641         }
1642         RETURN(rc);
1643 }
1644
1645 /**
1646  * Get size for inode for which FIEMAP mapping is requested.
1647  * Make the FIEMAP get_info call and returns the result.
1648  */
1649 int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1650               int num_bytes)
1651 {
1652         struct obd_export *exp = ll_i2dtexp(inode);
1653         struct lov_stripe_md *lsm = NULL;
1654         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1655         int vallen = num_bytes;
1656         int rc;
1657         ENTRY;
1658
1659         /* Checks for fiemap flags */
1660         if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1661                 fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1662                 return -EBADR;
1663         }
1664
1665         /* Check for FIEMAP_FLAG_SYNC */
1666         if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1667                 rc = filemap_fdatawrite(inode->i_mapping);
1668                 if (rc)
1669                         return rc;
1670         }
1671
1672         lsm = ccc_inode_lsm_get(inode);
1673         if (lsm == NULL)
1674                 return -ENOENT;
1675
1676         /* If the stripe_count > 1 and the application does not understand
1677          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1678          */
1679         if (lsm->lsm_stripe_count > 1 &&
1680             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1681                 GOTO(out, rc = -EOPNOTSUPP);
1682
1683         fm_key.oa.o_id = lsm->lsm_object_id;
1684         fm_key.oa.o_seq = lsm->lsm_object_seq;
1685         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1686
1687         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1688         obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1689         /* If filesize is 0, then there would be no objects for mapping */
1690         if (fm_key.oa.o_size == 0) {
1691                 fiemap->fm_mapped_extents = 0;
1692                 GOTO(out, rc = 0);
1693         }
1694
1695         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1696
1697         rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1698                           fiemap, lsm);
1699         if (rc)
1700                 CERROR("obd_get_info failed: rc = %d\n", rc);
1701
1702 out:
1703         ccc_inode_lsm_put(inode, lsm);
1704         RETURN(rc);
1705 }
1706
1707 int ll_fid2path(struct inode *inode, void *arg)
1708 {
1709         struct obd_export       *exp = ll_i2mdexp(inode);
1710         struct getinfo_fid2path *gfout, *gfin;
1711         int                      outsize, rc;
1712         ENTRY;
1713
1714         if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
1715             !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
1716                 RETURN(-EPERM);
1717
1718         /* Need to get the buflen */
1719         OBD_ALLOC_PTR(gfin);
1720         if (gfin == NULL)
1721                 RETURN(-ENOMEM);
1722         if (copy_from_user(gfin, arg, sizeof(*gfin))) {
1723                 OBD_FREE_PTR(gfin);
1724                 RETURN(-EFAULT);
1725         }
1726
1727         outsize = sizeof(*gfout) + gfin->gf_pathlen;
1728         OBD_ALLOC(gfout, outsize);
1729         if (gfout == NULL) {
1730                 OBD_FREE_PTR(gfin);
1731                 RETURN(-ENOMEM);
1732         }
1733         memcpy(gfout, gfin, sizeof(*gfout));
1734         OBD_FREE_PTR(gfin);
1735
1736         /* Call mdc_iocontrol */
1737         rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1738         if (rc)
1739                 GOTO(gf_free, rc);
1740         if (copy_to_user(arg, gfout, outsize))
1741                 rc = -EFAULT;
1742
1743 gf_free:
1744         OBD_FREE(gfout, outsize);
1745         RETURN(rc);
1746 }
1747
1748 static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1749 {
1750         struct ll_user_fiemap *fiemap_s;
1751         size_t num_bytes, ret_bytes;
1752         unsigned int extent_count;
1753         int rc = 0;
1754
1755         /* Get the extent count so we can calculate the size of
1756          * required fiemap buffer */
1757         if (get_user(extent_count,
1758             &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1759                 RETURN(-EFAULT);
1760         num_bytes = sizeof(*fiemap_s) + (extent_count *
1761                                          sizeof(struct ll_fiemap_extent));
1762
1763         OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1764         if (fiemap_s == NULL)
1765                 RETURN(-ENOMEM);
1766
1767         /* get the fiemap value */
1768         if (copy_from_user(fiemap_s, (struct ll_user_fiemap __user *)arg,
1769                            sizeof(*fiemap_s)))
1770                 GOTO(error, rc = -EFAULT);
1771
1772         /* If fm_extent_count is non-zero, read the first extent since
1773          * it is used to calculate end_offset and device from previous
1774          * fiemap call. */
1775         if (extent_count) {
1776                 if (copy_from_user(&fiemap_s->fm_extents[0],
1777                     (char __user *)arg + sizeof(*fiemap_s),
1778                     sizeof(struct ll_fiemap_extent)))
1779                         GOTO(error, rc = -EFAULT);
1780         }
1781
1782         rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1783         if (rc)
1784                 GOTO(error, rc);
1785
1786         ret_bytes = sizeof(struct ll_user_fiemap);
1787
1788         if (extent_count != 0)
1789                 ret_bytes += (fiemap_s->fm_mapped_extents *
1790                                  sizeof(struct ll_fiemap_extent));
1791
1792         if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1793                 rc = -EFAULT;
1794
1795 error:
1796         OBD_FREE_LARGE(fiemap_s, num_bytes);
1797         RETURN(rc);
1798 }
1799
1800 /*
1801  * Read the data_version for inode.
1802  *
1803  * This value is computed using stripe object version on OST.
1804  * Version is computed using server side locking.
1805  *
1806  * @param extent_lock  Take extent lock. Not needed if a process is already
1807  *                     holding the OST object group locks.
1808  */
1809 int ll_data_version(struct inode *inode, __u64 *data_version,
1810                     int extent_lock)
1811 {
1812         struct lov_stripe_md    *lsm = NULL;
1813         struct ll_sb_info       *sbi = ll_i2sbi(inode);
1814         struct obdo             *obdo = NULL;
1815         int                      rc;
1816         ENTRY;
1817
1818         /* If no stripe, we consider version is 0. */
1819         lsm = ccc_inode_lsm_get(inode);
1820         if (lsm == NULL) {
1821                 *data_version = 0;
1822                 CDEBUG(D_INODE, "No object for inode\n");
1823                 RETURN(0);
1824         }
1825
1826         OBD_ALLOC_PTR(obdo);
1827         if (obdo == NULL) {
1828                 ccc_inode_lsm_put(inode, lsm);
1829                 RETURN(-ENOMEM);
1830         }
1831
1832         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
1833         if (!rc) {
1834                 if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1835                         rc = -EOPNOTSUPP;
1836                 else
1837                         *data_version = obdo->o_data_version;
1838         }
1839
1840         OBD_FREE_PTR(obdo);
1841         ccc_inode_lsm_put(inode, lsm);
1842
1843         RETURN(rc);
1844 }
1845
1846 long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
1847 {
1848         struct inode *inode = file->f_dentry->d_inode;
1849         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1850         int flags;
1851
1852         ENTRY;
1853
1854         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1855                inode->i_generation, inode, cmd);
1856         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
1857
1858         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
1859         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
1860                 RETURN(-ENOTTY);
1861
1862         switch(cmd) {
1863         case LL_IOC_GETFLAGS:
1864                 /* Get the current value of the file flags */
1865                 return put_user(fd->fd_flags, (int *)arg);
1866         case LL_IOC_SETFLAGS:
1867         case LL_IOC_CLRFLAGS:
1868                 /* Set or clear specific file flags */
1869                 /* XXX This probably needs checks to ensure the flags are
1870                  *     not abused, and to handle any flag side effects.
1871                  */
1872                 if (get_user(flags, (int *) arg))
1873                         RETURN(-EFAULT);
1874
1875                 if (cmd == LL_IOC_SETFLAGS) {
1876                         if ((flags & LL_FILE_IGNORE_LOCK) &&
1877                             !(file->f_flags & O_DIRECT)) {
1878                                 CERROR("%s: unable to disable locking on "
1879                                        "non-O_DIRECT file\n", current->comm);
1880                                 RETURN(-EINVAL);
1881                         }
1882
1883                         fd->fd_flags |= flags;
1884                 } else {
1885                         fd->fd_flags &= ~flags;
1886                 }
1887                 RETURN(0);
1888         case LL_IOC_LOV_SETSTRIPE:
1889                 RETURN(ll_lov_setstripe(inode, file, arg));
1890         case LL_IOC_LOV_SETEA:
1891                 RETURN(ll_lov_setea(inode, file, arg));
1892         case LL_IOC_LOV_GETSTRIPE:
1893                 RETURN(ll_lov_getstripe(inode, arg));
1894         case LL_IOC_RECREATE_OBJ:
1895                 RETURN(ll_lov_recreate_obj(inode, arg));
1896         case LL_IOC_RECREATE_FID:
1897                 RETURN(ll_lov_recreate_fid(inode, arg));
1898         case FSFILT_IOC_FIEMAP:
1899                 RETURN(ll_ioctl_fiemap(inode, arg));
1900         case FSFILT_IOC_GETFLAGS:
1901         case FSFILT_IOC_SETFLAGS:
1902                 RETURN(ll_iocontrol(inode, file, cmd, arg));
1903         case FSFILT_IOC_GETVERSION_OLD:
1904         case FSFILT_IOC_GETVERSION:
1905                 RETURN(put_user(inode->i_generation, (int *)arg));
1906         case LL_IOC_GROUP_LOCK:
1907                 RETURN(ll_get_grouplock(inode, file, arg));
1908         case LL_IOC_GROUP_UNLOCK:
1909                 RETURN(ll_put_grouplock(inode, file, arg));
1910         case IOC_OBD_STATFS:
1911                 RETURN(ll_obd_statfs(inode, (void *)arg));
1912
1913         /* We need to special case any other ioctls we want to handle,
1914          * to send them to the MDS/OST as appropriate and to properly
1915          * network encode the arg field.
1916         case FSFILT_IOC_SETVERSION_OLD:
1917         case FSFILT_IOC_SETVERSION:
1918         */
1919         case LL_IOC_FLUSHCTX:
1920                 RETURN(ll_flush_ctx(inode));
1921         case LL_IOC_PATH2FID: {
1922                 if (copy_to_user((void *)arg, ll_inode2fid(inode),
1923                                  sizeof(struct lu_fid)))
1924                         RETURN(-EFAULT);
1925
1926                 RETURN(0);
1927         }
1928         case OBD_IOC_FID2PATH:
1929                 RETURN(ll_fid2path(inode, (void *)arg));
1930         case LL_IOC_DATA_VERSION: {
1931                 struct ioc_data_version idv;
1932                 int                     rc;
1933
1934                 if (copy_from_user(&idv, (char *)arg, sizeof(idv)))
1935                         RETURN(-EFAULT);
1936
1937                 rc = ll_data_version(inode, &idv.idv_version,
1938                                 !(idv.idv_flags & LL_DV_NOFLUSH));
1939
1940                 if (rc == 0 && copy_to_user((char *) arg, &idv, sizeof(idv)))
1941                         RETURN(-EFAULT);
1942
1943                 RETURN(rc);
1944         }
1945
1946         case LL_IOC_GET_MDTIDX: {
1947                 int mdtidx;
1948
1949                 mdtidx = ll_get_mdt_idx(inode);
1950                 if (mdtidx < 0)
1951                         RETURN(mdtidx);
1952
1953                 if (put_user((int)mdtidx, (int*)arg))
1954                         RETURN(-EFAULT);
1955
1956                 RETURN(0);
1957         }
1958         case OBD_IOC_GETDTNAME:
1959         case OBD_IOC_GETMDNAME:
1960                 RETURN(ll_get_obd_name(inode, cmd, arg));
1961         case LL_IOC_HSM_STATE_GET: {
1962                 struct md_op_data       *op_data;
1963                 struct hsm_user_state   *hus;
1964                 int                      rc;
1965
1966                 OBD_ALLOC_PTR(hus);
1967                 if (hus == NULL)
1968                         RETURN(-ENOMEM);
1969
1970                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
1971                                              LUSTRE_OPC_ANY, hus);
1972                 if (op_data == NULL) {
1973                         OBD_FREE_PTR(hus);
1974                         RETURN(-ENOMEM);
1975                 }
1976
1977                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
1978                                    op_data, NULL);
1979
1980                 if (copy_to_user((void *)arg, hus, sizeof(*hus)))
1981                         rc = -EFAULT;
1982
1983                 ll_finish_md_op_data(op_data);
1984                 OBD_FREE_PTR(hus);
1985                 RETURN(rc);
1986         }
1987         case LL_IOC_HSM_STATE_SET: {
1988                 struct md_op_data       *op_data;
1989                 struct hsm_state_set    *hss;
1990                 int                      rc;
1991
1992                 OBD_ALLOC_PTR(hss);
1993                 if (hss == NULL)
1994                         RETURN(-ENOMEM);
1995                 if (copy_from_user(hss, (char *)arg, sizeof(*hss))) {
1996                         OBD_FREE_PTR(hss);
1997                         RETURN(-EFAULT);
1998                 }
1999
2000                 /* Non-root users are forbidden to set or clear flags which are
2001                  * NOT defined in HSM_USER_MASK. */
2002                 if (((hss->hss_setmask | hss->hss_clearmask) & ~HSM_USER_MASK)
2003                     && !cfs_capable(CFS_CAP_SYS_ADMIN)) {
2004                         OBD_FREE_PTR(hss);
2005                         RETURN(-EPERM);
2006                 }
2007
2008                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2009                                              LUSTRE_OPC_ANY, hss);
2010                 if (op_data == NULL) {
2011                         OBD_FREE_PTR(hss);
2012                         RETURN(-ENOMEM);
2013                 }
2014
2015                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2016                                    op_data, NULL);
2017
2018                 ll_finish_md_op_data(op_data);
2019
2020                 OBD_FREE_PTR(hss);
2021                 RETURN(rc);
2022         }
2023         case LL_IOC_HSM_ACTION: {
2024                 struct md_op_data               *op_data;
2025                 struct hsm_current_action       *hca;
2026                 int                              rc;
2027
2028                 OBD_ALLOC_PTR(hca);
2029                 if (hca == NULL)
2030                         RETURN(-ENOMEM);
2031
2032                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2033                                              LUSTRE_OPC_ANY, hca);
2034                 if (op_data == NULL) {
2035                         OBD_FREE_PTR(hca);
2036                         RETURN(-ENOMEM);
2037                 }
2038
2039                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2040                                    op_data, NULL);
2041
2042                 if (cfs_copy_to_user((char *)arg, hca, sizeof(*hca)))
2043                         rc = -EFAULT;
2044
2045                 ll_finish_md_op_data(op_data);
2046                 OBD_FREE_PTR(hca);
2047                 RETURN(rc);
2048         }
2049         default: {
2050                 int err;
2051
2052                 if (LLIOC_STOP ==
2053                      ll_iocontrol_call(inode, file, cmd, arg, &err))
2054                         RETURN(err);
2055
2056                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2057                                      (void *)arg));
2058         }
2059         }
2060 }
2061
2062 #ifndef HAVE_FILE_LLSEEK_SIZE
2063 static inline loff_t
2064 llseek_execute(struct file *file, loff_t offset, loff_t maxsize)
2065 {
2066         if (offset < 0 && !(file->f_mode & FMODE_UNSIGNED_OFFSET))
2067                 return -EINVAL;
2068         if (offset > maxsize)
2069                 return -EINVAL;
2070
2071         if (offset != file->f_pos) {
2072                 file->f_pos = offset;
2073                 file->f_version = 0;
2074         }
2075         return offset;
2076 }
2077
2078 static loff_t
2079 generic_file_llseek_size(struct file *file, loff_t offset, int origin,
2080                 loff_t maxsize, loff_t eof)
2081 {
2082         struct inode *inode = file->f_dentry->d_inode;
2083
2084         switch (origin) {
2085         case SEEK_END:
2086                 offset += eof;
2087                 break;
2088         case SEEK_CUR:
2089                 /*
2090                  * Here we special-case the lseek(fd, 0, SEEK_CUR)
2091                  * position-querying operation.  Avoid rewriting the "same"
2092                  * f_pos value back to the file because a concurrent read(),
2093                  * write() or lseek() might have altered it
2094                  */
2095                 if (offset == 0)
2096                         return file->f_pos;
2097                 /*
2098                  * f_lock protects against read/modify/write race with other
2099                  * SEEK_CURs. Note that parallel writes and reads behave
2100                  * like SEEK_SET.
2101                  */
2102                 mutex_lock(&inode->i_mutex);
2103                 offset = llseek_execute(file, file->f_pos + offset, maxsize);
2104                 mutex_unlock(&inode->i_mutex);
2105                 return offset;
2106         case SEEK_DATA:
2107                 /*
2108                  * In the generic case the entire file is data, so as long as
2109                  * offset isn't at the end of the file then the offset is data.
2110                  */
2111                 if (offset >= eof)
2112                         return -ENXIO;
2113                 break;
2114         case SEEK_HOLE:
2115                 /*
2116                  * There is a virtual hole at the end of the file, so as long as
2117                  * offset isn't i_size or larger, return i_size.
2118                  */
2119                 if (offset >= eof)
2120                         return -ENXIO;
2121                 offset = eof;
2122                 break;
2123         }
2124
2125         return llseek_execute(file, offset, maxsize);
2126 }
2127 #endif
2128
2129 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2130 {
2131         struct inode *inode = file->f_dentry->d_inode;
2132         loff_t retval, eof = 0;
2133
2134         ENTRY;
2135         retval = offset + ((origin == SEEK_END) ? i_size_read(inode) :
2136                            (origin == SEEK_CUR) ? file->f_pos : 0);
2137         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%d)\n",
2138                inode->i_ino, inode->i_generation, inode, retval, retval,
2139                origin);
2140         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2141
2142         if (origin == SEEK_END || origin == SEEK_HOLE || origin == SEEK_DATA) {
2143                 retval = ll_glimpse_size(inode);
2144                 if (retval != 0)
2145                         RETURN(retval);
2146                 eof = i_size_read(inode);
2147         }
2148
2149         retval = generic_file_llseek_size(file, offset, origin,
2150                                           ll_file_maxbytes(inode), eof);
2151         RETURN(retval);
2152 }
2153
2154 int ll_flush(struct file *file, fl_owner_t id)
2155 {
2156         struct inode *inode = file->f_dentry->d_inode;
2157         struct ll_inode_info *lli = ll_i2info(inode);
2158         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2159         int rc, err;
2160
2161         LASSERT(!S_ISDIR(inode->i_mode));
2162
2163         /* catch async errors that were recorded back when async writeback
2164          * failed for pages in this mapping. */
2165         rc = lli->lli_async_rc;
2166         lli->lli_async_rc = 0;
2167         err = lov_read_and_clear_async_rc(lli->lli_clob);
2168         if (rc == 0)
2169                 rc = err;
2170
2171         /* The application has been told write failure already.
2172          * Do not report failure again. */
2173         if (fd->fd_write_failed)
2174                 return 0;
2175         return rc ? -EIO : 0;
2176 }
2177
2178 /**
2179  * Called to make sure a portion of file has been written out.
2180  * if @local_only is not true, it will send OST_SYNC RPCs to ost.
2181  *
2182  * Return how many pages have been written.
2183  */
2184 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2185                        enum cl_fsync_mode mode)
2186 {
2187         struct cl_env_nest nest;
2188         struct lu_env *env;
2189         struct cl_io *io;
2190         struct obd_capa *capa = NULL;
2191         struct cl_fsync_io *fio;
2192         int result;
2193         ENTRY;
2194
2195         if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2196             mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2197                 RETURN(-EINVAL);
2198
2199         env = cl_env_nested_get(&nest);
2200         if (IS_ERR(env))
2201                 RETURN(PTR_ERR(env));
2202
2203         capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2204
2205         io = ccc_env_thread_io(env);
2206         io->ci_obj = cl_i2info(inode)->lli_clob;
2207         io->ci_ignore_layout = 1;
2208
2209         /* initialize parameters for sync */
2210         fio = &io->u.ci_fsync;
2211         fio->fi_capa = capa;
2212         fio->fi_start = start;
2213         fio->fi_end = end;
2214         fio->fi_fid = ll_inode2fid(inode);
2215         fio->fi_mode = mode;
2216         fio->fi_nr_written = 0;
2217
2218         if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2219                 result = cl_io_loop(env, io);
2220         else
2221                 result = io->ci_result;
2222         if (result == 0)
2223                 result = fio->fi_nr_written;
2224         cl_io_fini(env, io);
2225         cl_env_nested_put(&nest, env);
2226
2227         capa_put(capa);
2228
2229         RETURN(result);
2230 }
2231
2232 #ifdef HAVE_FILE_FSYNC_4ARGS
2233 int ll_fsync(struct file *file, loff_t start, loff_t end, int data)
2234 #elif defined(HAVE_FILE_FSYNC_2ARGS)
2235 int ll_fsync(struct file *file, int data)
2236 #else
2237 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2238 #endif
2239 {
2240         struct inode *inode = file->f_dentry->d_inode;
2241         struct ll_inode_info *lli = ll_i2info(inode);
2242         struct ptlrpc_request *req;
2243         struct obd_capa *oc;
2244         int rc, err;
2245         ENTRY;
2246
2247         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2248                inode->i_generation, inode);
2249         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2250
2251 #ifdef HAVE_FILE_FSYNC_4ARGS
2252         rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
2253         mutex_lock(&inode->i_mutex);
2254 #else
2255         /* fsync's caller has already called _fdata{sync,write}, we want
2256          * that IO to finish before calling the osc and mdc sync methods */
2257         rc = filemap_fdatawait(inode->i_mapping);
2258 #endif
2259
2260         /* catch async errors that were recorded back when async writeback
2261          * failed for pages in this mapping. */
2262         if (!S_ISDIR(inode->i_mode)) {
2263                 err = lli->lli_async_rc;
2264                 lli->lli_async_rc = 0;
2265                 if (rc == 0)
2266                         rc = err;
2267                 err = lov_read_and_clear_async_rc(lli->lli_clob);
2268                 if (rc == 0)
2269                         rc = err;
2270         }
2271
2272         oc = ll_mdscapa_get(inode);
2273         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2274                       &req);
2275         capa_put(oc);
2276         if (!rc)
2277                 rc = err;
2278         if (!err)
2279                 ptlrpc_req_finished(req);
2280
2281         if (data) {
2282                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2283
2284                 err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
2285                                 CL_FSYNC_ALL);
2286                 if (rc == 0 && err < 0)
2287                         rc = err;
2288                 if (rc < 0)
2289                         fd->fd_write_failed = true;
2290                 else
2291                         fd->fd_write_failed = false;
2292         }
2293
2294 #ifdef HAVE_FILE_FSYNC_4ARGS
2295         mutex_unlock(&inode->i_mutex);
2296 #endif
2297         RETURN(rc);
2298 }
2299
2300 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2301 {
2302         struct inode *inode = file->f_dentry->d_inode;
2303         struct ll_sb_info *sbi = ll_i2sbi(inode);
2304         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
2305                                            .ei_cb_cp =ldlm_flock_completion_ast,
2306                                            .ei_cbdata = file_lock };
2307         struct md_op_data *op_data;
2308         struct lustre_handle lockh = {0};
2309         ldlm_policy_data_t flock = {{0}};
2310         int flags = 0;
2311         int rc;
2312         int rc2 = 0;
2313         ENTRY;
2314
2315         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2316                inode->i_ino, file_lock);
2317
2318         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2319
2320         if (file_lock->fl_flags & FL_FLOCK) {
2321                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2322                 /* flocks are whole-file locks */
2323                 flock.l_flock.end = OFFSET_MAX;
2324                 /* For flocks owner is determined by the local file desctiptor*/
2325                 flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2326         } else if (file_lock->fl_flags & FL_POSIX) {
2327                 flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2328                 flock.l_flock.start = file_lock->fl_start;
2329                 flock.l_flock.end = file_lock->fl_end;
2330         } else {
2331                 RETURN(-EINVAL);
2332         }
2333         flock.l_flock.pid = file_lock->fl_pid;
2334
2335         /* Somewhat ugly workaround for svc lockd.
2336          * lockd installs custom fl_lmops->lm_compare_owner that checks
2337          * for the fl_owner to be the same (which it always is on local node
2338          * I guess between lockd processes) and then compares pid.
2339          * As such we assign pid to the owner field to make it all work,
2340          * conflict with normal locks is unlikely since pid space and
2341          * pointer space for current->files are not intersecting */
2342         if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
2343                 flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2344
2345         switch (file_lock->fl_type) {
2346         case F_RDLCK:
2347                 einfo.ei_mode = LCK_PR;
2348                 break;
2349         case F_UNLCK:
2350                 /* An unlock request may or may not have any relation to
2351                  * existing locks so we may not be able to pass a lock handle
2352                  * via a normal ldlm_lock_cancel() request. The request may even
2353                  * unlock a byte range in the middle of an existing lock. In
2354                  * order to process an unlock request we need all of the same
2355                  * information that is given with a normal read or write record
2356                  * lock request. To avoid creating another ldlm unlock (cancel)
2357                  * message we'll treat a LCK_NL flock request as an unlock. */
2358                 einfo.ei_mode = LCK_NL;
2359                 break;
2360         case F_WRLCK:
2361                 einfo.ei_mode = LCK_PW;
2362                 break;
2363         default:
2364                 CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2365                         file_lock->fl_type);
2366                 RETURN (-ENOTSUPP);
2367         }
2368
2369         switch (cmd) {
2370         case F_SETLKW:
2371 #ifdef F_SETLKW64
2372         case F_SETLKW64:
2373 #endif
2374                 flags = 0;
2375                 break;
2376         case F_SETLK:
2377 #ifdef F_SETLK64
2378         case F_SETLK64:
2379 #endif
2380                 flags = LDLM_FL_BLOCK_NOWAIT;
2381                 break;
2382         case F_GETLK:
2383 #ifdef F_GETLK64
2384         case F_GETLK64:
2385 #endif
2386                 flags = LDLM_FL_TEST_LOCK;
2387                 /* Save the old mode so that if the mode in the lock changes we
2388                  * can decrement the appropriate reader or writer refcount. */
2389                 file_lock->fl_type = einfo.ei_mode;
2390                 break;
2391         default:
2392                 CERROR("unknown fcntl lock command: %d\n", cmd);
2393                 RETURN (-EINVAL);
2394         }
2395
2396         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2397                                      LUSTRE_OPC_ANY, NULL);
2398         if (IS_ERR(op_data))
2399                 RETURN(PTR_ERR(op_data));
2400
2401         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2402                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2403                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2404
2405         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2406                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2407
2408         if ((file_lock->fl_flags & FL_FLOCK) &&
2409             (rc == 0 || file_lock->fl_type == F_UNLCK))
2410                 rc2  = flock_lock_file_wait(file, file_lock);
2411         if ((file_lock->fl_flags & FL_POSIX) &&
2412             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2413             !(flags & LDLM_FL_TEST_LOCK))
2414                 rc2  = posix_lock_file_wait(file, file_lock);
2415
2416         if (rc2 && file_lock->fl_type != F_UNLCK) {
2417                 einfo.ei_mode = LCK_NL;
2418                 md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2419                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2420                 rc = rc2;
2421         }
2422
2423         ll_finish_md_op_data(op_data);
2424
2425         RETURN(rc);
2426 }
2427
2428 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2429 {
2430         ENTRY;
2431
2432         RETURN(-ENOSYS);
2433 }
2434
2435 /**
2436  * test if some locks matching bits and l_req_mode are acquired
2437  * - bits can be in different locks
2438  * - if found clear the common lock bits in *bits
2439  * - the bits not found, are kept in *bits
2440  * \param inode [IN]
2441  * \param bits [IN] searched lock bits [IN]
2442  * \param l_req_mode [IN] searched lock mode
2443  * \retval boolean, true iff all bits are found
2444  */
2445 int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2446 {
2447         struct lustre_handle lockh;
2448         ldlm_policy_data_t policy;
2449         ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
2450                                 (LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
2451         struct lu_fid *fid;
2452         __u64 flags;
2453         int i;
2454         ENTRY;
2455
2456         if (!inode)
2457                RETURN(0);
2458
2459         fid = &ll_i2info(inode)->lli_fid;
2460         CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
2461                ldlm_lockname[mode]);
2462
2463         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2464         for (i = 0; i < MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
2465                 policy.l_inodebits.bits = *bits & (1 << i);
2466                 if (policy.l_inodebits.bits == 0)
2467                         continue;
2468
2469                 if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
2470                                   &policy, mode, &lockh)) {
2471                         struct ldlm_lock *lock;
2472
2473                         lock = ldlm_handle2lock(&lockh);
2474                         if (lock) {
2475                                 *bits &=
2476                                       ~(lock->l_policy_data.l_inodebits.bits);
2477                                 LDLM_LOCK_PUT(lock);
2478                         } else {
2479                                 *bits &= ~policy.l_inodebits.bits;
2480                         }
2481                 }
2482         }
2483         RETURN(*bits == 0);
2484 }
2485
2486 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2487                             struct lustre_handle *lockh, __u64 flags)
2488 {
2489         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2490         struct lu_fid *fid;
2491         ldlm_mode_t rc;
2492         ENTRY;
2493
2494         fid = &ll_i2info(inode)->lli_fid;
2495         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2496
2497         rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
2498                            fid, LDLM_IBITS, &policy,
2499                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2500         RETURN(rc);
2501 }
2502
2503 static int ll_inode_revalidate_fini(struct inode *inode, int rc)
2504 {
2505         /* Already unlinked. Just update nlink and return success */
2506         if (rc == -ENOENT) {
2507                 clear_nlink(inode);
2508                 /* This path cannot be hit for regular files unless in
2509                  * case of obscure races, so no need to to validate
2510                  * size. */
2511                 if (!S_ISREG(inode->i_mode) && !S_ISDIR(inode->i_mode))
2512                         return 0;
2513         } else if (rc != 0) {
2514                 CERROR("%s: revalidate FID "DFID" error: rc = %d\n",
2515                        ll_get_fsname(inode->i_sb, NULL, 0),
2516                        PFID(ll_inode2fid(inode)), rc);
2517         }
2518
2519         return rc;
2520 }
2521
2522 int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2523                              __u64 ibits)
2524 {
2525         struct inode *inode = dentry->d_inode;
2526         struct ptlrpc_request *req = NULL;
2527         struct obd_export *exp;
2528         int rc = 0;
2529         ENTRY;
2530
2531         LASSERT(inode != NULL);
2532
2533         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2534                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2535
2536         exp = ll_i2mdexp(inode);
2537
2538         /* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
2539          *      But under CMD case, it caused some lock issues, should be fixed
2540          *      with new CMD ibits lock. See bug 12718 */
2541         if (exp_connect_flags(exp) & OBD_CONNECT_ATTRFID) {
2542                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2543                 struct md_op_data *op_data;
2544
2545                 if (ibits == MDS_INODELOCK_LOOKUP)
2546                         oit.it_op = IT_LOOKUP;
2547
2548                 /* Call getattr by fid, so do not provide name at all. */
2549                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2550                                              dentry->d_inode, NULL, 0, 0,
2551                                              LUSTRE_OPC_ANY, NULL);
2552                 if (IS_ERR(op_data))
2553                         RETURN(PTR_ERR(op_data));
2554
2555                 oit.it_create_mode |= M_CHECK_STALE;
2556                 rc = md_intent_lock(exp, op_data, NULL, 0,
2557                                     /* we are not interested in name
2558                                        based lookup */
2559                                     &oit, 0, &req,
2560                                     ll_md_blocking_ast, 0);
2561                 ll_finish_md_op_data(op_data);
2562                 oit.it_create_mode &= ~M_CHECK_STALE;
2563                 if (rc < 0) {
2564                         rc = ll_inode_revalidate_fini(inode, rc);
2565                         GOTO (out, rc);
2566                 }
2567
2568                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2569                 if (rc != 0) {
2570                         ll_intent_release(&oit);
2571                         GOTO(out, rc);
2572                 }
2573
2574                 /* Unlinked? Unhash dentry, so it is not picked up later by
2575                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2576                    here to preserve get_cwd functionality on 2.6.
2577                    Bug 10503 */
2578                 if (!dentry->d_inode->i_nlink)
2579                         d_lustre_invalidate(dentry);
2580
2581                 ll_lookup_finish_locks(&oit, dentry);
2582         } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
2583                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2584                 obd_valid valid = OBD_MD_FLGETATTR;
2585                 struct md_op_data *op_data;
2586                 int ealen = 0;
2587
2588                 if (S_ISREG(inode->i_mode)) {
2589                         rc = ll_get_max_mdsize(sbi, &ealen);
2590                         if (rc)
2591                                 RETURN(rc);
2592                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2593                 }
2594
2595                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
2596                                              0, ealen, LUSTRE_OPC_ANY,
2597                                              NULL);
2598                 if (IS_ERR(op_data))
2599                         RETURN(PTR_ERR(op_data));
2600
2601                 op_data->op_valid = valid;
2602                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2603                  * capa for this inode. Because we only keep capas of dirs
2604                  * fresh. */
2605                 rc = md_getattr(sbi->ll_md_exp, op_data, &req);
2606                 ll_finish_md_op_data(op_data);
2607                 if (rc) {
2608                         rc = ll_inode_revalidate_fini(inode, rc);
2609                         RETURN(rc);
2610                 }
2611
2612                 rc = ll_prep_inode(&inode, req, NULL, NULL);
2613         }
2614 out:
2615         ptlrpc_req_finished(req);
2616         return rc;
2617 }
2618
2619 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2620                            __u64 ibits)
2621 {
2622         struct inode *inode = dentry->d_inode;
2623         int rc;
2624         ENTRY;
2625
2626         rc = __ll_inode_revalidate_it(dentry, it, ibits);
2627         if (rc != 0)
2628                 RETURN(rc);
2629
2630         /* if object isn't regular file, don't validate size */
2631         if (!S_ISREG(inode->i_mode)) {
2632                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
2633                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
2634                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
2635         } else {
2636                 rc = ll_glimpse_size(inode);
2637         }
2638         RETURN(rc);
2639 }
2640
2641 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2642                   struct lookup_intent *it, struct kstat *stat)
2643 {
2644         struct inode *inode = de->d_inode;
2645         struct ll_sb_info *sbi = ll_i2sbi(inode);
2646         struct ll_inode_info *lli = ll_i2info(inode);
2647         int res = 0;
2648
2649         res = ll_inode_revalidate_it(de, it, MDS_INODELOCK_UPDATE |
2650                                              MDS_INODELOCK_LOOKUP);
2651         ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
2652
2653         if (res)
2654                 return res;
2655
2656         stat->dev = inode->i_sb->s_dev;
2657         if (ll_need_32bit_api(sbi))
2658                 stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
2659         else
2660                 stat->ino = inode->i_ino;
2661         stat->mode = inode->i_mode;
2662         stat->nlink = inode->i_nlink;
2663         stat->uid = inode->i_uid;
2664         stat->gid = inode->i_gid;
2665         stat->rdev = inode->i_rdev;
2666         stat->atime = inode->i_atime;
2667         stat->mtime = inode->i_mtime;
2668         stat->ctime = inode->i_ctime;
2669         stat->blksize = 1 << inode->i_blkbits;
2670
2671         stat->size = i_size_read(inode);
2672         stat->blocks = inode->i_blocks;
2673
2674         return 0;
2675 }
2676 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2677 {
2678         struct lookup_intent it = { .it_op = IT_GETATTR };
2679
2680         return ll_getattr_it(mnt, de, &it, stat);
2681 }
2682
2683 #ifdef HAVE_LINUX_FIEMAP_H
2684 int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
2685                 __u64 start, __u64 len)
2686 {
2687         int rc;
2688         size_t num_bytes;
2689         struct ll_user_fiemap *fiemap;
2690         unsigned int extent_count = fieinfo->fi_extents_max;
2691
2692         num_bytes = sizeof(*fiemap) + (extent_count *
2693                                        sizeof(struct ll_fiemap_extent));
2694         OBD_ALLOC_LARGE(fiemap, num_bytes);
2695
2696         if (fiemap == NULL)
2697                 RETURN(-ENOMEM);
2698
2699         fiemap->fm_flags = fieinfo->fi_flags;
2700         fiemap->fm_extent_count = fieinfo->fi_extents_max;
2701         fiemap->fm_start = start;
2702         fiemap->fm_length = len;
2703         memcpy(&fiemap->fm_extents[0], fieinfo->fi_extents_start,
2704                sizeof(struct ll_fiemap_extent));
2705
2706         rc = ll_do_fiemap(inode, fiemap, num_bytes);
2707
2708         fieinfo->fi_flags = fiemap->fm_flags;
2709         fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
2710         memcpy(fieinfo->fi_extents_start, &fiemap->fm_extents[0],
2711                fiemap->fm_mapped_extents * sizeof(struct ll_fiemap_extent));
2712
2713         OBD_FREE_LARGE(fiemap, num_bytes);
2714         return rc;
2715 }
2716 #endif
2717
2718 struct posix_acl * ll_get_acl(struct inode *inode, int type)
2719 {
2720         struct ll_inode_info *lli = ll_i2info(inode);
2721         struct posix_acl *acl = NULL;
2722         ENTRY;
2723
2724         spin_lock(&lli->lli_lock);
2725         /* VFS' acl_permission_check->check_acl will release the refcount */
2726         acl = posix_acl_dup(lli->lli_posix_acl);
2727         spin_unlock(&lli->lli_lock);
2728
2729         RETURN(acl);
2730 }
2731
2732 #ifndef HAVE_GENERIC_PERMISSION_2ARGS
2733 static int
2734 # ifdef HAVE_GENERIC_PERMISSION_4ARGS
2735 ll_check_acl(struct inode *inode, int mask, unsigned int flags)
2736 # else
2737 ll_check_acl(struct inode *inode, int mask)
2738 # endif
2739 {
2740 # ifdef CONFIG_FS_POSIX_ACL
2741         struct posix_acl *acl;
2742         int rc;
2743         ENTRY;
2744
2745 #  ifdef HAVE_GENERIC_PERMISSION_4ARGS
2746         if (flags & IPERM_FLAG_RCU)
2747                 return -ECHILD;
2748 #  endif
2749         acl = ll_get_acl(inode, ACL_TYPE_ACCESS);
2750
2751         if (!acl)
2752                 RETURN(-EAGAIN);
2753
2754         rc = posix_acl_permission(inode, acl, mask);
2755         posix_acl_release(acl);
2756
2757         RETURN(rc);
2758 # else /* !CONFIG_FS_POSIX_ACL */
2759         return -EAGAIN;
2760 # endif /* CONFIG_FS_POSIX_ACL */
2761 }
2762 #endif /* HAVE_GENERIC_PERMISSION_2ARGS */
2763
2764 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
2765 int ll_inode_permission(struct inode *inode, int mask, unsigned int flags)
2766 #else
2767 # ifdef HAVE_INODE_PERMISION_2ARGS
2768 int ll_inode_permission(struct inode *inode, int mask)
2769 # else
2770 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2771 # endif
2772 #endif
2773 {
2774         int rc = 0;
2775         ENTRY;
2776
2777 #ifdef MAY_NOT_BLOCK
2778         if (mask & MAY_NOT_BLOCK)
2779                 return -ECHILD;
2780 #elif defined(HAVE_GENERIC_PERMISSION_4ARGS)
2781         if (flags & IPERM_FLAG_RCU)
2782                 return -ECHILD;
2783 #endif
2784
2785        /* as root inode are NOT getting validated in lookup operation,
2786         * need to do it before permission check. */
2787
2788         if (inode == inode->i_sb->s_root->d_inode) {
2789                 struct lookup_intent it = { .it_op = IT_LOOKUP };
2790
2791                 rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
2792                                               MDS_INODELOCK_LOOKUP);
2793                 if (rc)
2794                         RETURN(rc);
2795         }
2796
2797         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
2798                inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
2799
2800         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2801                 return lustre_check_remote_perm(inode, mask);
2802
2803         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2804         rc = ll_generic_permission(inode, mask, flags, ll_check_acl);
2805
2806         RETURN(rc);
2807 }
2808
2809 #ifdef HAVE_FILE_READV
2810 #define READ_METHOD readv
2811 #define READ_FUNCTION ll_file_readv
2812 #define WRITE_METHOD writev
2813 #define WRITE_FUNCTION ll_file_writev
2814 #else
2815 #define READ_METHOD aio_read
2816 #define READ_FUNCTION ll_file_aio_read
2817 #define WRITE_METHOD aio_write
2818 #define WRITE_FUNCTION ll_file_aio_write
2819 #endif
2820
2821 /* -o localflock - only provides locally consistent flock locks */
2822 struct file_operations ll_file_operations = {
2823         .read           = ll_file_read,
2824         .READ_METHOD    = READ_FUNCTION,
2825         .write          = ll_file_write,
2826         .WRITE_METHOD   = WRITE_FUNCTION,
2827         .unlocked_ioctl = ll_file_ioctl,
2828         .open           = ll_file_open,
2829         .release        = ll_file_release,
2830         .mmap           = ll_file_mmap,
2831         .llseek         = ll_file_seek,
2832 #ifdef HAVE_KERNEL_SENDFILE
2833         .sendfile       = ll_file_sendfile,
2834 #endif
2835 #ifdef HAVE_KERNEL_SPLICE_READ
2836         .splice_read    = ll_file_splice_read,
2837 #endif
2838         .fsync          = ll_fsync,
2839         .flush          = ll_flush
2840 };
2841
2842 struct file_operations ll_file_operations_flock = {
2843         .read           = ll_file_read,
2844         .READ_METHOD    = READ_FUNCTION,
2845         .write          = ll_file_write,
2846         .WRITE_METHOD   = WRITE_FUNCTION,
2847         .unlocked_ioctl = ll_file_ioctl,
2848         .open           = ll_file_open,
2849         .release        = ll_file_release,
2850         .mmap           = ll_file_mmap,
2851         .llseek         = ll_file_seek,
2852 #ifdef HAVE_KERNEL_SENDFILE
2853         .sendfile       = ll_file_sendfile,
2854 #endif
2855 #ifdef HAVE_KERNEL_SPLICE_READ
2856         .splice_read    = ll_file_splice_read,
2857 #endif
2858         .fsync          = ll_fsync,
2859         .flush          = ll_flush,
2860         .flock          = ll_file_flock,
2861         .lock           = ll_file_flock
2862 };
2863
2864 /* These are for -o noflock - to return ENOSYS on flock calls */
2865 struct file_operations ll_file_operations_noflock = {
2866         .read           = ll_file_read,
2867         .READ_METHOD    = READ_FUNCTION,
2868         .write          = ll_file_write,
2869         .WRITE_METHOD   = WRITE_FUNCTION,
2870         .unlocked_ioctl = ll_file_ioctl,
2871         .open           = ll_file_open,
2872         .release        = ll_file_release,
2873         .mmap           = ll_file_mmap,
2874         .llseek         = ll_file_seek,
2875 #ifdef HAVE_KERNEL_SENDFILE
2876         .sendfile       = ll_file_sendfile,
2877 #endif
2878 #ifdef HAVE_KERNEL_SPLICE_READ
2879         .splice_read    = ll_file_splice_read,
2880 #endif
2881         .fsync          = ll_fsync,
2882         .flush          = ll_flush,
2883         .flock          = ll_file_noflock,
2884         .lock           = ll_file_noflock
2885 };
2886
2887 struct inode_operations ll_file_inode_operations = {
2888         .setattr        = ll_setattr,
2889         .getattr        = ll_getattr,
2890         .permission     = ll_inode_permission,
2891         .setxattr       = ll_setxattr,
2892         .getxattr       = ll_getxattr,
2893         .listxattr      = ll_listxattr,
2894         .removexattr    = ll_removexattr,
2895 #ifdef  HAVE_LINUX_FIEMAP_H
2896         .fiemap         = ll_fiemap,
2897 #endif
2898 #ifdef HAVE_IOP_GET_ACL
2899         .get_acl        = ll_get_acl,
2900 #endif
2901 };
2902
2903 /* dynamic ioctl number support routins */
2904 static struct llioc_ctl_data {
2905         struct rw_semaphore     ioc_sem;
2906         cfs_list_t              ioc_head;
2907 } llioc = {
2908         __RWSEM_INITIALIZER(llioc.ioc_sem),
2909         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2910 };
2911
2912
2913 struct llioc_data {
2914         cfs_list_t              iocd_list;
2915         unsigned int            iocd_size;
2916         llioc_callback_t        iocd_cb;
2917         unsigned int            iocd_count;
2918         unsigned int            iocd_cmd[0];
2919 };
2920
2921 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2922 {
2923         unsigned int size;
2924         struct llioc_data *in_data = NULL;
2925         ENTRY;
2926
2927         if (cb == NULL || cmd == NULL ||
2928             count > LLIOC_MAX_CMD || count < 0)
2929                 RETURN(NULL);
2930
2931         size = sizeof(*in_data) + count * sizeof(unsigned int);
2932         OBD_ALLOC(in_data, size);
2933         if (in_data == NULL)
2934                 RETURN(NULL);
2935
2936         memset(in_data, 0, sizeof(*in_data));
2937         in_data->iocd_size = size;
2938         in_data->iocd_cb = cb;
2939         in_data->iocd_count = count;
2940         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2941
2942         down_write(&llioc.ioc_sem);
2943         cfs_list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2944         up_write(&llioc.ioc_sem);
2945
2946         RETURN(in_data);
2947 }
2948
2949 void ll_iocontrol_unregister(void *magic)
2950 {
2951         struct llioc_data *tmp;
2952
2953         if (magic == NULL)
2954                 return;
2955
2956         down_write(&llioc.ioc_sem);
2957         cfs_list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2958                 if (tmp == magic) {
2959                         unsigned int size = tmp->iocd_size;
2960
2961                         cfs_list_del(&tmp->iocd_list);
2962                         up_write(&llioc.ioc_sem);
2963
2964                         OBD_FREE(tmp, size);
2965                         return;
2966                 }
2967         }
2968         up_write(&llioc.ioc_sem);
2969
2970         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2971 }
2972
2973 EXPORT_SYMBOL(ll_iocontrol_register);
2974 EXPORT_SYMBOL(ll_iocontrol_unregister);
2975
2976 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
2977                         unsigned int cmd, unsigned long arg, int *rcp)
2978 {
2979         enum llioc_iter ret = LLIOC_CONT;
2980         struct llioc_data *data;
2981         int rc = -EINVAL, i;
2982
2983         down_read(&llioc.ioc_sem);
2984         cfs_list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2985                 for (i = 0; i < data->iocd_count; i++) {
2986                         if (cmd != data->iocd_cmd[i])
2987                                 continue;
2988
2989                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2990                         break;
2991                 }
2992
2993                 if (ret == LLIOC_STOP)
2994                         break;
2995         }
2996         up_read(&llioc.ioc_sem);
2997
2998         if (rcp)
2999                 *rcp = rc;
3000         return ret;
3001 }
3002
3003 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
3004 {
3005         struct ll_inode_info *lli = ll_i2info(inode);
3006         struct cl_env_nest nest;
3007         struct lu_env *env;
3008         int result;
3009         ENTRY;
3010
3011         if (lli->lli_clob == NULL)
3012                 RETURN(0);
3013
3014         env = cl_env_nested_get(&nest);
3015         if (IS_ERR(env))
3016                 RETURN(PTR_ERR(env));
3017
3018         result = cl_conf_set(env, lli->lli_clob, conf);
3019         cl_env_nested_put(&nest, env);
3020
3021         if (conf->coc_opc == OBJECT_CONF_SET) {
3022                 struct ldlm_lock *lock = conf->coc_lock;
3023
3024                 LASSERT(lock != NULL);
3025                 LASSERT(ldlm_has_layout(lock));
3026                 if (result == 0) {
3027                         /* it can only be allowed to match after layout is
3028                          * applied to inode otherwise false layout would be
3029                          * seen. Applying layout shoud happen before dropping
3030                          * the intent lock. */
3031                         ldlm_lock_allow_match(lock);
3032                 }
3033         }
3034         RETURN(result);
3035 }
3036
3037 /**
3038  * Apply the layout to the inode. Layout lock is held and will be released
3039  * in this function.
3040  */
3041 static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
3042                                 struct inode *inode, __u32 *gen, bool reconf)
3043 {
3044         struct ll_inode_info *lli = ll_i2info(inode);
3045         struct ll_sb_info    *sbi = ll_i2sbi(inode);
3046         struct ldlm_lock *lock;
3047         struct lustre_md md = { NULL };
3048         struct cl_object_conf conf;
3049         int rc = 0;
3050         bool lvb_ready;
3051         ENTRY;
3052
3053         LASSERT(lustre_handle_is_used(lockh));
3054
3055         lock = ldlm_handle2lock(lockh);
3056         LASSERT(lock != NULL);
3057         LASSERT(ldlm_has_layout(lock));
3058
3059         LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n",
3060                 inode, PFID(&lli->lli_fid), reconf);
3061
3062         lock_res_and_lock(lock);
3063         lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY);
3064         unlock_res_and_lock(lock);
3065         /* checking lvb_ready is racy but this is okay. The worst case is
3066          * that multi processes may configure the file on the same time. */
3067         if (lvb_ready || !reconf) {
3068                 LDLM_LOCK_PUT(lock);
3069
3070                 rc = -ENODATA;
3071                 if (lvb_ready) {
3072                         /* layout_gen must be valid if layout lock is not
3073                          * cancelled and stripe has already set */
3074                         *gen = lli->lli_layout_gen;
3075                         rc = 0;
3076                 }
3077                 ldlm_lock_decref(lockh, mode);
3078                 RETURN(rc);
3079         }
3080
3081         /* for layout lock, lmm is returned in lock's lvb.
3082          * lvb_data is immutable if the lock is held so it's safe to access it
3083          * without res lock. See the description in ldlm_lock_decref_internal()
3084          * for the condition to free lvb_data of layout lock */
3085         if (lock->l_lvb_data != NULL) {
3086                 rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
3087                                   lock->l_lvb_data, lock->l_lvb_len);
3088                 if (rc >= 0) {
3089                         if (md.lsm != NULL)
3090                                 *gen = md.lsm->lsm_layout_gen;
3091                         rc = 0;
3092                 } else {
3093                         CERROR("%s: file "DFID" unpackmd error: %d\n",
3094                                 ll_get_fsname(inode->i_sb, NULL, 0),
3095                                 PFID(&lli->lli_fid), rc);
3096                 }
3097         }
3098         if (rc < 0) {
3099                 LDLM_LOCK_PUT(lock);
3100                 ldlm_lock_decref(lockh, mode);
3101                 RETURN(rc);
3102         }
3103
3104         /* set layout to file. Unlikely this will fail as old layout was
3105          * surely eliminated */
3106         memset(&conf, 0, sizeof conf);
3107         conf.coc_opc = OBJECT_CONF_SET;
3108         conf.coc_inode = inode;
3109         conf.coc_lock = lock;
3110         conf.u.coc_md = &md;
3111         rc = ll_layout_conf(inode, &conf);
3112         LDLM_LOCK_PUT(lock);
3113
3114         ldlm_lock_decref(lockh, mode);
3115
3116         if (md.lsm != NULL)
3117                 obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
3118
3119         /* wait for IO to complete if it's still being used. */
3120         if (rc == -EBUSY) {
3121                 CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n",
3122                         ll_get_fsname(inode->i_sb, NULL, 0),
3123                         inode, PFID(&lli->lli_fid));
3124
3125                 memset(&conf, 0, sizeof conf);
3126                 conf.coc_opc = OBJECT_CONF_WAIT;
3127                 conf.coc_inode = inode;
3128                 rc = ll_layout_conf(inode, &conf);
3129                 if (rc == 0)
3130                         rc = -EAGAIN;
3131
3132                 CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n",
3133                         PFID(&lli->lli_fid), rc);
3134         }
3135
3136         RETURN(rc);
3137 }
3138
3139 /**
3140  * This function checks if there exists a LAYOUT lock on the client side,
3141  * or enqueues it if it doesn't have one in cache.
3142  *
3143  * This function will not hold layout lock so it may be revoked any time after
3144  * this function returns. Any operations depend on layout should be redone
3145  * in that case.
3146  *
3147  * This function should be called before lov_io_init() to get an uptodate
3148  * layout version, the caller should save the version number and after IO
3149  * is finished, this function should be called again to verify that layout
3150  * is not changed during IO time.
3151  */
3152 int ll_layout_refresh(struct inode *inode, __u32 *gen)
3153 {
3154         struct ll_inode_info  *lli = ll_i2info(inode);
3155         struct ll_sb_info     *sbi = ll_i2sbi(inode);
3156         struct md_op_data     *op_data;
3157         struct lookup_intent   it;
3158         struct lustre_handle   lockh;
3159         ldlm_mode_t            mode;
3160         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
3161                                            .ei_mode = LCK_CR,
3162                                            .ei_cb_bl = ll_md_blocking_ast,
3163                                            .ei_cb_cp = ldlm_completion_ast,
3164                                            .ei_cbdata = inode };
3165         int rc;
3166         ENTRY;
3167
3168         *gen = LL_LAYOUT_GEN_ZERO;
3169         if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
3170                 RETURN(0);
3171
3172         /* sanity checks */
3173         LASSERT(fid_is_sane(ll_inode2fid(inode)));
3174         LASSERT(S_ISREG(inode->i_mode));
3175
3176         /* mostly layout lock is caching on the local side, so try to match
3177          * it before grabbing layout lock mutex. */
3178         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
3179         if (mode != 0) { /* hit cached lock */
3180                 rc = ll_layout_lock_set(&lockh, mode, inode, gen, false);
3181                 if (rc == 0)
3182                         RETURN(0);
3183
3184                 /* better hold lli_layout_mutex to try again otherwise
3185                  * it will have starvation problem. */
3186         }
3187
3188         /* take layout lock mutex to enqueue layout lock exclusively. */
3189         mutex_lock(&lli->lli_layout_mutex);
3190
3191 again:
3192         /* try again. Maybe somebody else has done this. */
3193         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
3194         if (mode != 0) { /* hit cached lock */
3195                 rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3196                 if (rc == -EAGAIN)
3197                         goto again;
3198
3199                 mutex_unlock(&lli->lli_layout_mutex);
3200                 RETURN(rc);
3201         }
3202
3203         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
3204                         0, 0, LUSTRE_OPC_ANY, NULL);
3205         if (IS_ERR(op_data)) {
3206                 mutex_unlock(&lli->lli_layout_mutex);
3207                 RETURN(PTR_ERR(op_data));
3208         }
3209
3210         /* have to enqueue one */
3211         memset(&it, 0, sizeof(it));
3212         it.it_op = IT_LAYOUT;
3213         lockh.cookie = 0ULL;
3214
3215         LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file %p/"DFID".\n",
3216                         ll_get_fsname(inode->i_sb, NULL, 0), inode,
3217                         PFID(&lli->lli_fid));
3218
3219         rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
3220                         NULL, 0, NULL, 0);
3221         if (it.d.lustre.it_data != NULL)
3222                 ptlrpc_req_finished(it.d.lustre.it_data);
3223         it.d.lustre.it_data = NULL;
3224
3225         ll_finish_md_op_data(op_data);
3226
3227         mode = it.d.lustre.it_lock_mode;
3228         it.d.lustre.it_lock_mode = 0;
3229         ll_intent_drop_lock(&it);
3230
3231         if (rc == 0) {
3232                 /* set lock data in case this is a new lock */
3233                 ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
3234                 rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3235                 if (rc == -EAGAIN)
3236                         goto again;
3237         }
3238         mutex_unlock(&lli->lli_layout_mutex);
3239
3240         RETURN(rc);
3241 }