Whamcloud - gitweb
LU-1431 ptlrpc: PTLRPC_BRW_MAX_SIZE usage cleanup.
[fs/lustre-release.git] / lustre / llite / file.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <linux/pagemap.h>
47 #include <linux/file.h>
48 #include "llite_internal.h"
49 #include <lustre/ll_fiemap.h>
50
51 #include "cl_object.h"
52
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, CFS_ALLOC_IO);
58         fd->fd_write_failed = false;
59         return fd;
60 }
61
62 static void ll_file_data_put(struct ll_file_data *fd)
63 {
64         if (fd != NULL)
65                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
66 }
67
68 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
69                           struct lustre_handle *fh)
70 {
71         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
72         op_data->op_attr.ia_mode = inode->i_mode;
73         op_data->op_attr.ia_atime = inode->i_atime;
74         op_data->op_attr.ia_mtime = inode->i_mtime;
75         op_data->op_attr.ia_ctime = inode->i_ctime;
76         op_data->op_attr.ia_size = i_size_read(inode);
77         op_data->op_attr_blocks = inode->i_blocks;
78         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
79                                         ll_inode_to_ext_flags(inode->i_flags);
80         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
81         if (fh)
82                 op_data->op_handle = *fh;
83         op_data->op_capa1 = ll_mdscapa_get(inode);
84
85         if (LLIF_DATA_MODIFIED & ll_i2info(inode)->lli_flags)
86                 op_data->op_bias |= MDS_DATA_MODIFIED;
87 }
88
89 /**
90  * Closes the IO epoch and packs all the attributes into @op_data for
91  * the CLOSE rpc.
92  */
93 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
94                              struct obd_client_handle *och)
95 {
96         ENTRY;
97
98         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
99                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
100
101         if (!(och->och_flags & FMODE_WRITE))
102                 goto out;
103
104         if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
105                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
106         else
107                 ll_ioepoch_close(inode, op_data, &och, 0);
108
109 out:
110         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
111         ll_prep_md_op_data(op_data, inode, NULL, NULL,
112                            0, 0, LUSTRE_OPC_ANY, NULL);
113         EXIT;
114 }
115
116 static int ll_close_inode_openhandle(struct obd_export *md_exp,
117                                      struct inode *inode,
118                                      struct obd_client_handle *och)
119 {
120         struct obd_export *exp = ll_i2mdexp(inode);
121         struct md_op_data *op_data;
122         struct ptlrpc_request *req = NULL;
123         struct obd_device *obd = class_exp2obd(exp);
124         int epoch_close = 1;
125         int rc;
126         ENTRY;
127
128         if (obd == NULL) {
129                 /*
130                  * XXX: in case of LMV, is this correct to access
131                  * ->exp_handle?
132                  */
133                 CERROR("Invalid MDC connection handle "LPX64"\n",
134                        ll_i2mdexp(inode)->exp_handle.h_cookie);
135                 GOTO(out, rc = 0);
136         }
137
138         OBD_ALLOC_PTR(op_data);
139         if (op_data == NULL)
140                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
141
142         ll_prepare_close(inode, op_data, och);
143         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
144         rc = md_close(md_exp, op_data, och->och_mod, &req);
145         if (rc == -EAGAIN) {
146                 /* This close must have the epoch closed. */
147                 LASSERT(epoch_close);
148                 /* MDS has instructed us to obtain Size-on-MDS attribute from
149                  * OSTs and send setattr to back to MDS. */
150                 rc = ll_som_update(inode, op_data);
151                 if (rc) {
152                         CERROR("inode %lu mdc Size-on-MDS update failed: "
153                                "rc = %d\n", inode->i_ino, rc);
154                         rc = 0;
155                 }
156         } else if (rc) {
157                 CERROR("inode %lu mdc close failed: rc = %d\n",
158                        inode->i_ino, rc);
159         }
160
161         /* DATA_MODIFIED flag was successfully sent on close, cancel data
162          * modification flag. */
163         if (rc == 0 && (op_data->op_bias & MDS_DATA_MODIFIED)) {
164                 struct ll_inode_info *lli = ll_i2info(inode);
165
166                 spin_lock(&lli->lli_lock);
167                 lli->lli_flags &= ~LLIF_DATA_MODIFIED;
168                 spin_unlock(&lli->lli_lock);
169         }
170
171         ll_finish_md_op_data(op_data);
172
173         if (rc == 0) {
174                 rc = ll_objects_destroy(req, inode);
175                 if (rc)
176                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
177                                inode->i_ino, rc);
178         }
179
180         EXIT;
181 out:
182
183         if (exp_connect_som(exp) && !epoch_close &&
184             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
185                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
186         } else {
187                 md_clear_open_replay_data(md_exp, och);
188                 /* Free @och if it is not waiting for DONE_WRITING. */
189                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
190                 OBD_FREE_PTR(och);
191         }
192         if (req) /* This is close request */
193                 ptlrpc_req_finished(req);
194         return rc;
195 }
196
197 int ll_md_real_close(struct inode *inode, int flags)
198 {
199         struct ll_inode_info *lli = ll_i2info(inode);
200         struct obd_client_handle **och_p;
201         struct obd_client_handle *och;
202         __u64 *och_usecount;
203         int rc = 0;
204         ENTRY;
205
206         if (flags & FMODE_WRITE) {
207                 och_p = &lli->lli_mds_write_och;
208                 och_usecount = &lli->lli_open_fd_write_count;
209         } else if (flags & FMODE_EXEC) {
210                 och_p = &lli->lli_mds_exec_och;
211                 och_usecount = &lli->lli_open_fd_exec_count;
212         } else {
213                 LASSERT(flags & FMODE_READ);
214                 och_p = &lli->lli_mds_read_och;
215                 och_usecount = &lli->lli_open_fd_read_count;
216         }
217
218         mutex_lock(&lli->lli_och_mutex);
219         if (*och_usecount) { /* There are still users of this handle, so
220                                 skip freeing it. */
221                 mutex_unlock(&lli->lli_och_mutex);
222                 RETURN(0);
223         }
224         och=*och_p;
225         *och_p = NULL;
226         mutex_unlock(&lli->lli_och_mutex);
227
228         if (och) { /* There might be a race and somebody have freed this och
229                       already */
230                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
231                                                inode, och);
232         }
233
234         RETURN(rc);
235 }
236
237 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
238                 struct file *file)
239 {
240         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
241         struct ll_inode_info *lli = ll_i2info(inode);
242         int rc = 0;
243         ENTRY;
244
245         /* clear group lock, if present */
246         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
247                 ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
248
249         /* Let's see if we have good enough OPEN lock on the file and if
250            we can skip talking to MDS */
251         if (file->f_dentry->d_inode) { /* Can this ever be false? */
252                 int lockmode;
253                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
254                 struct lustre_handle lockh;
255                 struct inode *inode = file->f_dentry->d_inode;
256                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
257
258                 mutex_lock(&lli->lli_och_mutex);
259                 if (fd->fd_omode & FMODE_WRITE) {
260                         lockmode = LCK_CW;
261                         LASSERT(lli->lli_open_fd_write_count);
262                         lli->lli_open_fd_write_count--;
263                 } else if (fd->fd_omode & FMODE_EXEC) {
264                         lockmode = LCK_PR;
265                         LASSERT(lli->lli_open_fd_exec_count);
266                         lli->lli_open_fd_exec_count--;
267                 } else {
268                         lockmode = LCK_CR;
269                         LASSERT(lli->lli_open_fd_read_count);
270                         lli->lli_open_fd_read_count--;
271                 }
272                 mutex_unlock(&lli->lli_och_mutex);
273
274                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
275                                    LDLM_IBITS, &policy, lockmode,
276                                    &lockh)) {
277                         rc = ll_md_real_close(file->f_dentry->d_inode,
278                                               fd->fd_omode);
279                 }
280         } else {
281                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
282                        file, file->f_dentry, file->f_dentry->d_name.name);
283         }
284
285         LUSTRE_FPRIVATE(file) = NULL;
286         ll_file_data_put(fd);
287         ll_capa_close(inode);
288
289         RETURN(rc);
290 }
291
292 /* While this returns an error code, fput() the caller does not, so we need
293  * to make every effort to clean up all of our state here.  Also, applications
294  * rarely check close errors and even if an error is returned they will not
295  * re-try the close call.
296  */
297 int ll_file_release(struct inode *inode, struct file *file)
298 {
299         struct ll_file_data *fd;
300         struct ll_sb_info *sbi = ll_i2sbi(inode);
301         struct ll_inode_info *lli = ll_i2info(inode);
302         int rc;
303         ENTRY;
304
305         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
306                inode->i_generation, inode);
307
308 #ifdef CONFIG_FS_POSIX_ACL
309         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
310             inode == inode->i_sb->s_root->d_inode) {
311                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
312
313                 LASSERT(fd != NULL);
314                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
315                         fd->fd_flags &= ~LL_FILE_RMTACL;
316                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
317                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
318                 }
319         }
320 #endif
321
322         if (inode->i_sb->s_root != file->f_dentry)
323                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
324         fd = LUSTRE_FPRIVATE(file);
325         LASSERT(fd != NULL);
326
327         /* The last ref on @file, maybe not the the owner pid of statahead.
328          * Different processes can open the same dir, "ll_opendir_key" means:
329          * it is me that should stop the statahead thread. */
330         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
331             lli->lli_opendir_pid != 0)
332                 ll_stop_statahead(inode, lli->lli_opendir_key);
333
334         if (inode->i_sb->s_root == file->f_dentry) {
335                 LUSTRE_FPRIVATE(file) = NULL;
336                 ll_file_data_put(fd);
337                 RETURN(0);
338         }
339
340         if (!S_ISDIR(inode->i_mode)) {
341                 lov_read_and_clear_async_rc(lli->lli_clob);
342                 lli->lli_async_rc = 0;
343         }
344
345         rc = ll_md_close(sbi->ll_md_exp, inode, file);
346
347         if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
348                 libcfs_debug_dumplog();
349
350         RETURN(rc);
351 }
352
353 static int ll_intent_file_open(struct file *file, void *lmm,
354                                int lmmsize, struct lookup_intent *itp)
355 {
356         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
357         struct dentry *parent = file->f_dentry->d_parent;
358         const char *name = file->f_dentry->d_name.name;
359         const int len = file->f_dentry->d_name.len;
360         struct md_op_data *op_data;
361         struct ptlrpc_request *req;
362         __u32 opc = LUSTRE_OPC_ANY;
363         int rc;
364         ENTRY;
365
366         if (!parent)
367                 RETURN(-ENOENT);
368
369         /* Usually we come here only for NFSD, and we want open lock.
370            But we can also get here with pre 2.6.15 patchless kernels, and in
371            that case that lock is also ok */
372         /* We can also get here if there was cached open handle in revalidate_it
373          * but it disappeared while we were getting from there to ll_file_open.
374          * But this means this file was closed and immediatelly opened which
375          * makes a good candidate for using OPEN lock */
376         /* If lmmsize & lmm are not 0, we are just setting stripe info
377          * parameters. No need for the open lock */
378         if (lmm == NULL && lmmsize == 0) {
379                 itp->it_flags |= MDS_OPEN_LOCK;
380                 if (itp->it_flags & FMODE_WRITE)
381                         opc = LUSTRE_OPC_CREATE;
382         }
383
384         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
385                                       file->f_dentry->d_inode, name, len,
386                                       O_RDWR, opc, NULL);
387         if (IS_ERR(op_data))
388                 RETURN(PTR_ERR(op_data));
389
390         itp->it_flags |= MDS_OPEN_BY_FID;
391         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
392                             0 /*unused */, &req, ll_md_blocking_ast, 0);
393         ll_finish_md_op_data(op_data);
394         if (rc == -ESTALE) {
395                 /* reason for keep own exit path - don`t flood log
396                 * with messages with -ESTALE errors.
397                 */
398                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
399                      it_open_error(DISP_OPEN_OPEN, itp))
400                         GOTO(out, rc);
401                 ll_release_openhandle(file->f_dentry, itp);
402                 GOTO(out, rc);
403         }
404
405         if (it_disposition(itp, DISP_LOOKUP_NEG))
406                 GOTO(out, rc = -ENOENT);
407
408         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
409                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
410                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
411                 GOTO(out, rc);
412         }
413
414         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL, itp);
415         if (!rc && itp->d.lustre.it_lock_mode)
416                 ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
417                                  itp, NULL);
418
419 out:
420         ptlrpc_req_finished(itp->d.lustre.it_data);
421         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
422         ll_intent_drop_lock(itp);
423
424         RETURN(rc);
425 }
426
427 /**
428  * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
429  * not believe attributes if a few ioepoch holders exist. Attributes for
430  * previous ioepoch if new one is opened are also skipped by MDS.
431  */
432 void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
433 {
434         if (ioepoch && lli->lli_ioepoch != ioepoch) {
435                 lli->lli_ioepoch = ioepoch;
436                 CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
437                        ioepoch, PFID(&lli->lli_fid));
438         }
439 }
440
441 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
442                        struct lookup_intent *it, struct obd_client_handle *och)
443 {
444         struct ptlrpc_request *req = it->d.lustre.it_data;
445         struct mdt_body *body;
446
447         LASSERT(och);
448
449         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
450         LASSERT(body != NULL);                      /* reply already checked out */
451
452         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
453         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
454         och->och_fid = lli->lli_fid;
455         och->och_flags = it->it_flags;
456         ll_ioepoch_open(lli, body->ioepoch);
457
458         return md_set_open_replay_data(md_exp, och, req);
459 }
460
461 int ll_local_open(struct file *file, struct lookup_intent *it,
462                   struct ll_file_data *fd, struct obd_client_handle *och)
463 {
464         struct inode *inode = file->f_dentry->d_inode;
465         struct ll_inode_info *lli = ll_i2info(inode);
466         ENTRY;
467
468         LASSERT(!LUSTRE_FPRIVATE(file));
469
470         LASSERT(fd != NULL);
471
472         if (och) {
473                 struct ptlrpc_request *req = it->d.lustre.it_data;
474                 struct mdt_body *body;
475                 int rc;
476
477                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
478                 if (rc)
479                         RETURN(rc);
480
481                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
482                 if ((it->it_flags & FMODE_WRITE) &&
483                     (body->valid & OBD_MD_FLSIZE))
484                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
485                                lli->lli_ioepoch, PFID(&lli->lli_fid));
486         }
487
488         LUSTRE_FPRIVATE(file) = fd;
489         ll_readahead_init(inode, &fd->fd_ras);
490         fd->fd_omode = it->it_flags;
491         RETURN(0);
492 }
493
494 /* Open a file, and (for the very first open) create objects on the OSTs at
495  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
496  * creation or open until ll_lov_setstripe() ioctl is called.
497  *
498  * If we already have the stripe MD locally then we don't request it in
499  * md_open(), by passing a lmm_size = 0.
500  *
501  * It is up to the application to ensure no other processes open this file
502  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
503  * used.  We might be able to avoid races of that sort by getting lli_open_sem
504  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
505  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
506  */
507 int ll_file_open(struct inode *inode, struct file *file)
508 {
509         struct ll_inode_info *lli = ll_i2info(inode);
510         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
511                                           .it_flags = file->f_flags };
512         struct obd_client_handle **och_p = NULL;
513         __u64 *och_usecount = NULL;
514         struct ll_file_data *fd;
515         int rc = 0, opendir_set = 0;
516         ENTRY;
517
518         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
519                inode->i_generation, inode, file->f_flags);
520
521         it = file->private_data; /* XXX: compat macro */
522         file->private_data = NULL; /* prevent ll_local_open assertion */
523
524         fd = ll_file_data_get();
525         if (fd == NULL)
526                 GOTO(out_och_free, rc = -ENOMEM);
527
528         fd->fd_file = file;
529         if (S_ISDIR(inode->i_mode)) {
530                 spin_lock(&lli->lli_sa_lock);
531                 if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
532                     lli->lli_opendir_pid == 0) {
533                         lli->lli_opendir_key = fd;
534                         lli->lli_opendir_pid = cfs_curproc_pid();
535                         opendir_set = 1;
536                 }
537                 spin_unlock(&lli->lli_sa_lock);
538         }
539
540         if (inode->i_sb->s_root == file->f_dentry) {
541                 LUSTRE_FPRIVATE(file) = fd;
542                 RETURN(0);
543         }
544
545         if (!it || !it->d.lustre.it_disposition) {
546                 /* Convert f_flags into access mode. We cannot use file->f_mode,
547                  * because everything but O_ACCMODE mask was stripped from
548                  * there */
549                 if ((oit.it_flags + 1) & O_ACCMODE)
550                         oit.it_flags++;
551                 if (file->f_flags & O_TRUNC)
552                         oit.it_flags |= FMODE_WRITE;
553
554                 /* kernel only call f_op->open in dentry_open.  filp_open calls
555                  * dentry_open after call to open_namei that checks permissions.
556                  * Only nfsd_open call dentry_open directly without checking
557                  * permissions and because of that this code below is safe. */
558                 if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
559                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
560
561                 /* We do not want O_EXCL here, presumably we opened the file
562                  * already? XXX - NFS implications? */
563                 oit.it_flags &= ~O_EXCL;
564
565                 /* bug20584, if "it_flags" contains O_CREAT, the file will be
566                  * created if necessary, then "IT_CREAT" should be set to keep
567                  * consistent with it */
568                 if (oit.it_flags & O_CREAT)
569                         oit.it_op |= IT_CREAT;
570
571                 it = &oit;
572         }
573
574 restart:
575         /* Let's see if we have file open on MDS already. */
576         if (it->it_flags & FMODE_WRITE) {
577                 och_p = &lli->lli_mds_write_och;
578                 och_usecount = &lli->lli_open_fd_write_count;
579         } else if (it->it_flags & FMODE_EXEC) {
580                 och_p = &lli->lli_mds_exec_och;
581                 och_usecount = &lli->lli_open_fd_exec_count;
582          } else {
583                 och_p = &lli->lli_mds_read_och;
584                 och_usecount = &lli->lli_open_fd_read_count;
585         }
586
587         mutex_lock(&lli->lli_och_mutex);
588         if (*och_p) { /* Open handle is present */
589                 if (it_disposition(it, DISP_OPEN_OPEN)) {
590                         /* Well, there's extra open request that we do not need,
591                            let's close it somehow. This will decref request. */
592                         rc = it_open_error(DISP_OPEN_OPEN, it);
593                         if (rc) {
594                                 mutex_unlock(&lli->lli_och_mutex);
595                                 GOTO(out_openerr, rc);
596                         }
597
598                         ll_release_openhandle(file->f_dentry, it);
599                 }
600                 (*och_usecount)++;
601
602                 rc = ll_local_open(file, it, fd, NULL);
603                 if (rc) {
604                         (*och_usecount)--;
605                         mutex_unlock(&lli->lli_och_mutex);
606                         GOTO(out_openerr, rc);
607                 }
608         } else {
609                 LASSERT(*och_usecount == 0);
610                 if (!it->d.lustre.it_disposition) {
611                         /* We cannot just request lock handle now, new ELC code
612                            means that one of other OPEN locks for this file
613                            could be cancelled, and since blocking ast handler
614                            would attempt to grab och_mutex as well, that would
615                            result in a deadlock */
616                         mutex_unlock(&lli->lli_och_mutex);
617                         it->it_create_mode |= M_CHECK_STALE;
618                         rc = ll_intent_file_open(file, NULL, 0, it);
619                         it->it_create_mode &= ~M_CHECK_STALE;
620                         if (rc)
621                                 GOTO(out_openerr, rc);
622
623                         goto restart;
624                 }
625                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
626                 if (!*och_p)
627                         GOTO(out_och_free, rc = -ENOMEM);
628
629                 (*och_usecount)++;
630
631                 /* md_intent_lock() didn't get a request ref if there was an
632                  * open error, so don't do cleanup on the request here
633                  * (bug 3430) */
634                 /* XXX (green): Should not we bail out on any error here, not
635                  * just open error? */
636                 rc = it_open_error(DISP_OPEN_OPEN, it);
637                 if (rc)
638                         GOTO(out_och_free, rc);
639
640                 LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
641
642                 rc = ll_local_open(file, it, fd, *och_p);
643                 if (rc)
644                         GOTO(out_och_free, rc);
645         }
646         mutex_unlock(&lli->lli_och_mutex);
647         fd = NULL;
648
649         /* Must do this outside lli_och_mutex lock to prevent deadlock where
650            different kind of OPEN lock for this same inode gets cancelled
651            by ldlm_cancel_lru */
652         if (!S_ISREG(inode->i_mode))
653                 GOTO(out_och_free, rc);
654
655         ll_capa_open(inode);
656
657         if (!lli->lli_has_smd) {
658                 if (file->f_flags & O_LOV_DELAY_CREATE ||
659                     !(file->f_mode & FMODE_WRITE)) {
660                         CDEBUG(D_INODE, "object creation was delayed\n");
661                         GOTO(out_och_free, rc);
662                 }
663         }
664         file->f_flags &= ~O_LOV_DELAY_CREATE;
665         GOTO(out_och_free, rc);
666
667 out_och_free:
668         if (rc) {
669                 if (och_p && *och_p) {
670                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
671                         *och_p = NULL; /* OBD_FREE writes some magic there */
672                         (*och_usecount)--;
673                 }
674                 mutex_unlock(&lli->lli_och_mutex);
675
676 out_openerr:
677                 if (opendir_set != 0)
678                         ll_stop_statahead(inode, lli->lli_opendir_key);
679                 if (fd != NULL)
680                         ll_file_data_put(fd);
681         } else {
682                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
683         }
684
685         if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
686                 ptlrpc_req_finished(it->d.lustre.it_data);
687                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
688         }
689
690         return rc;
691 }
692
693 /* Fills the obdo with the attributes for the lsm */
694 static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
695                           struct obd_capa *capa, struct obdo *obdo,
696                           __u64 ioepoch, int sync)
697 {
698         struct ptlrpc_request_set *set;
699         struct obd_info            oinfo = { { { 0 } } };
700         int                        rc;
701
702         ENTRY;
703
704         LASSERT(lsm != NULL);
705
706         oinfo.oi_md = lsm;
707         oinfo.oi_oa = obdo;
708         oinfo.oi_oa->o_id = lsm->lsm_object_id;
709         oinfo.oi_oa->o_seq = lsm->lsm_object_seq;
710         oinfo.oi_oa->o_mode = S_IFREG;
711         oinfo.oi_oa->o_ioepoch = ioepoch;
712         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
713                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
714                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
715                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
716                                OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
717                                OBD_MD_FLDATAVERSION;
718         oinfo.oi_capa = capa;
719         if (sync) {
720                 oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
721                 oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
722         }
723
724         set = ptlrpc_prep_set();
725         if (set == NULL) {
726                 CERROR("can't allocate ptlrpc set\n");
727                 rc = -ENOMEM;
728         } else {
729                 rc = obd_getattr_async(exp, &oinfo, set);
730                 if (rc == 0)
731                         rc = ptlrpc_set_wait(set);
732                 ptlrpc_set_destroy(set);
733         }
734         if (rc == 0)
735                 oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
736                                          OBD_MD_FLATIME | OBD_MD_FLMTIME |
737                                          OBD_MD_FLCTIME | OBD_MD_FLSIZE |
738                                          OBD_MD_FLDATAVERSION);
739         RETURN(rc);
740 }
741
742 /**
743   * Performs the getattr on the inode and updates its fields.
744   * If @sync != 0, perform the getattr under the server-side lock.
745   */
746 int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
747                      __u64 ioepoch, int sync)
748 {
749         struct obd_capa      *capa = ll_mdscapa_get(inode);
750         struct lov_stripe_md *lsm;
751         int rc;
752         ENTRY;
753
754         lsm = ccc_inode_lsm_get(inode);
755         rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
756                             capa, obdo, ioepoch, sync);
757         capa_put(capa);
758         if (rc == 0) {
759                 obdo_refresh_inode(inode, obdo, obdo->o_valid);
760                 CDEBUG(D_INODE,
761                        "objid "LPX64" size %llu, blocks %llu, blksize %lu\n",
762                        lsm ? lsm->lsm_object_id : 0, i_size_read(inode),
763                        (unsigned long long)inode->i_blocks,
764                        (unsigned long)ll_inode_blksize(inode));
765         }
766         ccc_inode_lsm_put(inode, lsm);
767         RETURN(rc);
768 }
769
770 int ll_merge_lvb(const struct lu_env *env, struct inode *inode)
771 {
772         struct ll_inode_info *lli = ll_i2info(inode);
773         struct cl_object *obj = lli->lli_clob;
774         struct cl_attr *attr = ccc_env_thread_attr(env);
775         struct ost_lvb lvb;
776         int rc = 0;
777
778         ENTRY;
779
780         ll_inode_size_lock(inode);
781         /* merge timestamps the most recently obtained from mds with
782            timestamps obtained from osts */
783         LTIME_S(inode->i_atime) = lli->lli_lvb.lvb_atime;
784         LTIME_S(inode->i_mtime) = lli->lli_lvb.lvb_mtime;
785         LTIME_S(inode->i_ctime) = lli->lli_lvb.lvb_ctime;
786         inode_init_lvb(inode, &lvb);
787
788         cl_object_attr_lock(obj);
789         rc = cl_object_attr_get(env, obj, attr);
790         cl_object_attr_unlock(obj);
791
792         if (rc == 0) {
793                 if (lvb.lvb_atime < attr->cat_atime)
794                         lvb.lvb_atime = attr->cat_atime;
795                 if (lvb.lvb_ctime < attr->cat_ctime)
796                         lvb.lvb_ctime = attr->cat_ctime;
797                 if (lvb.lvb_mtime < attr->cat_mtime)
798                         lvb.lvb_mtime = attr->cat_mtime;
799
800                 CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
801                                 PFID(&lli->lli_fid), attr->cat_size);
802                 cl_isize_write_nolock(inode, attr->cat_size);
803
804                 inode->i_blocks = attr->cat_blocks;
805
806                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
807                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
808                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
809         }
810         ll_inode_size_unlock(inode);
811
812         RETURN(rc);
813 }
814
815 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
816                      lstat_t *st)
817 {
818         struct obdo obdo = { 0 };
819         int rc;
820
821         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
822         if (rc == 0) {
823                 st->st_size   = obdo.o_size;
824                 st->st_blocks = obdo.o_blocks;
825                 st->st_mtime  = obdo.o_mtime;
826                 st->st_atime  = obdo.o_atime;
827                 st->st_ctime  = obdo.o_ctime;
828         }
829         return rc;
830 }
831
832 void ll_io_init(struct cl_io *io, const struct file *file, int write)
833 {
834         struct inode *inode = file->f_dentry->d_inode;
835
836         io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
837         if (write) {
838                 io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
839                 io->u.ci_wr.wr_sync = file->f_flags & O_SYNC || IS_SYNC(inode);
840         }
841         io->ci_obj     = ll_i2info(inode)->lli_clob;
842         io->ci_lockreq = CILR_MAYBE;
843         if (ll_file_nolock(file)) {
844                 io->ci_lockreq = CILR_NEVER;
845                 io->ci_no_srvlock = 1;
846         } else if (file->f_flags & O_APPEND) {
847                 io->ci_lockreq = CILR_MANDATORY;
848         }
849 }
850
851 static ssize_t
852 ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
853                    struct file *file, enum cl_io_type iot,
854                    loff_t *ppos, size_t count)
855 {
856         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
857         struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
858         struct cl_io         *io;
859         ssize_t               result;
860         ENTRY;
861
862         io = ccc_env_thread_io(env);
863         ll_io_init(io, file, iot == CIT_WRITE);
864
865         if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
866                 struct vvp_io *vio = vvp_env_io(env);
867                 struct ccc_io *cio = ccc_env_io(env);
868                 int write_mutex_locked = 0;
869
870                 cio->cui_fd  = LUSTRE_FPRIVATE(file);
871                 vio->cui_io_subtype = args->via_io_subtype;
872
873                 switch (vio->cui_io_subtype) {
874                 case IO_NORMAL:
875                         cio->cui_iov = args->u.normal.via_iov;
876                         cio->cui_nrsegs = args->u.normal.via_nrsegs;
877                         cio->cui_tot_nrsegs = cio->cui_nrsegs;
878 #ifndef HAVE_FILE_WRITEV
879                         cio->cui_iocb = args->u.normal.via_iocb;
880 #endif
881                         if ((iot == CIT_WRITE) &&
882                             !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
883                                 if (mutex_lock_interruptible(&lli->
884                                                                lli_write_mutex))
885                                         GOTO(out, result = -ERESTARTSYS);
886                                 write_mutex_locked = 1;
887                         } else if (iot == CIT_READ) {
888                                 down_read(&lli->lli_trunc_sem);
889                         }
890                         break;
891                 case IO_SENDFILE:
892                         vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
893                         vio->u.sendfile.cui_target = args->u.sendfile.via_target;
894                         break;
895                 case IO_SPLICE:
896                         vio->u.splice.cui_pipe = args->u.splice.via_pipe;
897                         vio->u.splice.cui_flags = args->u.splice.via_flags;
898                         break;
899                 default:
900                         CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
901                         LBUG();
902                 }
903                 result = cl_io_loop(env, io);
904                 if (write_mutex_locked)
905                         mutex_unlock(&lli->lli_write_mutex);
906                 else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
907                         up_read(&lli->lli_trunc_sem);
908         } else {
909                 /* cl_io_rw_init() handled IO */
910                 result = io->ci_result;
911         }
912
913         if (io->ci_nob > 0) {
914                 result = io->ci_nob;
915                 *ppos = io->u.ci_wr.wr.crw_pos;
916         }
917         GOTO(out, result);
918 out:
919         cl_io_fini(env, io);
920
921         if (iot == CIT_READ) {
922                 if (result >= 0)
923                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
924                                            LPROC_LL_READ_BYTES, result);
925         } else if (iot == CIT_WRITE) {
926                 if (result >= 0) {
927                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
928                                            LPROC_LL_WRITE_BYTES, result);
929                         fd->fd_write_failed = false;
930                 } else {
931                         fd->fd_write_failed = true;
932                 }
933         }
934
935         return result;
936 }
937
938
939 /*
940  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
941  */
942 static int ll_file_get_iov_count(const struct iovec *iov,
943                                  unsigned long *nr_segs, size_t *count)
944 {
945         size_t cnt = 0;
946         unsigned long seg;
947
948         for (seg = 0; seg < *nr_segs; seg++) {
949                 const struct iovec *iv = &iov[seg];
950
951                 /*
952                  * If any segment has a negative length, or the cumulative
953                  * length ever wraps negative then return -EINVAL.
954                  */
955                 cnt += iv->iov_len;
956                 if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
957                         return -EINVAL;
958                 if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
959                         continue;
960                 if (seg == 0)
961                         return -EFAULT;
962                 *nr_segs = seg;
963                 cnt -= iv->iov_len;   /* This segment is no good */
964                 break;
965         }
966         *count = cnt;
967         return 0;
968 }
969
970 #ifdef HAVE_FILE_READV
971 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
972                               unsigned long nr_segs, loff_t *ppos)
973 {
974         struct lu_env      *env;
975         struct vvp_io_args *args;
976         size_t              count;
977         ssize_t             result;
978         int                 refcheck;
979         ENTRY;
980
981         result = ll_file_get_iov_count(iov, &nr_segs, &count);
982         if (result)
983                 RETURN(result);
984
985         env = cl_env_get(&refcheck);
986         if (IS_ERR(env))
987                 RETURN(PTR_ERR(env));
988
989         args = vvp_env_args(env, IO_NORMAL);
990         args->u.normal.via_iov = (struct iovec *)iov;
991         args->u.normal.via_nrsegs = nr_segs;
992
993         result = ll_file_io_generic(env, args, file, CIT_READ, ppos, count);
994         cl_env_put(env, &refcheck);
995         RETURN(result);
996 }
997
998 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
999                             loff_t *ppos)
1000 {
1001         struct lu_env *env;
1002         struct iovec  *local_iov;
1003         ssize_t        result;
1004         int            refcheck;
1005         ENTRY;
1006
1007         env = cl_env_get(&refcheck);
1008         if (IS_ERR(env))
1009                 RETURN(PTR_ERR(env));
1010
1011         local_iov = &vvp_env_info(env)->vti_local_iov;
1012         local_iov->iov_base = (void __user *)buf;
1013         local_iov->iov_len = count;
1014         result = ll_file_readv(file, local_iov, 1, ppos);
1015         cl_env_put(env, &refcheck);
1016         RETURN(result);
1017 }
1018
1019 #else
1020 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
1021                                 unsigned long nr_segs, loff_t pos)
1022 {
1023         struct lu_env      *env;
1024         struct vvp_io_args *args;
1025         size_t              count;
1026         ssize_t             result;
1027         int                 refcheck;
1028         ENTRY;
1029
1030         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1031         if (result)
1032                 RETURN(result);
1033
1034         env = cl_env_get(&refcheck);
1035         if (IS_ERR(env))
1036                 RETURN(PTR_ERR(env));
1037
1038         args = vvp_env_args(env, IO_NORMAL);
1039         args->u.normal.via_iov = (struct iovec *)iov;
1040         args->u.normal.via_nrsegs = nr_segs;
1041         args->u.normal.via_iocb = iocb;
1042
1043         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1044                                     &iocb->ki_pos, count);
1045         cl_env_put(env, &refcheck);
1046         RETURN(result);
1047 }
1048
1049 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1050                             loff_t *ppos)
1051 {
1052         struct lu_env *env;
1053         struct iovec  *local_iov;
1054         struct kiocb  *kiocb;
1055         ssize_t        result;
1056         int            refcheck;
1057         ENTRY;
1058
1059         env = cl_env_get(&refcheck);
1060         if (IS_ERR(env))
1061                 RETURN(PTR_ERR(env));
1062
1063         local_iov = &vvp_env_info(env)->vti_local_iov;
1064         kiocb = &vvp_env_info(env)->vti_kiocb;
1065         local_iov->iov_base = (void __user *)buf;
1066         local_iov->iov_len = count;
1067         init_sync_kiocb(kiocb, file);
1068         kiocb->ki_pos = *ppos;
1069         kiocb->ki_left = count;
1070
1071         result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1072         *ppos = kiocb->ki_pos;
1073
1074         cl_env_put(env, &refcheck);
1075         RETURN(result);
1076 }
1077 #endif
1078
1079 /*
1080  * Write to a file (through the page cache).
1081  */
1082 #ifdef HAVE_FILE_WRITEV
1083 static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
1084                               unsigned long nr_segs, loff_t *ppos)
1085 {
1086         struct lu_env      *env;
1087         struct vvp_io_args *args;
1088         size_t              count;
1089         ssize_t             result;
1090         int                 refcheck;
1091         ENTRY;
1092
1093         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1094         if (result)
1095                 RETURN(result);
1096
1097         env = cl_env_get(&refcheck);
1098         if (IS_ERR(env))
1099                 RETURN(PTR_ERR(env));
1100
1101         args = vvp_env_args(env, IO_NORMAL);
1102         args->u.normal.via_iov = (struct iovec *)iov;
1103         args->u.normal.via_nrsegs = nr_segs;
1104
1105         result = ll_file_io_generic(env, args, file, CIT_WRITE, ppos, count);
1106         cl_env_put(env, &refcheck);
1107         RETURN(result);
1108 }
1109
1110 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1111                              loff_t *ppos)
1112 {
1113         struct lu_env    *env;
1114         struct iovec     *local_iov;
1115         ssize_t           result;
1116         int               refcheck;
1117         ENTRY;
1118
1119         env = cl_env_get(&refcheck);
1120         if (IS_ERR(env))
1121                 RETURN(PTR_ERR(env));
1122
1123         local_iov = &vvp_env_info(env)->vti_local_iov;
1124         local_iov->iov_base = (void __user *)buf;
1125         local_iov->iov_len = count;
1126
1127         result = ll_file_writev(file, local_iov, 1, ppos);
1128         cl_env_put(env, &refcheck);
1129         RETURN(result);
1130 }
1131
1132 #else /* AIO stuff */
1133 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1134                                  unsigned long nr_segs, loff_t pos)
1135 {
1136         struct lu_env      *env;
1137         struct vvp_io_args *args;
1138         size_t              count;
1139         ssize_t             result;
1140         int                 refcheck;
1141         ENTRY;
1142
1143         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1144         if (result)
1145                 RETURN(result);
1146
1147         env = cl_env_get(&refcheck);
1148         if (IS_ERR(env))
1149                 RETURN(PTR_ERR(env));
1150
1151         args = vvp_env_args(env, IO_NORMAL);
1152         args->u.normal.via_iov = (struct iovec *)iov;
1153         args->u.normal.via_nrsegs = nr_segs;
1154         args->u.normal.via_iocb = iocb;
1155
1156         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1157                                   &iocb->ki_pos, count);
1158         cl_env_put(env, &refcheck);
1159         RETURN(result);
1160 }
1161
1162 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1163                              loff_t *ppos)
1164 {
1165         struct lu_env *env;
1166         struct iovec  *local_iov;
1167         struct kiocb  *kiocb;
1168         ssize_t        result;
1169         int            refcheck;
1170         ENTRY;
1171
1172         env = cl_env_get(&refcheck);
1173         if (IS_ERR(env))
1174                 RETURN(PTR_ERR(env));
1175
1176         local_iov = &vvp_env_info(env)->vti_local_iov;
1177         kiocb = &vvp_env_info(env)->vti_kiocb;
1178         local_iov->iov_base = (void __user *)buf;
1179         local_iov->iov_len = count;
1180         init_sync_kiocb(kiocb, file);
1181         kiocb->ki_pos = *ppos;
1182         kiocb->ki_left = count;
1183
1184         result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1185         *ppos = kiocb->ki_pos;
1186
1187         cl_env_put(env, &refcheck);
1188         RETURN(result);
1189 }
1190 #endif
1191
1192
1193 #ifdef HAVE_KERNEL_SENDFILE
1194 /*
1195  * Send file content (through pagecache) somewhere with helper
1196  */
1197 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1198                                 read_actor_t actor, void *target)
1199 {
1200         struct lu_env      *env;
1201         struct vvp_io_args *args;
1202         ssize_t             result;
1203         int                 refcheck;
1204         ENTRY;
1205
1206         env = cl_env_get(&refcheck);
1207         if (IS_ERR(env))
1208                 RETURN(PTR_ERR(env));
1209
1210         args = vvp_env_args(env, IO_SENDFILE);
1211         args->u.sendfile.via_target = target;
1212         args->u.sendfile.via_actor = actor;
1213
1214         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1215         cl_env_put(env, &refcheck);
1216         RETURN(result);
1217 }
1218 #endif
1219
1220 #ifdef HAVE_KERNEL_SPLICE_READ
1221 /*
1222  * Send file content (through pagecache) somewhere with helper
1223  */
1224 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1225                                    struct pipe_inode_info *pipe, size_t count,
1226                                    unsigned int flags)
1227 {
1228         struct lu_env      *env;
1229         struct vvp_io_args *args;
1230         ssize_t             result;
1231         int                 refcheck;
1232         ENTRY;
1233
1234         env = cl_env_get(&refcheck);
1235         if (IS_ERR(env))
1236                 RETURN(PTR_ERR(env));
1237
1238         args = vvp_env_args(env, IO_SPLICE);
1239         args->u.splice.via_pipe = pipe;
1240         args->u.splice.via_flags = flags;
1241
1242         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1243         cl_env_put(env, &refcheck);
1244         RETURN(result);
1245 }
1246 #endif
1247
1248 static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq,
1249                            obd_count ost_idx)
1250 {
1251         struct obd_export *exp = ll_i2dtexp(inode);
1252         struct obd_trans_info oti = { 0 };
1253         struct obdo *oa = NULL;
1254         int lsm_size;
1255         int rc = 0;
1256         struct lov_stripe_md *lsm = NULL, *lsm2;
1257         ENTRY;
1258
1259         OBDO_ALLOC(oa);
1260         if (oa == NULL)
1261                 RETURN(-ENOMEM);
1262
1263         lsm = ccc_inode_lsm_get(inode);
1264         if (lsm == NULL)
1265                 GOTO(out, rc = -ENOENT);
1266
1267         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1268                    (lsm->lsm_stripe_count));
1269
1270         OBD_ALLOC_LARGE(lsm2, lsm_size);
1271         if (lsm2 == NULL)
1272                 GOTO(out, rc = -ENOMEM);
1273
1274         oa->o_id = id;
1275         oa->o_seq = seq;
1276         oa->o_nlink = ost_idx;
1277         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1278         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1279         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1280                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1281         obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1282         memcpy(lsm2, lsm, lsm_size);
1283         ll_inode_size_lock(inode);
1284         rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1285         ll_inode_size_unlock(inode);
1286
1287         OBD_FREE_LARGE(lsm2, lsm_size);
1288         GOTO(out, rc);
1289 out:
1290         ccc_inode_lsm_put(inode, lsm);
1291         OBDO_FREE(oa);
1292         return rc;
1293 }
1294
1295 static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1296 {
1297         struct ll_recreate_obj ucreat;
1298         ENTRY;
1299
1300         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1301                 RETURN(-EPERM);
1302
1303         if (copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1304                            sizeof(ucreat)))
1305                 RETURN(-EFAULT);
1306
1307         RETURN(ll_lov_recreate(inode, ucreat.lrc_id, 0,
1308                                ucreat.lrc_ost_idx));
1309 }
1310
1311 static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1312 {
1313         struct lu_fid   fid;
1314         obd_id          id;
1315         obd_count       ost_idx;
1316         ENTRY;
1317
1318         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1319                 RETURN(-EPERM);
1320
1321         if (copy_from_user(&fid, (struct lu_fid *)arg, sizeof(fid)))
1322                 RETURN(-EFAULT);
1323
1324         id = fid_oid(&fid) | ((fid_seq(&fid) & 0xffff) << 32);
1325         ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1326         RETURN(ll_lov_recreate(inode, id, 0, ost_idx));
1327 }
1328
1329 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1330                              int flags, struct lov_user_md *lum, int lum_size)
1331 {
1332         struct lov_stripe_md *lsm = NULL;
1333         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1334         int rc = 0;
1335         ENTRY;
1336
1337         lsm = ccc_inode_lsm_get(inode);
1338         if (lsm != NULL) {
1339                 ccc_inode_lsm_put(inode, lsm);
1340                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1341                        inode->i_ino);
1342                 RETURN(-EEXIST);
1343         }
1344
1345         ll_inode_size_lock(inode);
1346         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1347         if (rc)
1348                 GOTO(out, rc);
1349         rc = oit.d.lustre.it_status;
1350         if (rc < 0)
1351                 GOTO(out_req_free, rc);
1352
1353         ll_release_openhandle(file->f_dentry, &oit);
1354
1355  out:
1356         ll_inode_size_unlock(inode);
1357         ll_intent_release(&oit);
1358         ccc_inode_lsm_put(inode, lsm);
1359         RETURN(rc);
1360 out_req_free:
1361         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1362         goto out;
1363 }
1364
1365 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1366                              struct lov_mds_md **lmmp, int *lmm_size,
1367                              struct ptlrpc_request **request)
1368 {
1369         struct ll_sb_info *sbi = ll_i2sbi(inode);
1370         struct mdt_body  *body;
1371         struct lov_mds_md *lmm = NULL;
1372         struct ptlrpc_request *req = NULL;
1373         struct md_op_data *op_data;
1374         int rc, lmmsize;
1375
1376         rc = ll_get_max_mdsize(sbi, &lmmsize);
1377         if (rc)
1378                 RETURN(rc);
1379
1380         op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1381                                      strlen(filename), lmmsize,
1382                                      LUSTRE_OPC_ANY, NULL);
1383         if (IS_ERR(op_data))
1384                 RETURN(PTR_ERR(op_data));
1385
1386         op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1387         rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1388         ll_finish_md_op_data(op_data);
1389         if (rc < 0) {
1390                 CDEBUG(D_INFO, "md_getattr_name failed "
1391                        "on %s: rc %d\n", filename, rc);
1392                 GOTO(out, rc);
1393         }
1394
1395         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1396         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1397
1398         lmmsize = body->eadatasize;
1399
1400         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1401                         lmmsize == 0) {
1402                 GOTO(out, rc = -ENODATA);
1403         }
1404
1405         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1406         LASSERT(lmm != NULL);
1407
1408         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1409             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1410                 GOTO(out, rc = -EPROTO);
1411         }
1412
1413         /*
1414          * This is coming from the MDS, so is probably in
1415          * little endian.  We convert it to host endian before
1416          * passing it to userspace.
1417          */
1418         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1419                 /* if function called for directory - we should
1420                  * avoid swab not existent lsm objects */
1421                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1422                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1423                         if (S_ISREG(body->mode))
1424                                 lustre_swab_lov_user_md_objects(
1425                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1426                                  ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
1427                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1428                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1429                         if (S_ISREG(body->mode))
1430                                 lustre_swab_lov_user_md_objects(
1431                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1432                                  ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
1433                 }
1434         }
1435
1436 out:
1437         *lmmp = lmm;
1438         *lmm_size = lmmsize;
1439         *request = req;
1440         return rc;
1441 }
1442
1443 static int ll_lov_setea(struct inode *inode, struct file *file,
1444                             unsigned long arg)
1445 {
1446         int                      flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1447         struct lov_user_md      *lump;
1448         int                      lum_size = sizeof(struct lov_user_md) +
1449                                             sizeof(struct lov_user_ost_data);
1450         int                      rc;
1451         ENTRY;
1452
1453         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1454                 RETURN(-EPERM);
1455
1456         OBD_ALLOC_LARGE(lump, lum_size);
1457         if (lump == NULL)
1458                 RETURN(-ENOMEM);
1459
1460         if (copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1461                 OBD_FREE_LARGE(lump, lum_size);
1462                 RETURN(-EFAULT);
1463         }
1464
1465         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1466
1467         OBD_FREE_LARGE(lump, lum_size);
1468         RETURN(rc);
1469 }
1470
1471 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1472                             unsigned long arg)
1473 {
1474         struct lov_user_md_v3    lumv3;
1475         struct lov_user_md_v1   *lumv1 = (struct lov_user_md_v1 *)&lumv3;
1476         struct lov_user_md_v1   *lumv1p = (struct lov_user_md_v1 *)arg;
1477         struct lov_user_md_v3   *lumv3p = (struct lov_user_md_v3 *)arg;
1478         int                      lum_size, rc;
1479         int                      flags = FMODE_WRITE;
1480         ENTRY;
1481
1482         /* first try with v1 which is smaller than v3 */
1483         lum_size = sizeof(struct lov_user_md_v1);
1484         if (copy_from_user(lumv1, lumv1p, lum_size))
1485                 RETURN(-EFAULT);
1486
1487         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1488                 lum_size = sizeof(struct lov_user_md_v3);
1489                 if (copy_from_user(&lumv3, lumv3p, lum_size))
1490                         RETURN(-EFAULT);
1491         }
1492
1493         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1494         if (rc == 0) {
1495                 struct lov_stripe_md *lsm;
1496                 __u32 gen;
1497
1498                 put_user(0, &lumv1p->lmm_stripe_count);
1499
1500                 ll_layout_refresh(inode, &gen);
1501                 lsm = ccc_inode_lsm_get(inode);
1502                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1503                                    0, lsm, (void *)arg);
1504                 ccc_inode_lsm_put(inode, lsm);
1505         }
1506         RETURN(rc);
1507 }
1508
1509 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1510 {
1511         struct lov_stripe_md *lsm;
1512         int rc = -ENODATA;
1513         ENTRY;
1514
1515         lsm = ccc_inode_lsm_get(inode);
1516         if (lsm != NULL)
1517                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1518                                    lsm, (void *)arg);
1519         ccc_inode_lsm_put(inode, lsm);
1520         RETURN(rc);
1521 }
1522
1523 int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1524 {
1525         struct ll_inode_info   *lli = ll_i2info(inode);
1526         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1527         struct ccc_grouplock    grouplock;
1528         int                     rc;
1529         ENTRY;
1530
1531         if (ll_file_nolock(file))
1532                 RETURN(-EOPNOTSUPP);
1533
1534         spin_lock(&lli->lli_lock);
1535         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1536                 CWARN("group lock already existed with gid %lu\n",
1537                       fd->fd_grouplock.cg_gid);
1538                 spin_unlock(&lli->lli_lock);
1539                 RETURN(-EINVAL);
1540         }
1541         LASSERT(fd->fd_grouplock.cg_lock == NULL);
1542         spin_unlock(&lli->lli_lock);
1543
1544         rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1545                               arg, (file->f_flags & O_NONBLOCK), &grouplock);
1546         if (rc)
1547                 RETURN(rc);
1548
1549         spin_lock(&lli->lli_lock);
1550         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1551                 spin_unlock(&lli->lli_lock);
1552                 CERROR("another thread just won the race\n");
1553                 cl_put_grouplock(&grouplock);
1554                 RETURN(-EINVAL);
1555         }
1556
1557         fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1558         fd->fd_grouplock = grouplock;
1559         spin_unlock(&lli->lli_lock);
1560
1561         CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1562         RETURN(0);
1563 }
1564
1565 int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1566 {
1567         struct ll_inode_info   *lli = ll_i2info(inode);
1568         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1569         struct ccc_grouplock    grouplock;
1570         ENTRY;
1571
1572         spin_lock(&lli->lli_lock);
1573         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1574                 spin_unlock(&lli->lli_lock);
1575                 CWARN("no group lock held\n");
1576                 RETURN(-EINVAL);
1577         }
1578         LASSERT(fd->fd_grouplock.cg_lock != NULL);
1579
1580         if (fd->fd_grouplock.cg_gid != arg) {
1581                 CWARN("group lock %lu doesn't match current id %lu\n",
1582                        arg, fd->fd_grouplock.cg_gid);
1583                 spin_unlock(&lli->lli_lock);
1584                 RETURN(-EINVAL);
1585         }
1586
1587         grouplock = fd->fd_grouplock;
1588         memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1589         fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1590         spin_unlock(&lli->lli_lock);
1591
1592         cl_put_grouplock(&grouplock);
1593         CDEBUG(D_INFO, "group lock %lu released\n", arg);
1594         RETURN(0);
1595 }
1596
1597 /**
1598  * Close inode open handle
1599  *
1600  * \param dentry [in]     dentry which contains the inode
1601  * \param it     [in,out] intent which contains open info and result
1602  *
1603  * \retval 0     success
1604  * \retval <0    failure
1605  */
1606 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1607 {
1608         struct inode *inode = dentry->d_inode;
1609         struct obd_client_handle *och;
1610         int rc;
1611         ENTRY;
1612
1613         LASSERT(inode);
1614
1615         /* Root ? Do nothing. */
1616         if (dentry->d_inode->i_sb->s_root == dentry)
1617                 RETURN(0);
1618
1619         /* No open handle to close? Move away */
1620         if (!it_disposition(it, DISP_OPEN_OPEN))
1621                 RETURN(0);
1622
1623         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1624
1625         OBD_ALLOC(och, sizeof(*och));
1626         if (!och)
1627                 GOTO(out, rc = -ENOMEM);
1628
1629         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
1630                     ll_i2info(inode), it, och);
1631
1632         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1633                                        inode, och);
1634  out:
1635         /* this one is in place of ll_file_open */
1636         if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1637                 ptlrpc_req_finished(it->d.lustre.it_data);
1638                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1639         }
1640         RETURN(rc);
1641 }
1642
1643 /**
1644  * Get size for inode for which FIEMAP mapping is requested.
1645  * Make the FIEMAP get_info call and returns the result.
1646  */
1647 int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1648               int num_bytes)
1649 {
1650         struct obd_export *exp = ll_i2dtexp(inode);
1651         struct lov_stripe_md *lsm = NULL;
1652         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1653         int vallen = num_bytes;
1654         int rc;
1655         ENTRY;
1656
1657         /* Checks for fiemap flags */
1658         if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1659                 fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1660                 return -EBADR;
1661         }
1662
1663         /* Check for FIEMAP_FLAG_SYNC */
1664         if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1665                 rc = filemap_fdatawrite(inode->i_mapping);
1666                 if (rc)
1667                         return rc;
1668         }
1669
1670         lsm = ccc_inode_lsm_get(inode);
1671         if (lsm == NULL)
1672                 return -ENOENT;
1673
1674         /* If the stripe_count > 1 and the application does not understand
1675          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1676          */
1677         if (lsm->lsm_stripe_count > 1 &&
1678             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1679                 GOTO(out, rc = -EOPNOTSUPP);
1680
1681         fm_key.oa.o_id = lsm->lsm_object_id;
1682         fm_key.oa.o_seq = lsm->lsm_object_seq;
1683         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1684
1685         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1686         obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1687         /* If filesize is 0, then there would be no objects for mapping */
1688         if (fm_key.oa.o_size == 0) {
1689                 fiemap->fm_mapped_extents = 0;
1690                 GOTO(out, rc = 0);
1691         }
1692
1693         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1694
1695         rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1696                           fiemap, lsm);
1697         if (rc)
1698                 CERROR("obd_get_info failed: rc = %d\n", rc);
1699
1700 out:
1701         ccc_inode_lsm_put(inode, lsm);
1702         RETURN(rc);
1703 }
1704
1705 int ll_fid2path(struct inode *inode, void *arg)
1706 {
1707         struct obd_export       *exp = ll_i2mdexp(inode);
1708         struct getinfo_fid2path *gfout, *gfin;
1709         int                      outsize, rc;
1710         ENTRY;
1711
1712         if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
1713             !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
1714                 RETURN(-EPERM);
1715
1716         /* Need to get the buflen */
1717         OBD_ALLOC_PTR(gfin);
1718         if (gfin == NULL)
1719                 RETURN(-ENOMEM);
1720         if (copy_from_user(gfin, arg, sizeof(*gfin))) {
1721                 OBD_FREE_PTR(gfin);
1722                 RETURN(-EFAULT);
1723         }
1724
1725         outsize = sizeof(*gfout) + gfin->gf_pathlen;
1726         OBD_ALLOC(gfout, outsize);
1727         if (gfout == NULL) {
1728                 OBD_FREE_PTR(gfin);
1729                 RETURN(-ENOMEM);
1730         }
1731         memcpy(gfout, gfin, sizeof(*gfout));
1732         OBD_FREE_PTR(gfin);
1733
1734         /* Call mdc_iocontrol */
1735         rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1736         if (rc)
1737                 GOTO(gf_free, rc);
1738         if (copy_to_user(arg, gfout, outsize))
1739                 rc = -EFAULT;
1740
1741 gf_free:
1742         OBD_FREE(gfout, outsize);
1743         RETURN(rc);
1744 }
1745
1746 static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1747 {
1748         struct ll_user_fiemap *fiemap_s;
1749         size_t num_bytes, ret_bytes;
1750         unsigned int extent_count;
1751         int rc = 0;
1752
1753         /* Get the extent count so we can calculate the size of
1754          * required fiemap buffer */
1755         if (get_user(extent_count,
1756             &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1757                 RETURN(-EFAULT);
1758         num_bytes = sizeof(*fiemap_s) + (extent_count *
1759                                          sizeof(struct ll_fiemap_extent));
1760
1761         OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1762         if (fiemap_s == NULL)
1763                 RETURN(-ENOMEM);
1764
1765         /* get the fiemap value */
1766         if (copy_from_user(fiemap_s, (struct ll_user_fiemap __user *)arg,
1767                            sizeof(*fiemap_s)))
1768                 GOTO(error, rc = -EFAULT);
1769
1770         /* If fm_extent_count is non-zero, read the first extent since
1771          * it is used to calculate end_offset and device from previous
1772          * fiemap call. */
1773         if (extent_count) {
1774                 if (copy_from_user(&fiemap_s->fm_extents[0],
1775                     (char __user *)arg + sizeof(*fiemap_s),
1776                     sizeof(struct ll_fiemap_extent)))
1777                         GOTO(error, rc = -EFAULT);
1778         }
1779
1780         rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1781         if (rc)
1782                 GOTO(error, rc);
1783
1784         ret_bytes = sizeof(struct ll_user_fiemap);
1785
1786         if (extent_count != 0)
1787                 ret_bytes += (fiemap_s->fm_mapped_extents *
1788                                  sizeof(struct ll_fiemap_extent));
1789
1790         if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1791                 rc = -EFAULT;
1792
1793 error:
1794         OBD_FREE_LARGE(fiemap_s, num_bytes);
1795         RETURN(rc);
1796 }
1797
1798 /*
1799  * Read the data_version for inode.
1800  *
1801  * This value is computed using stripe object version on OST.
1802  * Version is computed using server side locking.
1803  *
1804  * @param extent_lock  Take extent lock. Not needed if a process is already
1805  *                     holding the OST object group locks.
1806  */
1807 int ll_data_version(struct inode *inode, __u64 *data_version,
1808                     int extent_lock)
1809 {
1810         struct lov_stripe_md    *lsm = NULL;
1811         struct ll_sb_info       *sbi = ll_i2sbi(inode);
1812         struct obdo             *obdo = NULL;
1813         int                      rc;
1814         ENTRY;
1815
1816         /* If no stripe, we consider version is 0. */
1817         lsm = ccc_inode_lsm_get(inode);
1818         if (lsm == NULL) {
1819                 *data_version = 0;
1820                 CDEBUG(D_INODE, "No object for inode\n");
1821                 RETURN(0);
1822         }
1823
1824         OBD_ALLOC_PTR(obdo);
1825         if (obdo == NULL) {
1826                 ccc_inode_lsm_put(inode, lsm);
1827                 RETURN(-ENOMEM);
1828         }
1829
1830         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
1831         if (!rc) {
1832                 if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1833                         rc = -EOPNOTSUPP;
1834                 else
1835                         *data_version = obdo->o_data_version;
1836         }
1837
1838         OBD_FREE_PTR(obdo);
1839         ccc_inode_lsm_put(inode, lsm);
1840
1841         RETURN(rc);
1842 }
1843
1844 long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
1845 {
1846         struct inode *inode = file->f_dentry->d_inode;
1847         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1848         int flags;
1849
1850         ENTRY;
1851
1852         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1853                inode->i_generation, inode, cmd);
1854         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
1855
1856         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
1857         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
1858                 RETURN(-ENOTTY);
1859
1860         switch(cmd) {
1861         case LL_IOC_GETFLAGS:
1862                 /* Get the current value of the file flags */
1863                 return put_user(fd->fd_flags, (int *)arg);
1864         case LL_IOC_SETFLAGS:
1865         case LL_IOC_CLRFLAGS:
1866                 /* Set or clear specific file flags */
1867                 /* XXX This probably needs checks to ensure the flags are
1868                  *     not abused, and to handle any flag side effects.
1869                  */
1870                 if (get_user(flags, (int *) arg))
1871                         RETURN(-EFAULT);
1872
1873                 if (cmd == LL_IOC_SETFLAGS) {
1874                         if ((flags & LL_FILE_IGNORE_LOCK) &&
1875                             !(file->f_flags & O_DIRECT)) {
1876                                 CERROR("%s: unable to disable locking on "
1877                                        "non-O_DIRECT file\n", current->comm);
1878                                 RETURN(-EINVAL);
1879                         }
1880
1881                         fd->fd_flags |= flags;
1882                 } else {
1883                         fd->fd_flags &= ~flags;
1884                 }
1885                 RETURN(0);
1886         case LL_IOC_LOV_SETSTRIPE:
1887                 RETURN(ll_lov_setstripe(inode, file, arg));
1888         case LL_IOC_LOV_SETEA:
1889                 RETURN(ll_lov_setea(inode, file, arg));
1890         case LL_IOC_LOV_GETSTRIPE:
1891                 RETURN(ll_lov_getstripe(inode, arg));
1892         case LL_IOC_RECREATE_OBJ:
1893                 RETURN(ll_lov_recreate_obj(inode, arg));
1894         case LL_IOC_RECREATE_FID:
1895                 RETURN(ll_lov_recreate_fid(inode, arg));
1896         case FSFILT_IOC_FIEMAP:
1897                 RETURN(ll_ioctl_fiemap(inode, arg));
1898         case FSFILT_IOC_GETFLAGS:
1899         case FSFILT_IOC_SETFLAGS:
1900                 RETURN(ll_iocontrol(inode, file, cmd, arg));
1901         case FSFILT_IOC_GETVERSION_OLD:
1902         case FSFILT_IOC_GETVERSION:
1903                 RETURN(put_user(inode->i_generation, (int *)arg));
1904         case LL_IOC_GROUP_LOCK:
1905                 RETURN(ll_get_grouplock(inode, file, arg));
1906         case LL_IOC_GROUP_UNLOCK:
1907                 RETURN(ll_put_grouplock(inode, file, arg));
1908         case IOC_OBD_STATFS:
1909                 RETURN(ll_obd_statfs(inode, (void *)arg));
1910
1911         /* We need to special case any other ioctls we want to handle,
1912          * to send them to the MDS/OST as appropriate and to properly
1913          * network encode the arg field.
1914         case FSFILT_IOC_SETVERSION_OLD:
1915         case FSFILT_IOC_SETVERSION:
1916         */
1917         case LL_IOC_FLUSHCTX:
1918                 RETURN(ll_flush_ctx(inode));
1919         case LL_IOC_PATH2FID: {
1920                 if (copy_to_user((void *)arg, ll_inode2fid(inode),
1921                                  sizeof(struct lu_fid)))
1922                         RETURN(-EFAULT);
1923
1924                 RETURN(0);
1925         }
1926         case OBD_IOC_FID2PATH:
1927                 RETURN(ll_fid2path(inode, (void *)arg));
1928         case LL_IOC_DATA_VERSION: {
1929                 struct ioc_data_version idv;
1930                 int                     rc;
1931
1932                 if (copy_from_user(&idv, (char *)arg, sizeof(idv)))
1933                         RETURN(-EFAULT);
1934
1935                 rc = ll_data_version(inode, &idv.idv_version,
1936                                 !(idv.idv_flags & LL_DV_NOFLUSH));
1937
1938                 if (rc == 0 && copy_to_user((char *) arg, &idv, sizeof(idv)))
1939                         RETURN(-EFAULT);
1940
1941                 RETURN(rc);
1942         }
1943
1944         case LL_IOC_GET_MDTIDX: {
1945                 int mdtidx;
1946
1947                 mdtidx = ll_get_mdt_idx(inode);
1948                 if (mdtidx < 0)
1949                         RETURN(mdtidx);
1950
1951                 if (put_user((int)mdtidx, (int*)arg))
1952                         RETURN(-EFAULT);
1953
1954                 RETURN(0);
1955         }
1956         case OBD_IOC_GETDTNAME:
1957         case OBD_IOC_GETMDNAME:
1958                 RETURN(ll_get_obd_name(inode, cmd, arg));
1959         case LL_IOC_HSM_STATE_GET: {
1960                 struct md_op_data       *op_data;
1961                 struct hsm_user_state   *hus;
1962                 int                      rc;
1963
1964                 OBD_ALLOC_PTR(hus);
1965                 if (hus == NULL)
1966                         RETURN(-ENOMEM);
1967
1968                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
1969                                              LUSTRE_OPC_ANY, hus);
1970                 if (op_data == NULL) {
1971                         OBD_FREE_PTR(hus);
1972                         RETURN(-ENOMEM);
1973                 }
1974
1975                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
1976                                    op_data, NULL);
1977
1978                 if (copy_to_user((void *)arg, hus, sizeof(*hus)))
1979                         rc = -EFAULT;
1980
1981                 ll_finish_md_op_data(op_data);
1982                 OBD_FREE_PTR(hus);
1983                 RETURN(rc);
1984         }
1985         case LL_IOC_HSM_STATE_SET: {
1986                 struct md_op_data       *op_data;
1987                 struct hsm_state_set    *hss;
1988                 int                      rc;
1989
1990                 OBD_ALLOC_PTR(hss);
1991                 if (hss == NULL)
1992                         RETURN(-ENOMEM);
1993                 if (copy_from_user(hss, (char *)arg, sizeof(*hss))) {
1994                         OBD_FREE_PTR(hss);
1995                         RETURN(-EFAULT);
1996                 }
1997
1998                 /* Non-root users are forbidden to set or clear flags which are
1999                  * NOT defined in HSM_USER_MASK. */
2000                 if (((hss->hss_setmask | hss->hss_clearmask) & ~HSM_USER_MASK)
2001                     && !cfs_capable(CFS_CAP_SYS_ADMIN)) {
2002                         OBD_FREE_PTR(hss);
2003                         RETURN(-EPERM);
2004                 }
2005
2006                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2007                                              LUSTRE_OPC_ANY, hss);
2008                 if (op_data == NULL) {
2009                         OBD_FREE_PTR(hss);
2010                         RETURN(-ENOMEM);
2011                 }
2012
2013                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2014                                    op_data, NULL);
2015
2016                 ll_finish_md_op_data(op_data);
2017
2018                 OBD_FREE_PTR(hss);
2019                 RETURN(rc);
2020         }
2021         case LL_IOC_HSM_ACTION: {
2022                 struct md_op_data               *op_data;
2023                 struct hsm_current_action       *hca;
2024                 int                              rc;
2025
2026                 OBD_ALLOC_PTR(hca);
2027                 if (hca == NULL)
2028                         RETURN(-ENOMEM);
2029
2030                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2031                                              LUSTRE_OPC_ANY, hca);
2032                 if (op_data == NULL) {
2033                         OBD_FREE_PTR(hca);
2034                         RETURN(-ENOMEM);
2035                 }
2036
2037                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2038                                    op_data, NULL);
2039
2040                 if (cfs_copy_to_user((char *)arg, hca, sizeof(*hca)))
2041                         rc = -EFAULT;
2042
2043                 ll_finish_md_op_data(op_data);
2044                 OBD_FREE_PTR(hca);
2045                 RETURN(rc);
2046         }
2047         default: {
2048                 int err;
2049
2050                 if (LLIOC_STOP ==
2051                      ll_iocontrol_call(inode, file, cmd, arg, &err))
2052                         RETURN(err);
2053
2054                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2055                                      (void *)arg));
2056         }
2057         }
2058 }
2059
2060 #ifndef HAVE_FILE_LLSEEK_SIZE
2061 static inline loff_t
2062 llseek_execute(struct file *file, loff_t offset, loff_t maxsize)
2063 {
2064         if (offset < 0 && !(file->f_mode & FMODE_UNSIGNED_OFFSET))
2065                 return -EINVAL;
2066         if (offset > maxsize)
2067                 return -EINVAL;
2068
2069         if (offset != file->f_pos) {
2070                 file->f_pos = offset;
2071                 file->f_version = 0;
2072         }
2073         return offset;
2074 }
2075
2076 static loff_t
2077 generic_file_llseek_size(struct file *file, loff_t offset, int origin,
2078                 loff_t maxsize, loff_t eof)
2079 {
2080         struct inode *inode = file->f_dentry->d_inode;
2081
2082         switch (origin) {
2083         case SEEK_END:
2084                 offset += eof;
2085                 break;
2086         case SEEK_CUR:
2087                 /*
2088                  * Here we special-case the lseek(fd, 0, SEEK_CUR)
2089                  * position-querying operation.  Avoid rewriting the "same"
2090                  * f_pos value back to the file because a concurrent read(),
2091                  * write() or lseek() might have altered it
2092                  */
2093                 if (offset == 0)
2094                         return file->f_pos;
2095                 /*
2096                  * f_lock protects against read/modify/write race with other
2097                  * SEEK_CURs. Note that parallel writes and reads behave
2098                  * like SEEK_SET.
2099                  */
2100                 mutex_lock(&inode->i_mutex);
2101                 offset = llseek_execute(file, file->f_pos + offset, maxsize);
2102                 mutex_unlock(&inode->i_mutex);
2103                 return offset;
2104         case SEEK_DATA:
2105                 /*
2106                  * In the generic case the entire file is data, so as long as
2107                  * offset isn't at the end of the file then the offset is data.
2108                  */
2109                 if (offset >= eof)
2110                         return -ENXIO;
2111                 break;
2112         case SEEK_HOLE:
2113                 /*
2114                  * There is a virtual hole at the end of the file, so as long as
2115                  * offset isn't i_size or larger, return i_size.
2116                  */
2117                 if (offset >= eof)
2118                         return -ENXIO;
2119                 offset = eof;
2120                 break;
2121         }
2122
2123         return llseek_execute(file, offset, maxsize);
2124 }
2125 #endif
2126
2127 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2128 {
2129         struct inode *inode = file->f_dentry->d_inode;
2130         loff_t retval, eof = 0;
2131
2132         ENTRY;
2133         retval = offset + ((origin == SEEK_END) ? i_size_read(inode) :
2134                            (origin == SEEK_CUR) ? file->f_pos : 0);
2135         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%d)\n",
2136                inode->i_ino, inode->i_generation, inode, retval, retval,
2137                origin);
2138         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2139
2140         if (origin == SEEK_END || origin == SEEK_HOLE || origin == SEEK_DATA) {
2141                 retval = ll_glimpse_size(inode);
2142                 if (retval != 0)
2143                         RETURN(retval);
2144                 eof = i_size_read(inode);
2145         }
2146
2147         retval = generic_file_llseek_size(file, offset, origin,
2148                                           ll_file_maxbytes(inode), eof);
2149         RETURN(retval);
2150 }
2151
2152 int ll_flush(struct file *file, fl_owner_t id)
2153 {
2154         struct inode *inode = file->f_dentry->d_inode;
2155         struct ll_inode_info *lli = ll_i2info(inode);
2156         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2157         int rc, err;
2158
2159         LASSERT(!S_ISDIR(inode->i_mode));
2160
2161         /* catch async errors that were recorded back when async writeback
2162          * failed for pages in this mapping. */
2163         rc = lli->lli_async_rc;
2164         lli->lli_async_rc = 0;
2165         err = lov_read_and_clear_async_rc(lli->lli_clob);
2166         if (rc == 0)
2167                 rc = err;
2168
2169         /* The application has been told write failure already.
2170          * Do not report failure again. */
2171         if (fd->fd_write_failed)
2172                 return 0;
2173         return rc ? -EIO : 0;
2174 }
2175
2176 /**
2177  * Called to make sure a portion of file has been written out.
2178  * if @local_only is not true, it will send OST_SYNC RPCs to ost.
2179  *
2180  * Return how many pages have been written.
2181  */
2182 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2183                        enum cl_fsync_mode mode)
2184 {
2185         struct cl_env_nest nest;
2186         struct lu_env *env;
2187         struct cl_io *io;
2188         struct obd_capa *capa = NULL;
2189         struct cl_fsync_io *fio;
2190         int result;
2191         ENTRY;
2192
2193         if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2194             mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2195                 RETURN(-EINVAL);
2196
2197         env = cl_env_nested_get(&nest);
2198         if (IS_ERR(env))
2199                 RETURN(PTR_ERR(env));
2200
2201         capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2202
2203         io = ccc_env_thread_io(env);
2204         io->ci_obj = cl_i2info(inode)->lli_clob;
2205         io->ci_ignore_layout = 1;
2206
2207         /* initialize parameters for sync */
2208         fio = &io->u.ci_fsync;
2209         fio->fi_capa = capa;
2210         fio->fi_start = start;
2211         fio->fi_end = end;
2212         fio->fi_fid = ll_inode2fid(inode);
2213         fio->fi_mode = mode;
2214         fio->fi_nr_written = 0;
2215
2216         if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2217                 result = cl_io_loop(env, io);
2218         else
2219                 result = io->ci_result;
2220         if (result == 0)
2221                 result = fio->fi_nr_written;
2222         cl_io_fini(env, io);
2223         cl_env_nested_put(&nest, env);
2224
2225         capa_put(capa);
2226
2227         RETURN(result);
2228 }
2229
2230 #ifdef HAVE_FILE_FSYNC_4ARGS
2231 int ll_fsync(struct file *file, loff_t start, loff_t end, int data)
2232 #elif defined(HAVE_FILE_FSYNC_2ARGS)
2233 int ll_fsync(struct file *file, int data)
2234 #else
2235 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2236 #endif
2237 {
2238         struct inode *inode = file->f_dentry->d_inode;
2239         struct ll_inode_info *lli = ll_i2info(inode);
2240         struct ptlrpc_request *req;
2241         struct obd_capa *oc;
2242         int rc, err;
2243         ENTRY;
2244
2245         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2246                inode->i_generation, inode);
2247         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2248
2249 #ifdef HAVE_FILE_FSYNC_4ARGS
2250         rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
2251         mutex_lock(&inode->i_mutex);
2252 #else
2253         /* fsync's caller has already called _fdata{sync,write}, we want
2254          * that IO to finish before calling the osc and mdc sync methods */
2255         rc = filemap_fdatawait(inode->i_mapping);
2256 #endif
2257
2258         /* catch async errors that were recorded back when async writeback
2259          * failed for pages in this mapping. */
2260         if (!S_ISDIR(inode->i_mode)) {
2261                 err = lli->lli_async_rc;
2262                 lli->lli_async_rc = 0;
2263                 if (rc == 0)
2264                         rc = err;
2265                 err = lov_read_and_clear_async_rc(lli->lli_clob);
2266                 if (rc == 0)
2267                         rc = err;
2268         }
2269
2270         oc = ll_mdscapa_get(inode);
2271         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2272                       &req);
2273         capa_put(oc);
2274         if (!rc)
2275                 rc = err;
2276         if (!err)
2277                 ptlrpc_req_finished(req);
2278
2279         if (data) {
2280                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2281
2282                 err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
2283                                 CL_FSYNC_ALL);
2284                 if (rc == 0 && err < 0)
2285                         rc = err;
2286                 if (rc < 0)
2287                         fd->fd_write_failed = true;
2288                 else
2289                         fd->fd_write_failed = false;
2290         }
2291
2292 #ifdef HAVE_FILE_FSYNC_4ARGS
2293         mutex_unlock(&inode->i_mutex);
2294 #endif
2295         RETURN(rc);
2296 }
2297
2298 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2299 {
2300         struct inode *inode = file->f_dentry->d_inode;
2301         struct ll_sb_info *sbi = ll_i2sbi(inode);
2302         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
2303                                            .ei_cb_cp =ldlm_flock_completion_ast,
2304                                            .ei_cbdata = file_lock };
2305         struct md_op_data *op_data;
2306         struct lustre_handle lockh = {0};
2307         ldlm_policy_data_t flock = {{0}};
2308         int flags = 0;
2309         int rc;
2310         int rc2 = 0;
2311         ENTRY;
2312
2313         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2314                inode->i_ino, file_lock);
2315
2316         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2317
2318         if (file_lock->fl_flags & FL_FLOCK) {
2319                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2320                 /* flocks are whole-file locks */
2321                 flock.l_flock.end = OFFSET_MAX;
2322                 /* For flocks owner is determined by the local file desctiptor*/
2323                 flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2324         } else if (file_lock->fl_flags & FL_POSIX) {
2325                 flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2326                 flock.l_flock.start = file_lock->fl_start;
2327                 flock.l_flock.end = file_lock->fl_end;
2328         } else {
2329                 RETURN(-EINVAL);
2330         }
2331         flock.l_flock.pid = file_lock->fl_pid;
2332
2333         /* Somewhat ugly workaround for svc lockd.
2334          * lockd installs custom fl_lmops->lm_compare_owner that checks
2335          * for the fl_owner to be the same (which it always is on local node
2336          * I guess between lockd processes) and then compares pid.
2337          * As such we assign pid to the owner field to make it all work,
2338          * conflict with normal locks is unlikely since pid space and
2339          * pointer space for current->files are not intersecting */
2340         if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
2341                 flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2342
2343         switch (file_lock->fl_type) {
2344         case F_RDLCK:
2345                 einfo.ei_mode = LCK_PR;
2346                 break;
2347         case F_UNLCK:
2348                 /* An unlock request may or may not have any relation to
2349                  * existing locks so we may not be able to pass a lock handle
2350                  * via a normal ldlm_lock_cancel() request. The request may even
2351                  * unlock a byte range in the middle of an existing lock. In
2352                  * order to process an unlock request we need all of the same
2353                  * information that is given with a normal read or write record
2354                  * lock request. To avoid creating another ldlm unlock (cancel)
2355                  * message we'll treat a LCK_NL flock request as an unlock. */
2356                 einfo.ei_mode = LCK_NL;
2357                 break;
2358         case F_WRLCK:
2359                 einfo.ei_mode = LCK_PW;
2360                 break;
2361         default:
2362                 CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2363                         file_lock->fl_type);
2364                 RETURN (-ENOTSUPP);
2365         }
2366
2367         switch (cmd) {
2368         case F_SETLKW:
2369 #ifdef F_SETLKW64
2370         case F_SETLKW64:
2371 #endif
2372                 flags = 0;
2373                 break;
2374         case F_SETLK:
2375 #ifdef F_SETLK64
2376         case F_SETLK64:
2377 #endif
2378                 flags = LDLM_FL_BLOCK_NOWAIT;
2379                 break;
2380         case F_GETLK:
2381 #ifdef F_GETLK64
2382         case F_GETLK64:
2383 #endif
2384                 flags = LDLM_FL_TEST_LOCK;
2385                 /* Save the old mode so that if the mode in the lock changes we
2386                  * can decrement the appropriate reader or writer refcount. */
2387                 file_lock->fl_type = einfo.ei_mode;
2388                 break;
2389         default:
2390                 CERROR("unknown fcntl lock command: %d\n", cmd);
2391                 RETURN (-EINVAL);
2392         }
2393
2394         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2395                                      LUSTRE_OPC_ANY, NULL);
2396         if (IS_ERR(op_data))
2397                 RETURN(PTR_ERR(op_data));
2398
2399         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2400                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2401                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2402
2403         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2404                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2405
2406         if ((file_lock->fl_flags & FL_FLOCK) &&
2407             (rc == 0 || file_lock->fl_type == F_UNLCK))
2408                 rc2  = flock_lock_file_wait(file, file_lock);
2409         if ((file_lock->fl_flags & FL_POSIX) &&
2410             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2411             !(flags & LDLM_FL_TEST_LOCK))
2412                 rc2  = posix_lock_file_wait(file, file_lock);
2413
2414         if (rc2 && file_lock->fl_type != F_UNLCK) {
2415                 einfo.ei_mode = LCK_NL;
2416                 md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2417                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2418                 rc = rc2;
2419         }
2420
2421         ll_finish_md_op_data(op_data);
2422
2423         RETURN(rc);
2424 }
2425
2426 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2427 {
2428         ENTRY;
2429
2430         RETURN(-ENOSYS);
2431 }
2432
2433 /**
2434  * test if some locks matching bits and l_req_mode are acquired
2435  * - bits can be in different locks
2436  * - if found clear the common lock bits in *bits
2437  * - the bits not found, are kept in *bits
2438  * \param inode [IN]
2439  * \param bits [IN] searched lock bits [IN]
2440  * \param l_req_mode [IN] searched lock mode
2441  * \retval boolean, true iff all bits are found
2442  */
2443 int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2444 {
2445         struct lustre_handle lockh;
2446         ldlm_policy_data_t policy;
2447         ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
2448                                 (LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
2449         struct lu_fid *fid;
2450         __u64 flags;
2451         int i;
2452         ENTRY;
2453
2454         if (!inode)
2455                RETURN(0);
2456
2457         fid = &ll_i2info(inode)->lli_fid;
2458         CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
2459                ldlm_lockname[mode]);
2460
2461         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2462         for (i = 0; i < MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
2463                 policy.l_inodebits.bits = *bits & (1 << i);
2464                 if (policy.l_inodebits.bits == 0)
2465                         continue;
2466
2467                 if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
2468                                   &policy, mode, &lockh)) {
2469                         struct ldlm_lock *lock;
2470
2471                         lock = ldlm_handle2lock(&lockh);
2472                         if (lock) {
2473                                 *bits &=
2474                                       ~(lock->l_policy_data.l_inodebits.bits);
2475                                 LDLM_LOCK_PUT(lock);
2476                         } else {
2477                                 *bits &= ~policy.l_inodebits.bits;
2478                         }
2479                 }
2480         }
2481         RETURN(*bits == 0);
2482 }
2483
2484 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2485                             struct lustre_handle *lockh, __u64 flags)
2486 {
2487         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2488         struct lu_fid *fid;
2489         ldlm_mode_t rc;
2490         ENTRY;
2491
2492         fid = &ll_i2info(inode)->lli_fid;
2493         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2494
2495         rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
2496                            fid, LDLM_IBITS, &policy,
2497                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2498         RETURN(rc);
2499 }
2500
2501 static int ll_inode_revalidate_fini(struct inode *inode, int rc)
2502 {
2503         /* Already unlinked. Just update nlink and return success */
2504         if (rc == -ENOENT) {
2505                 clear_nlink(inode);
2506                 /* This path cannot be hit for regular files unless in
2507                  * case of obscure races, so no need to to validate
2508                  * size. */
2509                 if (!S_ISREG(inode->i_mode) && !S_ISDIR(inode->i_mode))
2510                         return 0;
2511         } else if (rc != 0) {
2512                 CERROR("%s: revalidate FID "DFID" error: rc = %d\n",
2513                        ll_get_fsname(inode->i_sb, NULL, 0),
2514                        PFID(ll_inode2fid(inode)), rc);
2515         }
2516
2517         return rc;
2518 }
2519
2520 int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2521                              __u64 ibits)
2522 {
2523         struct inode *inode = dentry->d_inode;
2524         struct ptlrpc_request *req = NULL;
2525         struct obd_export *exp;
2526         int rc = 0;
2527         ENTRY;
2528
2529         LASSERT(inode != NULL);
2530
2531         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2532                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2533
2534         exp = ll_i2mdexp(inode);
2535
2536         /* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
2537          *      But under CMD case, it caused some lock issues, should be fixed
2538          *      with new CMD ibits lock. See bug 12718 */
2539         if (exp_connect_flags(exp) & OBD_CONNECT_ATTRFID) {
2540                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2541                 struct md_op_data *op_data;
2542
2543                 if (ibits == MDS_INODELOCK_LOOKUP)
2544                         oit.it_op = IT_LOOKUP;
2545
2546                 /* Call getattr by fid, so do not provide name at all. */
2547                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2548                                              dentry->d_inode, NULL, 0, 0,
2549                                              LUSTRE_OPC_ANY, NULL);
2550                 if (IS_ERR(op_data))
2551                         RETURN(PTR_ERR(op_data));
2552
2553                 oit.it_create_mode |= M_CHECK_STALE;
2554                 rc = md_intent_lock(exp, op_data, NULL, 0,
2555                                     /* we are not interested in name
2556                                        based lookup */
2557                                     &oit, 0, &req,
2558                                     ll_md_blocking_ast, 0);
2559                 ll_finish_md_op_data(op_data);
2560                 oit.it_create_mode &= ~M_CHECK_STALE;
2561                 if (rc < 0) {
2562                         rc = ll_inode_revalidate_fini(inode, rc);
2563                         GOTO (out, rc);
2564                 }
2565
2566                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2567                 if (rc != 0) {
2568                         ll_intent_release(&oit);
2569                         GOTO(out, rc);
2570                 }
2571
2572                 /* Unlinked? Unhash dentry, so it is not picked up later by
2573                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2574                    here to preserve get_cwd functionality on 2.6.
2575                    Bug 10503 */
2576                 if (!dentry->d_inode->i_nlink)
2577                         d_lustre_invalidate(dentry);
2578
2579                 ll_lookup_finish_locks(&oit, dentry);
2580         } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
2581                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2582                 obd_valid valid = OBD_MD_FLGETATTR;
2583                 struct md_op_data *op_data;
2584                 int ealen = 0;
2585
2586                 if (S_ISREG(inode->i_mode)) {
2587                         rc = ll_get_max_mdsize(sbi, &ealen);
2588                         if (rc)
2589                                 RETURN(rc);
2590                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2591                 }
2592
2593                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
2594                                              0, ealen, LUSTRE_OPC_ANY,
2595                                              NULL);
2596                 if (IS_ERR(op_data))
2597                         RETURN(PTR_ERR(op_data));
2598
2599                 op_data->op_valid = valid;
2600                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2601                  * capa for this inode. Because we only keep capas of dirs
2602                  * fresh. */
2603                 rc = md_getattr(sbi->ll_md_exp, op_data, &req);
2604                 ll_finish_md_op_data(op_data);
2605                 if (rc) {
2606                         rc = ll_inode_revalidate_fini(inode, rc);
2607                         RETURN(rc);
2608                 }
2609
2610                 rc = ll_prep_inode(&inode, req, NULL, NULL);
2611         }
2612 out:
2613         ptlrpc_req_finished(req);
2614         return rc;
2615 }
2616
2617 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2618                            __u64 ibits)
2619 {
2620         struct inode *inode = dentry->d_inode;
2621         int rc;
2622         ENTRY;
2623
2624         rc = __ll_inode_revalidate_it(dentry, it, ibits);
2625         if (rc != 0)
2626                 RETURN(rc);
2627
2628         /* if object isn't regular file, don't validate size */
2629         if (!S_ISREG(inode->i_mode)) {
2630                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
2631                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
2632                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
2633         } else {
2634                 rc = ll_glimpse_size(inode);
2635         }
2636         RETURN(rc);
2637 }
2638
2639 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2640                   struct lookup_intent *it, struct kstat *stat)
2641 {
2642         struct inode *inode = de->d_inode;
2643         struct ll_sb_info *sbi = ll_i2sbi(inode);
2644         struct ll_inode_info *lli = ll_i2info(inode);
2645         int res = 0;
2646
2647         res = ll_inode_revalidate_it(de, it, MDS_INODELOCK_UPDATE |
2648                                              MDS_INODELOCK_LOOKUP);
2649         ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
2650
2651         if (res)
2652                 return res;
2653
2654         stat->dev = inode->i_sb->s_dev;
2655         if (ll_need_32bit_api(sbi))
2656                 stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
2657         else
2658                 stat->ino = inode->i_ino;
2659         stat->mode = inode->i_mode;
2660         stat->nlink = inode->i_nlink;
2661         stat->uid = inode->i_uid;
2662         stat->gid = inode->i_gid;
2663         stat->rdev = inode->i_rdev;
2664         stat->atime = inode->i_atime;
2665         stat->mtime = inode->i_mtime;
2666         stat->ctime = inode->i_ctime;
2667         stat->blksize = 1 << inode->i_blkbits;
2668
2669         stat->size = i_size_read(inode);
2670         stat->blocks = inode->i_blocks;
2671
2672         return 0;
2673 }
2674 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2675 {
2676         struct lookup_intent it = { .it_op = IT_GETATTR };
2677
2678         return ll_getattr_it(mnt, de, &it, stat);
2679 }
2680
2681 #ifdef HAVE_LINUX_FIEMAP_H
2682 int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
2683                 __u64 start, __u64 len)
2684 {
2685         int rc;
2686         size_t num_bytes;
2687         struct ll_user_fiemap *fiemap;
2688         unsigned int extent_count = fieinfo->fi_extents_max;
2689
2690         num_bytes = sizeof(*fiemap) + (extent_count *
2691                                        sizeof(struct ll_fiemap_extent));
2692         OBD_ALLOC_LARGE(fiemap, num_bytes);
2693
2694         if (fiemap == NULL)
2695                 RETURN(-ENOMEM);
2696
2697         fiemap->fm_flags = fieinfo->fi_flags;
2698         fiemap->fm_extent_count = fieinfo->fi_extents_max;
2699         fiemap->fm_start = start;
2700         fiemap->fm_length = len;
2701         memcpy(&fiemap->fm_extents[0], fieinfo->fi_extents_start,
2702                sizeof(struct ll_fiemap_extent));
2703
2704         rc = ll_do_fiemap(inode, fiemap, num_bytes);
2705
2706         fieinfo->fi_flags = fiemap->fm_flags;
2707         fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
2708         memcpy(fieinfo->fi_extents_start, &fiemap->fm_extents[0],
2709                fiemap->fm_mapped_extents * sizeof(struct ll_fiemap_extent));
2710
2711         OBD_FREE_LARGE(fiemap, num_bytes);
2712         return rc;
2713 }
2714 #endif
2715
2716 struct posix_acl * ll_get_acl(struct inode *inode, int type)
2717 {
2718         struct ll_inode_info *lli = ll_i2info(inode);
2719         struct posix_acl *acl = NULL;
2720         ENTRY;
2721
2722         spin_lock(&lli->lli_lock);
2723         /* VFS' acl_permission_check->check_acl will release the refcount */
2724         acl = posix_acl_dup(lli->lli_posix_acl);
2725         spin_unlock(&lli->lli_lock);
2726
2727         RETURN(acl);
2728 }
2729
2730 #ifndef HAVE_GENERIC_PERMISSION_2ARGS
2731 static int
2732 # ifdef HAVE_GENERIC_PERMISSION_4ARGS
2733 ll_check_acl(struct inode *inode, int mask, unsigned int flags)
2734 # else
2735 ll_check_acl(struct inode *inode, int mask)
2736 # endif
2737 {
2738 # ifdef CONFIG_FS_POSIX_ACL
2739         struct posix_acl *acl;
2740         int rc;
2741         ENTRY;
2742
2743 #  ifdef HAVE_GENERIC_PERMISSION_4ARGS
2744         if (flags & IPERM_FLAG_RCU)
2745                 return -ECHILD;
2746 #  endif
2747         acl = ll_get_acl(inode, ACL_TYPE_ACCESS);
2748
2749         if (!acl)
2750                 RETURN(-EAGAIN);
2751
2752         rc = posix_acl_permission(inode, acl, mask);
2753         posix_acl_release(acl);
2754
2755         RETURN(rc);
2756 # else /* !CONFIG_FS_POSIX_ACL */
2757         return -EAGAIN;
2758 # endif /* CONFIG_FS_POSIX_ACL */
2759 }
2760 #endif /* HAVE_GENERIC_PERMISSION_2ARGS */
2761
2762 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
2763 int ll_inode_permission(struct inode *inode, int mask, unsigned int flags)
2764 #else
2765 # ifdef HAVE_INODE_PERMISION_2ARGS
2766 int ll_inode_permission(struct inode *inode, int mask)
2767 # else
2768 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2769 # endif
2770 #endif
2771 {
2772         int rc = 0;
2773         ENTRY;
2774
2775 #ifdef MAY_NOT_BLOCK
2776         if (mask & MAY_NOT_BLOCK)
2777                 return -ECHILD;
2778 #elif defined(HAVE_GENERIC_PERMISSION_4ARGS)
2779         if (flags & IPERM_FLAG_RCU)
2780                 return -ECHILD;
2781 #endif
2782
2783        /* as root inode are NOT getting validated in lookup operation,
2784         * need to do it before permission check. */
2785
2786         if (inode == inode->i_sb->s_root->d_inode) {
2787                 struct lookup_intent it = { .it_op = IT_LOOKUP };
2788
2789                 rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
2790                                               MDS_INODELOCK_LOOKUP);
2791                 if (rc)
2792                         RETURN(rc);
2793         }
2794
2795         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
2796                inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
2797
2798         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2799                 return lustre_check_remote_perm(inode, mask);
2800
2801         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2802         rc = ll_generic_permission(inode, mask, flags, ll_check_acl);
2803
2804         RETURN(rc);
2805 }
2806
2807 #ifdef HAVE_FILE_READV
2808 #define READ_METHOD readv
2809 #define READ_FUNCTION ll_file_readv
2810 #define WRITE_METHOD writev
2811 #define WRITE_FUNCTION ll_file_writev
2812 #else
2813 #define READ_METHOD aio_read
2814 #define READ_FUNCTION ll_file_aio_read
2815 #define WRITE_METHOD aio_write
2816 #define WRITE_FUNCTION ll_file_aio_write
2817 #endif
2818
2819 /* -o localflock - only provides locally consistent flock locks */
2820 struct file_operations ll_file_operations = {
2821         .read           = ll_file_read,
2822         .READ_METHOD    = READ_FUNCTION,
2823         .write          = ll_file_write,
2824         .WRITE_METHOD   = WRITE_FUNCTION,
2825         .unlocked_ioctl = ll_file_ioctl,
2826         .open           = ll_file_open,
2827         .release        = ll_file_release,
2828         .mmap           = ll_file_mmap,
2829         .llseek         = ll_file_seek,
2830 #ifdef HAVE_KERNEL_SENDFILE
2831         .sendfile       = ll_file_sendfile,
2832 #endif
2833 #ifdef HAVE_KERNEL_SPLICE_READ
2834         .splice_read    = ll_file_splice_read,
2835 #endif
2836         .fsync          = ll_fsync,
2837         .flush          = ll_flush
2838 };
2839
2840 struct file_operations ll_file_operations_flock = {
2841         .read           = ll_file_read,
2842         .READ_METHOD    = READ_FUNCTION,
2843         .write          = ll_file_write,
2844         .WRITE_METHOD   = WRITE_FUNCTION,
2845         .unlocked_ioctl = ll_file_ioctl,
2846         .open           = ll_file_open,
2847         .release        = ll_file_release,
2848         .mmap           = ll_file_mmap,
2849         .llseek         = ll_file_seek,
2850 #ifdef HAVE_KERNEL_SENDFILE
2851         .sendfile       = ll_file_sendfile,
2852 #endif
2853 #ifdef HAVE_KERNEL_SPLICE_READ
2854         .splice_read    = ll_file_splice_read,
2855 #endif
2856         .fsync          = ll_fsync,
2857         .flush          = ll_flush,
2858         .flock          = ll_file_flock,
2859         .lock           = ll_file_flock
2860 };
2861
2862 /* These are for -o noflock - to return ENOSYS on flock calls */
2863 struct file_operations ll_file_operations_noflock = {
2864         .read           = ll_file_read,
2865         .READ_METHOD    = READ_FUNCTION,
2866         .write          = ll_file_write,
2867         .WRITE_METHOD   = WRITE_FUNCTION,
2868         .unlocked_ioctl = ll_file_ioctl,
2869         .open           = ll_file_open,
2870         .release        = ll_file_release,
2871         .mmap           = ll_file_mmap,
2872         .llseek         = ll_file_seek,
2873 #ifdef HAVE_KERNEL_SENDFILE
2874         .sendfile       = ll_file_sendfile,
2875 #endif
2876 #ifdef HAVE_KERNEL_SPLICE_READ
2877         .splice_read    = ll_file_splice_read,
2878 #endif
2879         .fsync          = ll_fsync,
2880         .flush          = ll_flush,
2881         .flock          = ll_file_noflock,
2882         .lock           = ll_file_noflock
2883 };
2884
2885 struct inode_operations ll_file_inode_operations = {
2886         .setattr        = ll_setattr,
2887         .getattr        = ll_getattr,
2888         .permission     = ll_inode_permission,
2889         .setxattr       = ll_setxattr,
2890         .getxattr       = ll_getxattr,
2891         .listxattr      = ll_listxattr,
2892         .removexattr    = ll_removexattr,
2893 #ifdef  HAVE_LINUX_FIEMAP_H
2894         .fiemap         = ll_fiemap,
2895 #endif
2896 #ifdef HAVE_IOP_GET_ACL
2897         .get_acl        = ll_get_acl,
2898 #endif
2899 };
2900
2901 /* dynamic ioctl number support routins */
2902 static struct llioc_ctl_data {
2903         struct rw_semaphore     ioc_sem;
2904         cfs_list_t              ioc_head;
2905 } llioc = {
2906         __RWSEM_INITIALIZER(llioc.ioc_sem),
2907         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2908 };
2909
2910
2911 struct llioc_data {
2912         cfs_list_t              iocd_list;
2913         unsigned int            iocd_size;
2914         llioc_callback_t        iocd_cb;
2915         unsigned int            iocd_count;
2916         unsigned int            iocd_cmd[0];
2917 };
2918
2919 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2920 {
2921         unsigned int size;
2922         struct llioc_data *in_data = NULL;
2923         ENTRY;
2924
2925         if (cb == NULL || cmd == NULL ||
2926             count > LLIOC_MAX_CMD || count < 0)
2927                 RETURN(NULL);
2928
2929         size = sizeof(*in_data) + count * sizeof(unsigned int);
2930         OBD_ALLOC(in_data, size);
2931         if (in_data == NULL)
2932                 RETURN(NULL);
2933
2934         memset(in_data, 0, sizeof(*in_data));
2935         in_data->iocd_size = size;
2936         in_data->iocd_cb = cb;
2937         in_data->iocd_count = count;
2938         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2939
2940         down_write(&llioc.ioc_sem);
2941         cfs_list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2942         up_write(&llioc.ioc_sem);
2943
2944         RETURN(in_data);
2945 }
2946
2947 void ll_iocontrol_unregister(void *magic)
2948 {
2949         struct llioc_data *tmp;
2950
2951         if (magic == NULL)
2952                 return;
2953
2954         down_write(&llioc.ioc_sem);
2955         cfs_list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2956                 if (tmp == magic) {
2957                         unsigned int size = tmp->iocd_size;
2958
2959                         cfs_list_del(&tmp->iocd_list);
2960                         up_write(&llioc.ioc_sem);
2961
2962                         OBD_FREE(tmp, size);
2963                         return;
2964                 }
2965         }
2966         up_write(&llioc.ioc_sem);
2967
2968         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2969 }
2970
2971 EXPORT_SYMBOL(ll_iocontrol_register);
2972 EXPORT_SYMBOL(ll_iocontrol_unregister);
2973
2974 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
2975                         unsigned int cmd, unsigned long arg, int *rcp)
2976 {
2977         enum llioc_iter ret = LLIOC_CONT;
2978         struct llioc_data *data;
2979         int rc = -EINVAL, i;
2980
2981         down_read(&llioc.ioc_sem);
2982         cfs_list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2983                 for (i = 0; i < data->iocd_count; i++) {
2984                         if (cmd != data->iocd_cmd[i])
2985                                 continue;
2986
2987                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2988                         break;
2989                 }
2990
2991                 if (ret == LLIOC_STOP)
2992                         break;
2993         }
2994         up_read(&llioc.ioc_sem);
2995
2996         if (rcp)
2997                 *rcp = rc;
2998         return ret;
2999 }
3000
3001 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
3002 {
3003         struct ll_inode_info *lli = ll_i2info(inode);
3004         struct cl_env_nest nest;
3005         struct lu_env *env;
3006         int result;
3007         ENTRY;
3008
3009         if (lli->lli_clob == NULL)
3010                 RETURN(0);
3011
3012         env = cl_env_nested_get(&nest);
3013         if (IS_ERR(env))
3014                 RETURN(PTR_ERR(env));
3015
3016         result = cl_conf_set(env, lli->lli_clob, conf);
3017         cl_env_nested_put(&nest, env);
3018
3019         if (conf->coc_opc == OBJECT_CONF_SET) {
3020                 struct ldlm_lock *lock = conf->coc_lock;
3021
3022                 LASSERT(lock != NULL);
3023                 LASSERT(ldlm_has_layout(lock));
3024                 if (result == 0) {
3025                         /* it can only be allowed to match after layout is
3026                          * applied to inode otherwise false layout would be
3027                          * seen. Applying layout shoud happen before dropping
3028                          * the intent lock. */
3029                         ldlm_lock_allow_match(lock);
3030                 }
3031         }
3032         RETURN(result);
3033 }
3034
3035 /**
3036  * Apply the layout to the inode. Layout lock is held and will be released
3037  * in this function.
3038  */
3039 static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
3040                                 struct inode *inode, __u32 *gen, bool reconf)
3041 {
3042         struct ll_inode_info *lli = ll_i2info(inode);
3043         struct ll_sb_info    *sbi = ll_i2sbi(inode);
3044         struct ldlm_lock *lock;
3045         struct lustre_md md = { NULL };
3046         struct cl_object_conf conf;
3047         int rc = 0;
3048         bool lvb_ready;
3049         ENTRY;
3050
3051         LASSERT(lustre_handle_is_used(lockh));
3052
3053         lock = ldlm_handle2lock(lockh);
3054         LASSERT(lock != NULL);
3055         LASSERT(ldlm_has_layout(lock));
3056
3057         LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n",
3058                 inode, PFID(&lli->lli_fid), reconf);
3059
3060         lock_res_and_lock(lock);
3061         lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY);
3062         unlock_res_and_lock(lock);
3063         /* checking lvb_ready is racy but this is okay. The worst case is
3064          * that multi processes may configure the file on the same time. */
3065         if (lvb_ready || !reconf) {
3066                 LDLM_LOCK_PUT(lock);
3067
3068                 rc = -ENODATA;
3069                 if (lvb_ready) {
3070                         /* layout_gen must be valid if layout lock is not
3071                          * cancelled and stripe has already set */
3072                         *gen = lli->lli_layout_gen;
3073                         rc = 0;
3074                 }
3075                 ldlm_lock_decref(lockh, mode);
3076                 RETURN(rc);
3077         }
3078
3079         /* for layout lock, lmm is returned in lock's lvb.
3080          * lvb_data is immutable if the lock is held so it's safe to access it
3081          * without res lock. See the description in ldlm_lock_decref_internal()
3082          * for the condition to free lvb_data of layout lock */
3083         if (lock->l_lvb_data != NULL) {
3084                 rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
3085                                   lock->l_lvb_data, lock->l_lvb_len);
3086                 if (rc >= 0) {
3087                         if (md.lsm != NULL)
3088                                 *gen = md.lsm->lsm_layout_gen;
3089                         rc = 0;
3090                 } else {
3091                         CERROR("%s: file "DFID" unpackmd error: %d\n",
3092                                 ll_get_fsname(inode->i_sb, NULL, 0),
3093                                 PFID(&lli->lli_fid), rc);
3094                 }
3095         }
3096         if (rc < 0) {
3097                 LDLM_LOCK_PUT(lock);
3098                 ldlm_lock_decref(lockh, mode);
3099                 RETURN(rc);
3100         }
3101
3102         /* set layout to file. Unlikely this will fail as old layout was
3103          * surely eliminated */
3104         memset(&conf, 0, sizeof conf);
3105         conf.coc_opc = OBJECT_CONF_SET;
3106         conf.coc_inode = inode;
3107         conf.coc_lock = lock;
3108         conf.u.coc_md = &md;
3109         rc = ll_layout_conf(inode, &conf);
3110         LDLM_LOCK_PUT(lock);
3111
3112         ldlm_lock_decref(lockh, mode);
3113
3114         if (md.lsm != NULL)
3115                 obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
3116
3117         /* wait for IO to complete if it's still being used. */
3118         if (rc == -EBUSY) {
3119                 CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n",
3120                         ll_get_fsname(inode->i_sb, NULL, 0),
3121                         inode, PFID(&lli->lli_fid));
3122
3123                 memset(&conf, 0, sizeof conf);
3124                 conf.coc_opc = OBJECT_CONF_WAIT;
3125                 conf.coc_inode = inode;
3126                 rc = ll_layout_conf(inode, &conf);
3127                 if (rc == 0)
3128                         rc = -EAGAIN;
3129
3130                 CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n",
3131                         PFID(&lli->lli_fid), rc);
3132         }
3133
3134         RETURN(rc);
3135 }
3136
3137 /**
3138  * This function checks if there exists a LAYOUT lock on the client side,
3139  * or enqueues it if it doesn't have one in cache.
3140  *
3141  * This function will not hold layout lock so it may be revoked any time after
3142  * this function returns. Any operations depend on layout should be redone
3143  * in that case.
3144  *
3145  * This function should be called before lov_io_init() to get an uptodate
3146  * layout version, the caller should save the version number and after IO
3147  * is finished, this function should be called again to verify that layout
3148  * is not changed during IO time.
3149  */
3150 int ll_layout_refresh(struct inode *inode, __u32 *gen)
3151 {
3152         struct ll_inode_info  *lli = ll_i2info(inode);
3153         struct ll_sb_info     *sbi = ll_i2sbi(inode);
3154         struct md_op_data     *op_data;
3155         struct lookup_intent   it;
3156         struct lustre_handle   lockh;
3157         ldlm_mode_t            mode;
3158         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
3159                                            .ei_mode = LCK_CR,
3160                                            .ei_cb_bl = ll_md_blocking_ast,
3161                                            .ei_cb_cp = ldlm_completion_ast,
3162                                            .ei_cbdata = inode };
3163         int rc;
3164         ENTRY;
3165
3166         *gen = LL_LAYOUT_GEN_ZERO;
3167         if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
3168                 RETURN(0);
3169
3170         /* sanity checks */
3171         LASSERT(fid_is_sane(ll_inode2fid(inode)));
3172         LASSERT(S_ISREG(inode->i_mode));
3173
3174         /* mostly layout lock is caching on the local side, so try to match
3175          * it before grabbing layout lock mutex. */
3176         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
3177         if (mode != 0) { /* hit cached lock */
3178                 rc = ll_layout_lock_set(&lockh, mode, inode, gen, false);
3179                 if (rc == 0)
3180                         RETURN(0);
3181
3182                 /* better hold lli_layout_mutex to try again otherwise
3183                  * it will have starvation problem. */
3184         }
3185
3186         /* take layout lock mutex to enqueue layout lock exclusively. */
3187         mutex_lock(&lli->lli_layout_mutex);
3188
3189 again:
3190         /* try again. Maybe somebody else has done this. */
3191         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
3192         if (mode != 0) { /* hit cached lock */
3193                 rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3194                 if (rc == -EAGAIN)
3195                         goto again;
3196
3197                 mutex_unlock(&lli->lli_layout_mutex);
3198                 RETURN(rc);
3199         }
3200
3201         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
3202                         0, 0, LUSTRE_OPC_ANY, NULL);
3203         if (IS_ERR(op_data)) {
3204                 mutex_unlock(&lli->lli_layout_mutex);
3205                 RETURN(PTR_ERR(op_data));
3206         }
3207
3208         /* have to enqueue one */
3209         memset(&it, 0, sizeof(it));
3210         it.it_op = IT_LAYOUT;
3211         lockh.cookie = 0ULL;
3212
3213         LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file %p/"DFID".\n",
3214                         ll_get_fsname(inode->i_sb, NULL, 0), inode,
3215                         PFID(&lli->lli_fid));
3216
3217         rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
3218                         NULL, 0, NULL, 0);
3219         if (it.d.lustre.it_data != NULL)
3220                 ptlrpc_req_finished(it.d.lustre.it_data);
3221         it.d.lustre.it_data = NULL;
3222
3223         ll_finish_md_op_data(op_data);
3224
3225         mode = it.d.lustre.it_lock_mode;
3226         it.d.lustre.it_lock_mode = 0;
3227         ll_intent_drop_lock(&it);
3228
3229         if (rc == 0) {
3230                 /* set lock data in case this is a new lock */
3231                 ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
3232                 rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3233                 if (rc == -EAGAIN)
3234                         goto again;
3235         }
3236         mutex_unlock(&lli->lli_layout_mutex);
3237
3238         RETURN(rc);
3239 }