Whamcloud - gitweb
LU-359 llite: no close error if application has known failure
[fs/lustre-release.git] / lustre / llite / file.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <linux/pagemap.h>
47 #include <linux/file.h>
48 #include "llite_internal.h"
49 #include <lustre/ll_fiemap.h>
50
51 #include "cl_object.h"
52
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, CFS_ALLOC_IO);
58         fd->fd_write_failed = false;
59         return fd;
60 }
61
62 static void ll_file_data_put(struct ll_file_data *fd)
63 {
64         if (fd != NULL)
65                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
66 }
67
68 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
69                           struct lustre_handle *fh)
70 {
71         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
72         op_data->op_attr.ia_mode = inode->i_mode;
73         op_data->op_attr.ia_atime = inode->i_atime;
74         op_data->op_attr.ia_mtime = inode->i_mtime;
75         op_data->op_attr.ia_ctime = inode->i_ctime;
76         op_data->op_attr.ia_size = i_size_read(inode);
77         op_data->op_attr_blocks = inode->i_blocks;
78         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
79                                         ll_inode_to_ext_flags(inode->i_flags);
80         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
81         if (fh)
82                 op_data->op_handle = *fh;
83         op_data->op_capa1 = ll_mdscapa_get(inode);
84 }
85
86 /**
87  * Closes the IO epoch and packs all the attributes into @op_data for
88  * the CLOSE rpc.
89  */
90 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
91                              struct obd_client_handle *och)
92 {
93         ENTRY;
94
95         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
96                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
97
98         if (!(och->och_flags & FMODE_WRITE))
99                 goto out;
100
101         if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
102                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
103         else
104                 ll_ioepoch_close(inode, op_data, &och, 0);
105
106 out:
107         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
108         ll_prep_md_op_data(op_data, inode, NULL, NULL,
109                            0, 0, LUSTRE_OPC_ANY, NULL);
110         EXIT;
111 }
112
113 static int ll_close_inode_openhandle(struct obd_export *md_exp,
114                                      struct inode *inode,
115                                      struct obd_client_handle *och)
116 {
117         struct obd_export *exp = ll_i2mdexp(inode);
118         struct md_op_data *op_data;
119         struct ptlrpc_request *req = NULL;
120         struct obd_device *obd = class_exp2obd(exp);
121         int epoch_close = 1;
122         int rc;
123         ENTRY;
124
125         if (obd == NULL) {
126                 /*
127                  * XXX: in case of LMV, is this correct to access
128                  * ->exp_handle?
129                  */
130                 CERROR("Invalid MDC connection handle "LPX64"\n",
131                        ll_i2mdexp(inode)->exp_handle.h_cookie);
132                 GOTO(out, rc = 0);
133         }
134
135         OBD_ALLOC_PTR(op_data);
136         if (op_data == NULL)
137                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
138
139         ll_prepare_close(inode, op_data, och);
140         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
141         rc = md_close(md_exp, op_data, och->och_mod, &req);
142         if (rc == -EAGAIN) {
143                 /* This close must have the epoch closed. */
144                 LASSERT(epoch_close);
145                 /* MDS has instructed us to obtain Size-on-MDS attribute from
146                  * OSTs and send setattr to back to MDS. */
147                 rc = ll_som_update(inode, op_data);
148                 if (rc) {
149                         CERROR("inode %lu mdc Size-on-MDS update failed: "
150                                "rc = %d\n", inode->i_ino, rc);
151                         rc = 0;
152                 }
153         } else if (rc) {
154                 CERROR("inode %lu mdc close failed: rc = %d\n",
155                        inode->i_ino, rc);
156         }
157         ll_finish_md_op_data(op_data);
158
159         if (rc == 0) {
160                 rc = ll_objects_destroy(req, inode);
161                 if (rc)
162                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
163                                inode->i_ino, rc);
164         }
165
166         EXIT;
167 out:
168
169         if (exp_connect_som(exp) && !epoch_close &&
170             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
171                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
172         } else {
173                 md_clear_open_replay_data(md_exp, och);
174                 /* Free @och if it is not waiting for DONE_WRITING. */
175                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
176                 OBD_FREE_PTR(och);
177         }
178         if (req) /* This is close request */
179                 ptlrpc_req_finished(req);
180         return rc;
181 }
182
183 int ll_md_real_close(struct inode *inode, int flags)
184 {
185         struct ll_inode_info *lli = ll_i2info(inode);
186         struct obd_client_handle **och_p;
187         struct obd_client_handle *och;
188         __u64 *och_usecount;
189         int rc = 0;
190         ENTRY;
191
192         if (flags & FMODE_WRITE) {
193                 och_p = &lli->lli_mds_write_och;
194                 och_usecount = &lli->lli_open_fd_write_count;
195         } else if (flags & FMODE_EXEC) {
196                 och_p = &lli->lli_mds_exec_och;
197                 och_usecount = &lli->lli_open_fd_exec_count;
198         } else {
199                 LASSERT(flags & FMODE_READ);
200                 och_p = &lli->lli_mds_read_och;
201                 och_usecount = &lli->lli_open_fd_read_count;
202         }
203
204         cfs_mutex_lock(&lli->lli_och_mutex);
205         if (*och_usecount) { /* There are still users of this handle, so
206                                 skip freeing it. */
207                 cfs_mutex_unlock(&lli->lli_och_mutex);
208                 RETURN(0);
209         }
210         och=*och_p;
211         *och_p = NULL;
212         cfs_mutex_unlock(&lli->lli_och_mutex);
213
214         if (och) { /* There might be a race and somebody have freed this och
215                       already */
216                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
217                                                inode, och);
218         }
219
220         RETURN(rc);
221 }
222
223 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
224                 struct file *file)
225 {
226         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
227         struct ll_inode_info *lli = ll_i2info(inode);
228         int rc = 0;
229         ENTRY;
230
231         /* clear group lock, if present */
232         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
233                 ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
234
235         /* Let's see if we have good enough OPEN lock on the file and if
236            we can skip talking to MDS */
237         if (file->f_dentry->d_inode) { /* Can this ever be false? */
238                 int lockmode;
239                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
240                 struct lustre_handle lockh;
241                 struct inode *inode = file->f_dentry->d_inode;
242                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
243
244                 cfs_mutex_lock(&lli->lli_och_mutex);
245                 if (fd->fd_omode & FMODE_WRITE) {
246                         lockmode = LCK_CW;
247                         LASSERT(lli->lli_open_fd_write_count);
248                         lli->lli_open_fd_write_count--;
249                 } else if (fd->fd_omode & FMODE_EXEC) {
250                         lockmode = LCK_PR;
251                         LASSERT(lli->lli_open_fd_exec_count);
252                         lli->lli_open_fd_exec_count--;
253                 } else {
254                         lockmode = LCK_CR;
255                         LASSERT(lli->lli_open_fd_read_count);
256                         lli->lli_open_fd_read_count--;
257                 }
258                 cfs_mutex_unlock(&lli->lli_och_mutex);
259
260                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
261                                    LDLM_IBITS, &policy, lockmode,
262                                    &lockh)) {
263                         rc = ll_md_real_close(file->f_dentry->d_inode,
264                                               fd->fd_omode);
265                 }
266         } else {
267                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
268                        file, file->f_dentry, file->f_dentry->d_name.name);
269         }
270
271         LUSTRE_FPRIVATE(file) = NULL;
272         ll_file_data_put(fd);
273         ll_capa_close(inode);
274
275         RETURN(rc);
276 }
277
278 /* While this returns an error code, fput() the caller does not, so we need
279  * to make every effort to clean up all of our state here.  Also, applications
280  * rarely check close errors and even if an error is returned they will not
281  * re-try the close call.
282  */
283 int ll_file_release(struct inode *inode, struct file *file)
284 {
285         struct ll_file_data *fd;
286         struct ll_sb_info *sbi = ll_i2sbi(inode);
287         struct ll_inode_info *lli = ll_i2info(inode);
288         int rc;
289         ENTRY;
290
291         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
292                inode->i_generation, inode);
293
294 #ifdef CONFIG_FS_POSIX_ACL
295         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
296             inode == inode->i_sb->s_root->d_inode) {
297                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
298
299                 LASSERT(fd != NULL);
300                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
301                         fd->fd_flags &= ~LL_FILE_RMTACL;
302                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
303                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
304                 }
305         }
306 #endif
307
308         if (inode->i_sb->s_root != file->f_dentry)
309                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
310         fd = LUSTRE_FPRIVATE(file);
311         LASSERT(fd != NULL);
312
313         /* The last ref on @file, maybe not the the owner pid of statahead.
314          * Different processes can open the same dir, "ll_opendir_key" means:
315          * it is me that should stop the statahead thread. */
316         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
317             lli->lli_opendir_pid != 0)
318                 ll_stop_statahead(inode, lli->lli_opendir_key);
319
320         if (inode->i_sb->s_root == file->f_dentry) {
321                 LUSTRE_FPRIVATE(file) = NULL;
322                 ll_file_data_put(fd);
323                 RETURN(0);
324         }
325
326         if (!S_ISDIR(inode->i_mode)) {
327                 lov_read_and_clear_async_rc(lli->lli_clob);
328                 lli->lli_async_rc = 0;
329         }
330
331         rc = ll_md_close(sbi->ll_md_exp, inode, file);
332
333         if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
334                 libcfs_debug_dumplog();
335
336         RETURN(rc);
337 }
338
339 static int ll_intent_file_open(struct file *file, void *lmm,
340                                int lmmsize, struct lookup_intent *itp)
341 {
342         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
343         struct dentry *parent = file->f_dentry->d_parent;
344         const char *name = file->f_dentry->d_name.name;
345         const int len = file->f_dentry->d_name.len;
346         struct md_op_data *op_data;
347         struct ptlrpc_request *req;
348         __u32 opc = LUSTRE_OPC_ANY;
349         int rc;
350         ENTRY;
351
352         if (!parent)
353                 RETURN(-ENOENT);
354
355         /* Usually we come here only for NFSD, and we want open lock.
356            But we can also get here with pre 2.6.15 patchless kernels, and in
357            that case that lock is also ok */
358         /* We can also get here if there was cached open handle in revalidate_it
359          * but it disappeared while we were getting from there to ll_file_open.
360          * But this means this file was closed and immediatelly opened which
361          * makes a good candidate for using OPEN lock */
362         /* If lmmsize & lmm are not 0, we are just setting stripe info
363          * parameters. No need for the open lock */
364         if (lmm == NULL && lmmsize == 0) {
365                 itp->it_flags |= MDS_OPEN_LOCK;
366                 if (itp->it_flags & FMODE_WRITE)
367                         opc = LUSTRE_OPC_CREATE;
368         }
369
370         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
371                                       file->f_dentry->d_inode, name, len,
372                                       O_RDWR, opc, NULL);
373         if (IS_ERR(op_data))
374                 RETURN(PTR_ERR(op_data));
375
376         itp->it_flags |= MDS_OPEN_BY_FID;
377         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
378                             0 /*unused */, &req, ll_md_blocking_ast, 0);
379         ll_finish_md_op_data(op_data);
380         if (rc == -ESTALE) {
381                 /* reason for keep own exit path - don`t flood log
382                 * with messages with -ESTALE errors.
383                 */
384                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
385                      it_open_error(DISP_OPEN_OPEN, itp))
386                         GOTO(out, rc);
387                 ll_release_openhandle(file->f_dentry, itp);
388                 GOTO(out, rc);
389         }
390
391         if (it_disposition(itp, DISP_LOOKUP_NEG))
392                 GOTO(out, rc = -ENOENT);
393
394         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
395                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
396                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
397                 GOTO(out, rc);
398         }
399
400         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
401         if (!rc && itp->d.lustre.it_lock_mode)
402                 ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
403                                  itp, NULL);
404
405 out:
406         ptlrpc_req_finished(itp->d.lustre.it_data);
407         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
408         ll_intent_drop_lock(itp);
409
410         RETURN(rc);
411 }
412
413 /**
414  * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
415  * not believe attributes if a few ioepoch holders exist. Attributes for
416  * previous ioepoch if new one is opened are also skipped by MDS.
417  */
418 void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
419 {
420         if (ioepoch && lli->lli_ioepoch != ioepoch) {
421                 lli->lli_ioepoch = ioepoch;
422                 CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
423                        ioepoch, PFID(&lli->lli_fid));
424         }
425 }
426
427 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
428                        struct lookup_intent *it, struct obd_client_handle *och)
429 {
430         struct ptlrpc_request *req = it->d.lustre.it_data;
431         struct mdt_body *body;
432
433         LASSERT(och);
434
435         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
436         LASSERT(body != NULL);                      /* reply already checked out */
437
438         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
439         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
440         och->och_fid = lli->lli_fid;
441         och->och_flags = it->it_flags;
442         ll_ioepoch_open(lli, body->ioepoch);
443
444         return md_set_open_replay_data(md_exp, och, req);
445 }
446
447 int ll_local_open(struct file *file, struct lookup_intent *it,
448                   struct ll_file_data *fd, struct obd_client_handle *och)
449 {
450         struct inode *inode = file->f_dentry->d_inode;
451         struct ll_inode_info *lli = ll_i2info(inode);
452         ENTRY;
453
454         LASSERT(!LUSTRE_FPRIVATE(file));
455
456         LASSERT(fd != NULL);
457
458         if (och) {
459                 struct ptlrpc_request *req = it->d.lustre.it_data;
460                 struct mdt_body *body;
461                 int rc;
462
463                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
464                 if (rc)
465                         RETURN(rc);
466
467                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
468                 if ((it->it_flags & FMODE_WRITE) &&
469                     (body->valid & OBD_MD_FLSIZE))
470                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
471                                lli->lli_ioepoch, PFID(&lli->lli_fid));
472         }
473
474         LUSTRE_FPRIVATE(file) = fd;
475         ll_readahead_init(inode, &fd->fd_ras);
476         fd->fd_omode = it->it_flags;
477         RETURN(0);
478 }
479
480 /* Open a file, and (for the very first open) create objects on the OSTs at
481  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
482  * creation or open until ll_lov_setstripe() ioctl is called.
483  *
484  * If we already have the stripe MD locally then we don't request it in
485  * md_open(), by passing a lmm_size = 0.
486  *
487  * It is up to the application to ensure no other processes open this file
488  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
489  * used.  We might be able to avoid races of that sort by getting lli_open_sem
490  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
491  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
492  */
493 int ll_file_open(struct inode *inode, struct file *file)
494 {
495         struct ll_inode_info *lli = ll_i2info(inode);
496         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
497                                           .it_flags = file->f_flags };
498         struct obd_client_handle **och_p = NULL;
499         __u64 *och_usecount = NULL;
500         struct ll_file_data *fd;
501         int rc = 0, opendir_set = 0;
502         ENTRY;
503
504         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
505                inode->i_generation, inode, file->f_flags);
506
507         it = file->private_data; /* XXX: compat macro */
508         file->private_data = NULL; /* prevent ll_local_open assertion */
509
510         fd = ll_file_data_get();
511         if (fd == NULL)
512                 GOTO(out_och_free, rc = -ENOMEM);
513
514         fd->fd_file = file;
515         if (S_ISDIR(inode->i_mode)) {
516                 cfs_spin_lock(&lli->lli_sa_lock);
517                 if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
518                     lli->lli_opendir_pid == 0) {
519                         lli->lli_opendir_key = fd;
520                         lli->lli_opendir_pid = cfs_curproc_pid();
521                         opendir_set = 1;
522                 }
523                 cfs_spin_unlock(&lli->lli_sa_lock);
524         }
525
526         if (inode->i_sb->s_root == file->f_dentry) {
527                 LUSTRE_FPRIVATE(file) = fd;
528                 RETURN(0);
529         }
530
531         if (!it || !it->d.lustre.it_disposition) {
532                 /* Convert f_flags into access mode. We cannot use file->f_mode,
533                  * because everything but O_ACCMODE mask was stripped from
534                  * there */
535                 if ((oit.it_flags + 1) & O_ACCMODE)
536                         oit.it_flags++;
537                 if (file->f_flags & O_TRUNC)
538                         oit.it_flags |= FMODE_WRITE;
539
540                 /* kernel only call f_op->open in dentry_open.  filp_open calls
541                  * dentry_open after call to open_namei that checks permissions.
542                  * Only nfsd_open call dentry_open directly without checking
543                  * permissions and because of that this code below is safe. */
544                 if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
545                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
546
547                 /* We do not want O_EXCL here, presumably we opened the file
548                  * already? XXX - NFS implications? */
549                 oit.it_flags &= ~O_EXCL;
550
551                 /* bug20584, if "it_flags" contains O_CREAT, the file will be
552                  * created if necessary, then "IT_CREAT" should be set to keep
553                  * consistent with it */
554                 if (oit.it_flags & O_CREAT)
555                         oit.it_op |= IT_CREAT;
556
557                 it = &oit;
558         }
559
560 restart:
561         /* Let's see if we have file open on MDS already. */
562         if (it->it_flags & FMODE_WRITE) {
563                 och_p = &lli->lli_mds_write_och;
564                 och_usecount = &lli->lli_open_fd_write_count;
565         } else if (it->it_flags & FMODE_EXEC) {
566                 och_p = &lli->lli_mds_exec_och;
567                 och_usecount = &lli->lli_open_fd_exec_count;
568          } else {
569                 och_p = &lli->lli_mds_read_och;
570                 och_usecount = &lli->lli_open_fd_read_count;
571         }
572
573         cfs_mutex_lock(&lli->lli_och_mutex);
574         if (*och_p) { /* Open handle is present */
575                 if (it_disposition(it, DISP_OPEN_OPEN)) {
576                         /* Well, there's extra open request that we do not need,
577                            let's close it somehow. This will decref request. */
578                         rc = it_open_error(DISP_OPEN_OPEN, it);
579                         if (rc) {
580                                 cfs_mutex_unlock(&lli->lli_och_mutex);
581                                 GOTO(out_openerr, rc);
582                         }
583
584                         ll_release_openhandle(file->f_dentry, it);
585                 }
586                 (*och_usecount)++;
587
588                 rc = ll_local_open(file, it, fd, NULL);
589                 if (rc) {
590                         (*och_usecount)--;
591                         cfs_mutex_unlock(&lli->lli_och_mutex);
592                         GOTO(out_openerr, rc);
593                 }
594         } else {
595                 LASSERT(*och_usecount == 0);
596                 if (!it->d.lustre.it_disposition) {
597                         /* We cannot just request lock handle now, new ELC code
598                            means that one of other OPEN locks for this file
599                            could be cancelled, and since blocking ast handler
600                            would attempt to grab och_mutex as well, that would
601                            result in a deadlock */
602                         cfs_mutex_unlock(&lli->lli_och_mutex);
603                         it->it_create_mode |= M_CHECK_STALE;
604                         rc = ll_intent_file_open(file, NULL, 0, it);
605                         it->it_create_mode &= ~M_CHECK_STALE;
606                         if (rc)
607                                 GOTO(out_openerr, rc);
608
609                         goto restart;
610                 }
611                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
612                 if (!*och_p)
613                         GOTO(out_och_free, rc = -ENOMEM);
614
615                 (*och_usecount)++;
616
617                 /* md_intent_lock() didn't get a request ref if there was an
618                  * open error, so don't do cleanup on the request here
619                  * (bug 3430) */
620                 /* XXX (green): Should not we bail out on any error here, not
621                  * just open error? */
622                 rc = it_open_error(DISP_OPEN_OPEN, it);
623                 if (rc)
624                         GOTO(out_och_free, rc);
625
626                 LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
627
628                 rc = ll_local_open(file, it, fd, *och_p);
629                 if (rc)
630                         GOTO(out_och_free, rc);
631         }
632         cfs_mutex_unlock(&lli->lli_och_mutex);
633         fd = NULL;
634
635         /* Must do this outside lli_och_mutex lock to prevent deadlock where
636            different kind of OPEN lock for this same inode gets cancelled
637            by ldlm_cancel_lru */
638         if (!S_ISREG(inode->i_mode))
639                 GOTO(out_och_free, rc);
640
641         ll_capa_open(inode);
642
643         if (!lli->lli_has_smd) {
644                 if (file->f_flags & O_LOV_DELAY_CREATE ||
645                     !(file->f_mode & FMODE_WRITE)) {
646                         CDEBUG(D_INODE, "object creation was delayed\n");
647                         GOTO(out_och_free, rc);
648                 }
649         }
650         file->f_flags &= ~O_LOV_DELAY_CREATE;
651         GOTO(out_och_free, rc);
652
653 out_och_free:
654         if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
655                 ptlrpc_req_finished(it->d.lustre.it_data);
656                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
657         }
658
659         if (rc) {
660                 if (och_p && *och_p) {
661                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
662                         *och_p = NULL; /* OBD_FREE writes some magic there */
663                         (*och_usecount)--;
664                 }
665                 cfs_mutex_unlock(&lli->lli_och_mutex);
666
667 out_openerr:
668                 if (opendir_set != 0)
669                         ll_stop_statahead(inode, lli->lli_opendir_key);
670                 if (fd != NULL)
671                         ll_file_data_put(fd);
672         } else {
673                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
674         }
675
676         return rc;
677 }
678
679 /* Fills the obdo with the attributes for the lsm */
680 static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
681                           struct obd_capa *capa, struct obdo *obdo,
682                           __u64 ioepoch, int sync)
683 {
684         struct ptlrpc_request_set *set;
685         struct obd_info            oinfo = { { { 0 } } };
686         int                        rc;
687
688         ENTRY;
689
690         LASSERT(lsm != NULL);
691
692         oinfo.oi_md = lsm;
693         oinfo.oi_oa = obdo;
694         oinfo.oi_oa->o_id = lsm->lsm_object_id;
695         oinfo.oi_oa->o_seq = lsm->lsm_object_seq;
696         oinfo.oi_oa->o_mode = S_IFREG;
697         oinfo.oi_oa->o_ioepoch = ioepoch;
698         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
699                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
700                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
701                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
702                                OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
703                                OBD_MD_FLDATAVERSION;
704         oinfo.oi_capa = capa;
705         if (sync) {
706                 oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
707                 oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
708         }
709
710         set = ptlrpc_prep_set();
711         if (set == NULL) {
712                 CERROR("can't allocate ptlrpc set\n");
713                 rc = -ENOMEM;
714         } else {
715                 rc = obd_getattr_async(exp, &oinfo, set);
716                 if (rc == 0)
717                         rc = ptlrpc_set_wait(set);
718                 ptlrpc_set_destroy(set);
719         }
720         if (rc == 0)
721                 oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
722                                          OBD_MD_FLATIME | OBD_MD_FLMTIME |
723                                          OBD_MD_FLCTIME | OBD_MD_FLSIZE |
724                                          OBD_MD_FLDATAVERSION);
725         RETURN(rc);
726 }
727
728 /**
729   * Performs the getattr on the inode and updates its fields.
730   * If @sync != 0, perform the getattr under the server-side lock.
731   */
732 int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
733                      __u64 ioepoch, int sync)
734 {
735         struct obd_capa      *capa = ll_mdscapa_get(inode);
736         struct lov_stripe_md *lsm;
737         int rc;
738         ENTRY;
739
740         lsm = ccc_inode_lsm_get(inode);
741         rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
742                             capa, obdo, ioepoch, sync);
743         capa_put(capa);
744         if (rc == 0) {
745                 obdo_refresh_inode(inode, obdo, obdo->o_valid);
746                 CDEBUG(D_INODE,
747                        "objid "LPX64" size %llu, blocks %llu, blksize %lu\n",
748                        lsm ? lsm->lsm_object_id : 0, i_size_read(inode),
749                        (unsigned long long)inode->i_blocks,
750                        (unsigned long)ll_inode_blksize(inode));
751         }
752         ccc_inode_lsm_put(inode, lsm);
753         RETURN(rc);
754 }
755
756 int ll_merge_lvb(struct inode *inode)
757 {
758         struct ll_inode_info *lli = ll_i2info(inode);
759         struct ll_sb_info *sbi = ll_i2sbi(inode);
760         struct lov_stripe_md *lsm;
761         struct ost_lvb lvb;
762         int rc = 0;
763
764         ENTRY;
765
766         lsm = ccc_inode_lsm_get(inode);
767         ll_inode_size_lock(inode);
768         inode_init_lvb(inode, &lvb);
769
770         /* merge timestamps the most resently obtained from mds with
771            timestamps obtained from osts */
772         lvb.lvb_atime = lli->lli_lvb.lvb_atime;
773         lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
774         lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
775         if (lsm != NULL) {
776                 rc = obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
777                 cl_isize_write_nolock(inode, lvb.lvb_size);
778
779                 CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
780                                 PFID(&lli->lli_fid), lvb.lvb_size);
781                 inode->i_blocks = lvb.lvb_blocks;
782
783                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
784                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
785                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
786         }
787         ll_inode_size_unlock(inode);
788         ccc_inode_lsm_put(inode, lsm);
789
790         RETURN(rc);
791 }
792
793 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
794                      lstat_t *st)
795 {
796         struct obdo obdo = { 0 };
797         int rc;
798
799         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
800         if (rc == 0) {
801                 st->st_size   = obdo.o_size;
802                 st->st_blocks = obdo.o_blocks;
803                 st->st_mtime  = obdo.o_mtime;
804                 st->st_atime  = obdo.o_atime;
805                 st->st_ctime  = obdo.o_ctime;
806         }
807         return rc;
808 }
809
810 void ll_io_init(struct cl_io *io, const struct file *file, int write)
811 {
812         struct inode *inode = file->f_dentry->d_inode;
813
814         io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
815         if (write) {
816                 io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
817                 io->u.ci_wr.wr_sync = file->f_flags & O_SYNC || IS_SYNC(inode);
818         }
819         io->ci_obj     = ll_i2info(inode)->lli_clob;
820         io->ci_lockreq = CILR_MAYBE;
821         if (ll_file_nolock(file)) {
822                 io->ci_lockreq = CILR_NEVER;
823                 io->ci_no_srvlock = 1;
824         } else if (file->f_flags & O_APPEND) {
825                 io->ci_lockreq = CILR_MANDATORY;
826         }
827 }
828
829 static ssize_t
830 ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
831                    struct file *file, enum cl_io_type iot,
832                    loff_t *ppos, size_t count)
833 {
834         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
835         struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
836         struct cl_io         *io;
837         ssize_t               result;
838         ENTRY;
839
840         io = ccc_env_thread_io(env);
841         ll_io_init(io, file, iot == CIT_WRITE);
842
843         if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
844                 struct vvp_io *vio = vvp_env_io(env);
845                 struct ccc_io *cio = ccc_env_io(env);
846                 int write_mutex_locked = 0;
847
848                 cio->cui_fd  = LUSTRE_FPRIVATE(file);
849                 vio->cui_io_subtype = args->via_io_subtype;
850
851                 switch (vio->cui_io_subtype) {
852                 case IO_NORMAL:
853                         cio->cui_iov = args->u.normal.via_iov;
854                         cio->cui_nrsegs = args->u.normal.via_nrsegs;
855                         cio->cui_tot_nrsegs = cio->cui_nrsegs;
856 #ifndef HAVE_FILE_WRITEV
857                         cio->cui_iocb = args->u.normal.via_iocb;
858 #endif
859                         if ((iot == CIT_WRITE) &&
860                             !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
861                                 if (cfs_mutex_lock_interruptible(&lli->
862                                                                lli_write_mutex))
863                                         GOTO(out, result = -ERESTARTSYS);
864                                 write_mutex_locked = 1;
865                         } else if (iot == CIT_READ) {
866                                 cfs_down_read(&lli->lli_trunc_sem);
867                         }
868                         break;
869                 case IO_SENDFILE:
870                         vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
871                         vio->u.sendfile.cui_target = args->u.sendfile.via_target;
872                         break;
873                 case IO_SPLICE:
874                         vio->u.splice.cui_pipe = args->u.splice.via_pipe;
875                         vio->u.splice.cui_flags = args->u.splice.via_flags;
876                         break;
877                 default:
878                         CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
879                         LBUG();
880                 }
881                 result = cl_io_loop(env, io);
882                 if (write_mutex_locked)
883                         cfs_mutex_unlock(&lli->lli_write_mutex);
884                 else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
885                         cfs_up_read(&lli->lli_trunc_sem);
886         } else {
887                 /* cl_io_rw_init() handled IO */
888                 result = io->ci_result;
889         }
890
891         if (io->ci_nob > 0) {
892                 result = io->ci_nob;
893                 *ppos = io->u.ci_wr.wr.crw_pos;
894         }
895         GOTO(out, result);
896 out:
897         cl_io_fini(env, io);
898
899         if (iot == CIT_READ) {
900                 if (result >= 0)
901                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
902                                            LPROC_LL_READ_BYTES, result);
903         } else if (iot == CIT_WRITE) {
904                 if (result >= 0) {
905                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
906                                            LPROC_LL_WRITE_BYTES, result);
907                         fd->fd_write_failed = false;
908                 } else {
909                         fd->fd_write_failed = true;
910                 }
911         }
912
913         return result;
914 }
915
916
917 /*
918  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
919  */
920 static int ll_file_get_iov_count(const struct iovec *iov,
921                                  unsigned long *nr_segs, size_t *count)
922 {
923         size_t cnt = 0;
924         unsigned long seg;
925
926         for (seg = 0; seg < *nr_segs; seg++) {
927                 const struct iovec *iv = &iov[seg];
928
929                 /*
930                  * If any segment has a negative length, or the cumulative
931                  * length ever wraps negative then return -EINVAL.
932                  */
933                 cnt += iv->iov_len;
934                 if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
935                         return -EINVAL;
936                 if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
937                         continue;
938                 if (seg == 0)
939                         return -EFAULT;
940                 *nr_segs = seg;
941                 cnt -= iv->iov_len;   /* This segment is no good */
942                 break;
943         }
944         *count = cnt;
945         return 0;
946 }
947
948 #ifdef HAVE_FILE_READV
949 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
950                               unsigned long nr_segs, loff_t *ppos)
951 {
952         struct lu_env      *env;
953         struct vvp_io_args *args;
954         size_t              count;
955         ssize_t             result;
956         int                 refcheck;
957         ENTRY;
958
959         result = ll_file_get_iov_count(iov, &nr_segs, &count);
960         if (result)
961                 RETURN(result);
962
963         env = cl_env_get(&refcheck);
964         if (IS_ERR(env))
965                 RETURN(PTR_ERR(env));
966
967         args = vvp_env_args(env, IO_NORMAL);
968         args->u.normal.via_iov = (struct iovec *)iov;
969         args->u.normal.via_nrsegs = nr_segs;
970
971         result = ll_file_io_generic(env, args, file, CIT_READ, ppos, count);
972         cl_env_put(env, &refcheck);
973         RETURN(result);
974 }
975
976 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
977                             loff_t *ppos)
978 {
979         struct lu_env *env;
980         struct iovec  *local_iov;
981         ssize_t        result;
982         int            refcheck;
983         ENTRY;
984
985         env = cl_env_get(&refcheck);
986         if (IS_ERR(env))
987                 RETURN(PTR_ERR(env));
988
989         local_iov = &vvp_env_info(env)->vti_local_iov;
990         local_iov->iov_base = (void __user *)buf;
991         local_iov->iov_len = count;
992         result = ll_file_readv(file, local_iov, 1, ppos);
993         cl_env_put(env, &refcheck);
994         RETURN(result);
995 }
996
997 #else
998 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
999                                 unsigned long nr_segs, loff_t pos)
1000 {
1001         struct lu_env      *env;
1002         struct vvp_io_args *args;
1003         size_t              count;
1004         ssize_t             result;
1005         int                 refcheck;
1006         ENTRY;
1007
1008         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1009         if (result)
1010                 RETURN(result);
1011
1012         env = cl_env_get(&refcheck);
1013         if (IS_ERR(env))
1014                 RETURN(PTR_ERR(env));
1015
1016         args = vvp_env_args(env, IO_NORMAL);
1017         args->u.normal.via_iov = (struct iovec *)iov;
1018         args->u.normal.via_nrsegs = nr_segs;
1019         args->u.normal.via_iocb = iocb;
1020
1021         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1022                                     &iocb->ki_pos, count);
1023         cl_env_put(env, &refcheck);
1024         RETURN(result);
1025 }
1026
1027 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1028                             loff_t *ppos)
1029 {
1030         struct lu_env *env;
1031         struct iovec  *local_iov;
1032         struct kiocb  *kiocb;
1033         ssize_t        result;
1034         int            refcheck;
1035         ENTRY;
1036
1037         env = cl_env_get(&refcheck);
1038         if (IS_ERR(env))
1039                 RETURN(PTR_ERR(env));
1040
1041         local_iov = &vvp_env_info(env)->vti_local_iov;
1042         kiocb = &vvp_env_info(env)->vti_kiocb;
1043         local_iov->iov_base = (void __user *)buf;
1044         local_iov->iov_len = count;
1045         init_sync_kiocb(kiocb, file);
1046         kiocb->ki_pos = *ppos;
1047         kiocb->ki_left = count;
1048
1049         result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1050         *ppos = kiocb->ki_pos;
1051
1052         cl_env_put(env, &refcheck);
1053         RETURN(result);
1054 }
1055 #endif
1056
1057 /*
1058  * Write to a file (through the page cache).
1059  */
1060 #ifdef HAVE_FILE_WRITEV
1061 static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
1062                               unsigned long nr_segs, loff_t *ppos)
1063 {
1064         struct lu_env      *env;
1065         struct vvp_io_args *args;
1066         size_t              count;
1067         ssize_t             result;
1068         int                 refcheck;
1069         ENTRY;
1070
1071         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1072         if (result)
1073                 RETURN(result);
1074
1075         env = cl_env_get(&refcheck);
1076         if (IS_ERR(env))
1077                 RETURN(PTR_ERR(env));
1078
1079         args = vvp_env_args(env, IO_NORMAL);
1080         args->u.normal.via_iov = (struct iovec *)iov;
1081         args->u.normal.via_nrsegs = nr_segs;
1082
1083         result = ll_file_io_generic(env, args, file, CIT_WRITE, ppos, count);
1084         cl_env_put(env, &refcheck);
1085         RETURN(result);
1086 }
1087
1088 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1089                              loff_t *ppos)
1090 {
1091         struct lu_env    *env;
1092         struct iovec     *local_iov;
1093         ssize_t           result;
1094         int               refcheck;
1095         ENTRY;
1096
1097         env = cl_env_get(&refcheck);
1098         if (IS_ERR(env))
1099                 RETURN(PTR_ERR(env));
1100
1101         local_iov = &vvp_env_info(env)->vti_local_iov;
1102         local_iov->iov_base = (void __user *)buf;
1103         local_iov->iov_len = count;
1104
1105         result = ll_file_writev(file, local_iov, 1, ppos);
1106         cl_env_put(env, &refcheck);
1107         RETURN(result);
1108 }
1109
1110 #else /* AIO stuff */
1111 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1112                                  unsigned long nr_segs, loff_t pos)
1113 {
1114         struct lu_env      *env;
1115         struct vvp_io_args *args;
1116         size_t              count;
1117         ssize_t             result;
1118         int                 refcheck;
1119         ENTRY;
1120
1121         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1122         if (result)
1123                 RETURN(result);
1124
1125         env = cl_env_get(&refcheck);
1126         if (IS_ERR(env))
1127                 RETURN(PTR_ERR(env));
1128
1129         args = vvp_env_args(env, IO_NORMAL);
1130         args->u.normal.via_iov = (struct iovec *)iov;
1131         args->u.normal.via_nrsegs = nr_segs;
1132         args->u.normal.via_iocb = iocb;
1133
1134         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1135                                   &iocb->ki_pos, count);
1136         cl_env_put(env, &refcheck);
1137         RETURN(result);
1138 }
1139
1140 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1141                              loff_t *ppos)
1142 {
1143         struct lu_env *env;
1144         struct iovec  *local_iov;
1145         struct kiocb  *kiocb;
1146         ssize_t        result;
1147         int            refcheck;
1148         ENTRY;
1149
1150         env = cl_env_get(&refcheck);
1151         if (IS_ERR(env))
1152                 RETURN(PTR_ERR(env));
1153
1154         local_iov = &vvp_env_info(env)->vti_local_iov;
1155         kiocb = &vvp_env_info(env)->vti_kiocb;
1156         local_iov->iov_base = (void __user *)buf;
1157         local_iov->iov_len = count;
1158         init_sync_kiocb(kiocb, file);
1159         kiocb->ki_pos = *ppos;
1160         kiocb->ki_left = count;
1161
1162         result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1163         *ppos = kiocb->ki_pos;
1164
1165         cl_env_put(env, &refcheck);
1166         RETURN(result);
1167 }
1168 #endif
1169
1170
1171 #ifdef HAVE_KERNEL_SENDFILE
1172 /*
1173  * Send file content (through pagecache) somewhere with helper
1174  */
1175 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1176                                 read_actor_t actor, void *target)
1177 {
1178         struct lu_env      *env;
1179         struct vvp_io_args *args;
1180         ssize_t             result;
1181         int                 refcheck;
1182         ENTRY;
1183
1184         env = cl_env_get(&refcheck);
1185         if (IS_ERR(env))
1186                 RETURN(PTR_ERR(env));
1187
1188         args = vvp_env_args(env, IO_SENDFILE);
1189         args->u.sendfile.via_target = target;
1190         args->u.sendfile.via_actor = actor;
1191
1192         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1193         cl_env_put(env, &refcheck);
1194         RETURN(result);
1195 }
1196 #endif
1197
1198 #ifdef HAVE_KERNEL_SPLICE_READ
1199 /*
1200  * Send file content (through pagecache) somewhere with helper
1201  */
1202 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1203                                    struct pipe_inode_info *pipe, size_t count,
1204                                    unsigned int flags)
1205 {
1206         struct lu_env      *env;
1207         struct vvp_io_args *args;
1208         ssize_t             result;
1209         int                 refcheck;
1210         ENTRY;
1211
1212         env = cl_env_get(&refcheck);
1213         if (IS_ERR(env))
1214                 RETURN(PTR_ERR(env));
1215
1216         args = vvp_env_args(env, IO_SPLICE);
1217         args->u.splice.via_pipe = pipe;
1218         args->u.splice.via_flags = flags;
1219
1220         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1221         cl_env_put(env, &refcheck);
1222         RETURN(result);
1223 }
1224 #endif
1225
1226 static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq,
1227                            obd_count ost_idx)
1228 {
1229         struct obd_export *exp = ll_i2dtexp(inode);
1230         struct obd_trans_info oti = { 0 };
1231         struct obdo *oa = NULL;
1232         int lsm_size;
1233         int rc = 0;
1234         struct lov_stripe_md *lsm = NULL, *lsm2;
1235         ENTRY;
1236
1237         OBDO_ALLOC(oa);
1238         if (oa == NULL)
1239                 RETURN(-ENOMEM);
1240
1241         lsm = ccc_inode_lsm_get(inode);
1242         if (lsm == NULL)
1243                 GOTO(out, rc = -ENOENT);
1244
1245         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1246                    (lsm->lsm_stripe_count));
1247
1248         OBD_ALLOC_LARGE(lsm2, lsm_size);
1249         if (lsm2 == NULL)
1250                 GOTO(out, rc = -ENOMEM);
1251
1252         oa->o_id = id;
1253         oa->o_seq = seq;
1254         oa->o_nlink = ost_idx;
1255         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1256         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1257         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1258                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1259         obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1260         memcpy(lsm2, lsm, lsm_size);
1261         ll_inode_size_lock(inode);
1262         rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1263         ll_inode_size_unlock(inode);
1264
1265         OBD_FREE_LARGE(lsm2, lsm_size);
1266         GOTO(out, rc);
1267 out:
1268         ccc_inode_lsm_put(inode, lsm);
1269         OBDO_FREE(oa);
1270         return rc;
1271 }
1272
1273 static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1274 {
1275         struct ll_recreate_obj ucreat;
1276         ENTRY;
1277
1278         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1279                 RETURN(-EPERM);
1280
1281         if (cfs_copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1282                                sizeof(struct ll_recreate_obj)))
1283                 RETURN(-EFAULT);
1284
1285         RETURN(ll_lov_recreate(inode, ucreat.lrc_id, 0,
1286                                ucreat.lrc_ost_idx));
1287 }
1288
1289 static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1290 {
1291         struct lu_fid fid;
1292         obd_id id;
1293         obd_count ost_idx;
1294         ENTRY;
1295
1296         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1297                 RETURN(-EPERM);
1298
1299         if (cfs_copy_from_user(&fid, (struct lu_fid *)arg,
1300                                sizeof(struct lu_fid)))
1301                 RETURN(-EFAULT);
1302
1303         id = fid_oid(&fid) | ((fid_seq(&fid) & 0xffff) << 32);
1304         ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1305         RETURN(ll_lov_recreate(inode, id, 0, ost_idx));
1306 }
1307
1308 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1309                              int flags, struct lov_user_md *lum, int lum_size)
1310 {
1311         struct lov_stripe_md *lsm = NULL;
1312         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1313         int rc = 0;
1314         ENTRY;
1315
1316         lsm = ccc_inode_lsm_get(inode);
1317         if (lsm != NULL) {
1318                 ccc_inode_lsm_put(inode, lsm);
1319                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1320                        inode->i_ino);
1321                 RETURN(-EEXIST);
1322         }
1323
1324         ll_inode_size_lock(inode);
1325         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1326         if (rc)
1327                 GOTO(out, rc);
1328         rc = oit.d.lustre.it_status;
1329         if (rc < 0)
1330                 GOTO(out_req_free, rc);
1331
1332         ll_release_openhandle(file->f_dentry, &oit);
1333
1334  out:
1335         ll_inode_size_unlock(inode);
1336         ll_intent_release(&oit);
1337         ccc_inode_lsm_put(inode, lsm);
1338         RETURN(rc);
1339 out_req_free:
1340         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1341         goto out;
1342 }
1343
1344 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1345                              struct lov_mds_md **lmmp, int *lmm_size,
1346                              struct ptlrpc_request **request)
1347 {
1348         struct ll_sb_info *sbi = ll_i2sbi(inode);
1349         struct mdt_body  *body;
1350         struct lov_mds_md *lmm = NULL;
1351         struct ptlrpc_request *req = NULL;
1352         struct md_op_data *op_data;
1353         int rc, lmmsize;
1354
1355         rc = ll_get_max_mdsize(sbi, &lmmsize);
1356         if (rc)
1357                 RETURN(rc);
1358
1359         op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1360                                      strlen(filename), lmmsize,
1361                                      LUSTRE_OPC_ANY, NULL);
1362         if (IS_ERR(op_data))
1363                 RETURN(PTR_ERR(op_data));
1364
1365         op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1366         rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1367         ll_finish_md_op_data(op_data);
1368         if (rc < 0) {
1369                 CDEBUG(D_INFO, "md_getattr_name failed "
1370                        "on %s: rc %d\n", filename, rc);
1371                 GOTO(out, rc);
1372         }
1373
1374         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1375         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1376
1377         lmmsize = body->eadatasize;
1378
1379         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1380                         lmmsize == 0) {
1381                 GOTO(out, rc = -ENODATA);
1382         }
1383
1384         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1385         LASSERT(lmm != NULL);
1386
1387         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1388             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1389                 GOTO(out, rc = -EPROTO);
1390         }
1391
1392         /*
1393          * This is coming from the MDS, so is probably in
1394          * little endian.  We convert it to host endian before
1395          * passing it to userspace.
1396          */
1397         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1398                 /* if function called for directory - we should
1399                  * avoid swab not existent lsm objects */
1400                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1401                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1402                         if (S_ISREG(body->mode))
1403                                 lustre_swab_lov_user_md_objects(
1404                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1405                                  ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
1406                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1407                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1408                         if (S_ISREG(body->mode))
1409                                 lustre_swab_lov_user_md_objects(
1410                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1411                                  ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
1412                 }
1413         }
1414
1415 out:
1416         *lmmp = lmm;
1417         *lmm_size = lmmsize;
1418         *request = req;
1419         return rc;
1420 }
1421
1422 static int ll_lov_setea(struct inode *inode, struct file *file,
1423                             unsigned long arg)
1424 {
1425         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1426         struct lov_user_md  *lump;
1427         int lum_size = sizeof(struct lov_user_md) +
1428                        sizeof(struct lov_user_ost_data);
1429         int rc;
1430         ENTRY;
1431
1432         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1433                 RETURN(-EPERM);
1434
1435         OBD_ALLOC_LARGE(lump, lum_size);
1436         if (lump == NULL) {
1437                 RETURN(-ENOMEM);
1438         }
1439         if (cfs_copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1440                 OBD_FREE_LARGE(lump, lum_size);
1441                 RETURN(-EFAULT);
1442         }
1443
1444         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1445
1446         OBD_FREE_LARGE(lump, lum_size);
1447         RETURN(rc);
1448 }
1449
1450 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1451                             unsigned long arg)
1452 {
1453         struct lov_user_md_v3 lumv3;
1454         struct lov_user_md_v1 *lumv1 = (struct lov_user_md_v1 *)&lumv3;
1455         struct lov_user_md_v1 *lumv1p = (struct lov_user_md_v1 *)arg;
1456         struct lov_user_md_v3 *lumv3p = (struct lov_user_md_v3 *)arg;
1457         int lum_size;
1458         int rc;
1459         int flags = FMODE_WRITE;
1460         ENTRY;
1461
1462         /* first try with v1 which is smaller than v3 */
1463         lum_size = sizeof(struct lov_user_md_v1);
1464         if (cfs_copy_from_user(lumv1, lumv1p, lum_size))
1465                 RETURN(-EFAULT);
1466
1467         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1468                 lum_size = sizeof(struct lov_user_md_v3);
1469                 if (cfs_copy_from_user(&lumv3, lumv3p, lum_size))
1470                         RETURN(-EFAULT);
1471         }
1472
1473         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1474         if (rc == 0) {
1475                 struct lov_stripe_md *lsm;
1476                 put_user(0, &lumv1p->lmm_stripe_count);
1477                 lsm = ccc_inode_lsm_get(inode);
1478                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1479                                    0, lsm, (void *)arg);
1480                 ccc_inode_lsm_put(inode, lsm);
1481         }
1482         RETURN(rc);
1483 }
1484
1485 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1486 {
1487         struct lov_stripe_md *lsm;
1488         int rc = -ENODATA;
1489         ENTRY;
1490
1491         lsm = ccc_inode_lsm_get(inode);
1492         if (lsm != NULL)
1493                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1494                                    lsm, (void *)arg);
1495         ccc_inode_lsm_put(inode, lsm);
1496         RETURN(rc);
1497 }
1498
1499 int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1500 {
1501         struct ll_inode_info   *lli = ll_i2info(inode);
1502         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1503         struct ccc_grouplock    grouplock;
1504         int                     rc;
1505         ENTRY;
1506
1507         if (ll_file_nolock(file))
1508                 RETURN(-EOPNOTSUPP);
1509
1510         cfs_spin_lock(&lli->lli_lock);
1511         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1512                 CWARN("group lock already existed with gid %lu\n",
1513                        fd->fd_grouplock.cg_gid);
1514                 cfs_spin_unlock(&lli->lli_lock);
1515                 RETURN(-EINVAL);
1516         }
1517         LASSERT(fd->fd_grouplock.cg_lock == NULL);
1518         cfs_spin_unlock(&lli->lli_lock);
1519
1520         rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1521                               arg, (file->f_flags & O_NONBLOCK), &grouplock);
1522         if (rc)
1523                 RETURN(rc);
1524
1525         cfs_spin_lock(&lli->lli_lock);
1526         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1527                 cfs_spin_unlock(&lli->lli_lock);
1528                 CERROR("another thread just won the race\n");
1529                 cl_put_grouplock(&grouplock);
1530                 RETURN(-EINVAL);
1531         }
1532
1533         fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1534         fd->fd_grouplock = grouplock;
1535         cfs_spin_unlock(&lli->lli_lock);
1536
1537         CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1538         RETURN(0);
1539 }
1540
1541 int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1542 {
1543         struct ll_inode_info   *lli = ll_i2info(inode);
1544         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1545         struct ccc_grouplock    grouplock;
1546         ENTRY;
1547
1548         cfs_spin_lock(&lli->lli_lock);
1549         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1550                 cfs_spin_unlock(&lli->lli_lock);
1551                 CWARN("no group lock held\n");
1552                 RETURN(-EINVAL);
1553         }
1554         LASSERT(fd->fd_grouplock.cg_lock != NULL);
1555
1556         if (fd->fd_grouplock.cg_gid != arg) {
1557                 CWARN("group lock %lu doesn't match current id %lu\n",
1558                        arg, fd->fd_grouplock.cg_gid);
1559                 cfs_spin_unlock(&lli->lli_lock);
1560                 RETURN(-EINVAL);
1561         }
1562
1563         grouplock = fd->fd_grouplock;
1564         memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1565         fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1566         cfs_spin_unlock(&lli->lli_lock);
1567
1568         cl_put_grouplock(&grouplock);
1569         CDEBUG(D_INFO, "group lock %lu released\n", arg);
1570         RETURN(0);
1571 }
1572
1573 /**
1574  * Close inode open handle
1575  *
1576  * \param dentry [in]     dentry which contains the inode
1577  * \param it     [in,out] intent which contains open info and result
1578  *
1579  * \retval 0     success
1580  * \retval <0    failure
1581  */
1582 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1583 {
1584         struct inode *inode = dentry->d_inode;
1585         struct obd_client_handle *och;
1586         int rc;
1587         ENTRY;
1588
1589         LASSERT(inode);
1590
1591         /* Root ? Do nothing. */
1592         if (dentry->d_inode->i_sb->s_root == dentry)
1593                 RETURN(0);
1594
1595         /* No open handle to close? Move away */
1596         if (!it_disposition(it, DISP_OPEN_OPEN))
1597                 RETURN(0);
1598
1599         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1600
1601         OBD_ALLOC(och, sizeof(*och));
1602         if (!och)
1603                 GOTO(out, rc = -ENOMEM);
1604
1605         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
1606                     ll_i2info(inode), it, och);
1607
1608         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1609                                        inode, och);
1610  out:
1611         /* this one is in place of ll_file_open */
1612         if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1613                 ptlrpc_req_finished(it->d.lustre.it_data);
1614                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1615         }
1616         RETURN(rc);
1617 }
1618
1619 /**
1620  * Get size for inode for which FIEMAP mapping is requested.
1621  * Make the FIEMAP get_info call and returns the result.
1622  */
1623 int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1624               int num_bytes)
1625 {
1626         struct obd_export *exp = ll_i2dtexp(inode);
1627         struct lov_stripe_md *lsm = NULL;
1628         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1629         int vallen = num_bytes;
1630         int rc;
1631         ENTRY;
1632
1633         /* Checks for fiemap flags */
1634         if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1635                 fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1636                 return -EBADR;
1637         }
1638
1639         /* Check for FIEMAP_FLAG_SYNC */
1640         if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1641                 rc = filemap_fdatawrite(inode->i_mapping);
1642                 if (rc)
1643                         return rc;
1644         }
1645
1646         lsm = ccc_inode_lsm_get(inode);
1647         if (lsm == NULL)
1648                 return -ENOENT;
1649
1650         /* If the stripe_count > 1 and the application does not understand
1651          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1652          */
1653         if (lsm->lsm_stripe_count > 1 &&
1654             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1655                 GOTO(out, rc = -EOPNOTSUPP);
1656
1657         fm_key.oa.o_id = lsm->lsm_object_id;
1658         fm_key.oa.o_seq = lsm->lsm_object_seq;
1659         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1660
1661         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1662         obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1663         /* If filesize is 0, then there would be no objects for mapping */
1664         if (fm_key.oa.o_size == 0) {
1665                 fiemap->fm_mapped_extents = 0;
1666                 GOTO(out, rc = 0);
1667         }
1668
1669         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1670
1671         rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1672                           fiemap, lsm);
1673         if (rc)
1674                 CERROR("obd_get_info failed: rc = %d\n", rc);
1675
1676 out:
1677         ccc_inode_lsm_put(inode, lsm);
1678         RETURN(rc);
1679 }
1680
1681 int ll_fid2path(struct obd_export *exp, void *arg)
1682 {
1683         struct getinfo_fid2path *gfout, *gfin;
1684         int outsize, rc;
1685         ENTRY;
1686
1687         /* Need to get the buflen */
1688         OBD_ALLOC_PTR(gfin);
1689         if (gfin == NULL)
1690                 RETURN(-ENOMEM);
1691         if (cfs_copy_from_user(gfin, arg, sizeof(*gfin))) {
1692                 OBD_FREE_PTR(gfin);
1693                 RETURN(-EFAULT);
1694         }
1695
1696         outsize = sizeof(*gfout) + gfin->gf_pathlen;
1697         OBD_ALLOC(gfout, outsize);
1698         if (gfout == NULL) {
1699                 OBD_FREE_PTR(gfin);
1700                 RETURN(-ENOMEM);
1701         }
1702         memcpy(gfout, gfin, sizeof(*gfout));
1703         OBD_FREE_PTR(gfin);
1704
1705         /* Call mdc_iocontrol */
1706         rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1707         if (rc)
1708                 GOTO(gf_free, rc);
1709         if (cfs_copy_to_user(arg, gfout, outsize))
1710                 rc = -EFAULT;
1711
1712 gf_free:
1713         OBD_FREE(gfout, outsize);
1714         RETURN(rc);
1715 }
1716
1717 static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1718 {
1719         struct ll_user_fiemap *fiemap_s;
1720         size_t num_bytes, ret_bytes;
1721         unsigned int extent_count;
1722         int rc = 0;
1723
1724         /* Get the extent count so we can calculate the size of
1725          * required fiemap buffer */
1726         if (get_user(extent_count,
1727             &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1728                 RETURN(-EFAULT);
1729         num_bytes = sizeof(*fiemap_s) + (extent_count *
1730                                          sizeof(struct ll_fiemap_extent));
1731
1732         OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1733         if (fiemap_s == NULL)
1734                 RETURN(-ENOMEM);
1735
1736         /* get the fiemap value */
1737         if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
1738                            sizeof(*fiemap_s)))
1739                 GOTO(error, rc = -EFAULT);
1740
1741         /* If fm_extent_count is non-zero, read the first extent since
1742          * it is used to calculate end_offset and device from previous
1743          * fiemap call. */
1744         if (extent_count) {
1745                 if (copy_from_user(&fiemap_s->fm_extents[0],
1746                     (char __user *)arg + sizeof(*fiemap_s),
1747                     sizeof(struct ll_fiemap_extent)))
1748                         GOTO(error, rc = -EFAULT);
1749         }
1750
1751         rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1752         if (rc)
1753                 GOTO(error, rc);
1754
1755         ret_bytes = sizeof(struct ll_user_fiemap);
1756
1757         if (extent_count != 0)
1758                 ret_bytes += (fiemap_s->fm_mapped_extents *
1759                                  sizeof(struct ll_fiemap_extent));
1760
1761         if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1762                 rc = -EFAULT;
1763
1764 error:
1765         OBD_FREE_LARGE(fiemap_s, num_bytes);
1766         RETURN(rc);
1767 }
1768
1769 /*
1770  * Read the data_version for inode.
1771  *
1772  * This value is computed using stripe object version on OST.
1773  * Version is computed using server side locking.
1774  *
1775  * @param extent_lock  Take extent lock. Not needed if a process is already
1776  *                     holding the OST object group locks.
1777  */
1778 static int ll_data_version(struct inode *inode, __u64 *data_version,
1779                            int extent_lock)
1780 {
1781         struct lov_stripe_md *lsm = NULL;
1782         struct ll_sb_info    *sbi = ll_i2sbi(inode);
1783         struct obdo          *obdo = NULL;
1784         int                   rc;
1785         ENTRY;
1786
1787         /* If no stripe, we consider version is 0. */
1788         lsm = ccc_inode_lsm_get(inode);
1789         if (lsm == NULL) {
1790                 *data_version = 0;
1791                 CDEBUG(D_INODE, "No object for inode\n");
1792                 RETURN(0);
1793         }
1794
1795         OBD_ALLOC_PTR(obdo);
1796         if (obdo == NULL) {
1797                 ccc_inode_lsm_put(inode, lsm);
1798                 RETURN(-ENOMEM);
1799         }
1800
1801         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
1802         if (!rc) {
1803                 if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1804                         rc = -EOPNOTSUPP;
1805                 else
1806                         *data_version = obdo->o_data_version;
1807         }
1808
1809         OBD_FREE_PTR(obdo);
1810         ccc_inode_lsm_put(inode, lsm);
1811
1812         RETURN(rc);
1813 }
1814
1815 long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
1816 {
1817         struct inode *inode = file->f_dentry->d_inode;
1818         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1819         int flags;
1820
1821         ENTRY;
1822
1823         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1824                inode->i_generation, inode, cmd);
1825         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
1826
1827         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
1828         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
1829                 RETURN(-ENOTTY);
1830
1831         switch(cmd) {
1832         case LL_IOC_GETFLAGS:
1833                 /* Get the current value of the file flags */
1834                 return put_user(fd->fd_flags, (int *)arg);
1835         case LL_IOC_SETFLAGS:
1836         case LL_IOC_CLRFLAGS:
1837                 /* Set or clear specific file flags */
1838                 /* XXX This probably needs checks to ensure the flags are
1839                  *     not abused, and to handle any flag side effects.
1840                  */
1841                 if (get_user(flags, (int *) arg))
1842                         RETURN(-EFAULT);
1843
1844                 if (cmd == LL_IOC_SETFLAGS) {
1845                         if ((flags & LL_FILE_IGNORE_LOCK) &&
1846                             !(file->f_flags & O_DIRECT)) {
1847                                 CERROR("%s: unable to disable locking on "
1848                                        "non-O_DIRECT file\n", current->comm);
1849                                 RETURN(-EINVAL);
1850                         }
1851
1852                         fd->fd_flags |= flags;
1853                 } else {
1854                         fd->fd_flags &= ~flags;
1855                 }
1856                 RETURN(0);
1857         case LL_IOC_LOV_SETSTRIPE:
1858                 RETURN(ll_lov_setstripe(inode, file, arg));
1859         case LL_IOC_LOV_SETEA:
1860                 RETURN(ll_lov_setea(inode, file, arg));
1861         case LL_IOC_LOV_GETSTRIPE:
1862                 RETURN(ll_lov_getstripe(inode, arg));
1863         case LL_IOC_RECREATE_OBJ:
1864                 RETURN(ll_lov_recreate_obj(inode, arg));
1865         case LL_IOC_RECREATE_FID:
1866                 RETURN(ll_lov_recreate_fid(inode, arg));
1867         case FSFILT_IOC_FIEMAP:
1868                 RETURN(ll_ioctl_fiemap(inode, arg));
1869         case FSFILT_IOC_GETFLAGS:
1870         case FSFILT_IOC_SETFLAGS:
1871                 RETURN(ll_iocontrol(inode, file, cmd, arg));
1872         case FSFILT_IOC_GETVERSION_OLD:
1873         case FSFILT_IOC_GETVERSION:
1874                 RETURN(put_user(inode->i_generation, (int *)arg));
1875         case LL_IOC_GROUP_LOCK:
1876                 RETURN(ll_get_grouplock(inode, file, arg));
1877         case LL_IOC_GROUP_UNLOCK:
1878                 RETURN(ll_put_grouplock(inode, file, arg));
1879         case IOC_OBD_STATFS:
1880                 RETURN(ll_obd_statfs(inode, (void *)arg));
1881
1882         /* We need to special case any other ioctls we want to handle,
1883          * to send them to the MDS/OST as appropriate and to properly
1884          * network encode the arg field.
1885         case FSFILT_IOC_SETVERSION_OLD:
1886         case FSFILT_IOC_SETVERSION:
1887         */
1888         case LL_IOC_FLUSHCTX:
1889                 RETURN(ll_flush_ctx(inode));
1890         case LL_IOC_PATH2FID: {
1891                 if (cfs_copy_to_user((void *)arg, ll_inode2fid(inode),
1892                                      sizeof(struct lu_fid)))
1893                         RETURN(-EFAULT);
1894
1895                 RETURN(0);
1896         }
1897         case OBD_IOC_FID2PATH:
1898                 RETURN(ll_fid2path(ll_i2mdexp(inode), (void *)arg));
1899         case LL_IOC_DATA_VERSION: {
1900                 struct ioc_data_version idv;
1901                 int rc;
1902
1903                 if (cfs_copy_from_user(&idv, (char *)arg, sizeof(idv)))
1904                         RETURN(-EFAULT);
1905
1906                 rc = ll_data_version(inode, &idv.idv_version,
1907                                      !(idv.idv_flags & LL_DV_NOFLUSH));
1908
1909                 if (rc == 0 &&
1910                     cfs_copy_to_user((char *) arg, &idv, sizeof(idv)))
1911                         RETURN(-EFAULT);
1912
1913                 RETURN(rc);
1914         }
1915
1916         case LL_IOC_GET_MDTIDX: {
1917                 int mdtidx;
1918
1919                 mdtidx = ll_get_mdt_idx(inode);
1920                 if (mdtidx < 0)
1921                         RETURN(mdtidx);
1922
1923                 if (put_user((int)mdtidx, (int*)arg))
1924                         RETURN(-EFAULT);
1925
1926                 RETURN(0);
1927         }
1928         case OBD_IOC_GETDTNAME:
1929         case OBD_IOC_GETMDNAME:
1930                 RETURN(ll_get_obd_name(inode, cmd, arg));
1931         default: {
1932                 int err;
1933
1934                 if (LLIOC_STOP ==
1935                     ll_iocontrol_call(inode, file, cmd, arg, &err))
1936                         RETURN(err);
1937
1938                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
1939                                      (void *)arg));
1940         }
1941         }
1942 }
1943
1944 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
1945 {
1946         struct inode *inode = file->f_dentry->d_inode;
1947         loff_t retval;
1948         ENTRY;
1949         retval = offset + ((origin == 2) ? i_size_read(inode) :
1950                            (origin == 1) ? file->f_pos : 0);
1951         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%s)\n",
1952                inode->i_ino, inode->i_generation, inode, retval, retval,
1953                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
1954         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
1955
1956         if (origin == 2) { /* SEEK_END */
1957                 int rc;
1958
1959                 rc = ll_glimpse_size(inode);
1960                 if (rc != 0)
1961                         RETURN(rc);
1962
1963                 offset += i_size_read(inode);
1964         } else if (origin == 1) { /* SEEK_CUR */
1965                 offset += file->f_pos;
1966         }
1967
1968         retval = -EINVAL;
1969         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
1970                 if (offset != file->f_pos) {
1971                         file->f_pos = offset;
1972                 }
1973                 retval = offset;
1974         }
1975
1976         RETURN(retval);
1977 }
1978
1979 int ll_flush(struct file *file, fl_owner_t id)
1980 {
1981         struct inode *inode = file->f_dentry->d_inode;
1982         struct ll_inode_info *lli = ll_i2info(inode);
1983         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1984         int rc, err;
1985
1986         LASSERT(!S_ISDIR(inode->i_mode));
1987
1988         /* catch async errors that were recorded back when async writeback
1989          * failed for pages in this mapping. */
1990         rc = lli->lli_async_rc;
1991         lli->lli_async_rc = 0;
1992         err = lov_read_and_clear_async_rc(lli->lli_clob);
1993         if (rc == 0)
1994                 rc = err;
1995
1996         /* The application has been told write failure already.
1997          * Do not report failure again. */
1998         if (fd->fd_write_failed)
1999                 return 0;
2000         return rc ? -EIO : 0;
2001 }
2002
2003 /**
2004  * Called to make sure a portion of file has been written out.
2005  * if @local_only is not true, it will send OST_SYNC RPCs to ost.
2006  *
2007  * Return how many pages have been written.
2008  */
2009 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2010                        enum cl_fsync_mode mode)
2011 {
2012         struct cl_env_nest nest;
2013         struct lu_env *env;
2014         struct cl_io *io;
2015         struct obd_capa *capa = NULL;
2016         struct cl_fsync_io *fio;
2017         int result;
2018         ENTRY;
2019
2020         if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2021             mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2022                 RETURN(-EINVAL);
2023
2024         env = cl_env_nested_get(&nest);
2025         if (IS_ERR(env))
2026                 RETURN(PTR_ERR(env));
2027
2028         capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2029
2030         io = ccc_env_thread_io(env);
2031         io->ci_obj = cl_i2info(inode)->lli_clob;
2032         io->ci_ignore_layout = 1;
2033
2034         /* initialize parameters for sync */
2035         fio = &io->u.ci_fsync;
2036         fio->fi_capa = capa;
2037         fio->fi_start = start;
2038         fio->fi_end = end;
2039         fio->fi_fid = ll_inode2fid(inode);
2040         fio->fi_mode = mode;
2041         fio->fi_nr_written = 0;
2042
2043         if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2044                 result = cl_io_loop(env, io);
2045         else
2046                 result = io->ci_result;
2047         if (result == 0)
2048                 result = fio->fi_nr_written;
2049         cl_io_fini(env, io);
2050         cl_env_nested_put(&nest, env);
2051
2052         capa_put(capa);
2053
2054         RETURN(result);
2055 }
2056
2057 #ifdef HAVE_FILE_FSYNC_4ARGS
2058 int ll_fsync(struct file *file, loff_t start, loff_t end, int data)
2059 #elif defined(HAVE_FILE_FSYNC_2ARGS)
2060 int ll_fsync(struct file *file, int data)
2061 #else
2062 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2063 #endif
2064 {
2065         struct inode *inode = file->f_dentry->d_inode;
2066         struct ll_inode_info *lli = ll_i2info(inode);
2067         struct ptlrpc_request *req;
2068         struct obd_capa *oc;
2069         struct lov_stripe_md *lsm;
2070         int rc, err;
2071         ENTRY;
2072         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2073                inode->i_generation, inode);
2074         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2075
2076         /* fsync's caller has already called _fdata{sync,write}, we want
2077          * that IO to finish before calling the osc and mdc sync methods */
2078         rc = filemap_fdatawait(inode->i_mapping);
2079
2080         /* catch async errors that were recorded back when async writeback
2081          * failed for pages in this mapping. */
2082         if (!S_ISDIR(inode->i_mode)) {
2083                 err = lli->lli_async_rc;
2084                 lli->lli_async_rc = 0;
2085                 if (rc == 0)
2086                         rc = err;
2087                 err = lov_read_and_clear_async_rc(lli->lli_clob);
2088                 if (rc == 0)
2089                         rc = err;
2090         }
2091
2092         oc = ll_mdscapa_get(inode);
2093         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2094                       &req);
2095         capa_put(oc);
2096         if (!rc)
2097                 rc = err;
2098         if (!err)
2099                 ptlrpc_req_finished(req);
2100
2101         lsm = ccc_inode_lsm_get(inode);
2102         if (data && lsm) {
2103                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2104
2105                 err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
2106                                 CL_FSYNC_ALL);
2107                 if (rc == 0 && err < 0)
2108                         rc = err;
2109                 if (rc < 0)
2110                         fd->fd_write_failed = true;
2111                 else
2112                         fd->fd_write_failed = false;
2113         }
2114         ccc_inode_lsm_put(inode, lsm);
2115
2116         RETURN(rc);
2117 }
2118
2119 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2120 {
2121         struct inode *inode = file->f_dentry->d_inode;
2122         struct ll_sb_info *sbi = ll_i2sbi(inode);
2123         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
2124                                            .ei_cb_cp =ldlm_flock_completion_ast,
2125                                            .ei_cbdata = file_lock };
2126         struct md_op_data *op_data;
2127         struct lustre_handle lockh = {0};
2128         ldlm_policy_data_t flock = {{0}};
2129         int flags = 0;
2130         int rc;
2131         ENTRY;
2132
2133         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2134                inode->i_ino, file_lock);
2135
2136         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2137
2138         if (file_lock->fl_flags & FL_FLOCK) {
2139                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2140                 /* flocks are whole-file locks */
2141                 flock.l_flock.end = OFFSET_MAX;
2142                 /* For flocks owner is determined by the local file desctiptor*/
2143                 flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2144         } else if (file_lock->fl_flags & FL_POSIX) {
2145                 flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2146                 flock.l_flock.start = file_lock->fl_start;
2147                 flock.l_flock.end = file_lock->fl_end;
2148         } else {
2149                 RETURN(-EINVAL);
2150         }
2151         flock.l_flock.pid = file_lock->fl_pid;
2152
2153         /* Somewhat ugly workaround for svc lockd.
2154          * lockd installs custom fl_lmops->fl_compare_owner that checks
2155          * for the fl_owner to be the same (which it always is on local node
2156          * I guess between lockd processes) and then compares pid.
2157          * As such we assign pid to the owner field to make it all work,
2158          * conflict with normal locks is unlikely since pid space and
2159          * pointer space for current->files are not intersecting */
2160         if (file_lock->fl_lmops && file_lock->fl_lmops->fl_compare_owner)
2161                 flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2162
2163         switch (file_lock->fl_type) {
2164         case F_RDLCK:
2165                 einfo.ei_mode = LCK_PR;
2166                 break;
2167         case F_UNLCK:
2168                 /* An unlock request may or may not have any relation to
2169                  * existing locks so we may not be able to pass a lock handle
2170                  * via a normal ldlm_lock_cancel() request. The request may even
2171                  * unlock a byte range in the middle of an existing lock. In
2172                  * order to process an unlock request we need all of the same
2173                  * information that is given with a normal read or write record
2174                  * lock request. To avoid creating another ldlm unlock (cancel)
2175                  * message we'll treat a LCK_NL flock request as an unlock. */
2176                 einfo.ei_mode = LCK_NL;
2177                 break;
2178         case F_WRLCK:
2179                 einfo.ei_mode = LCK_PW;
2180                 break;
2181         default:
2182                 CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2183                         file_lock->fl_type);
2184                 RETURN (-ENOTSUPP);
2185         }
2186
2187         switch (cmd) {
2188         case F_SETLKW:
2189 #ifdef F_SETLKW64
2190         case F_SETLKW64:
2191 #endif
2192                 flags = 0;
2193                 break;
2194         case F_SETLK:
2195 #ifdef F_SETLK64
2196         case F_SETLK64:
2197 #endif
2198                 flags = LDLM_FL_BLOCK_NOWAIT;
2199                 break;
2200         case F_GETLK:
2201 #ifdef F_GETLK64
2202         case F_GETLK64:
2203 #endif
2204                 flags = LDLM_FL_TEST_LOCK;
2205                 /* Save the old mode so that if the mode in the lock changes we
2206                  * can decrement the appropriate reader or writer refcount. */
2207                 file_lock->fl_type = einfo.ei_mode;
2208                 break;
2209         default:
2210                 CERROR("unknown fcntl lock command: %d\n", cmd);
2211                 RETURN (-EINVAL);
2212         }
2213
2214         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2215                                      LUSTRE_OPC_ANY, NULL);
2216         if (IS_ERR(op_data))
2217                 RETURN(PTR_ERR(op_data));
2218
2219         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2220                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2221                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2222
2223         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2224                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2225
2226         ll_finish_md_op_data(op_data);
2227
2228         if ((file_lock->fl_flags & FL_FLOCK) &&
2229             (rc == 0 || file_lock->fl_type == F_UNLCK))
2230                 flock_lock_file_wait(file, file_lock);
2231         if ((file_lock->fl_flags & FL_POSIX) &&
2232             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2233             !(flags & LDLM_FL_TEST_LOCK))
2234                 posix_lock_file_wait(file, file_lock);
2235
2236         RETURN(rc);
2237 }
2238
2239 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2240 {
2241         ENTRY;
2242
2243         RETURN(-ENOSYS);
2244 }
2245
2246 /**
2247  * test if some locks matching bits and l_req_mode are acquired
2248  * - bits can be in different locks
2249  * - if found clear the common lock bits in *bits
2250  * - the bits not found, are kept in *bits
2251  * \param inode [IN]
2252  * \param bits [IN] searched lock bits [IN]
2253  * \param l_req_mode [IN] searched lock mode
2254  * \retval boolean, true iff all bits are found
2255  */
2256 int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2257 {
2258         struct lustre_handle lockh;
2259         ldlm_policy_data_t policy;
2260         ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
2261                                 (LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
2262         struct lu_fid *fid;
2263         int flags;
2264         int i;
2265         ENTRY;
2266
2267         if (!inode)
2268                RETURN(0);
2269
2270         fid = &ll_i2info(inode)->lli_fid;
2271         CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
2272                ldlm_lockname[mode]);
2273
2274         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2275         for (i = 0; i < MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
2276                 policy.l_inodebits.bits = *bits & (1 << i);
2277                 if (policy.l_inodebits.bits == 0)
2278                         continue;
2279
2280                 if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
2281                                   &policy, mode, &lockh)) {
2282                         struct ldlm_lock *lock;
2283
2284                         lock = ldlm_handle2lock(&lockh);
2285                         if (lock) {
2286                                 *bits &=
2287                                       ~(lock->l_policy_data.l_inodebits.bits);
2288                                 LDLM_LOCK_PUT(lock);
2289                         } else {
2290                                 *bits &= ~policy.l_inodebits.bits;
2291                         }
2292                 }
2293         }
2294         RETURN(*bits == 0);
2295 }
2296
2297 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2298                             struct lustre_handle *lockh)
2299 {
2300         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2301         struct lu_fid *fid;
2302         ldlm_mode_t rc;
2303         int flags;
2304         ENTRY;
2305
2306         fid = &ll_i2info(inode)->lli_fid;
2307         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2308
2309         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2310         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2311                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2312         RETURN(rc);
2313 }
2314
2315 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2316         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2317                               * and return success */
2318                 inode->i_nlink = 0;
2319                 /* This path cannot be hit for regular files unless in
2320                  * case of obscure races, so no need to to validate
2321                  * size. */
2322                 if (!S_ISREG(inode->i_mode) &&
2323                     !S_ISDIR(inode->i_mode))
2324                         return 0;
2325         }
2326
2327         if (rc) {
2328                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2329                 return -abs(rc);
2330
2331         }
2332
2333         return 0;
2334 }
2335
2336 int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2337                              __u64 ibits)
2338 {
2339         struct inode *inode = dentry->d_inode;
2340         struct ptlrpc_request *req = NULL;
2341         struct obd_export *exp;
2342         int rc = 0;
2343         ENTRY;
2344
2345         if (!inode) {
2346                 CERROR("REPORT THIS LINE TO PETER\n");
2347                 RETURN(0);
2348         }
2349
2350         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2351                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2352
2353         exp = ll_i2mdexp(inode);
2354
2355         /* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
2356          *      But under CMD case, it caused some lock issues, should be fixed
2357          *      with new CMD ibits lock. See bug 12718 */
2358         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2359                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2360                 struct md_op_data *op_data;
2361
2362                 if (ibits == MDS_INODELOCK_LOOKUP)
2363                         oit.it_op = IT_LOOKUP;
2364
2365                 /* Call getattr by fid, so do not provide name at all. */
2366                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2367                                              dentry->d_inode, NULL, 0, 0,
2368                                              LUSTRE_OPC_ANY, NULL);
2369                 if (IS_ERR(op_data))
2370                         RETURN(PTR_ERR(op_data));
2371
2372                 oit.it_create_mode |= M_CHECK_STALE;
2373                 rc = md_intent_lock(exp, op_data, NULL, 0,
2374                                     /* we are not interested in name
2375                                        based lookup */
2376                                     &oit, 0, &req,
2377                                     ll_md_blocking_ast, 0);
2378                 ll_finish_md_op_data(op_data);
2379                 oit.it_create_mode &= ~M_CHECK_STALE;
2380                 if (rc < 0) {
2381                         rc = ll_inode_revalidate_fini(inode, rc);
2382                         GOTO (out, rc);
2383                 }
2384
2385                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2386                 if (rc != 0) {
2387                         ll_intent_release(&oit);
2388                         GOTO(out, rc);
2389                 }
2390
2391                 /* Unlinked? Unhash dentry, so it is not picked up later by
2392                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2393                    here to preserve get_cwd functionality on 2.6.
2394                    Bug 10503 */
2395                 if (!dentry->d_inode->i_nlink)
2396                         d_lustre_invalidate(dentry);
2397
2398                 ll_lookup_finish_locks(&oit, dentry);
2399         } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
2400                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2401                 obd_valid valid = OBD_MD_FLGETATTR;
2402                 struct md_op_data *op_data;
2403                 int ealen = 0;
2404
2405                 if (S_ISREG(inode->i_mode)) {
2406                         rc = ll_get_max_mdsize(sbi, &ealen);
2407                         if (rc)
2408                                 RETURN(rc);
2409                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2410                 }
2411
2412                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
2413                                              0, ealen, LUSTRE_OPC_ANY,
2414                                              NULL);
2415                 if (IS_ERR(op_data))
2416                         RETURN(PTR_ERR(op_data));
2417
2418                 op_data->op_valid = valid;
2419                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2420                  * capa for this inode. Because we only keep capas of dirs
2421                  * fresh. */
2422                 rc = md_getattr(sbi->ll_md_exp, op_data, &req);
2423                 ll_finish_md_op_data(op_data);
2424                 if (rc) {
2425                         rc = ll_inode_revalidate_fini(inode, rc);
2426                         RETURN(rc);
2427                 }
2428
2429                 rc = ll_prep_inode(&inode, req, NULL);
2430         }
2431 out:
2432         ptlrpc_req_finished(req);
2433         return rc;
2434 }
2435
2436 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2437                            __u64 ibits)
2438 {
2439         struct inode *inode = dentry->d_inode;
2440         int rc;
2441         ENTRY;
2442
2443         rc = __ll_inode_revalidate_it(dentry, it, ibits);
2444
2445         /* if object not yet allocated, don't validate size */
2446         if (rc == 0 && !ll_i2info(dentry->d_inode)->lli_has_smd) {
2447                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
2448                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
2449                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
2450                 RETURN(0);
2451         }
2452
2453         /* ll_glimpse_size will prefer locally cached writes if they extend
2454          * the file */
2455
2456         if (rc == 0)
2457                 rc = ll_glimpse_size(inode);
2458
2459         RETURN(rc);
2460 }
2461
2462 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2463                   struct lookup_intent *it, struct kstat *stat)
2464 {
2465         struct inode *inode = de->d_inode;
2466         struct ll_sb_info *sbi = ll_i2sbi(inode);
2467         struct ll_inode_info *lli = ll_i2info(inode);
2468         int res = 0;
2469
2470         res = ll_inode_revalidate_it(de, it, MDS_INODELOCK_UPDATE |
2471                                              MDS_INODELOCK_LOOKUP);
2472         ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
2473
2474         if (res)
2475                 return res;
2476
2477         stat->dev = inode->i_sb->s_dev;
2478         if (ll_need_32bit_api(sbi))
2479                 stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
2480         else
2481                 stat->ino = inode->i_ino;
2482         stat->mode = inode->i_mode;
2483         stat->nlink = inode->i_nlink;
2484         stat->uid = inode->i_uid;
2485         stat->gid = inode->i_gid;
2486         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2487         stat->atime = inode->i_atime;
2488         stat->mtime = inode->i_mtime;
2489         stat->ctime = inode->i_ctime;
2490         stat->blksize = 1 << inode->i_blkbits;
2491
2492         stat->size = i_size_read(inode);
2493         stat->blocks = inode->i_blocks;
2494
2495         return 0;
2496 }
2497 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2498 {
2499         struct lookup_intent it = { .it_op = IT_GETATTR };
2500
2501         return ll_getattr_it(mnt, de, &it, stat);
2502 }
2503
2504 #ifdef HAVE_LINUX_FIEMAP_H
2505 int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
2506                 __u64 start, __u64 len)
2507 {
2508         int rc;
2509         size_t num_bytes;
2510         struct ll_user_fiemap *fiemap;
2511         unsigned int extent_count = fieinfo->fi_extents_max;
2512
2513         num_bytes = sizeof(*fiemap) + (extent_count *
2514                                        sizeof(struct ll_fiemap_extent));
2515         OBD_ALLOC_LARGE(fiemap, num_bytes);
2516
2517         if (fiemap == NULL)
2518                 RETURN(-ENOMEM);
2519
2520         fiemap->fm_flags = fieinfo->fi_flags;
2521         fiemap->fm_extent_count = fieinfo->fi_extents_max;
2522         fiemap->fm_start = start;
2523         fiemap->fm_length = len;
2524         memcpy(&fiemap->fm_extents[0], fieinfo->fi_extents_start,
2525                sizeof(struct ll_fiemap_extent));
2526
2527         rc = ll_do_fiemap(inode, fiemap, num_bytes);
2528
2529         fieinfo->fi_flags = fiemap->fm_flags;
2530         fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
2531         memcpy(fieinfo->fi_extents_start, &fiemap->fm_extents[0],
2532                fiemap->fm_mapped_extents * sizeof(struct ll_fiemap_extent));
2533
2534         OBD_FREE_LARGE(fiemap, num_bytes);
2535         return rc;
2536 }
2537 #endif
2538
2539 #ifndef HAVE_GENERIC_PERMISSION_2ARGS
2540 static int
2541 # ifdef HAVE_GENERIC_PERMISSION_4ARGS
2542 lustre_check_acl(struct inode *inode, int mask, unsigned int flags)
2543 # else
2544 lustre_check_acl(struct inode *inode, int mask)
2545 # endif
2546 {
2547 # ifdef CONFIG_FS_POSIX_ACL
2548         struct ll_inode_info *lli = ll_i2info(inode);
2549         struct posix_acl *acl;
2550         int rc;
2551         ENTRY;
2552
2553 #  ifdef HAVE_GENERIC_PERMISSION_4ARGS
2554         if (flags & IPERM_FLAG_RCU)
2555                 return -ECHILD;
2556 #  endif
2557         cfs_spin_lock(&lli->lli_lock);
2558         acl = posix_acl_dup(lli->lli_posix_acl);
2559         cfs_spin_unlock(&lli->lli_lock);
2560
2561         if (!acl)
2562                 RETURN(-EAGAIN);
2563
2564         rc = posix_acl_permission(inode, acl, mask);
2565         posix_acl_release(acl);
2566
2567         RETURN(rc);
2568 # else /* !CONFIG_FS_POSIX_ACL */
2569         return -EAGAIN;
2570 # endif /* CONFIG_FS_POSIX_ACL */
2571 }
2572 #endif /* HAVE_GENERIC_PERMISSION_2ARGS */
2573
2574 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
2575 int ll_inode_permission(struct inode *inode, int mask, unsigned int flags)
2576 #else
2577 # ifdef HAVE_INODE_PERMISION_2ARGS
2578 int ll_inode_permission(struct inode *inode, int mask)
2579 # else
2580 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2581 # endif
2582 #endif
2583 {
2584         int rc = 0;
2585         ENTRY;
2586
2587 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
2588         if (flags & IPERM_FLAG_RCU)
2589                 return -ECHILD;
2590 #endif
2591
2592        /* as root inode are NOT getting validated in lookup operation,
2593         * need to do it before permission check. */
2594
2595         if (inode == inode->i_sb->s_root->d_inode) {
2596                 struct lookup_intent it = { .it_op = IT_LOOKUP };
2597
2598                 rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
2599                                               MDS_INODELOCK_LOOKUP);
2600                 if (rc)
2601                         RETURN(rc);
2602         }
2603
2604         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
2605                inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
2606
2607         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2608                 return lustre_check_remote_perm(inode, mask);
2609
2610         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2611         rc = ll_generic_permission(inode, mask, flags, lustre_check_acl);
2612
2613         RETURN(rc);
2614 }
2615
2616 #ifdef HAVE_FILE_READV
2617 #define READ_METHOD readv
2618 #define READ_FUNCTION ll_file_readv
2619 #define WRITE_METHOD writev
2620 #define WRITE_FUNCTION ll_file_writev
2621 #else
2622 #define READ_METHOD aio_read
2623 #define READ_FUNCTION ll_file_aio_read
2624 #define WRITE_METHOD aio_write
2625 #define WRITE_FUNCTION ll_file_aio_write
2626 #endif
2627
2628 /* -o localflock - only provides locally consistent flock locks */
2629 struct file_operations ll_file_operations = {
2630         .read           = ll_file_read,
2631         .READ_METHOD    = READ_FUNCTION,
2632         .write          = ll_file_write,
2633         .WRITE_METHOD   = WRITE_FUNCTION,
2634         .unlocked_ioctl = ll_file_ioctl,
2635         .open           = ll_file_open,
2636         .release        = ll_file_release,
2637         .mmap           = ll_file_mmap,
2638         .llseek         = ll_file_seek,
2639 #ifdef HAVE_KERNEL_SENDFILE
2640         .sendfile       = ll_file_sendfile,
2641 #endif
2642 #ifdef HAVE_KERNEL_SPLICE_READ
2643         .splice_read    = ll_file_splice_read,
2644 #endif
2645         .fsync          = ll_fsync,
2646         .flush          = ll_flush
2647 };
2648
2649 struct file_operations ll_file_operations_flock = {
2650         .read           = ll_file_read,
2651         .READ_METHOD    = READ_FUNCTION,
2652         .write          = ll_file_write,
2653         .WRITE_METHOD   = WRITE_FUNCTION,
2654         .unlocked_ioctl = ll_file_ioctl,
2655         .open           = ll_file_open,
2656         .release        = ll_file_release,
2657         .mmap           = ll_file_mmap,
2658         .llseek         = ll_file_seek,
2659 #ifdef HAVE_KERNEL_SENDFILE
2660         .sendfile       = ll_file_sendfile,
2661 #endif
2662 #ifdef HAVE_KERNEL_SPLICE_READ
2663         .splice_read    = ll_file_splice_read,
2664 #endif
2665         .fsync          = ll_fsync,
2666         .flush          = ll_flush,
2667         .flock          = ll_file_flock,
2668         .lock           = ll_file_flock
2669 };
2670
2671 /* These are for -o noflock - to return ENOSYS on flock calls */
2672 struct file_operations ll_file_operations_noflock = {
2673         .read           = ll_file_read,
2674         .READ_METHOD    = READ_FUNCTION,
2675         .write          = ll_file_write,
2676         .WRITE_METHOD   = WRITE_FUNCTION,
2677         .unlocked_ioctl = ll_file_ioctl,
2678         .open           = ll_file_open,
2679         .release        = ll_file_release,
2680         .mmap           = ll_file_mmap,
2681         .llseek         = ll_file_seek,
2682 #ifdef HAVE_KERNEL_SENDFILE
2683         .sendfile       = ll_file_sendfile,
2684 #endif
2685 #ifdef HAVE_KERNEL_SPLICE_READ
2686         .splice_read    = ll_file_splice_read,
2687 #endif
2688         .fsync          = ll_fsync,
2689         .flush          = ll_flush,
2690         .flock          = ll_file_noflock,
2691         .lock           = ll_file_noflock
2692 };
2693
2694 struct inode_operations ll_file_inode_operations = {
2695         .setattr        = ll_setattr,
2696         .truncate       = ll_truncate,
2697         .getattr        = ll_getattr,
2698         .permission     = ll_inode_permission,
2699         .setxattr       = ll_setxattr,
2700         .getxattr       = ll_getxattr,
2701         .listxattr      = ll_listxattr,
2702         .removexattr    = ll_removexattr,
2703 #ifdef  HAVE_LINUX_FIEMAP_H
2704         .fiemap         = ll_fiemap,
2705 #endif
2706 };
2707
2708 /* dynamic ioctl number support routins */
2709 static struct llioc_ctl_data {
2710         cfs_rw_semaphore_t      ioc_sem;
2711         cfs_list_t              ioc_head;
2712 } llioc = {
2713         __RWSEM_INITIALIZER(llioc.ioc_sem),
2714         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2715 };
2716
2717
2718 struct llioc_data {
2719         cfs_list_t              iocd_list;
2720         unsigned int            iocd_size;
2721         llioc_callback_t        iocd_cb;
2722         unsigned int            iocd_count;
2723         unsigned int            iocd_cmd[0];
2724 };
2725
2726 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2727 {
2728         unsigned int size;
2729         struct llioc_data *in_data = NULL;
2730         ENTRY;
2731
2732         if (cb == NULL || cmd == NULL ||
2733             count > LLIOC_MAX_CMD || count < 0)
2734                 RETURN(NULL);
2735
2736         size = sizeof(*in_data) + count * sizeof(unsigned int);
2737         OBD_ALLOC(in_data, size);
2738         if (in_data == NULL)
2739                 RETURN(NULL);
2740
2741         memset(in_data, 0, sizeof(*in_data));
2742         in_data->iocd_size = size;
2743         in_data->iocd_cb = cb;
2744         in_data->iocd_count = count;
2745         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2746
2747         cfs_down_write(&llioc.ioc_sem);
2748         cfs_list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2749         cfs_up_write(&llioc.ioc_sem);
2750
2751         RETURN(in_data);
2752 }
2753
2754 void ll_iocontrol_unregister(void *magic)
2755 {
2756         struct llioc_data *tmp;
2757
2758         if (magic == NULL)
2759                 return;
2760
2761         cfs_down_write(&llioc.ioc_sem);
2762         cfs_list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2763                 if (tmp == magic) {
2764                         unsigned int size = tmp->iocd_size;
2765
2766                         cfs_list_del(&tmp->iocd_list);
2767                         cfs_up_write(&llioc.ioc_sem);
2768
2769                         OBD_FREE(tmp, size);
2770                         return;
2771                 }
2772         }
2773         cfs_up_write(&llioc.ioc_sem);
2774
2775         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2776 }
2777
2778 EXPORT_SYMBOL(ll_iocontrol_register);
2779 EXPORT_SYMBOL(ll_iocontrol_unregister);
2780
2781 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
2782                         unsigned int cmd, unsigned long arg, int *rcp)
2783 {
2784         enum llioc_iter ret = LLIOC_CONT;
2785         struct llioc_data *data;
2786         int rc = -EINVAL, i;
2787
2788         cfs_down_read(&llioc.ioc_sem);
2789         cfs_list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2790                 for (i = 0; i < data->iocd_count; i++) {
2791                         if (cmd != data->iocd_cmd[i])
2792                                 continue;
2793
2794                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2795                         break;
2796                 }
2797
2798                 if (ret == LLIOC_STOP)
2799                         break;
2800         }
2801         cfs_up_read(&llioc.ioc_sem);
2802
2803         if (rcp)
2804                 *rcp = rc;
2805         return ret;
2806 }
2807
2808 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
2809 {
2810         struct ll_inode_info *lli = ll_i2info(inode);
2811         struct cl_env_nest nest;
2812         struct lu_env *env;
2813         int result;
2814         ENTRY;
2815
2816         if (lli->lli_clob == NULL)
2817                 RETURN(0);
2818
2819         env = cl_env_nested_get(&nest);
2820         if (IS_ERR(env))
2821                 RETURN(PTR_ERR(env));
2822
2823         result = cl_conf_set(env, lli->lli_clob, conf);
2824         cl_env_nested_put(&nest, env);
2825         RETURN(result);
2826 }
2827
2828 /**
2829  * This function checks if there exists a LAYOUT lock on the client side,
2830  * or enqueues it if it doesn't have one in cache.
2831  *
2832  * This function will not hold layout lock so it may be revoked any time after
2833  * this function returns. Any operations depend on layout should be redone
2834  * in that case.
2835  *
2836  * This function should be called before lov_io_init() to get an uptodate
2837  * layout version, the caller should save the version number and after IO
2838  * is finished, this function should be called again to verify that layout
2839  * is not changed during IO time.
2840  */
2841 int ll_layout_refresh(struct inode *inode, __u32 *gen)
2842 {
2843         struct ll_inode_info  *lli = ll_i2info(inode);
2844         struct ll_sb_info     *sbi = ll_i2sbi(inode);
2845         struct md_op_data     *op_data = NULL;
2846         struct ptlrpc_request *req = NULL;
2847         struct lookup_intent   it = { .it_op = IT_LAYOUT };
2848         struct lustre_handle   lockh;
2849         ldlm_mode_t            mode;
2850         struct cl_object_conf  conf = {  .coc_inode = inode,
2851                                          .coc_validate_only = true };
2852         int rc;
2853         ENTRY;
2854
2855         *gen = 0;
2856         if (!(ll_i2sbi(inode)->ll_flags & LL_SBI_LAYOUT_LOCK))
2857                 RETURN(0);
2858
2859         /* sanity checks */
2860         LASSERT(fid_is_sane(ll_inode2fid(inode)));
2861         LASSERT(S_ISREG(inode->i_mode));
2862
2863         /* mostly layout lock is caching on the local side, so try to match
2864          * it before grabbing layout lock mutex. */
2865         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh);
2866         if (mode != 0) { /* hit cached lock */
2867                 struct lov_stripe_md *lsm;
2868
2869                 lsm = ccc_inode_lsm_get(inode);
2870                 if (lsm != NULL)
2871                         *gen = lsm->lsm_layout_gen;
2872                 ccc_inode_lsm_put(inode, lsm);
2873                 ldlm_lock_decref(&lockh, mode);
2874
2875                 RETURN(0);
2876         }
2877
2878         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
2879                                      0, 0, LUSTRE_OPC_ANY, NULL);
2880         if (IS_ERR(op_data))
2881                 RETURN(PTR_ERR(op_data));
2882
2883         /* take layout lock mutex to enqueue layout lock exclusively. */
2884         cfs_mutex_lock(&lli->lli_layout_mutex);
2885
2886         /* make sure the old conf goes away */
2887         ll_layout_conf(inode, &conf);
2888
2889         /* enqueue layout lock */
2890         rc = md_intent_lock(sbi->ll_md_exp, op_data, NULL, 0, &it, 0,
2891                         &req, ll_md_blocking_ast, 0);
2892         if (rc == 0) {
2893                 /* we get a new lock, so update the lock data */
2894                 lockh.cookie = it.d.lustre.it_lock_handle;
2895                 md_set_lock_data(sbi->ll_md_exp, &lockh.cookie, inode, NULL);
2896
2897                 /* req == NULL is when lock was found in client cache, without
2898                  * any request to server (but lsm can be canceled just after a
2899                  * release) */
2900                 if (req != NULL) {
2901                         struct ldlm_lock *lock = ldlm_handle2lock(&lockh);
2902                         struct lustre_md md = { NULL };
2903                         void *lmm;
2904                         int lmmsize;
2905
2906                         /* for IT_LAYOUT lock, lmm is returned in lock's lvb
2907                          * data via completion callback */
2908                         LASSERT(lock != NULL);
2909                         lmm = lock->l_lvb_data;
2910                         lmmsize = lock->l_lvb_len;
2911                         if (lmm != NULL)
2912                                 rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
2913                                                 lmm, lmmsize);
2914                         if (rc == 0) {
2915                                 if (md.lsm != NULL)
2916                                         *gen = md.lsm->lsm_layout_gen;
2917
2918                                 memset(&conf, 0, sizeof conf);
2919                                 conf.coc_inode = inode;
2920                                 conf.u.coc_md = &md;
2921                                 ll_layout_conf(inode, &conf);
2922                                 /* is this racy? */
2923                                 lli->lli_has_smd = md.lsm != NULL;
2924                         }
2925                         if (md.lsm != NULL)
2926                                 obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
2927
2928                         LDLM_LOCK_PUT(lock);
2929                         ptlrpc_req_finished(req);
2930                 } else { /* hit caching lock */
2931                         struct lov_stripe_md *lsm;
2932
2933                         lsm = ccc_inode_lsm_get(inode);
2934                         if (lsm != NULL)
2935                                 *gen = lsm->lsm_layout_gen;
2936                         ccc_inode_lsm_put(inode, lsm);
2937                 }
2938                 ll_intent_drop_lock(&it);
2939         }
2940         cfs_mutex_unlock(&lli->lli_layout_mutex);
2941         ll_finish_md_op_data(op_data);
2942
2943         RETURN(rc);
2944 }