Whamcloud - gitweb
LU-2061 hsm: get list of HSM actions
[fs/lustre-release.git] / lustre / llite / file.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <linux/pagemap.h>
47 #include <linux/file.h>
48 #include "llite_internal.h"
49 #include <lustre/ll_fiemap.h>
50
51 #include "cl_object.h"
52
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, CFS_ALLOC_IO);
58         fd->fd_write_failed = false;
59         return fd;
60 }
61
62 static void ll_file_data_put(struct ll_file_data *fd)
63 {
64         if (fd != NULL)
65                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
66 }
67
68 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
69                           struct lustre_handle *fh)
70 {
71         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
72         op_data->op_attr.ia_mode = inode->i_mode;
73         op_data->op_attr.ia_atime = inode->i_atime;
74         op_data->op_attr.ia_mtime = inode->i_mtime;
75         op_data->op_attr.ia_ctime = inode->i_ctime;
76         op_data->op_attr.ia_size = i_size_read(inode);
77         op_data->op_attr_blocks = inode->i_blocks;
78         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
79                                         ll_inode_to_ext_flags(inode->i_flags);
80         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
81         if (fh)
82                 op_data->op_handle = *fh;
83         op_data->op_capa1 = ll_mdscapa_get(inode);
84
85         if (LLIF_DATA_MODIFIED & ll_i2info(inode)->lli_flags)
86                 op_data->op_bias |= MDS_DATA_MODIFIED;
87 }
88
89 /**
90  * Closes the IO epoch and packs all the attributes into @op_data for
91  * the CLOSE rpc.
92  */
93 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
94                              struct obd_client_handle *och)
95 {
96         ENTRY;
97
98         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
99                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
100
101         if (!(och->och_flags & FMODE_WRITE))
102                 goto out;
103
104         if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
105                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
106         else
107                 ll_ioepoch_close(inode, op_data, &och, 0);
108
109 out:
110         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
111         ll_prep_md_op_data(op_data, inode, NULL, NULL,
112                            0, 0, LUSTRE_OPC_ANY, NULL);
113         EXIT;
114 }
115
116 static int ll_close_inode_openhandle(struct obd_export *md_exp,
117                                      struct inode *inode,
118                                      struct obd_client_handle *och)
119 {
120         struct obd_export *exp = ll_i2mdexp(inode);
121         struct md_op_data *op_data;
122         struct ptlrpc_request *req = NULL;
123         struct obd_device *obd = class_exp2obd(exp);
124         int epoch_close = 1;
125         int rc;
126         ENTRY;
127
128         if (obd == NULL) {
129                 /*
130                  * XXX: in case of LMV, is this correct to access
131                  * ->exp_handle?
132                  */
133                 CERROR("Invalid MDC connection handle "LPX64"\n",
134                        ll_i2mdexp(inode)->exp_handle.h_cookie);
135                 GOTO(out, rc = 0);
136         }
137
138         OBD_ALLOC_PTR(op_data);
139         if (op_data == NULL)
140                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
141
142         ll_prepare_close(inode, op_data, och);
143         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
144         rc = md_close(md_exp, op_data, och->och_mod, &req);
145         if (rc == -EAGAIN) {
146                 /* This close must have the epoch closed. */
147                 LASSERT(epoch_close);
148                 /* MDS has instructed us to obtain Size-on-MDS attribute from
149                  * OSTs and send setattr to back to MDS. */
150                 rc = ll_som_update(inode, op_data);
151                 if (rc) {
152                         CERROR("inode %lu mdc Size-on-MDS update failed: "
153                                "rc = %d\n", inode->i_ino, rc);
154                         rc = 0;
155                 }
156         } else if (rc) {
157                 CERROR("inode %lu mdc close failed: rc = %d\n",
158                        inode->i_ino, rc);
159         }
160
161         /* DATA_MODIFIED flag was successfully sent on close, cancel data
162          * modification flag. */
163         if (rc == 0 && (op_data->op_bias & MDS_DATA_MODIFIED)) {
164                 struct ll_inode_info *lli = ll_i2info(inode);
165
166                 spin_lock(&lli->lli_lock);
167                 lli->lli_flags &= ~LLIF_DATA_MODIFIED;
168                 spin_unlock(&lli->lli_lock);
169         }
170
171         ll_finish_md_op_data(op_data);
172
173         if (rc == 0) {
174                 rc = ll_objects_destroy(req, inode);
175                 if (rc)
176                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
177                                inode->i_ino, rc);
178         }
179
180         EXIT;
181 out:
182
183         if (exp_connect_som(exp) && !epoch_close &&
184             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
185                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
186         } else {
187                 md_clear_open_replay_data(md_exp, och);
188                 /* Free @och if it is not waiting for DONE_WRITING. */
189                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
190                 OBD_FREE_PTR(och);
191         }
192         if (req) /* This is close request */
193                 ptlrpc_req_finished(req);
194         return rc;
195 }
196
197 int ll_md_real_close(struct inode *inode, int flags)
198 {
199         struct ll_inode_info *lli = ll_i2info(inode);
200         struct obd_client_handle **och_p;
201         struct obd_client_handle *och;
202         __u64 *och_usecount;
203         int rc = 0;
204         ENTRY;
205
206         if (flags & FMODE_WRITE) {
207                 och_p = &lli->lli_mds_write_och;
208                 och_usecount = &lli->lli_open_fd_write_count;
209         } else if (flags & FMODE_EXEC) {
210                 och_p = &lli->lli_mds_exec_och;
211                 och_usecount = &lli->lli_open_fd_exec_count;
212         } else {
213                 LASSERT(flags & FMODE_READ);
214                 och_p = &lli->lli_mds_read_och;
215                 och_usecount = &lli->lli_open_fd_read_count;
216         }
217
218         mutex_lock(&lli->lli_och_mutex);
219         if (*och_usecount) { /* There are still users of this handle, so
220                                 skip freeing it. */
221                 mutex_unlock(&lli->lli_och_mutex);
222                 RETURN(0);
223         }
224         och=*och_p;
225         *och_p = NULL;
226         mutex_unlock(&lli->lli_och_mutex);
227
228         if (och) { /* There might be a race and somebody have freed this och
229                       already */
230                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
231                                                inode, och);
232         }
233
234         RETURN(rc);
235 }
236
237 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
238                 struct file *file)
239 {
240         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
241         struct ll_inode_info *lli = ll_i2info(inode);
242         int rc = 0;
243         ENTRY;
244
245         /* clear group lock, if present */
246         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
247                 ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
248
249         /* Let's see if we have good enough OPEN lock on the file and if
250            we can skip talking to MDS */
251         if (file->f_dentry->d_inode) { /* Can this ever be false? */
252                 int lockmode;
253                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
254                 struct lustre_handle lockh;
255                 struct inode *inode = file->f_dentry->d_inode;
256                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
257
258                 mutex_lock(&lli->lli_och_mutex);
259                 if (fd->fd_omode & FMODE_WRITE) {
260                         lockmode = LCK_CW;
261                         LASSERT(lli->lli_open_fd_write_count);
262                         lli->lli_open_fd_write_count--;
263                 } else if (fd->fd_omode & FMODE_EXEC) {
264                         lockmode = LCK_PR;
265                         LASSERT(lli->lli_open_fd_exec_count);
266                         lli->lli_open_fd_exec_count--;
267                 } else {
268                         lockmode = LCK_CR;
269                         LASSERT(lli->lli_open_fd_read_count);
270                         lli->lli_open_fd_read_count--;
271                 }
272                 mutex_unlock(&lli->lli_och_mutex);
273
274                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
275                                    LDLM_IBITS, &policy, lockmode,
276                                    &lockh)) {
277                         rc = ll_md_real_close(file->f_dentry->d_inode,
278                                               fd->fd_omode);
279                 }
280         } else {
281                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
282                        file, file->f_dentry, file->f_dentry->d_name.name);
283         }
284
285         LUSTRE_FPRIVATE(file) = NULL;
286         ll_file_data_put(fd);
287         ll_capa_close(inode);
288
289         RETURN(rc);
290 }
291
292 /* While this returns an error code, fput() the caller does not, so we need
293  * to make every effort to clean up all of our state here.  Also, applications
294  * rarely check close errors and even if an error is returned they will not
295  * re-try the close call.
296  */
297 int ll_file_release(struct inode *inode, struct file *file)
298 {
299         struct ll_file_data *fd;
300         struct ll_sb_info *sbi = ll_i2sbi(inode);
301         struct ll_inode_info *lli = ll_i2info(inode);
302         int rc;
303         ENTRY;
304
305         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
306                inode->i_generation, inode);
307
308 #ifdef CONFIG_FS_POSIX_ACL
309         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
310             inode == inode->i_sb->s_root->d_inode) {
311                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
312
313                 LASSERT(fd != NULL);
314                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
315                         fd->fd_flags &= ~LL_FILE_RMTACL;
316                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
317                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
318                 }
319         }
320 #endif
321
322         if (inode->i_sb->s_root != file->f_dentry)
323                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
324         fd = LUSTRE_FPRIVATE(file);
325         LASSERT(fd != NULL);
326
327         /* The last ref on @file, maybe not the the owner pid of statahead.
328          * Different processes can open the same dir, "ll_opendir_key" means:
329          * it is me that should stop the statahead thread. */
330         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
331             lli->lli_opendir_pid != 0)
332                 ll_stop_statahead(inode, lli->lli_opendir_key);
333
334         if (inode->i_sb->s_root == file->f_dentry) {
335                 LUSTRE_FPRIVATE(file) = NULL;
336                 ll_file_data_put(fd);
337                 RETURN(0);
338         }
339
340         if (!S_ISDIR(inode->i_mode)) {
341                 lov_read_and_clear_async_rc(lli->lli_clob);
342                 lli->lli_async_rc = 0;
343         }
344
345         rc = ll_md_close(sbi->ll_md_exp, inode, file);
346
347         if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
348                 libcfs_debug_dumplog();
349
350         RETURN(rc);
351 }
352
353 static int ll_intent_file_open(struct file *file, void *lmm,
354                                int lmmsize, struct lookup_intent *itp)
355 {
356         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
357         struct dentry *parent = file->f_dentry->d_parent;
358         const char *name = file->f_dentry->d_name.name;
359         const int len = file->f_dentry->d_name.len;
360         struct md_op_data *op_data;
361         struct ptlrpc_request *req;
362         __u32 opc = LUSTRE_OPC_ANY;
363         int rc;
364         ENTRY;
365
366         if (!parent)
367                 RETURN(-ENOENT);
368
369         /* Usually we come here only for NFSD, and we want open lock.
370            But we can also get here with pre 2.6.15 patchless kernels, and in
371            that case that lock is also ok */
372         /* We can also get here if there was cached open handle in revalidate_it
373          * but it disappeared while we were getting from there to ll_file_open.
374          * But this means this file was closed and immediatelly opened which
375          * makes a good candidate for using OPEN lock */
376         /* If lmmsize & lmm are not 0, we are just setting stripe info
377          * parameters. No need for the open lock */
378         if (lmm == NULL && lmmsize == 0) {
379                 itp->it_flags |= MDS_OPEN_LOCK;
380                 if (itp->it_flags & FMODE_WRITE)
381                         opc = LUSTRE_OPC_CREATE;
382         }
383
384         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
385                                       file->f_dentry->d_inode, name, len,
386                                       O_RDWR, opc, NULL);
387         if (IS_ERR(op_data))
388                 RETURN(PTR_ERR(op_data));
389
390         itp->it_flags |= MDS_OPEN_BY_FID;
391         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
392                             0 /*unused */, &req, ll_md_blocking_ast, 0);
393         ll_finish_md_op_data(op_data);
394         if (rc == -ESTALE) {
395                 /* reason for keep own exit path - don`t flood log
396                 * with messages with -ESTALE errors.
397                 */
398                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
399                      it_open_error(DISP_OPEN_OPEN, itp))
400                         GOTO(out, rc);
401                 ll_release_openhandle(file->f_dentry, itp);
402                 GOTO(out, rc);
403         }
404
405         if (it_disposition(itp, DISP_LOOKUP_NEG))
406                 GOTO(out, rc = -ENOENT);
407
408         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
409                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
410                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
411                 GOTO(out, rc);
412         }
413
414         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL, itp);
415         if (!rc && itp->d.lustre.it_lock_mode)
416                 ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
417                                  itp, NULL);
418
419 out:
420         ptlrpc_req_finished(itp->d.lustre.it_data);
421         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
422         ll_intent_drop_lock(itp);
423
424         RETURN(rc);
425 }
426
427 /**
428  * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
429  * not believe attributes if a few ioepoch holders exist. Attributes for
430  * previous ioepoch if new one is opened are also skipped by MDS.
431  */
432 void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
433 {
434         if (ioepoch && lli->lli_ioepoch != ioepoch) {
435                 lli->lli_ioepoch = ioepoch;
436                 CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
437                        ioepoch, PFID(&lli->lli_fid));
438         }
439 }
440
441 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
442                        struct lookup_intent *it, struct obd_client_handle *och)
443 {
444         struct ptlrpc_request *req = it->d.lustre.it_data;
445         struct mdt_body *body;
446
447         LASSERT(och);
448
449         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
450         LASSERT(body != NULL);                      /* reply already checked out */
451
452         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
453         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
454         och->och_fid = lli->lli_fid;
455         och->och_flags = it->it_flags;
456         ll_ioepoch_open(lli, body->ioepoch);
457
458         return md_set_open_replay_data(md_exp, och, req);
459 }
460
461 int ll_local_open(struct file *file, struct lookup_intent *it,
462                   struct ll_file_data *fd, struct obd_client_handle *och)
463 {
464         struct inode *inode = file->f_dentry->d_inode;
465         struct ll_inode_info *lli = ll_i2info(inode);
466         ENTRY;
467
468         LASSERT(!LUSTRE_FPRIVATE(file));
469
470         LASSERT(fd != NULL);
471
472         if (och) {
473                 struct ptlrpc_request *req = it->d.lustre.it_data;
474                 struct mdt_body *body;
475                 int rc;
476
477                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
478                 if (rc)
479                         RETURN(rc);
480
481                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
482                 if ((it->it_flags & FMODE_WRITE) &&
483                     (body->valid & OBD_MD_FLSIZE))
484                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
485                                lli->lli_ioepoch, PFID(&lli->lli_fid));
486         }
487
488         LUSTRE_FPRIVATE(file) = fd;
489         ll_readahead_init(inode, &fd->fd_ras);
490         fd->fd_omode = it->it_flags;
491         RETURN(0);
492 }
493
494 /* Open a file, and (for the very first open) create objects on the OSTs at
495  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
496  * creation or open until ll_lov_setstripe() ioctl is called.
497  *
498  * If we already have the stripe MD locally then we don't request it in
499  * md_open(), by passing a lmm_size = 0.
500  *
501  * It is up to the application to ensure no other processes open this file
502  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
503  * used.  We might be able to avoid races of that sort by getting lli_open_sem
504  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
505  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
506  */
507 int ll_file_open(struct inode *inode, struct file *file)
508 {
509         struct ll_inode_info *lli = ll_i2info(inode);
510         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
511                                           .it_flags = file->f_flags };
512         struct obd_client_handle **och_p = NULL;
513         __u64 *och_usecount = NULL;
514         struct ll_file_data *fd;
515         int rc = 0, opendir_set = 0;
516         ENTRY;
517
518         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
519                inode->i_generation, inode, file->f_flags);
520
521         it = file->private_data; /* XXX: compat macro */
522         file->private_data = NULL; /* prevent ll_local_open assertion */
523
524         fd = ll_file_data_get();
525         if (fd == NULL)
526                 GOTO(out_och_free, rc = -ENOMEM);
527
528         fd->fd_file = file;
529         if (S_ISDIR(inode->i_mode)) {
530                 spin_lock(&lli->lli_sa_lock);
531                 if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
532                     lli->lli_opendir_pid == 0) {
533                         lli->lli_opendir_key = fd;
534                         lli->lli_opendir_pid = cfs_curproc_pid();
535                         opendir_set = 1;
536                 }
537                 spin_unlock(&lli->lli_sa_lock);
538         }
539
540         if (inode->i_sb->s_root == file->f_dentry) {
541                 LUSTRE_FPRIVATE(file) = fd;
542                 RETURN(0);
543         }
544
545         if (!it || !it->d.lustre.it_disposition) {
546                 /* Convert f_flags into access mode. We cannot use file->f_mode,
547                  * because everything but O_ACCMODE mask was stripped from
548                  * there */
549                 if ((oit.it_flags + 1) & O_ACCMODE)
550                         oit.it_flags++;
551                 if (file->f_flags & O_TRUNC)
552                         oit.it_flags |= FMODE_WRITE;
553
554                 /* kernel only call f_op->open in dentry_open.  filp_open calls
555                  * dentry_open after call to open_namei that checks permissions.
556                  * Only nfsd_open call dentry_open directly without checking
557                  * permissions and because of that this code below is safe. */
558                 if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
559                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
560
561                 /* We do not want O_EXCL here, presumably we opened the file
562                  * already? XXX - NFS implications? */
563                 oit.it_flags &= ~O_EXCL;
564
565                 /* bug20584, if "it_flags" contains O_CREAT, the file will be
566                  * created if necessary, then "IT_CREAT" should be set to keep
567                  * consistent with it */
568                 if (oit.it_flags & O_CREAT)
569                         oit.it_op |= IT_CREAT;
570
571                 it = &oit;
572         }
573
574 restart:
575         /* Let's see if we have file open on MDS already. */
576         if (it->it_flags & FMODE_WRITE) {
577                 och_p = &lli->lli_mds_write_och;
578                 och_usecount = &lli->lli_open_fd_write_count;
579         } else if (it->it_flags & FMODE_EXEC) {
580                 och_p = &lli->lli_mds_exec_och;
581                 och_usecount = &lli->lli_open_fd_exec_count;
582          } else {
583                 och_p = &lli->lli_mds_read_och;
584                 och_usecount = &lli->lli_open_fd_read_count;
585         }
586
587         mutex_lock(&lli->lli_och_mutex);
588         if (*och_p) { /* Open handle is present */
589                 if (it_disposition(it, DISP_OPEN_OPEN)) {
590                         /* Well, there's extra open request that we do not need,
591                            let's close it somehow. This will decref request. */
592                         rc = it_open_error(DISP_OPEN_OPEN, it);
593                         if (rc) {
594                                 mutex_unlock(&lli->lli_och_mutex);
595                                 GOTO(out_openerr, rc);
596                         }
597
598                         ll_release_openhandle(file->f_dentry, it);
599                 }
600                 (*och_usecount)++;
601
602                 rc = ll_local_open(file, it, fd, NULL);
603                 if (rc) {
604                         (*och_usecount)--;
605                         mutex_unlock(&lli->lli_och_mutex);
606                         GOTO(out_openerr, rc);
607                 }
608         } else {
609                 LASSERT(*och_usecount == 0);
610                 if (!it->d.lustre.it_disposition) {
611                         /* We cannot just request lock handle now, new ELC code
612                            means that one of other OPEN locks for this file
613                            could be cancelled, and since blocking ast handler
614                            would attempt to grab och_mutex as well, that would
615                            result in a deadlock */
616                         mutex_unlock(&lli->lli_och_mutex);
617                         it->it_create_mode |= M_CHECK_STALE;
618                         rc = ll_intent_file_open(file, NULL, 0, it);
619                         it->it_create_mode &= ~M_CHECK_STALE;
620                         if (rc)
621                                 GOTO(out_openerr, rc);
622
623                         goto restart;
624                 }
625                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
626                 if (!*och_p)
627                         GOTO(out_och_free, rc = -ENOMEM);
628
629                 (*och_usecount)++;
630
631                 /* md_intent_lock() didn't get a request ref if there was an
632                  * open error, so don't do cleanup on the request here
633                  * (bug 3430) */
634                 /* XXX (green): Should not we bail out on any error here, not
635                  * just open error? */
636                 rc = it_open_error(DISP_OPEN_OPEN, it);
637                 if (rc)
638                         GOTO(out_och_free, rc);
639
640                 LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
641
642                 rc = ll_local_open(file, it, fd, *och_p);
643                 if (rc)
644                         GOTO(out_och_free, rc);
645         }
646         mutex_unlock(&lli->lli_och_mutex);
647         fd = NULL;
648
649         /* Must do this outside lli_och_mutex lock to prevent deadlock where
650            different kind of OPEN lock for this same inode gets cancelled
651            by ldlm_cancel_lru */
652         if (!S_ISREG(inode->i_mode))
653                 GOTO(out_och_free, rc);
654
655         ll_capa_open(inode);
656
657         if (!lli->lli_has_smd) {
658                 if (file->f_flags & O_LOV_DELAY_CREATE ||
659                     !(file->f_mode & FMODE_WRITE)) {
660                         CDEBUG(D_INODE, "object creation was delayed\n");
661                         GOTO(out_och_free, rc);
662                 }
663         }
664         file->f_flags &= ~O_LOV_DELAY_CREATE;
665         GOTO(out_och_free, rc);
666
667 out_och_free:
668         if (rc) {
669                 if (och_p && *och_p) {
670                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
671                         *och_p = NULL; /* OBD_FREE writes some magic there */
672                         (*och_usecount)--;
673                 }
674                 mutex_unlock(&lli->lli_och_mutex);
675
676 out_openerr:
677                 if (opendir_set != 0)
678                         ll_stop_statahead(inode, lli->lli_opendir_key);
679                 if (fd != NULL)
680                         ll_file_data_put(fd);
681         } else {
682                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
683         }
684
685         if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
686                 ptlrpc_req_finished(it->d.lustre.it_data);
687                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
688         }
689
690         return rc;
691 }
692
693 /* Fills the obdo with the attributes for the lsm */
694 static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
695                           struct obd_capa *capa, struct obdo *obdo,
696                           __u64 ioepoch, int sync)
697 {
698         struct ptlrpc_request_set *set;
699         struct obd_info            oinfo = { { { 0 } } };
700         int                        rc;
701
702         ENTRY;
703
704         LASSERT(lsm != NULL);
705
706         oinfo.oi_md = lsm;
707         oinfo.oi_oa = obdo;
708         oinfo.oi_oa->o_id = lsm->lsm_object_id;
709         oinfo.oi_oa->o_seq = lsm->lsm_object_seq;
710         oinfo.oi_oa->o_mode = S_IFREG;
711         oinfo.oi_oa->o_ioepoch = ioepoch;
712         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
713                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
714                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
715                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
716                                OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
717                                OBD_MD_FLDATAVERSION;
718         oinfo.oi_capa = capa;
719         if (sync) {
720                 oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
721                 oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
722         }
723
724         set = ptlrpc_prep_set();
725         if (set == NULL) {
726                 CERROR("can't allocate ptlrpc set\n");
727                 rc = -ENOMEM;
728         } else {
729                 rc = obd_getattr_async(exp, &oinfo, set);
730                 if (rc == 0)
731                         rc = ptlrpc_set_wait(set);
732                 ptlrpc_set_destroy(set);
733         }
734         if (rc == 0)
735                 oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
736                                          OBD_MD_FLATIME | OBD_MD_FLMTIME |
737                                          OBD_MD_FLCTIME | OBD_MD_FLSIZE |
738                                          OBD_MD_FLDATAVERSION);
739         RETURN(rc);
740 }
741
742 /**
743   * Performs the getattr on the inode and updates its fields.
744   * If @sync != 0, perform the getattr under the server-side lock.
745   */
746 int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
747                      __u64 ioepoch, int sync)
748 {
749         struct obd_capa      *capa = ll_mdscapa_get(inode);
750         struct lov_stripe_md *lsm;
751         int rc;
752         ENTRY;
753
754         lsm = ccc_inode_lsm_get(inode);
755         rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
756                             capa, obdo, ioepoch, sync);
757         capa_put(capa);
758         if (rc == 0) {
759                 obdo_refresh_inode(inode, obdo, obdo->o_valid);
760                 CDEBUG(D_INODE,
761                        "objid "LPX64" size %llu, blocks %llu, blksize %lu\n",
762                        lsm ? lsm->lsm_object_id : 0, i_size_read(inode),
763                        (unsigned long long)inode->i_blocks,
764                        (unsigned long)ll_inode_blksize(inode));
765         }
766         ccc_inode_lsm_put(inode, lsm);
767         RETURN(rc);
768 }
769
770 int ll_merge_lvb(struct inode *inode)
771 {
772         struct ll_inode_info *lli = ll_i2info(inode);
773         struct ll_sb_info *sbi = ll_i2sbi(inode);
774         struct lov_stripe_md *lsm;
775         struct ost_lvb lvb;
776         int rc = 0;
777
778         ENTRY;
779
780         lsm = ccc_inode_lsm_get(inode);
781         ll_inode_size_lock(inode);
782         inode_init_lvb(inode, &lvb);
783
784         /* merge timestamps the most resently obtained from mds with
785            timestamps obtained from osts */
786         lvb.lvb_atime = lli->lli_lvb.lvb_atime;
787         lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
788         lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
789         if (lsm != NULL) {
790                 rc = obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
791                 cl_isize_write_nolock(inode, lvb.lvb_size);
792
793                 CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
794                                 PFID(&lli->lli_fid), lvb.lvb_size);
795                 inode->i_blocks = lvb.lvb_blocks;
796         }
797         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
798         LTIME_S(inode->i_atime) = lvb.lvb_atime;
799         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
800         ll_inode_size_unlock(inode);
801         ccc_inode_lsm_put(inode, lsm);
802
803         RETURN(rc);
804 }
805
806 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
807                      lstat_t *st)
808 {
809         struct obdo obdo = { 0 };
810         int rc;
811
812         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
813         if (rc == 0) {
814                 st->st_size   = obdo.o_size;
815                 st->st_blocks = obdo.o_blocks;
816                 st->st_mtime  = obdo.o_mtime;
817                 st->st_atime  = obdo.o_atime;
818                 st->st_ctime  = obdo.o_ctime;
819         }
820         return rc;
821 }
822
823 void ll_io_init(struct cl_io *io, const struct file *file, int write)
824 {
825         struct inode *inode = file->f_dentry->d_inode;
826
827         io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
828         if (write) {
829                 io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
830                 io->u.ci_wr.wr_sync = file->f_flags & O_SYNC || IS_SYNC(inode);
831         }
832         io->ci_obj     = ll_i2info(inode)->lli_clob;
833         io->ci_lockreq = CILR_MAYBE;
834         if (ll_file_nolock(file)) {
835                 io->ci_lockreq = CILR_NEVER;
836                 io->ci_no_srvlock = 1;
837         } else if (file->f_flags & O_APPEND) {
838                 io->ci_lockreq = CILR_MANDATORY;
839         }
840 }
841
842 static ssize_t
843 ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
844                    struct file *file, enum cl_io_type iot,
845                    loff_t *ppos, size_t count)
846 {
847         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
848         struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
849         struct cl_io         *io;
850         ssize_t               result;
851         ENTRY;
852
853         io = ccc_env_thread_io(env);
854         ll_io_init(io, file, iot == CIT_WRITE);
855
856         if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
857                 struct vvp_io *vio = vvp_env_io(env);
858                 struct ccc_io *cio = ccc_env_io(env);
859                 int write_mutex_locked = 0;
860
861                 cio->cui_fd  = LUSTRE_FPRIVATE(file);
862                 vio->cui_io_subtype = args->via_io_subtype;
863
864                 switch (vio->cui_io_subtype) {
865                 case IO_NORMAL:
866                         cio->cui_iov = args->u.normal.via_iov;
867                         cio->cui_nrsegs = args->u.normal.via_nrsegs;
868                         cio->cui_tot_nrsegs = cio->cui_nrsegs;
869 #ifndef HAVE_FILE_WRITEV
870                         cio->cui_iocb = args->u.normal.via_iocb;
871 #endif
872                         if ((iot == CIT_WRITE) &&
873                             !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
874                                 if (mutex_lock_interruptible(&lli->
875                                                                lli_write_mutex))
876                                         GOTO(out, result = -ERESTARTSYS);
877                                 write_mutex_locked = 1;
878                         } else if (iot == CIT_READ) {
879                                 down_read(&lli->lli_trunc_sem);
880                         }
881                         break;
882                 case IO_SENDFILE:
883                         vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
884                         vio->u.sendfile.cui_target = args->u.sendfile.via_target;
885                         break;
886                 case IO_SPLICE:
887                         vio->u.splice.cui_pipe = args->u.splice.via_pipe;
888                         vio->u.splice.cui_flags = args->u.splice.via_flags;
889                         break;
890                 default:
891                         CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
892                         LBUG();
893                 }
894                 result = cl_io_loop(env, io);
895                 if (write_mutex_locked)
896                         mutex_unlock(&lli->lli_write_mutex);
897                 else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
898                         up_read(&lli->lli_trunc_sem);
899         } else {
900                 /* cl_io_rw_init() handled IO */
901                 result = io->ci_result;
902         }
903
904         if (io->ci_nob > 0) {
905                 result = io->ci_nob;
906                 *ppos = io->u.ci_wr.wr.crw_pos;
907         }
908         GOTO(out, result);
909 out:
910         cl_io_fini(env, io);
911
912         if (iot == CIT_READ) {
913                 if (result >= 0)
914                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
915                                            LPROC_LL_READ_BYTES, result);
916         } else if (iot == CIT_WRITE) {
917                 if (result >= 0) {
918                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
919                                            LPROC_LL_WRITE_BYTES, result);
920                         fd->fd_write_failed = false;
921                 } else {
922                         fd->fd_write_failed = true;
923                 }
924         }
925
926         return result;
927 }
928
929
930 /*
931  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
932  */
933 static int ll_file_get_iov_count(const struct iovec *iov,
934                                  unsigned long *nr_segs, size_t *count)
935 {
936         size_t cnt = 0;
937         unsigned long seg;
938
939         for (seg = 0; seg < *nr_segs; seg++) {
940                 const struct iovec *iv = &iov[seg];
941
942                 /*
943                  * If any segment has a negative length, or the cumulative
944                  * length ever wraps negative then return -EINVAL.
945                  */
946                 cnt += iv->iov_len;
947                 if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
948                         return -EINVAL;
949                 if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
950                         continue;
951                 if (seg == 0)
952                         return -EFAULT;
953                 *nr_segs = seg;
954                 cnt -= iv->iov_len;   /* This segment is no good */
955                 break;
956         }
957         *count = cnt;
958         return 0;
959 }
960
961 #ifdef HAVE_FILE_READV
962 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
963                               unsigned long nr_segs, loff_t *ppos)
964 {
965         struct lu_env      *env;
966         struct vvp_io_args *args;
967         size_t              count;
968         ssize_t             result;
969         int                 refcheck;
970         ENTRY;
971
972         result = ll_file_get_iov_count(iov, &nr_segs, &count);
973         if (result)
974                 RETURN(result);
975
976         env = cl_env_get(&refcheck);
977         if (IS_ERR(env))
978                 RETURN(PTR_ERR(env));
979
980         args = vvp_env_args(env, IO_NORMAL);
981         args->u.normal.via_iov = (struct iovec *)iov;
982         args->u.normal.via_nrsegs = nr_segs;
983
984         result = ll_file_io_generic(env, args, file, CIT_READ, ppos, count);
985         cl_env_put(env, &refcheck);
986         RETURN(result);
987 }
988
989 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
990                             loff_t *ppos)
991 {
992         struct lu_env *env;
993         struct iovec  *local_iov;
994         ssize_t        result;
995         int            refcheck;
996         ENTRY;
997
998         env = cl_env_get(&refcheck);
999         if (IS_ERR(env))
1000                 RETURN(PTR_ERR(env));
1001
1002         local_iov = &vvp_env_info(env)->vti_local_iov;
1003         local_iov->iov_base = (void __user *)buf;
1004         local_iov->iov_len = count;
1005         result = ll_file_readv(file, local_iov, 1, ppos);
1006         cl_env_put(env, &refcheck);
1007         RETURN(result);
1008 }
1009
1010 #else
1011 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
1012                                 unsigned long nr_segs, loff_t pos)
1013 {
1014         struct lu_env      *env;
1015         struct vvp_io_args *args;
1016         size_t              count;
1017         ssize_t             result;
1018         int                 refcheck;
1019         ENTRY;
1020
1021         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1022         if (result)
1023                 RETURN(result);
1024
1025         env = cl_env_get(&refcheck);
1026         if (IS_ERR(env))
1027                 RETURN(PTR_ERR(env));
1028
1029         args = vvp_env_args(env, IO_NORMAL);
1030         args->u.normal.via_iov = (struct iovec *)iov;
1031         args->u.normal.via_nrsegs = nr_segs;
1032         args->u.normal.via_iocb = iocb;
1033
1034         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1035                                     &iocb->ki_pos, count);
1036         cl_env_put(env, &refcheck);
1037         RETURN(result);
1038 }
1039
1040 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1041                             loff_t *ppos)
1042 {
1043         struct lu_env *env;
1044         struct iovec  *local_iov;
1045         struct kiocb  *kiocb;
1046         ssize_t        result;
1047         int            refcheck;
1048         ENTRY;
1049
1050         env = cl_env_get(&refcheck);
1051         if (IS_ERR(env))
1052                 RETURN(PTR_ERR(env));
1053
1054         local_iov = &vvp_env_info(env)->vti_local_iov;
1055         kiocb = &vvp_env_info(env)->vti_kiocb;
1056         local_iov->iov_base = (void __user *)buf;
1057         local_iov->iov_len = count;
1058         init_sync_kiocb(kiocb, file);
1059         kiocb->ki_pos = *ppos;
1060         kiocb->ki_left = count;
1061
1062         result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1063         *ppos = kiocb->ki_pos;
1064
1065         cl_env_put(env, &refcheck);
1066         RETURN(result);
1067 }
1068 #endif
1069
1070 /*
1071  * Write to a file (through the page cache).
1072  */
1073 #ifdef HAVE_FILE_WRITEV
1074 static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
1075                               unsigned long nr_segs, loff_t *ppos)
1076 {
1077         struct lu_env      *env;
1078         struct vvp_io_args *args;
1079         size_t              count;
1080         ssize_t             result;
1081         int                 refcheck;
1082         ENTRY;
1083
1084         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1085         if (result)
1086                 RETURN(result);
1087
1088         env = cl_env_get(&refcheck);
1089         if (IS_ERR(env))
1090                 RETURN(PTR_ERR(env));
1091
1092         args = vvp_env_args(env, IO_NORMAL);
1093         args->u.normal.via_iov = (struct iovec *)iov;
1094         args->u.normal.via_nrsegs = nr_segs;
1095
1096         result = ll_file_io_generic(env, args, file, CIT_WRITE, ppos, count);
1097         cl_env_put(env, &refcheck);
1098         RETURN(result);
1099 }
1100
1101 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1102                              loff_t *ppos)
1103 {
1104         struct lu_env    *env;
1105         struct iovec     *local_iov;
1106         ssize_t           result;
1107         int               refcheck;
1108         ENTRY;
1109
1110         env = cl_env_get(&refcheck);
1111         if (IS_ERR(env))
1112                 RETURN(PTR_ERR(env));
1113
1114         local_iov = &vvp_env_info(env)->vti_local_iov;
1115         local_iov->iov_base = (void __user *)buf;
1116         local_iov->iov_len = count;
1117
1118         result = ll_file_writev(file, local_iov, 1, ppos);
1119         cl_env_put(env, &refcheck);
1120         RETURN(result);
1121 }
1122
1123 #else /* AIO stuff */
1124 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1125                                  unsigned long nr_segs, loff_t pos)
1126 {
1127         struct lu_env      *env;
1128         struct vvp_io_args *args;
1129         size_t              count;
1130         ssize_t             result;
1131         int                 refcheck;
1132         ENTRY;
1133
1134         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1135         if (result)
1136                 RETURN(result);
1137
1138         env = cl_env_get(&refcheck);
1139         if (IS_ERR(env))
1140                 RETURN(PTR_ERR(env));
1141
1142         args = vvp_env_args(env, IO_NORMAL);
1143         args->u.normal.via_iov = (struct iovec *)iov;
1144         args->u.normal.via_nrsegs = nr_segs;
1145         args->u.normal.via_iocb = iocb;
1146
1147         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1148                                   &iocb->ki_pos, count);
1149         cl_env_put(env, &refcheck);
1150         RETURN(result);
1151 }
1152
1153 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1154                              loff_t *ppos)
1155 {
1156         struct lu_env *env;
1157         struct iovec  *local_iov;
1158         struct kiocb  *kiocb;
1159         ssize_t        result;
1160         int            refcheck;
1161         ENTRY;
1162
1163         env = cl_env_get(&refcheck);
1164         if (IS_ERR(env))
1165                 RETURN(PTR_ERR(env));
1166
1167         local_iov = &vvp_env_info(env)->vti_local_iov;
1168         kiocb = &vvp_env_info(env)->vti_kiocb;
1169         local_iov->iov_base = (void __user *)buf;
1170         local_iov->iov_len = count;
1171         init_sync_kiocb(kiocb, file);
1172         kiocb->ki_pos = *ppos;
1173         kiocb->ki_left = count;
1174
1175         result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1176         *ppos = kiocb->ki_pos;
1177
1178         cl_env_put(env, &refcheck);
1179         RETURN(result);
1180 }
1181 #endif
1182
1183
1184 #ifdef HAVE_KERNEL_SENDFILE
1185 /*
1186  * Send file content (through pagecache) somewhere with helper
1187  */
1188 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1189                                 read_actor_t actor, void *target)
1190 {
1191         struct lu_env      *env;
1192         struct vvp_io_args *args;
1193         ssize_t             result;
1194         int                 refcheck;
1195         ENTRY;
1196
1197         env = cl_env_get(&refcheck);
1198         if (IS_ERR(env))
1199                 RETURN(PTR_ERR(env));
1200
1201         args = vvp_env_args(env, IO_SENDFILE);
1202         args->u.sendfile.via_target = target;
1203         args->u.sendfile.via_actor = actor;
1204
1205         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1206         cl_env_put(env, &refcheck);
1207         RETURN(result);
1208 }
1209 #endif
1210
1211 #ifdef HAVE_KERNEL_SPLICE_READ
1212 /*
1213  * Send file content (through pagecache) somewhere with helper
1214  */
1215 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1216                                    struct pipe_inode_info *pipe, size_t count,
1217                                    unsigned int flags)
1218 {
1219         struct lu_env      *env;
1220         struct vvp_io_args *args;
1221         ssize_t             result;
1222         int                 refcheck;
1223         ENTRY;
1224
1225         env = cl_env_get(&refcheck);
1226         if (IS_ERR(env))
1227                 RETURN(PTR_ERR(env));
1228
1229         args = vvp_env_args(env, IO_SPLICE);
1230         args->u.splice.via_pipe = pipe;
1231         args->u.splice.via_flags = flags;
1232
1233         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1234         cl_env_put(env, &refcheck);
1235         RETURN(result);
1236 }
1237 #endif
1238
1239 static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq,
1240                            obd_count ost_idx)
1241 {
1242         struct obd_export *exp = ll_i2dtexp(inode);
1243         struct obd_trans_info oti = { 0 };
1244         struct obdo *oa = NULL;
1245         int lsm_size;
1246         int rc = 0;
1247         struct lov_stripe_md *lsm = NULL, *lsm2;
1248         ENTRY;
1249
1250         OBDO_ALLOC(oa);
1251         if (oa == NULL)
1252                 RETURN(-ENOMEM);
1253
1254         lsm = ccc_inode_lsm_get(inode);
1255         if (lsm == NULL)
1256                 GOTO(out, rc = -ENOENT);
1257
1258         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1259                    (lsm->lsm_stripe_count));
1260
1261         OBD_ALLOC_LARGE(lsm2, lsm_size);
1262         if (lsm2 == NULL)
1263                 GOTO(out, rc = -ENOMEM);
1264
1265         oa->o_id = id;
1266         oa->o_seq = seq;
1267         oa->o_nlink = ost_idx;
1268         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1269         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1270         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1271                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1272         obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1273         memcpy(lsm2, lsm, lsm_size);
1274         ll_inode_size_lock(inode);
1275         rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1276         ll_inode_size_unlock(inode);
1277
1278         OBD_FREE_LARGE(lsm2, lsm_size);
1279         GOTO(out, rc);
1280 out:
1281         ccc_inode_lsm_put(inode, lsm);
1282         OBDO_FREE(oa);
1283         return rc;
1284 }
1285
1286 static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1287 {
1288         struct ll_recreate_obj ucreat;
1289         ENTRY;
1290
1291         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1292                 RETURN(-EPERM);
1293
1294         if (copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1295                            sizeof(ucreat)))
1296                 RETURN(-EFAULT);
1297
1298         RETURN(ll_lov_recreate(inode, ucreat.lrc_id, 0,
1299                                ucreat.lrc_ost_idx));
1300 }
1301
1302 static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1303 {
1304         struct lu_fid   fid;
1305         obd_id          id;
1306         obd_count       ost_idx;
1307         ENTRY;
1308
1309         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1310                 RETURN(-EPERM);
1311
1312         if (copy_from_user(&fid, (struct lu_fid *)arg, sizeof(fid)))
1313                 RETURN(-EFAULT);
1314
1315         id = fid_oid(&fid) | ((fid_seq(&fid) & 0xffff) << 32);
1316         ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1317         RETURN(ll_lov_recreate(inode, id, 0, ost_idx));
1318 }
1319
1320 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1321                              int flags, struct lov_user_md *lum, int lum_size)
1322 {
1323         struct lov_stripe_md *lsm = NULL;
1324         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1325         int rc = 0;
1326         ENTRY;
1327
1328         lsm = ccc_inode_lsm_get(inode);
1329         if (lsm != NULL) {
1330                 ccc_inode_lsm_put(inode, lsm);
1331                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1332                        inode->i_ino);
1333                 RETURN(-EEXIST);
1334         }
1335
1336         ll_inode_size_lock(inode);
1337         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1338         if (rc)
1339                 GOTO(out, rc);
1340         rc = oit.d.lustre.it_status;
1341         if (rc < 0)
1342                 GOTO(out_req_free, rc);
1343
1344         ll_release_openhandle(file->f_dentry, &oit);
1345
1346  out:
1347         ll_inode_size_unlock(inode);
1348         ll_intent_release(&oit);
1349         ccc_inode_lsm_put(inode, lsm);
1350         RETURN(rc);
1351 out_req_free:
1352         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1353         goto out;
1354 }
1355
1356 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1357                              struct lov_mds_md **lmmp, int *lmm_size,
1358                              struct ptlrpc_request **request)
1359 {
1360         struct ll_sb_info *sbi = ll_i2sbi(inode);
1361         struct mdt_body  *body;
1362         struct lov_mds_md *lmm = NULL;
1363         struct ptlrpc_request *req = NULL;
1364         struct md_op_data *op_data;
1365         int rc, lmmsize;
1366
1367         rc = ll_get_max_mdsize(sbi, &lmmsize);
1368         if (rc)
1369                 RETURN(rc);
1370
1371         op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1372                                      strlen(filename), lmmsize,
1373                                      LUSTRE_OPC_ANY, NULL);
1374         if (IS_ERR(op_data))
1375                 RETURN(PTR_ERR(op_data));
1376
1377         op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1378         rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1379         ll_finish_md_op_data(op_data);
1380         if (rc < 0) {
1381                 CDEBUG(D_INFO, "md_getattr_name failed "
1382                        "on %s: rc %d\n", filename, rc);
1383                 GOTO(out, rc);
1384         }
1385
1386         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1387         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1388
1389         lmmsize = body->eadatasize;
1390
1391         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1392                         lmmsize == 0) {
1393                 GOTO(out, rc = -ENODATA);
1394         }
1395
1396         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1397         LASSERT(lmm != NULL);
1398
1399         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1400             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1401                 GOTO(out, rc = -EPROTO);
1402         }
1403
1404         /*
1405          * This is coming from the MDS, so is probably in
1406          * little endian.  We convert it to host endian before
1407          * passing it to userspace.
1408          */
1409         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1410                 /* if function called for directory - we should
1411                  * avoid swab not existent lsm objects */
1412                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1413                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1414                         if (S_ISREG(body->mode))
1415                                 lustre_swab_lov_user_md_objects(
1416                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1417                                  ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
1418                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1419                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1420                         if (S_ISREG(body->mode))
1421                                 lustre_swab_lov_user_md_objects(
1422                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1423                                  ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
1424                 }
1425         }
1426
1427 out:
1428         *lmmp = lmm;
1429         *lmm_size = lmmsize;
1430         *request = req;
1431         return rc;
1432 }
1433
1434 static int ll_lov_setea(struct inode *inode, struct file *file,
1435                             unsigned long arg)
1436 {
1437         int                      flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1438         struct lov_user_md      *lump;
1439         int                      lum_size = sizeof(struct lov_user_md) +
1440                                             sizeof(struct lov_user_ost_data);
1441         int                      rc;
1442         ENTRY;
1443
1444         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1445                 RETURN(-EPERM);
1446
1447         OBD_ALLOC_LARGE(lump, lum_size);
1448         if (lump == NULL)
1449                 RETURN(-ENOMEM);
1450
1451         if (copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1452                 OBD_FREE_LARGE(lump, lum_size);
1453                 RETURN(-EFAULT);
1454         }
1455
1456         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1457
1458         OBD_FREE_LARGE(lump, lum_size);
1459         RETURN(rc);
1460 }
1461
1462 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1463                             unsigned long arg)
1464 {
1465         struct lov_user_md_v3    lumv3;
1466         struct lov_user_md_v1   *lumv1 = (struct lov_user_md_v1 *)&lumv3;
1467         struct lov_user_md_v1   *lumv1p = (struct lov_user_md_v1 *)arg;
1468         struct lov_user_md_v3   *lumv3p = (struct lov_user_md_v3 *)arg;
1469         int                      lum_size, rc;
1470         int                      flags = FMODE_WRITE;
1471         ENTRY;
1472
1473         /* first try with v1 which is smaller than v3 */
1474         lum_size = sizeof(struct lov_user_md_v1);
1475         if (copy_from_user(lumv1, lumv1p, lum_size))
1476                 RETURN(-EFAULT);
1477
1478         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1479                 lum_size = sizeof(struct lov_user_md_v3);
1480                 if (copy_from_user(&lumv3, lumv3p, lum_size))
1481                         RETURN(-EFAULT);
1482         }
1483
1484         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1485         if (rc == 0) {
1486                 struct lov_stripe_md *lsm;
1487                 __u32 gen;
1488
1489                 put_user(0, &lumv1p->lmm_stripe_count);
1490
1491                 ll_layout_refresh(inode, &gen);
1492                 lsm = ccc_inode_lsm_get(inode);
1493                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1494                                    0, lsm, (void *)arg);
1495                 ccc_inode_lsm_put(inode, lsm);
1496         }
1497         RETURN(rc);
1498 }
1499
1500 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1501 {
1502         struct lov_stripe_md *lsm;
1503         int rc = -ENODATA;
1504         ENTRY;
1505
1506         lsm = ccc_inode_lsm_get(inode);
1507         if (lsm != NULL)
1508                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1509                                    lsm, (void *)arg);
1510         ccc_inode_lsm_put(inode, lsm);
1511         RETURN(rc);
1512 }
1513
1514 int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1515 {
1516         struct ll_inode_info   *lli = ll_i2info(inode);
1517         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1518         struct ccc_grouplock    grouplock;
1519         int                     rc;
1520         ENTRY;
1521
1522         if (ll_file_nolock(file))
1523                 RETURN(-EOPNOTSUPP);
1524
1525         spin_lock(&lli->lli_lock);
1526         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1527                 CWARN("group lock already existed with gid %lu\n",
1528                       fd->fd_grouplock.cg_gid);
1529                 spin_unlock(&lli->lli_lock);
1530                 RETURN(-EINVAL);
1531         }
1532         LASSERT(fd->fd_grouplock.cg_lock == NULL);
1533         spin_unlock(&lli->lli_lock);
1534
1535         rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1536                               arg, (file->f_flags & O_NONBLOCK), &grouplock);
1537         if (rc)
1538                 RETURN(rc);
1539
1540         spin_lock(&lli->lli_lock);
1541         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1542                 spin_unlock(&lli->lli_lock);
1543                 CERROR("another thread just won the race\n");
1544                 cl_put_grouplock(&grouplock);
1545                 RETURN(-EINVAL);
1546         }
1547
1548         fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1549         fd->fd_grouplock = grouplock;
1550         spin_unlock(&lli->lli_lock);
1551
1552         CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1553         RETURN(0);
1554 }
1555
1556 int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1557 {
1558         struct ll_inode_info   *lli = ll_i2info(inode);
1559         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1560         struct ccc_grouplock    grouplock;
1561         ENTRY;
1562
1563         spin_lock(&lli->lli_lock);
1564         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1565                 spin_unlock(&lli->lli_lock);
1566                 CWARN("no group lock held\n");
1567                 RETURN(-EINVAL);
1568         }
1569         LASSERT(fd->fd_grouplock.cg_lock != NULL);
1570
1571         if (fd->fd_grouplock.cg_gid != arg) {
1572                 CWARN("group lock %lu doesn't match current id %lu\n",
1573                        arg, fd->fd_grouplock.cg_gid);
1574                 spin_unlock(&lli->lli_lock);
1575                 RETURN(-EINVAL);
1576         }
1577
1578         grouplock = fd->fd_grouplock;
1579         memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1580         fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1581         spin_unlock(&lli->lli_lock);
1582
1583         cl_put_grouplock(&grouplock);
1584         CDEBUG(D_INFO, "group lock %lu released\n", arg);
1585         RETURN(0);
1586 }
1587
1588 /**
1589  * Close inode open handle
1590  *
1591  * \param dentry [in]     dentry which contains the inode
1592  * \param it     [in,out] intent which contains open info and result
1593  *
1594  * \retval 0     success
1595  * \retval <0    failure
1596  */
1597 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1598 {
1599         struct inode *inode = dentry->d_inode;
1600         struct obd_client_handle *och;
1601         int rc;
1602         ENTRY;
1603
1604         LASSERT(inode);
1605
1606         /* Root ? Do nothing. */
1607         if (dentry->d_inode->i_sb->s_root == dentry)
1608                 RETURN(0);
1609
1610         /* No open handle to close? Move away */
1611         if (!it_disposition(it, DISP_OPEN_OPEN))
1612                 RETURN(0);
1613
1614         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1615
1616         OBD_ALLOC(och, sizeof(*och));
1617         if (!och)
1618                 GOTO(out, rc = -ENOMEM);
1619
1620         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
1621                     ll_i2info(inode), it, och);
1622
1623         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1624                                        inode, och);
1625  out:
1626         /* this one is in place of ll_file_open */
1627         if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1628                 ptlrpc_req_finished(it->d.lustre.it_data);
1629                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1630         }
1631         RETURN(rc);
1632 }
1633
1634 /**
1635  * Get size for inode for which FIEMAP mapping is requested.
1636  * Make the FIEMAP get_info call and returns the result.
1637  */
1638 int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1639               int num_bytes)
1640 {
1641         struct obd_export *exp = ll_i2dtexp(inode);
1642         struct lov_stripe_md *lsm = NULL;
1643         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1644         int vallen = num_bytes;
1645         int rc;
1646         ENTRY;
1647
1648         /* Checks for fiemap flags */
1649         if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1650                 fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1651                 return -EBADR;
1652         }
1653
1654         /* Check for FIEMAP_FLAG_SYNC */
1655         if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1656                 rc = filemap_fdatawrite(inode->i_mapping);
1657                 if (rc)
1658                         return rc;
1659         }
1660
1661         lsm = ccc_inode_lsm_get(inode);
1662         if (lsm == NULL)
1663                 return -ENOENT;
1664
1665         /* If the stripe_count > 1 and the application does not understand
1666          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1667          */
1668         if (lsm->lsm_stripe_count > 1 &&
1669             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1670                 GOTO(out, rc = -EOPNOTSUPP);
1671
1672         fm_key.oa.o_id = lsm->lsm_object_id;
1673         fm_key.oa.o_seq = lsm->lsm_object_seq;
1674         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1675
1676         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1677         obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1678         /* If filesize is 0, then there would be no objects for mapping */
1679         if (fm_key.oa.o_size == 0) {
1680                 fiemap->fm_mapped_extents = 0;
1681                 GOTO(out, rc = 0);
1682         }
1683
1684         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1685
1686         rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1687                           fiemap, lsm);
1688         if (rc)
1689                 CERROR("obd_get_info failed: rc = %d\n", rc);
1690
1691 out:
1692         ccc_inode_lsm_put(inode, lsm);
1693         RETURN(rc);
1694 }
1695
1696 int ll_fid2path(struct inode *inode, void *arg)
1697 {
1698         struct obd_export       *exp = ll_i2mdexp(inode);
1699         struct getinfo_fid2path *gfout, *gfin;
1700         int                      outsize, rc;
1701         ENTRY;
1702
1703         if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
1704             !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
1705                 RETURN(-EPERM);
1706
1707         /* Need to get the buflen */
1708         OBD_ALLOC_PTR(gfin);
1709         if (gfin == NULL)
1710                 RETURN(-ENOMEM);
1711         if (copy_from_user(gfin, arg, sizeof(*gfin))) {
1712                 OBD_FREE_PTR(gfin);
1713                 RETURN(-EFAULT);
1714         }
1715
1716         outsize = sizeof(*gfout) + gfin->gf_pathlen;
1717         OBD_ALLOC(gfout, outsize);
1718         if (gfout == NULL) {
1719                 OBD_FREE_PTR(gfin);
1720                 RETURN(-ENOMEM);
1721         }
1722         memcpy(gfout, gfin, sizeof(*gfout));
1723         OBD_FREE_PTR(gfin);
1724
1725         /* Call mdc_iocontrol */
1726         rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1727         if (rc)
1728                 GOTO(gf_free, rc);
1729         if (copy_to_user(arg, gfout, outsize))
1730                 rc = -EFAULT;
1731
1732 gf_free:
1733         OBD_FREE(gfout, outsize);
1734         RETURN(rc);
1735 }
1736
1737 static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1738 {
1739         struct ll_user_fiemap *fiemap_s;
1740         size_t num_bytes, ret_bytes;
1741         unsigned int extent_count;
1742         int rc = 0;
1743
1744         /* Get the extent count so we can calculate the size of
1745          * required fiemap buffer */
1746         if (get_user(extent_count,
1747             &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1748                 RETURN(-EFAULT);
1749         num_bytes = sizeof(*fiemap_s) + (extent_count *
1750                                          sizeof(struct ll_fiemap_extent));
1751
1752         OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1753         if (fiemap_s == NULL)
1754                 RETURN(-ENOMEM);
1755
1756         /* get the fiemap value */
1757         if (copy_from_user(fiemap_s, (struct ll_user_fiemap __user *)arg,
1758                            sizeof(*fiemap_s)))
1759                 GOTO(error, rc = -EFAULT);
1760
1761         /* If fm_extent_count is non-zero, read the first extent since
1762          * it is used to calculate end_offset and device from previous
1763          * fiemap call. */
1764         if (extent_count) {
1765                 if (copy_from_user(&fiemap_s->fm_extents[0],
1766                     (char __user *)arg + sizeof(*fiemap_s),
1767                     sizeof(struct ll_fiemap_extent)))
1768                         GOTO(error, rc = -EFAULT);
1769         }
1770
1771         rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1772         if (rc)
1773                 GOTO(error, rc);
1774
1775         ret_bytes = sizeof(struct ll_user_fiemap);
1776
1777         if (extent_count != 0)
1778                 ret_bytes += (fiemap_s->fm_mapped_extents *
1779                                  sizeof(struct ll_fiemap_extent));
1780
1781         if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1782                 rc = -EFAULT;
1783
1784 error:
1785         OBD_FREE_LARGE(fiemap_s, num_bytes);
1786         RETURN(rc);
1787 }
1788
1789 /*
1790  * Read the data_version for inode.
1791  *
1792  * This value is computed using stripe object version on OST.
1793  * Version is computed using server side locking.
1794  *
1795  * @param extent_lock  Take extent lock. Not needed if a process is already
1796  *                     holding the OST object group locks.
1797  */
1798 int ll_data_version(struct inode *inode, __u64 *data_version,
1799                     int extent_lock)
1800 {
1801         struct lov_stripe_md    *lsm = NULL;
1802         struct ll_sb_info       *sbi = ll_i2sbi(inode);
1803         struct obdo             *obdo = NULL;
1804         int                      rc;
1805         ENTRY;
1806
1807         /* If no stripe, we consider version is 0. */
1808         lsm = ccc_inode_lsm_get(inode);
1809         if (lsm == NULL) {
1810                 *data_version = 0;
1811                 CDEBUG(D_INODE, "No object for inode\n");
1812                 RETURN(0);
1813         }
1814
1815         OBD_ALLOC_PTR(obdo);
1816         if (obdo == NULL) {
1817                 ccc_inode_lsm_put(inode, lsm);
1818                 RETURN(-ENOMEM);
1819         }
1820
1821         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
1822         if (!rc) {
1823                 if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1824                         rc = -EOPNOTSUPP;
1825                 else
1826                         *data_version = obdo->o_data_version;
1827         }
1828
1829         OBD_FREE_PTR(obdo);
1830         ccc_inode_lsm_put(inode, lsm);
1831
1832         RETURN(rc);
1833 }
1834
1835 long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
1836 {
1837         struct inode *inode = file->f_dentry->d_inode;
1838         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1839         int flags;
1840
1841         ENTRY;
1842
1843         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1844                inode->i_generation, inode, cmd);
1845         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
1846
1847         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
1848         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
1849                 RETURN(-ENOTTY);
1850
1851         switch(cmd) {
1852         case LL_IOC_GETFLAGS:
1853                 /* Get the current value of the file flags */
1854                 return put_user(fd->fd_flags, (int *)arg);
1855         case LL_IOC_SETFLAGS:
1856         case LL_IOC_CLRFLAGS:
1857                 /* Set or clear specific file flags */
1858                 /* XXX This probably needs checks to ensure the flags are
1859                  *     not abused, and to handle any flag side effects.
1860                  */
1861                 if (get_user(flags, (int *) arg))
1862                         RETURN(-EFAULT);
1863
1864                 if (cmd == LL_IOC_SETFLAGS) {
1865                         if ((flags & LL_FILE_IGNORE_LOCK) &&
1866                             !(file->f_flags & O_DIRECT)) {
1867                                 CERROR("%s: unable to disable locking on "
1868                                        "non-O_DIRECT file\n", current->comm);
1869                                 RETURN(-EINVAL);
1870                         }
1871
1872                         fd->fd_flags |= flags;
1873                 } else {
1874                         fd->fd_flags &= ~flags;
1875                 }
1876                 RETURN(0);
1877         case LL_IOC_LOV_SETSTRIPE:
1878                 RETURN(ll_lov_setstripe(inode, file, arg));
1879         case LL_IOC_LOV_SETEA:
1880                 RETURN(ll_lov_setea(inode, file, arg));
1881         case LL_IOC_LOV_GETSTRIPE:
1882                 RETURN(ll_lov_getstripe(inode, arg));
1883         case LL_IOC_RECREATE_OBJ:
1884                 RETURN(ll_lov_recreate_obj(inode, arg));
1885         case LL_IOC_RECREATE_FID:
1886                 RETURN(ll_lov_recreate_fid(inode, arg));
1887         case FSFILT_IOC_FIEMAP:
1888                 RETURN(ll_ioctl_fiemap(inode, arg));
1889         case FSFILT_IOC_GETFLAGS:
1890         case FSFILT_IOC_SETFLAGS:
1891                 RETURN(ll_iocontrol(inode, file, cmd, arg));
1892         case FSFILT_IOC_GETVERSION_OLD:
1893         case FSFILT_IOC_GETVERSION:
1894                 RETURN(put_user(inode->i_generation, (int *)arg));
1895         case LL_IOC_GROUP_LOCK:
1896                 RETURN(ll_get_grouplock(inode, file, arg));
1897         case LL_IOC_GROUP_UNLOCK:
1898                 RETURN(ll_put_grouplock(inode, file, arg));
1899         case IOC_OBD_STATFS:
1900                 RETURN(ll_obd_statfs(inode, (void *)arg));
1901
1902         /* We need to special case any other ioctls we want to handle,
1903          * to send them to the MDS/OST as appropriate and to properly
1904          * network encode the arg field.
1905         case FSFILT_IOC_SETVERSION_OLD:
1906         case FSFILT_IOC_SETVERSION:
1907         */
1908         case LL_IOC_FLUSHCTX:
1909                 RETURN(ll_flush_ctx(inode));
1910         case LL_IOC_PATH2FID: {
1911                 if (copy_to_user((void *)arg, ll_inode2fid(inode),
1912                                  sizeof(struct lu_fid)))
1913                         RETURN(-EFAULT);
1914
1915                 RETURN(0);
1916         }
1917         case OBD_IOC_FID2PATH:
1918                 RETURN(ll_fid2path(inode, (void *)arg));
1919         case LL_IOC_DATA_VERSION: {
1920                 struct ioc_data_version idv;
1921                 int                     rc;
1922
1923                 if (copy_from_user(&idv, (char *)arg, sizeof(idv)))
1924                         RETURN(-EFAULT);
1925
1926                 rc = ll_data_version(inode, &idv.idv_version,
1927                                 !(idv.idv_flags & LL_DV_NOFLUSH));
1928
1929                 if (rc == 0 && copy_to_user((char *) arg, &idv, sizeof(idv)))
1930                         RETURN(-EFAULT);
1931
1932                 RETURN(rc);
1933         }
1934
1935         case LL_IOC_GET_MDTIDX: {
1936                 int mdtidx;
1937
1938                 mdtidx = ll_get_mdt_idx(inode);
1939                 if (mdtidx < 0)
1940                         RETURN(mdtidx);
1941
1942                 if (put_user((int)mdtidx, (int*)arg))
1943                         RETURN(-EFAULT);
1944
1945                 RETURN(0);
1946         }
1947         case OBD_IOC_GETDTNAME:
1948         case OBD_IOC_GETMDNAME:
1949                 RETURN(ll_get_obd_name(inode, cmd, arg));
1950         case LL_IOC_HSM_STATE_GET: {
1951                 struct md_op_data       *op_data;
1952                 struct hsm_user_state   *hus;
1953                 int                      rc;
1954
1955                 OBD_ALLOC_PTR(hus);
1956                 if (hus == NULL)
1957                         RETURN(-ENOMEM);
1958
1959                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
1960                                              LUSTRE_OPC_ANY, hus);
1961                 if (op_data == NULL) {
1962                         OBD_FREE_PTR(hus);
1963                         RETURN(-ENOMEM);
1964                 }
1965
1966                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
1967                                    op_data, NULL);
1968
1969                 if (copy_to_user((void *)arg, hus, sizeof(*hus)))
1970                         rc = -EFAULT;
1971
1972                 ll_finish_md_op_data(op_data);
1973                 OBD_FREE_PTR(hus);
1974                 RETURN(rc);
1975         }
1976         case LL_IOC_HSM_STATE_SET: {
1977                 struct md_op_data       *op_data;
1978                 struct hsm_state_set    *hss;
1979                 int                      rc;
1980
1981                 OBD_ALLOC_PTR(hss);
1982                 if (hss == NULL)
1983                         RETURN(-ENOMEM);
1984                 if (copy_from_user(hss, (char *)arg, sizeof(*hss))) {
1985                         OBD_FREE_PTR(hss);
1986                         RETURN(-EFAULT);
1987                 }
1988
1989                 /* Non-root users are forbidden to set or clear flags which are
1990                  * NOT defined in HSM_USER_MASK. */
1991                 if (((hss->hss_setmask | hss->hss_clearmask) & ~HSM_USER_MASK)
1992                     && !cfs_capable(CFS_CAP_SYS_ADMIN)) {
1993                         OBD_FREE_PTR(hss);
1994                         RETURN(-EPERM);
1995                 }
1996
1997                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
1998                                              LUSTRE_OPC_ANY, hss);
1999                 if (op_data == NULL) {
2000                         OBD_FREE_PTR(hss);
2001                         RETURN(-ENOMEM);
2002                 }
2003
2004                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2005                                    op_data, NULL);
2006
2007                 ll_finish_md_op_data(op_data);
2008
2009                 OBD_FREE_PTR(hss);
2010                 RETURN(rc);
2011         }
2012         case LL_IOC_HSM_ACTION: {
2013                 struct md_op_data               *op_data;
2014                 struct hsm_current_action       *hca;
2015                 int                              rc;
2016
2017                 OBD_ALLOC_PTR(hca);
2018                 if (hca == NULL)
2019                         RETURN(-ENOMEM);
2020
2021                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2022                                              LUSTRE_OPC_ANY, hca);
2023                 if (op_data == NULL) {
2024                         OBD_FREE_PTR(hca);
2025                         RETURN(-ENOMEM);
2026                 }
2027
2028                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2029                                    op_data, NULL);
2030
2031                 if (cfs_copy_to_user((char *)arg, hca, sizeof(*hca)))
2032                         rc = -EFAULT;
2033
2034                 ll_finish_md_op_data(op_data);
2035                 OBD_FREE_PTR(hca);
2036                 RETURN(rc);
2037         }
2038         default: {
2039                 int err;
2040
2041                 if (LLIOC_STOP ==
2042                      ll_iocontrol_call(inode, file, cmd, arg, &err))
2043                         RETURN(err);
2044
2045                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2046                                      (void *)arg));
2047         }
2048         }
2049 }
2050
2051 #ifndef HAVE_FILE_LLSEEK_SIZE
2052 static inline loff_t
2053 llseek_execute(struct file *file, loff_t offset, loff_t maxsize)
2054 {
2055         if (offset < 0 && !(file->f_mode & FMODE_UNSIGNED_OFFSET))
2056                 return -EINVAL;
2057         if (offset > maxsize)
2058                 return -EINVAL;
2059
2060         if (offset != file->f_pos) {
2061                 file->f_pos = offset;
2062                 file->f_version = 0;
2063         }
2064         return offset;
2065 }
2066
2067 static loff_t
2068 generic_file_llseek_size(struct file *file, loff_t offset, int origin,
2069                 loff_t maxsize, loff_t eof)
2070 {
2071         struct inode *inode = file->f_dentry->d_inode;
2072
2073         switch (origin) {
2074         case SEEK_END:
2075                 offset += eof;
2076                 break;
2077         case SEEK_CUR:
2078                 /*
2079                  * Here we special-case the lseek(fd, 0, SEEK_CUR)
2080                  * position-querying operation.  Avoid rewriting the "same"
2081                  * f_pos value back to the file because a concurrent read(),
2082                  * write() or lseek() might have altered it
2083                  */
2084                 if (offset == 0)
2085                         return file->f_pos;
2086                 /*
2087                  * f_lock protects against read/modify/write race with other
2088                  * SEEK_CURs. Note that parallel writes and reads behave
2089                  * like SEEK_SET.
2090                  */
2091                 mutex_lock(&inode->i_mutex);
2092                 offset = llseek_execute(file, file->f_pos + offset, maxsize);
2093                 mutex_unlock(&inode->i_mutex);
2094                 return offset;
2095         case SEEK_DATA:
2096                 /*
2097                  * In the generic case the entire file is data, so as long as
2098                  * offset isn't at the end of the file then the offset is data.
2099                  */
2100                 if (offset >= eof)
2101                         return -ENXIO;
2102                 break;
2103         case SEEK_HOLE:
2104                 /*
2105                  * There is a virtual hole at the end of the file, so as long as
2106                  * offset isn't i_size or larger, return i_size.
2107                  */
2108                 if (offset >= eof)
2109                         return -ENXIO;
2110                 offset = eof;
2111                 break;
2112         }
2113
2114         return llseek_execute(file, offset, maxsize);
2115 }
2116 #endif
2117
2118 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2119 {
2120         struct inode *inode = file->f_dentry->d_inode;
2121         loff_t retval, eof = 0;
2122
2123         ENTRY;
2124         retval = offset + ((origin == SEEK_END) ? i_size_read(inode) :
2125                            (origin == SEEK_CUR) ? file->f_pos : 0);
2126         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%d)\n",
2127                inode->i_ino, inode->i_generation, inode, retval, retval,
2128                origin);
2129         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2130
2131         if (origin == SEEK_END || origin == SEEK_HOLE || origin == SEEK_DATA) {
2132                 retval = ll_glimpse_size(inode);
2133                 if (retval != 0)
2134                         RETURN(retval);
2135                 eof = i_size_read(inode);
2136         }
2137
2138         retval = generic_file_llseek_size(file, offset, origin,
2139                                           ll_file_maxbytes(inode), eof);
2140         RETURN(retval);
2141 }
2142
2143 int ll_flush(struct file *file, fl_owner_t id)
2144 {
2145         struct inode *inode = file->f_dentry->d_inode;
2146         struct ll_inode_info *lli = ll_i2info(inode);
2147         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2148         int rc, err;
2149
2150         LASSERT(!S_ISDIR(inode->i_mode));
2151
2152         /* catch async errors that were recorded back when async writeback
2153          * failed for pages in this mapping. */
2154         rc = lli->lli_async_rc;
2155         lli->lli_async_rc = 0;
2156         err = lov_read_and_clear_async_rc(lli->lli_clob);
2157         if (rc == 0)
2158                 rc = err;
2159
2160         /* The application has been told write failure already.
2161          * Do not report failure again. */
2162         if (fd->fd_write_failed)
2163                 return 0;
2164         return rc ? -EIO : 0;
2165 }
2166
2167 /**
2168  * Called to make sure a portion of file has been written out.
2169  * if @local_only is not true, it will send OST_SYNC RPCs to ost.
2170  *
2171  * Return how many pages have been written.
2172  */
2173 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2174                        enum cl_fsync_mode mode)
2175 {
2176         struct cl_env_nest nest;
2177         struct lu_env *env;
2178         struct cl_io *io;
2179         struct obd_capa *capa = NULL;
2180         struct cl_fsync_io *fio;
2181         int result;
2182         ENTRY;
2183
2184         if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2185             mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2186                 RETURN(-EINVAL);
2187
2188         env = cl_env_nested_get(&nest);
2189         if (IS_ERR(env))
2190                 RETURN(PTR_ERR(env));
2191
2192         capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2193
2194         io = ccc_env_thread_io(env);
2195         io->ci_obj = cl_i2info(inode)->lli_clob;
2196         io->ci_ignore_layout = 1;
2197
2198         /* initialize parameters for sync */
2199         fio = &io->u.ci_fsync;
2200         fio->fi_capa = capa;
2201         fio->fi_start = start;
2202         fio->fi_end = end;
2203         fio->fi_fid = ll_inode2fid(inode);
2204         fio->fi_mode = mode;
2205         fio->fi_nr_written = 0;
2206
2207         if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2208                 result = cl_io_loop(env, io);
2209         else
2210                 result = io->ci_result;
2211         if (result == 0)
2212                 result = fio->fi_nr_written;
2213         cl_io_fini(env, io);
2214         cl_env_nested_put(&nest, env);
2215
2216         capa_put(capa);
2217
2218         RETURN(result);
2219 }
2220
2221 #ifdef HAVE_FILE_FSYNC_4ARGS
2222 int ll_fsync(struct file *file, loff_t start, loff_t end, int data)
2223 #elif defined(HAVE_FILE_FSYNC_2ARGS)
2224 int ll_fsync(struct file *file, int data)
2225 #else
2226 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2227 #endif
2228 {
2229         struct inode *inode = file->f_dentry->d_inode;
2230         struct ll_inode_info *lli = ll_i2info(inode);
2231         struct ptlrpc_request *req;
2232         struct obd_capa *oc;
2233         int rc, err;
2234         ENTRY;
2235
2236         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2237                inode->i_generation, inode);
2238         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2239
2240 #ifdef HAVE_FILE_FSYNC_4ARGS
2241         rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
2242         mutex_lock(&inode->i_mutex);
2243 #else
2244         /* fsync's caller has already called _fdata{sync,write}, we want
2245          * that IO to finish before calling the osc and mdc sync methods */
2246         rc = filemap_fdatawait(inode->i_mapping);
2247 #endif
2248
2249         /* catch async errors that were recorded back when async writeback
2250          * failed for pages in this mapping. */
2251         if (!S_ISDIR(inode->i_mode)) {
2252                 err = lli->lli_async_rc;
2253                 lli->lli_async_rc = 0;
2254                 if (rc == 0)
2255                         rc = err;
2256                 err = lov_read_and_clear_async_rc(lli->lli_clob);
2257                 if (rc == 0)
2258                         rc = err;
2259         }
2260
2261         oc = ll_mdscapa_get(inode);
2262         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2263                       &req);
2264         capa_put(oc);
2265         if (!rc)
2266                 rc = err;
2267         if (!err)
2268                 ptlrpc_req_finished(req);
2269
2270         if (data) {
2271                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2272
2273                 err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
2274                                 CL_FSYNC_ALL);
2275                 if (rc == 0 && err < 0)
2276                         rc = err;
2277                 if (rc < 0)
2278                         fd->fd_write_failed = true;
2279                 else
2280                         fd->fd_write_failed = false;
2281         }
2282
2283 #ifdef HAVE_FILE_FSYNC_4ARGS
2284         mutex_unlock(&inode->i_mutex);
2285 #endif
2286         RETURN(rc);
2287 }
2288
2289 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2290 {
2291         struct inode *inode = file->f_dentry->d_inode;
2292         struct ll_sb_info *sbi = ll_i2sbi(inode);
2293         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
2294                                            .ei_cb_cp =ldlm_flock_completion_ast,
2295                                            .ei_cbdata = file_lock };
2296         struct md_op_data *op_data;
2297         struct lustre_handle lockh = {0};
2298         ldlm_policy_data_t flock = {{0}};
2299         int flags = 0;
2300         int rc;
2301         int rc2 = 0;
2302         ENTRY;
2303
2304         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2305                inode->i_ino, file_lock);
2306
2307         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2308
2309         if (file_lock->fl_flags & FL_FLOCK) {
2310                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2311                 /* flocks are whole-file locks */
2312                 flock.l_flock.end = OFFSET_MAX;
2313                 /* For flocks owner is determined by the local file desctiptor*/
2314                 flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2315         } else if (file_lock->fl_flags & FL_POSIX) {
2316                 flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2317                 flock.l_flock.start = file_lock->fl_start;
2318                 flock.l_flock.end = file_lock->fl_end;
2319         } else {
2320                 RETURN(-EINVAL);
2321         }
2322         flock.l_flock.pid = file_lock->fl_pid;
2323
2324         /* Somewhat ugly workaround for svc lockd.
2325          * lockd installs custom fl_lmops->lm_compare_owner that checks
2326          * for the fl_owner to be the same (which it always is on local node
2327          * I guess between lockd processes) and then compares pid.
2328          * As such we assign pid to the owner field to make it all work,
2329          * conflict with normal locks is unlikely since pid space and
2330          * pointer space for current->files are not intersecting */
2331         if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
2332                 flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2333
2334         switch (file_lock->fl_type) {
2335         case F_RDLCK:
2336                 einfo.ei_mode = LCK_PR;
2337                 break;
2338         case F_UNLCK:
2339                 /* An unlock request may or may not have any relation to
2340                  * existing locks so we may not be able to pass a lock handle
2341                  * via a normal ldlm_lock_cancel() request. The request may even
2342                  * unlock a byte range in the middle of an existing lock. In
2343                  * order to process an unlock request we need all of the same
2344                  * information that is given with a normal read or write record
2345                  * lock request. To avoid creating another ldlm unlock (cancel)
2346                  * message we'll treat a LCK_NL flock request as an unlock. */
2347                 einfo.ei_mode = LCK_NL;
2348                 break;
2349         case F_WRLCK:
2350                 einfo.ei_mode = LCK_PW;
2351                 break;
2352         default:
2353                 CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2354                         file_lock->fl_type);
2355                 RETURN (-ENOTSUPP);
2356         }
2357
2358         switch (cmd) {
2359         case F_SETLKW:
2360 #ifdef F_SETLKW64
2361         case F_SETLKW64:
2362 #endif
2363                 flags = 0;
2364                 break;
2365         case F_SETLK:
2366 #ifdef F_SETLK64
2367         case F_SETLK64:
2368 #endif
2369                 flags = LDLM_FL_BLOCK_NOWAIT;
2370                 break;
2371         case F_GETLK:
2372 #ifdef F_GETLK64
2373         case F_GETLK64:
2374 #endif
2375                 flags = LDLM_FL_TEST_LOCK;
2376                 /* Save the old mode so that if the mode in the lock changes we
2377                  * can decrement the appropriate reader or writer refcount. */
2378                 file_lock->fl_type = einfo.ei_mode;
2379                 break;
2380         default:
2381                 CERROR("unknown fcntl lock command: %d\n", cmd);
2382                 RETURN (-EINVAL);
2383         }
2384
2385         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2386                                      LUSTRE_OPC_ANY, NULL);
2387         if (IS_ERR(op_data))
2388                 RETURN(PTR_ERR(op_data));
2389
2390         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2391                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2392                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2393
2394         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2395                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2396
2397         if ((file_lock->fl_flags & FL_FLOCK) &&
2398             (rc == 0 || file_lock->fl_type == F_UNLCK))
2399                 rc2  = flock_lock_file_wait(file, file_lock);
2400         if ((file_lock->fl_flags & FL_POSIX) &&
2401             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2402             !(flags & LDLM_FL_TEST_LOCK))
2403                 rc2  = posix_lock_file_wait(file, file_lock);
2404
2405         if (rc2 && file_lock->fl_type != F_UNLCK) {
2406                 einfo.ei_mode = LCK_NL;
2407                 md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2408                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2409                 rc = rc2;
2410         }
2411
2412         ll_finish_md_op_data(op_data);
2413
2414         RETURN(rc);
2415 }
2416
2417 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2418 {
2419         ENTRY;
2420
2421         RETURN(-ENOSYS);
2422 }
2423
2424 /**
2425  * test if some locks matching bits and l_req_mode are acquired
2426  * - bits can be in different locks
2427  * - if found clear the common lock bits in *bits
2428  * - the bits not found, are kept in *bits
2429  * \param inode [IN]
2430  * \param bits [IN] searched lock bits [IN]
2431  * \param l_req_mode [IN] searched lock mode
2432  * \retval boolean, true iff all bits are found
2433  */
2434 int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2435 {
2436         struct lustre_handle lockh;
2437         ldlm_policy_data_t policy;
2438         ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
2439                                 (LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
2440         struct lu_fid *fid;
2441         __u64 flags;
2442         int i;
2443         ENTRY;
2444
2445         if (!inode)
2446                RETURN(0);
2447
2448         fid = &ll_i2info(inode)->lli_fid;
2449         CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
2450                ldlm_lockname[mode]);
2451
2452         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2453         for (i = 0; i < MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
2454                 policy.l_inodebits.bits = *bits & (1 << i);
2455                 if (policy.l_inodebits.bits == 0)
2456                         continue;
2457
2458                 if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
2459                                   &policy, mode, &lockh)) {
2460                         struct ldlm_lock *lock;
2461
2462                         lock = ldlm_handle2lock(&lockh);
2463                         if (lock) {
2464                                 *bits &=
2465                                       ~(lock->l_policy_data.l_inodebits.bits);
2466                                 LDLM_LOCK_PUT(lock);
2467                         } else {
2468                                 *bits &= ~policy.l_inodebits.bits;
2469                         }
2470                 }
2471         }
2472         RETURN(*bits == 0);
2473 }
2474
2475 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2476                             struct lustre_handle *lockh, __u64 flags)
2477 {
2478         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2479         struct lu_fid *fid;
2480         ldlm_mode_t rc;
2481         ENTRY;
2482
2483         fid = &ll_i2info(inode)->lli_fid;
2484         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2485
2486         rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
2487                            fid, LDLM_IBITS, &policy,
2488                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2489         RETURN(rc);
2490 }
2491
2492 static int ll_inode_revalidate_fini(struct inode *inode, int rc)
2493 {
2494         /* Already unlinked. Just update nlink and return success */
2495         if (rc == -ENOENT) {
2496                 clear_nlink(inode);
2497                 /* This path cannot be hit for regular files unless in
2498                  * case of obscure races, so no need to to validate
2499                  * size. */
2500                 if (!S_ISREG(inode->i_mode) && !S_ISDIR(inode->i_mode))
2501                         return 0;
2502         } else if (rc != 0) {
2503                 CERROR("%s: revalidate FID "DFID" error: rc = %d\n",
2504                        ll_get_fsname(inode->i_sb, NULL, 0),
2505                        PFID(ll_inode2fid(inode)), rc);
2506         }
2507
2508         return rc;
2509 }
2510
2511 int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2512                              __u64 ibits)
2513 {
2514         struct inode *inode = dentry->d_inode;
2515         struct ptlrpc_request *req = NULL;
2516         struct obd_export *exp;
2517         int rc = 0;
2518         ENTRY;
2519
2520         LASSERT(inode != NULL);
2521
2522         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2523                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2524
2525         exp = ll_i2mdexp(inode);
2526
2527         /* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
2528          *      But under CMD case, it caused some lock issues, should be fixed
2529          *      with new CMD ibits lock. See bug 12718 */
2530         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2531                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2532                 struct md_op_data *op_data;
2533
2534                 if (ibits == MDS_INODELOCK_LOOKUP)
2535                         oit.it_op = IT_LOOKUP;
2536
2537                 /* Call getattr by fid, so do not provide name at all. */
2538                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2539                                              dentry->d_inode, NULL, 0, 0,
2540                                              LUSTRE_OPC_ANY, NULL);
2541                 if (IS_ERR(op_data))
2542                         RETURN(PTR_ERR(op_data));
2543
2544                 oit.it_create_mode |= M_CHECK_STALE;
2545                 rc = md_intent_lock(exp, op_data, NULL, 0,
2546                                     /* we are not interested in name
2547                                        based lookup */
2548                                     &oit, 0, &req,
2549                                     ll_md_blocking_ast, 0);
2550                 ll_finish_md_op_data(op_data);
2551                 oit.it_create_mode &= ~M_CHECK_STALE;
2552                 if (rc < 0) {
2553                         rc = ll_inode_revalidate_fini(inode, rc);
2554                         GOTO (out, rc);
2555                 }
2556
2557                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2558                 if (rc != 0) {
2559                         ll_intent_release(&oit);
2560                         GOTO(out, rc);
2561                 }
2562
2563                 /* Unlinked? Unhash dentry, so it is not picked up later by
2564                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2565                    here to preserve get_cwd functionality on 2.6.
2566                    Bug 10503 */
2567                 if (!dentry->d_inode->i_nlink)
2568                         d_lustre_invalidate(dentry);
2569
2570                 ll_lookup_finish_locks(&oit, dentry);
2571         } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
2572                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2573                 obd_valid valid = OBD_MD_FLGETATTR;
2574                 struct md_op_data *op_data;
2575                 int ealen = 0;
2576
2577                 if (S_ISREG(inode->i_mode)) {
2578                         rc = ll_get_max_mdsize(sbi, &ealen);
2579                         if (rc)
2580                                 RETURN(rc);
2581                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2582                 }
2583
2584                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
2585                                              0, ealen, LUSTRE_OPC_ANY,
2586                                              NULL);
2587                 if (IS_ERR(op_data))
2588                         RETURN(PTR_ERR(op_data));
2589
2590                 op_data->op_valid = valid;
2591                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2592                  * capa for this inode. Because we only keep capas of dirs
2593                  * fresh. */
2594                 rc = md_getattr(sbi->ll_md_exp, op_data, &req);
2595                 ll_finish_md_op_data(op_data);
2596                 if (rc) {
2597                         rc = ll_inode_revalidate_fini(inode, rc);
2598                         RETURN(rc);
2599                 }
2600
2601                 rc = ll_prep_inode(&inode, req, NULL, NULL);
2602         }
2603 out:
2604         ptlrpc_req_finished(req);
2605         return rc;
2606 }
2607
2608 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2609                            __u64 ibits)
2610 {
2611         struct inode *inode = dentry->d_inode;
2612         int rc;
2613         ENTRY;
2614
2615         rc = __ll_inode_revalidate_it(dentry, it, ibits);
2616         if (rc != 0)
2617                 RETURN(rc);
2618
2619         /* if object isn't regular file, don't validate size */
2620         if (!S_ISREG(inode->i_mode)) {
2621                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
2622                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
2623                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
2624         } else {
2625                 rc = ll_glimpse_size(inode);
2626         }
2627         RETURN(rc);
2628 }
2629
2630 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2631                   struct lookup_intent *it, struct kstat *stat)
2632 {
2633         struct inode *inode = de->d_inode;
2634         struct ll_sb_info *sbi = ll_i2sbi(inode);
2635         struct ll_inode_info *lli = ll_i2info(inode);
2636         int res = 0;
2637
2638         res = ll_inode_revalidate_it(de, it, MDS_INODELOCK_UPDATE |
2639                                              MDS_INODELOCK_LOOKUP);
2640         ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
2641
2642         if (res)
2643                 return res;
2644
2645         stat->dev = inode->i_sb->s_dev;
2646         if (ll_need_32bit_api(sbi))
2647                 stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
2648         else
2649                 stat->ino = inode->i_ino;
2650         stat->mode = inode->i_mode;
2651         stat->nlink = inode->i_nlink;
2652         stat->uid = inode->i_uid;
2653         stat->gid = inode->i_gid;
2654         stat->rdev = inode->i_rdev;
2655         stat->atime = inode->i_atime;
2656         stat->mtime = inode->i_mtime;
2657         stat->ctime = inode->i_ctime;
2658         stat->blksize = 1 << inode->i_blkbits;
2659
2660         stat->size = i_size_read(inode);
2661         stat->blocks = inode->i_blocks;
2662
2663         return 0;
2664 }
2665 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2666 {
2667         struct lookup_intent it = { .it_op = IT_GETATTR };
2668
2669         return ll_getattr_it(mnt, de, &it, stat);
2670 }
2671
2672 #ifdef HAVE_LINUX_FIEMAP_H
2673 int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
2674                 __u64 start, __u64 len)
2675 {
2676         int rc;
2677         size_t num_bytes;
2678         struct ll_user_fiemap *fiemap;
2679         unsigned int extent_count = fieinfo->fi_extents_max;
2680
2681         num_bytes = sizeof(*fiemap) + (extent_count *
2682                                        sizeof(struct ll_fiemap_extent));
2683         OBD_ALLOC_LARGE(fiemap, num_bytes);
2684
2685         if (fiemap == NULL)
2686                 RETURN(-ENOMEM);
2687
2688         fiemap->fm_flags = fieinfo->fi_flags;
2689         fiemap->fm_extent_count = fieinfo->fi_extents_max;
2690         fiemap->fm_start = start;
2691         fiemap->fm_length = len;
2692         memcpy(&fiemap->fm_extents[0], fieinfo->fi_extents_start,
2693                sizeof(struct ll_fiemap_extent));
2694
2695         rc = ll_do_fiemap(inode, fiemap, num_bytes);
2696
2697         fieinfo->fi_flags = fiemap->fm_flags;
2698         fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
2699         memcpy(fieinfo->fi_extents_start, &fiemap->fm_extents[0],
2700                fiemap->fm_mapped_extents * sizeof(struct ll_fiemap_extent));
2701
2702         OBD_FREE_LARGE(fiemap, num_bytes);
2703         return rc;
2704 }
2705 #endif
2706
2707 struct posix_acl * ll_get_acl(struct inode *inode, int type)
2708 {
2709         struct ll_inode_info *lli = ll_i2info(inode);
2710         struct posix_acl *acl = NULL;
2711         ENTRY;
2712
2713         spin_lock(&lli->lli_lock);
2714         /* VFS' acl_permission_check->check_acl will release the refcount */
2715         acl = posix_acl_dup(lli->lli_posix_acl);
2716         spin_unlock(&lli->lli_lock);
2717
2718         RETURN(acl);
2719 }
2720
2721 #ifndef HAVE_GENERIC_PERMISSION_2ARGS
2722 static int
2723 # ifdef HAVE_GENERIC_PERMISSION_4ARGS
2724 ll_check_acl(struct inode *inode, int mask, unsigned int flags)
2725 # else
2726 ll_check_acl(struct inode *inode, int mask)
2727 # endif
2728 {
2729 # ifdef CONFIG_FS_POSIX_ACL
2730         struct posix_acl *acl;
2731         int rc;
2732         ENTRY;
2733
2734 #  ifdef HAVE_GENERIC_PERMISSION_4ARGS
2735         if (flags & IPERM_FLAG_RCU)
2736                 return -ECHILD;
2737 #  endif
2738         acl = ll_get_acl(inode, ACL_TYPE_ACCESS);
2739
2740         if (!acl)
2741                 RETURN(-EAGAIN);
2742
2743         rc = posix_acl_permission(inode, acl, mask);
2744         posix_acl_release(acl);
2745
2746         RETURN(rc);
2747 # else /* !CONFIG_FS_POSIX_ACL */
2748         return -EAGAIN;
2749 # endif /* CONFIG_FS_POSIX_ACL */
2750 }
2751 #endif /* HAVE_GENERIC_PERMISSION_2ARGS */
2752
2753 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
2754 int ll_inode_permission(struct inode *inode, int mask, unsigned int flags)
2755 #else
2756 # ifdef HAVE_INODE_PERMISION_2ARGS
2757 int ll_inode_permission(struct inode *inode, int mask)
2758 # else
2759 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2760 # endif
2761 #endif
2762 {
2763         int rc = 0;
2764         ENTRY;
2765
2766 #ifdef MAY_NOT_BLOCK
2767         if (mask & MAY_NOT_BLOCK)
2768                 return -ECHILD;
2769 #elif defined(HAVE_GENERIC_PERMISSION_4ARGS)
2770         if (flags & IPERM_FLAG_RCU)
2771                 return -ECHILD;
2772 #endif
2773
2774        /* as root inode are NOT getting validated in lookup operation,
2775         * need to do it before permission check. */
2776
2777         if (inode == inode->i_sb->s_root->d_inode) {
2778                 struct lookup_intent it = { .it_op = IT_LOOKUP };
2779
2780                 rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
2781                                               MDS_INODELOCK_LOOKUP);
2782                 if (rc)
2783                         RETURN(rc);
2784         }
2785
2786         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
2787                inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
2788
2789         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2790                 return lustre_check_remote_perm(inode, mask);
2791
2792         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2793         rc = ll_generic_permission(inode, mask, flags, ll_check_acl);
2794
2795         RETURN(rc);
2796 }
2797
2798 #ifdef HAVE_FILE_READV
2799 #define READ_METHOD readv
2800 #define READ_FUNCTION ll_file_readv
2801 #define WRITE_METHOD writev
2802 #define WRITE_FUNCTION ll_file_writev
2803 #else
2804 #define READ_METHOD aio_read
2805 #define READ_FUNCTION ll_file_aio_read
2806 #define WRITE_METHOD aio_write
2807 #define WRITE_FUNCTION ll_file_aio_write
2808 #endif
2809
2810 /* -o localflock - only provides locally consistent flock locks */
2811 struct file_operations ll_file_operations = {
2812         .read           = ll_file_read,
2813         .READ_METHOD    = READ_FUNCTION,
2814         .write          = ll_file_write,
2815         .WRITE_METHOD   = WRITE_FUNCTION,
2816         .unlocked_ioctl = ll_file_ioctl,
2817         .open           = ll_file_open,
2818         .release        = ll_file_release,
2819         .mmap           = ll_file_mmap,
2820         .llseek         = ll_file_seek,
2821 #ifdef HAVE_KERNEL_SENDFILE
2822         .sendfile       = ll_file_sendfile,
2823 #endif
2824 #ifdef HAVE_KERNEL_SPLICE_READ
2825         .splice_read    = ll_file_splice_read,
2826 #endif
2827         .fsync          = ll_fsync,
2828         .flush          = ll_flush
2829 };
2830
2831 struct file_operations ll_file_operations_flock = {
2832         .read           = ll_file_read,
2833         .READ_METHOD    = READ_FUNCTION,
2834         .write          = ll_file_write,
2835         .WRITE_METHOD   = WRITE_FUNCTION,
2836         .unlocked_ioctl = ll_file_ioctl,
2837         .open           = ll_file_open,
2838         .release        = ll_file_release,
2839         .mmap           = ll_file_mmap,
2840         .llseek         = ll_file_seek,
2841 #ifdef HAVE_KERNEL_SENDFILE
2842         .sendfile       = ll_file_sendfile,
2843 #endif
2844 #ifdef HAVE_KERNEL_SPLICE_READ
2845         .splice_read    = ll_file_splice_read,
2846 #endif
2847         .fsync          = ll_fsync,
2848         .flush          = ll_flush,
2849         .flock          = ll_file_flock,
2850         .lock           = ll_file_flock
2851 };
2852
2853 /* These are for -o noflock - to return ENOSYS on flock calls */
2854 struct file_operations ll_file_operations_noflock = {
2855         .read           = ll_file_read,
2856         .READ_METHOD    = READ_FUNCTION,
2857         .write          = ll_file_write,
2858         .WRITE_METHOD   = WRITE_FUNCTION,
2859         .unlocked_ioctl = ll_file_ioctl,
2860         .open           = ll_file_open,
2861         .release        = ll_file_release,
2862         .mmap           = ll_file_mmap,
2863         .llseek         = ll_file_seek,
2864 #ifdef HAVE_KERNEL_SENDFILE
2865         .sendfile       = ll_file_sendfile,
2866 #endif
2867 #ifdef HAVE_KERNEL_SPLICE_READ
2868         .splice_read    = ll_file_splice_read,
2869 #endif
2870         .fsync          = ll_fsync,
2871         .flush          = ll_flush,
2872         .flock          = ll_file_noflock,
2873         .lock           = ll_file_noflock
2874 };
2875
2876 struct inode_operations ll_file_inode_operations = {
2877         .setattr        = ll_setattr,
2878         .getattr        = ll_getattr,
2879         .permission     = ll_inode_permission,
2880         .setxattr       = ll_setxattr,
2881         .getxattr       = ll_getxattr,
2882         .listxattr      = ll_listxattr,
2883         .removexattr    = ll_removexattr,
2884 #ifdef  HAVE_LINUX_FIEMAP_H
2885         .fiemap         = ll_fiemap,
2886 #endif
2887 #ifdef HAVE_IOP_GET_ACL
2888         .get_acl        = ll_get_acl,
2889 #endif
2890 };
2891
2892 /* dynamic ioctl number support routins */
2893 static struct llioc_ctl_data {
2894         struct rw_semaphore     ioc_sem;
2895         cfs_list_t              ioc_head;
2896 } llioc = {
2897         __RWSEM_INITIALIZER(llioc.ioc_sem),
2898         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2899 };
2900
2901
2902 struct llioc_data {
2903         cfs_list_t              iocd_list;
2904         unsigned int            iocd_size;
2905         llioc_callback_t        iocd_cb;
2906         unsigned int            iocd_count;
2907         unsigned int            iocd_cmd[0];
2908 };
2909
2910 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2911 {
2912         unsigned int size;
2913         struct llioc_data *in_data = NULL;
2914         ENTRY;
2915
2916         if (cb == NULL || cmd == NULL ||
2917             count > LLIOC_MAX_CMD || count < 0)
2918                 RETURN(NULL);
2919
2920         size = sizeof(*in_data) + count * sizeof(unsigned int);
2921         OBD_ALLOC(in_data, size);
2922         if (in_data == NULL)
2923                 RETURN(NULL);
2924
2925         memset(in_data, 0, sizeof(*in_data));
2926         in_data->iocd_size = size;
2927         in_data->iocd_cb = cb;
2928         in_data->iocd_count = count;
2929         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2930
2931         down_write(&llioc.ioc_sem);
2932         cfs_list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2933         up_write(&llioc.ioc_sem);
2934
2935         RETURN(in_data);
2936 }
2937
2938 void ll_iocontrol_unregister(void *magic)
2939 {
2940         struct llioc_data *tmp;
2941
2942         if (magic == NULL)
2943                 return;
2944
2945         down_write(&llioc.ioc_sem);
2946         cfs_list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2947                 if (tmp == magic) {
2948                         unsigned int size = tmp->iocd_size;
2949
2950                         cfs_list_del(&tmp->iocd_list);
2951                         up_write(&llioc.ioc_sem);
2952
2953                         OBD_FREE(tmp, size);
2954                         return;
2955                 }
2956         }
2957         up_write(&llioc.ioc_sem);
2958
2959         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2960 }
2961
2962 EXPORT_SYMBOL(ll_iocontrol_register);
2963 EXPORT_SYMBOL(ll_iocontrol_unregister);
2964
2965 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
2966                         unsigned int cmd, unsigned long arg, int *rcp)
2967 {
2968         enum llioc_iter ret = LLIOC_CONT;
2969         struct llioc_data *data;
2970         int rc = -EINVAL, i;
2971
2972         down_read(&llioc.ioc_sem);
2973         cfs_list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2974                 for (i = 0; i < data->iocd_count; i++) {
2975                         if (cmd != data->iocd_cmd[i])
2976                                 continue;
2977
2978                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2979                         break;
2980                 }
2981
2982                 if (ret == LLIOC_STOP)
2983                         break;
2984         }
2985         up_read(&llioc.ioc_sem);
2986
2987         if (rcp)
2988                 *rcp = rc;
2989         return ret;
2990 }
2991
2992 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
2993 {
2994         struct ll_inode_info *lli = ll_i2info(inode);
2995         struct cl_env_nest nest;
2996         struct lu_env *env;
2997         int result;
2998         ENTRY;
2999
3000         if (lli->lli_clob == NULL)
3001                 RETURN(0);
3002
3003         env = cl_env_nested_get(&nest);
3004         if (IS_ERR(env))
3005                 RETURN(PTR_ERR(env));
3006
3007         result = cl_conf_set(env, lli->lli_clob, conf);
3008         cl_env_nested_put(&nest, env);
3009
3010         if (conf->coc_opc == OBJECT_CONF_SET) {
3011                 struct ldlm_lock *lock = conf->coc_lock;
3012
3013                 LASSERT(lock != NULL);
3014                 LASSERT(ldlm_has_layout(lock));
3015                 if (result == 0) {
3016                         /* it can only be allowed to match after layout is
3017                          * applied to inode otherwise false layout would be
3018                          * seen. Applying layout shoud happen before dropping
3019                          * the intent lock. */
3020                         ldlm_lock_allow_match(lock);
3021                 }
3022         }
3023         RETURN(result);
3024 }
3025
3026 /**
3027  * Apply the layout to the inode. Layout lock is held and will be released
3028  * in this function.
3029  */
3030 static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
3031                                 struct inode *inode, __u32 *gen, bool reconf)
3032 {
3033         struct ll_inode_info *lli = ll_i2info(inode);
3034         struct ll_sb_info    *sbi = ll_i2sbi(inode);
3035         struct ldlm_lock *lock;
3036         struct lustre_md md = { NULL };
3037         struct cl_object_conf conf;
3038         int rc = 0;
3039         bool lvb_ready;
3040         ENTRY;
3041
3042         LASSERT(lustre_handle_is_used(lockh));
3043
3044         lock = ldlm_handle2lock(lockh);
3045         LASSERT(lock != NULL);
3046         LASSERT(ldlm_has_layout(lock));
3047
3048         LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n",
3049                 inode, PFID(&lli->lli_fid), reconf);
3050
3051         lock_res_and_lock(lock);
3052         lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY);
3053         unlock_res_and_lock(lock);
3054         /* checking lvb_ready is racy but this is okay. The worst case is
3055          * that multi processes may configure the file on the same time. */
3056         if (lvb_ready || !reconf) {
3057                 LDLM_LOCK_PUT(lock);
3058
3059                 rc = -ENODATA;
3060                 if (lvb_ready) {
3061                         /* layout_gen must be valid if layout lock is not
3062                          * cancelled and stripe has already set */
3063                         *gen = lli->lli_layout_gen;
3064                         rc = 0;
3065                 }
3066                 ldlm_lock_decref(lockh, mode);
3067                 RETURN(rc);
3068         }
3069
3070         /* for layout lock, lmm is returned in lock's lvb.
3071          * lvb_data is immutable if the lock is held so it's safe to access it
3072          * without res lock. See the description in ldlm_lock_decref_internal()
3073          * for the condition to free lvb_data of layout lock */
3074         if (lock->l_lvb_data != NULL) {
3075                 rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
3076                                   lock->l_lvb_data, lock->l_lvb_len);
3077                 if (rc >= 0) {
3078                         if (md.lsm != NULL)
3079                                 *gen = md.lsm->lsm_layout_gen;
3080                         rc = 0;
3081                 } else {
3082                         CERROR("%s: file "DFID" unpackmd error: %d\n",
3083                                 ll_get_fsname(inode->i_sb, NULL, 0),
3084                                 PFID(&lli->lli_fid), rc);
3085                 }
3086         }
3087         if (rc < 0) {
3088                 LDLM_LOCK_PUT(lock);
3089                 ldlm_lock_decref(lockh, mode);
3090                 RETURN(rc);
3091         }
3092
3093         /* set layout to file. Unlikely this will fail as old layout was
3094          * surely eliminated */
3095         memset(&conf, 0, sizeof conf);
3096         conf.coc_opc = OBJECT_CONF_SET;
3097         conf.coc_inode = inode;
3098         conf.coc_lock = lock;
3099         conf.u.coc_md = &md;
3100         rc = ll_layout_conf(inode, &conf);
3101         LDLM_LOCK_PUT(lock);
3102
3103         ldlm_lock_decref(lockh, mode);
3104
3105         if (md.lsm != NULL)
3106                 obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
3107
3108         /* wait for IO to complete if it's still being used. */
3109         if (rc == -EBUSY) {
3110                 CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n",
3111                         ll_get_fsname(inode->i_sb, NULL, 0),
3112                         inode, PFID(&lli->lli_fid));
3113
3114                 memset(&conf, 0, sizeof conf);
3115                 conf.coc_opc = OBJECT_CONF_WAIT;
3116                 conf.coc_inode = inode;
3117                 rc = ll_layout_conf(inode, &conf);
3118                 if (rc == 0)
3119                         rc = -EAGAIN;
3120
3121                 CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n",
3122                         PFID(&lli->lli_fid), rc);
3123         }
3124
3125         RETURN(rc);
3126 }
3127
3128 /**
3129  * This function checks if there exists a LAYOUT lock on the client side,
3130  * or enqueues it if it doesn't have one in cache.
3131  *
3132  * This function will not hold layout lock so it may be revoked any time after
3133  * this function returns. Any operations depend on layout should be redone
3134  * in that case.
3135  *
3136  * This function should be called before lov_io_init() to get an uptodate
3137  * layout version, the caller should save the version number and after IO
3138  * is finished, this function should be called again to verify that layout
3139  * is not changed during IO time.
3140  */
3141 int ll_layout_refresh(struct inode *inode, __u32 *gen)
3142 {
3143         struct ll_inode_info  *lli = ll_i2info(inode);
3144         struct ll_sb_info     *sbi = ll_i2sbi(inode);
3145         struct md_op_data     *op_data;
3146         struct lookup_intent   it;
3147         struct lustre_handle   lockh;
3148         ldlm_mode_t            mode;
3149         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
3150                                            .ei_mode = LCK_CR,
3151                                            .ei_cb_bl = ll_md_blocking_ast,
3152                                            .ei_cb_cp = ldlm_completion_ast,
3153                                            .ei_cbdata = inode };
3154         int rc;
3155         ENTRY;
3156
3157         *gen = LL_LAYOUT_GEN_ZERO;
3158         if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
3159                 RETURN(0);
3160
3161         /* sanity checks */
3162         LASSERT(fid_is_sane(ll_inode2fid(inode)));
3163         LASSERT(S_ISREG(inode->i_mode));
3164
3165         /* mostly layout lock is caching on the local side, so try to match
3166          * it before grabbing layout lock mutex. */
3167         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
3168         if (mode != 0) { /* hit cached lock */
3169                 rc = ll_layout_lock_set(&lockh, mode, inode, gen, false);
3170                 if (rc == 0)
3171                         RETURN(0);
3172
3173                 /* better hold lli_layout_mutex to try again otherwise
3174                  * it will have starvation problem. */
3175         }
3176
3177         /* take layout lock mutex to enqueue layout lock exclusively. */
3178         mutex_lock(&lli->lli_layout_mutex);
3179
3180 again:
3181         /* try again. Maybe somebody else has done this. */
3182         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
3183         if (mode != 0) { /* hit cached lock */
3184                 rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3185                 if (rc == -EAGAIN)
3186                         goto again;
3187
3188                 mutex_unlock(&lli->lli_layout_mutex);
3189                 RETURN(rc);
3190         }
3191
3192         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
3193                         0, 0, LUSTRE_OPC_ANY, NULL);
3194         if (IS_ERR(op_data)) {
3195                 mutex_unlock(&lli->lli_layout_mutex);
3196                 RETURN(PTR_ERR(op_data));
3197         }
3198
3199         /* have to enqueue one */
3200         memset(&it, 0, sizeof(it));
3201         it.it_op = IT_LAYOUT;
3202         lockh.cookie = 0ULL;
3203
3204         LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file %p/"DFID".\n",
3205                         ll_get_fsname(inode->i_sb, NULL, 0), inode,
3206                         PFID(&lli->lli_fid));
3207
3208         rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
3209                         NULL, 0, NULL, 0);
3210         if (it.d.lustre.it_data != NULL)
3211                 ptlrpc_req_finished(it.d.lustre.it_data);
3212         it.d.lustre.it_data = NULL;
3213
3214         ll_finish_md_op_data(op_data);
3215
3216         mode = it.d.lustre.it_lock_mode;
3217         it.d.lustre.it_lock_mode = 0;
3218         ll_intent_drop_lock(&it);
3219
3220         if (rc == 0) {
3221                 /* set lock data in case this is a new lock */
3222                 ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
3223                 rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3224                 if (rc == -EAGAIN)
3225                         goto again;
3226         }
3227         mutex_unlock(&lli->lli_layout_mutex);
3228
3229         RETURN(rc);
3230 }