Whamcloud - gitweb
LU-709 build: cleanup LC_FLUSH_OWNER_ID/INVALIDATE_MAPPING
[fs/lustre-release.git] / lustre / llite / file.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <linux/pagemap.h>
47 #include <linux/file.h>
48 #include "llite_internal.h"
49 #include <lustre/ll_fiemap.h>
50
51 #include "cl_object.h"
52
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, CFS_ALLOC_IO);
58         return fd;
59 }
60
61 static void ll_file_data_put(struct ll_file_data *fd)
62 {
63         if (fd != NULL)
64                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
65 }
66
67 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
68                           struct lustre_handle *fh)
69 {
70         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
71         op_data->op_attr.ia_mode = inode->i_mode;
72         op_data->op_attr.ia_atime = inode->i_atime;
73         op_data->op_attr.ia_mtime = inode->i_mtime;
74         op_data->op_attr.ia_ctime = inode->i_ctime;
75         op_data->op_attr.ia_size = i_size_read(inode);
76         op_data->op_attr_blocks = inode->i_blocks;
77         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
78                                         ll_inode_to_ext_flags(inode->i_flags);
79         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
80         if (fh)
81                 op_data->op_handle = *fh;
82         op_data->op_capa1 = ll_mdscapa_get(inode);
83 }
84
85 /**
86  * Closes the IO epoch and packs all the attributes into @op_data for
87  * the CLOSE rpc.
88  */
89 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
90                              struct obd_client_handle *och)
91 {
92         ENTRY;
93
94         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
95                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
96
97         if (!(och->och_flags & FMODE_WRITE))
98                 goto out;
99
100         if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
101                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
102         else
103                 ll_ioepoch_close(inode, op_data, &och, 0);
104
105 out:
106         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
107         ll_prep_md_op_data(op_data, inode, NULL, NULL,
108                            0, 0, LUSTRE_OPC_ANY, NULL);
109         EXIT;
110 }
111
112 static int ll_close_inode_openhandle(struct obd_export *md_exp,
113                                      struct inode *inode,
114                                      struct obd_client_handle *och)
115 {
116         struct obd_export *exp = ll_i2mdexp(inode);
117         struct md_op_data *op_data;
118         struct ptlrpc_request *req = NULL;
119         struct obd_device *obd = class_exp2obd(exp);
120         int epoch_close = 1;
121         int rc;
122         ENTRY;
123
124         if (obd == NULL) {
125                 /*
126                  * XXX: in case of LMV, is this correct to access
127                  * ->exp_handle?
128                  */
129                 CERROR("Invalid MDC connection handle "LPX64"\n",
130                        ll_i2mdexp(inode)->exp_handle.h_cookie);
131                 GOTO(out, rc = 0);
132         }
133
134         OBD_ALLOC_PTR(op_data);
135         if (op_data == NULL)
136                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
137
138         ll_prepare_close(inode, op_data, och);
139         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
140         rc = md_close(md_exp, op_data, och->och_mod, &req);
141         if (rc == -EAGAIN) {
142                 /* This close must have the epoch closed. */
143                 LASSERT(epoch_close);
144                 /* MDS has instructed us to obtain Size-on-MDS attribute from
145                  * OSTs and send setattr to back to MDS. */
146                 rc = ll_som_update(inode, op_data);
147                 if (rc) {
148                         CERROR("inode %lu mdc Size-on-MDS update failed: "
149                                "rc = %d\n", inode->i_ino, rc);
150                         rc = 0;
151                 }
152         } else if (rc) {
153                 CERROR("inode %lu mdc close failed: rc = %d\n",
154                        inode->i_ino, rc);
155         }
156         ll_finish_md_op_data(op_data);
157
158         if (rc == 0) {
159                 rc = ll_objects_destroy(req, inode);
160                 if (rc)
161                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
162                                inode->i_ino, rc);
163         }
164
165         EXIT;
166 out:
167
168         if (exp_connect_som(exp) && !epoch_close &&
169             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
170                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
171         } else {
172                 md_clear_open_replay_data(md_exp, och);
173                 /* Free @och if it is not waiting for DONE_WRITING. */
174                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
175                 OBD_FREE_PTR(och);
176         }
177         if (req) /* This is close request */
178                 ptlrpc_req_finished(req);
179         return rc;
180 }
181
182 int ll_md_real_close(struct inode *inode, int flags)
183 {
184         struct ll_inode_info *lli = ll_i2info(inode);
185         struct obd_client_handle **och_p;
186         struct obd_client_handle *och;
187         __u64 *och_usecount;
188         int rc = 0;
189         ENTRY;
190
191         if (flags & FMODE_WRITE) {
192                 och_p = &lli->lli_mds_write_och;
193                 och_usecount = &lli->lli_open_fd_write_count;
194         } else if (flags & FMODE_EXEC) {
195                 och_p = &lli->lli_mds_exec_och;
196                 och_usecount = &lli->lli_open_fd_exec_count;
197         } else {
198                 LASSERT(flags & FMODE_READ);
199                 och_p = &lli->lli_mds_read_och;
200                 och_usecount = &lli->lli_open_fd_read_count;
201         }
202
203         cfs_mutex_lock(&lli->lli_och_mutex);
204         if (*och_usecount) { /* There are still users of this handle, so
205                                 skip freeing it. */
206                 cfs_mutex_unlock(&lli->lli_och_mutex);
207                 RETURN(0);
208         }
209         och=*och_p;
210         *och_p = NULL;
211         cfs_mutex_unlock(&lli->lli_och_mutex);
212
213         if (och) { /* There might be a race and somebody have freed this och
214                       already */
215                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
216                                                inode, och);
217         }
218
219         RETURN(rc);
220 }
221
222 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
223                 struct file *file)
224 {
225         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
226         struct ll_inode_info *lli = ll_i2info(inode);
227         int rc = 0;
228         ENTRY;
229
230         /* clear group lock, if present */
231         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
232                 ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
233
234         /* Let's see if we have good enough OPEN lock on the file and if
235            we can skip talking to MDS */
236         if (file->f_dentry->d_inode) { /* Can this ever be false? */
237                 int lockmode;
238                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
239                 struct lustre_handle lockh;
240                 struct inode *inode = file->f_dentry->d_inode;
241                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
242
243                 cfs_mutex_lock(&lli->lli_och_mutex);
244                 if (fd->fd_omode & FMODE_WRITE) {
245                         lockmode = LCK_CW;
246                         LASSERT(lli->lli_open_fd_write_count);
247                         lli->lli_open_fd_write_count--;
248                 } else if (fd->fd_omode & FMODE_EXEC) {
249                         lockmode = LCK_PR;
250                         LASSERT(lli->lli_open_fd_exec_count);
251                         lli->lli_open_fd_exec_count--;
252                 } else {
253                         lockmode = LCK_CR;
254                         LASSERT(lli->lli_open_fd_read_count);
255                         lli->lli_open_fd_read_count--;
256                 }
257                 cfs_mutex_unlock(&lli->lli_och_mutex);
258
259                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
260                                    LDLM_IBITS, &policy, lockmode,
261                                    &lockh)) {
262                         rc = ll_md_real_close(file->f_dentry->d_inode,
263                                               fd->fd_omode);
264                 }
265         } else {
266                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
267                        file, file->f_dentry, file->f_dentry->d_name.name);
268         }
269
270         LUSTRE_FPRIVATE(file) = NULL;
271         ll_file_data_put(fd);
272         ll_capa_close(inode);
273
274         RETURN(rc);
275 }
276
277 /* While this returns an error code, fput() the caller does not, so we need
278  * to make every effort to clean up all of our state here.  Also, applications
279  * rarely check close errors and even if an error is returned they will not
280  * re-try the close call.
281  */
282 int ll_file_release(struct inode *inode, struct file *file)
283 {
284         struct ll_file_data *fd;
285         struct ll_sb_info *sbi = ll_i2sbi(inode);
286         struct ll_inode_info *lli = ll_i2info(inode);
287         int rc;
288         ENTRY;
289
290         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
291                inode->i_generation, inode);
292
293 #ifdef CONFIG_FS_POSIX_ACL
294         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
295             inode == inode->i_sb->s_root->d_inode) {
296                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
297
298                 LASSERT(fd != NULL);
299                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
300                         fd->fd_flags &= ~LL_FILE_RMTACL;
301                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
302                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
303                 }
304         }
305 #endif
306
307         if (inode->i_sb->s_root != file->f_dentry)
308                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
309         fd = LUSTRE_FPRIVATE(file);
310         LASSERT(fd != NULL);
311
312         /* The last ref on @file, maybe not the the owner pid of statahead.
313          * Different processes can open the same dir, "ll_opendir_key" means:
314          * it is me that should stop the statahead thread. */
315         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
316             lli->lli_opendir_pid != 0)
317                 ll_stop_statahead(inode, lli->lli_opendir_key);
318
319         if (inode->i_sb->s_root == file->f_dentry) {
320                 LUSTRE_FPRIVATE(file) = NULL;
321                 ll_file_data_put(fd);
322                 RETURN(0);
323         }
324
325         if (!S_ISDIR(inode->i_mode)) {
326                 lov_read_and_clear_async_rc(lli->lli_clob);
327                 lli->lli_async_rc = 0;
328         }
329
330         rc = ll_md_close(sbi->ll_md_exp, inode, file);
331
332         if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
333                 libcfs_debug_dumplog();
334
335         RETURN(rc);
336 }
337
338 static int ll_intent_file_open(struct file *file, void *lmm,
339                                int lmmsize, struct lookup_intent *itp)
340 {
341         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
342         struct dentry *parent = file->f_dentry->d_parent;
343         const char *name = file->f_dentry->d_name.name;
344         const int len = file->f_dentry->d_name.len;
345         struct md_op_data *op_data;
346         struct ptlrpc_request *req;
347         __u32 opc = LUSTRE_OPC_ANY;
348         int rc;
349         ENTRY;
350
351         if (!parent)
352                 RETURN(-ENOENT);
353
354         /* Usually we come here only for NFSD, and we want open lock.
355            But we can also get here with pre 2.6.15 patchless kernels, and in
356            that case that lock is also ok */
357         /* We can also get here if there was cached open handle in revalidate_it
358          * but it disappeared while we were getting from there to ll_file_open.
359          * But this means this file was closed and immediatelly opened which
360          * makes a good candidate for using OPEN lock */
361         /* If lmmsize & lmm are not 0, we are just setting stripe info
362          * parameters. No need for the open lock */
363         if (lmm == NULL && lmmsize == 0) {
364                 itp->it_flags |= MDS_OPEN_LOCK;
365                 if (itp->it_flags & FMODE_WRITE)
366                         opc = LUSTRE_OPC_CREATE;
367         }
368
369         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
370                                       file->f_dentry->d_inode, name, len,
371                                       O_RDWR, opc, NULL);
372         if (IS_ERR(op_data))
373                 RETURN(PTR_ERR(op_data));
374
375         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
376                             0 /*unused */, &req, ll_md_blocking_ast, 0);
377         ll_finish_md_op_data(op_data);
378         if (rc == -ESTALE) {
379                 /* reason for keep own exit path - don`t flood log
380                 * with messages with -ESTALE errors.
381                 */
382                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
383                      it_open_error(DISP_OPEN_OPEN, itp))
384                         GOTO(out, rc);
385                 ll_release_openhandle(file->f_dentry, itp);
386                 GOTO(out, rc);
387         }
388
389         if (it_disposition(itp, DISP_LOOKUP_NEG))
390                 GOTO(out, rc = -ENOENT);
391
392         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
393                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
394                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
395                 GOTO(out, rc);
396         }
397
398         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
399         if (!rc && itp->d.lustre.it_lock_mode)
400                 ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
401                                  itp, NULL);
402
403 out:
404         ptlrpc_req_finished(itp->d.lustre.it_data);
405         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
406         ll_intent_drop_lock(itp);
407
408         RETURN(rc);
409 }
410
411 /**
412  * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
413  * not believe attributes if a few ioepoch holders exist. Attributes for
414  * previous ioepoch if new one is opened are also skipped by MDS.
415  */
416 void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
417 {
418         if (ioepoch && lli->lli_ioepoch != ioepoch) {
419                 lli->lli_ioepoch = ioepoch;
420                 CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
421                        ioepoch, PFID(&lli->lli_fid));
422         }
423 }
424
425 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
426                        struct lookup_intent *it, struct obd_client_handle *och)
427 {
428         struct ptlrpc_request *req = it->d.lustre.it_data;
429         struct mdt_body *body;
430
431         LASSERT(och);
432
433         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
434         LASSERT(body != NULL);                      /* reply already checked out */
435
436         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
437         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
438         och->och_fid = lli->lli_fid;
439         och->och_flags = it->it_flags;
440         ll_ioepoch_open(lli, body->ioepoch);
441
442         return md_set_open_replay_data(md_exp, och, req);
443 }
444
445 int ll_local_open(struct file *file, struct lookup_intent *it,
446                   struct ll_file_data *fd, struct obd_client_handle *och)
447 {
448         struct inode *inode = file->f_dentry->d_inode;
449         struct ll_inode_info *lli = ll_i2info(inode);
450         ENTRY;
451
452         LASSERT(!LUSTRE_FPRIVATE(file));
453
454         LASSERT(fd != NULL);
455
456         if (och) {
457                 struct ptlrpc_request *req = it->d.lustre.it_data;
458                 struct mdt_body *body;
459                 int rc;
460
461                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
462                 if (rc)
463                         RETURN(rc);
464
465                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
466                 if ((it->it_flags & FMODE_WRITE) &&
467                     (body->valid & OBD_MD_FLSIZE))
468                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
469                                lli->lli_ioepoch, PFID(&lli->lli_fid));
470         }
471
472         LUSTRE_FPRIVATE(file) = fd;
473         ll_readahead_init(inode, &fd->fd_ras);
474         fd->fd_omode = it->it_flags;
475         RETURN(0);
476 }
477
478 /* Open a file, and (for the very first open) create objects on the OSTs at
479  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
480  * creation or open until ll_lov_setstripe() ioctl is called.
481  *
482  * If we already have the stripe MD locally then we don't request it in
483  * md_open(), by passing a lmm_size = 0.
484  *
485  * It is up to the application to ensure no other processes open this file
486  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
487  * used.  We might be able to avoid races of that sort by getting lli_open_sem
488  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
489  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
490  */
491 int ll_file_open(struct inode *inode, struct file *file)
492 {
493         struct ll_inode_info *lli = ll_i2info(inode);
494         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
495                                           .it_flags = file->f_flags };
496         struct obd_client_handle **och_p = NULL;
497         __u64 *och_usecount = NULL;
498         struct ll_file_data *fd;
499         int rc = 0, opendir_set = 0;
500         ENTRY;
501
502         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
503                inode->i_generation, inode, file->f_flags);
504
505         it = file->private_data; /* XXX: compat macro */
506         file->private_data = NULL; /* prevent ll_local_open assertion */
507
508         fd = ll_file_data_get();
509         if (fd == NULL)
510                 GOTO(out_och_free, rc = -ENOMEM);
511
512         fd->fd_file = file;
513         if (S_ISDIR(inode->i_mode)) {
514                 cfs_spin_lock(&lli->lli_sa_lock);
515                 if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
516                     lli->lli_opendir_pid == 0) {
517                         lli->lli_opendir_key = fd;
518                         lli->lli_opendir_pid = cfs_curproc_pid();
519                         opendir_set = 1;
520                 }
521                 cfs_spin_unlock(&lli->lli_sa_lock);
522         }
523
524         if (inode->i_sb->s_root == file->f_dentry) {
525                 LUSTRE_FPRIVATE(file) = fd;
526                 RETURN(0);
527         }
528
529         if (!it || !it->d.lustre.it_disposition) {
530                 /* Convert f_flags into access mode. We cannot use file->f_mode,
531                  * because everything but O_ACCMODE mask was stripped from
532                  * there */
533                 if ((oit.it_flags + 1) & O_ACCMODE)
534                         oit.it_flags++;
535                 if (file->f_flags & O_TRUNC)
536                         oit.it_flags |= FMODE_WRITE;
537
538                 /* kernel only call f_op->open in dentry_open.  filp_open calls
539                  * dentry_open after call to open_namei that checks permissions.
540                  * Only nfsd_open call dentry_open directly without checking
541                  * permissions and because of that this code below is safe. */
542                 if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
543                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
544
545                 /* We do not want O_EXCL here, presumably we opened the file
546                  * already? XXX - NFS implications? */
547                 oit.it_flags &= ~O_EXCL;
548
549                 /* bug20584, if "it_flags" contains O_CREAT, the file will be
550                  * created if necessary, then "IT_CREAT" should be set to keep
551                  * consistent with it */
552                 if (oit.it_flags & O_CREAT)
553                         oit.it_op |= IT_CREAT;
554
555                 it = &oit;
556         }
557
558 restart:
559         /* Let's see if we have file open on MDS already. */
560         if (it->it_flags & FMODE_WRITE) {
561                 och_p = &lli->lli_mds_write_och;
562                 och_usecount = &lli->lli_open_fd_write_count;
563         } else if (it->it_flags & FMODE_EXEC) {
564                 och_p = &lli->lli_mds_exec_och;
565                 och_usecount = &lli->lli_open_fd_exec_count;
566          } else {
567                 och_p = &lli->lli_mds_read_och;
568                 och_usecount = &lli->lli_open_fd_read_count;
569         }
570
571         cfs_mutex_lock(&lli->lli_och_mutex);
572         if (*och_p) { /* Open handle is present */
573                 if (it_disposition(it, DISP_OPEN_OPEN)) {
574                         /* Well, there's extra open request that we do not need,
575                            let's close it somehow. This will decref request. */
576                         rc = it_open_error(DISP_OPEN_OPEN, it);
577                         if (rc) {
578                                 cfs_mutex_unlock(&lli->lli_och_mutex);
579                                 GOTO(out_openerr, rc);
580                         }
581
582                         ll_release_openhandle(file->f_dentry, it);
583                 }
584                 (*och_usecount)++;
585
586                 rc = ll_local_open(file, it, fd, NULL);
587                 if (rc) {
588                         (*och_usecount)--;
589                         cfs_mutex_unlock(&lli->lli_och_mutex);
590                         GOTO(out_openerr, rc);
591                 }
592         } else {
593                 LASSERT(*och_usecount == 0);
594                 if (!it->d.lustre.it_disposition) {
595                         /* We cannot just request lock handle now, new ELC code
596                            means that one of other OPEN locks for this file
597                            could be cancelled, and since blocking ast handler
598                            would attempt to grab och_mutex as well, that would
599                            result in a deadlock */
600                         cfs_mutex_unlock(&lli->lli_och_mutex);
601                         it->it_create_mode |= M_CHECK_STALE;
602                         rc = ll_intent_file_open(file, NULL, 0, it);
603                         it->it_create_mode &= ~M_CHECK_STALE;
604                         if (rc)
605                                 GOTO(out_openerr, rc);
606
607                         goto restart;
608                 }
609                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
610                 if (!*och_p)
611                         GOTO(out_och_free, rc = -ENOMEM);
612
613                 (*och_usecount)++;
614
615                 /* md_intent_lock() didn't get a request ref if there was an
616                  * open error, so don't do cleanup on the request here
617                  * (bug 3430) */
618                 /* XXX (green): Should not we bail out on any error here, not
619                  * just open error? */
620                 rc = it_open_error(DISP_OPEN_OPEN, it);
621                 if (rc)
622                         GOTO(out_och_free, rc);
623
624                 LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
625
626                 rc = ll_local_open(file, it, fd, *och_p);
627                 if (rc)
628                         GOTO(out_och_free, rc);
629         }
630         cfs_mutex_unlock(&lli->lli_och_mutex);
631         fd = NULL;
632
633         /* Must do this outside lli_och_mutex lock to prevent deadlock where
634            different kind of OPEN lock for this same inode gets cancelled
635            by ldlm_cancel_lru */
636         if (!S_ISREG(inode->i_mode))
637                 GOTO(out_och_free, rc);
638
639         ll_capa_open(inode);
640
641         if (!lli->lli_has_smd) {
642                 if (file->f_flags & O_LOV_DELAY_CREATE ||
643                     !(file->f_mode & FMODE_WRITE)) {
644                         CDEBUG(D_INODE, "object creation was delayed\n");
645                         GOTO(out_och_free, rc);
646                 }
647         }
648         file->f_flags &= ~O_LOV_DELAY_CREATE;
649         GOTO(out_och_free, rc);
650
651 out_och_free:
652         if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
653                 ptlrpc_req_finished(it->d.lustre.it_data);
654                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
655         }
656
657         if (rc) {
658                 if (och_p && *och_p) {
659                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
660                         *och_p = NULL; /* OBD_FREE writes some magic there */
661                         (*och_usecount)--;
662                 }
663                 cfs_mutex_unlock(&lli->lli_och_mutex);
664
665 out_openerr:
666                 if (opendir_set != 0)
667                         ll_stop_statahead(inode, lli->lli_opendir_key);
668                 if (fd != NULL)
669                         ll_file_data_put(fd);
670         } else {
671                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
672         }
673
674         return rc;
675 }
676
677 /* Fills the obdo with the attributes for the lsm */
678 static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
679                           struct obd_capa *capa, struct obdo *obdo,
680                           __u64 ioepoch, int sync)
681 {
682         struct ptlrpc_request_set *set;
683         struct obd_info            oinfo = { { { 0 } } };
684         int                        rc;
685
686         ENTRY;
687
688         LASSERT(lsm != NULL);
689
690         oinfo.oi_md = lsm;
691         oinfo.oi_oa = obdo;
692         oinfo.oi_oa->o_id = lsm->lsm_object_id;
693         oinfo.oi_oa->o_seq = lsm->lsm_object_seq;
694         oinfo.oi_oa->o_mode = S_IFREG;
695         oinfo.oi_oa->o_ioepoch = ioepoch;
696         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
697                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
698                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
699                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
700                                OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
701                                OBD_MD_FLDATAVERSION;
702         oinfo.oi_capa = capa;
703         if (sync) {
704                 oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
705                 oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
706         }
707
708         set = ptlrpc_prep_set();
709         if (set == NULL) {
710                 CERROR("can't allocate ptlrpc set\n");
711                 rc = -ENOMEM;
712         } else {
713                 rc = obd_getattr_async(exp, &oinfo, set);
714                 if (rc == 0)
715                         rc = ptlrpc_set_wait(set);
716                 ptlrpc_set_destroy(set);
717         }
718         if (rc == 0)
719                 oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
720                                          OBD_MD_FLATIME | OBD_MD_FLMTIME |
721                                          OBD_MD_FLCTIME | OBD_MD_FLSIZE |
722                                          OBD_MD_FLDATAVERSION);
723         RETURN(rc);
724 }
725
726 /**
727   * Performs the getattr on the inode and updates its fields.
728   * If @sync != 0, perform the getattr under the server-side lock.
729   */
730 int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
731                      __u64 ioepoch, int sync)
732 {
733         struct obd_capa      *capa = ll_mdscapa_get(inode);
734         struct lov_stripe_md *lsm;
735         int rc;
736         ENTRY;
737
738         lsm = ccc_inode_lsm_get(inode);
739         rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
740                             capa, obdo, ioepoch, sync);
741         capa_put(capa);
742         if (rc == 0) {
743                 obdo_refresh_inode(inode, obdo, obdo->o_valid);
744                 CDEBUG(D_INODE,
745                        "objid "LPX64" size %llu, blocks %llu, blksize %lu\n",
746                        lsm ? lsm->lsm_object_id : 0, i_size_read(inode),
747                        (unsigned long long)inode->i_blocks,
748                        (unsigned long)ll_inode_blksize(inode));
749         }
750         ccc_inode_lsm_put(inode, lsm);
751         RETURN(rc);
752 }
753
754 int ll_merge_lvb(struct inode *inode)
755 {
756         struct ll_inode_info *lli = ll_i2info(inode);
757         struct ll_sb_info *sbi = ll_i2sbi(inode);
758         struct lov_stripe_md *lsm;
759         struct ost_lvb lvb;
760         int rc = 0;
761
762         ENTRY;
763
764         lsm = ccc_inode_lsm_get(inode);
765         ll_inode_size_lock(inode);
766         inode_init_lvb(inode, &lvb);
767
768         /* merge timestamps the most resently obtained from mds with
769            timestamps obtained from osts */
770         lvb.lvb_atime = lli->lli_lvb.lvb_atime;
771         lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
772         lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
773         if (lsm != NULL) {
774                 rc = obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
775                 cl_isize_write_nolock(inode, lvb.lvb_size);
776
777                 CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
778                                 PFID(&lli->lli_fid), lvb.lvb_size);
779                 inode->i_blocks = lvb.lvb_blocks;
780
781                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
782                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
783                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
784         }
785         ll_inode_size_unlock(inode);
786         ccc_inode_lsm_put(inode, lsm);
787
788         RETURN(rc);
789 }
790
791 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
792                      lstat_t *st)
793 {
794         struct obdo obdo = { 0 };
795         int rc;
796
797         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
798         if (rc == 0) {
799                 st->st_size   = obdo.o_size;
800                 st->st_blocks = obdo.o_blocks;
801                 st->st_mtime  = obdo.o_mtime;
802                 st->st_atime  = obdo.o_atime;
803                 st->st_ctime  = obdo.o_ctime;
804         }
805         return rc;
806 }
807
808 void ll_io_init(struct cl_io *io, const struct file *file, int write)
809 {
810         struct inode *inode = file->f_dentry->d_inode;
811
812         io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
813         if (write) {
814                 io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
815                 io->u.ci_wr.wr_sync = file->f_flags & O_SYNC || IS_SYNC(inode);
816         }
817         io->ci_obj     = ll_i2info(inode)->lli_clob;
818         io->ci_lockreq = CILR_MAYBE;
819         if (ll_file_nolock(file)) {
820                 io->ci_lockreq = CILR_NEVER;
821                 io->ci_no_srvlock = 1;
822         } else if (file->f_flags & O_APPEND) {
823                 io->ci_lockreq = CILR_MANDATORY;
824         }
825 }
826
827 static ssize_t ll_file_io_generic(const struct lu_env *env,
828                 struct vvp_io_args *args, struct file *file,
829                 enum cl_io_type iot, loff_t *ppos, size_t count)
830 {
831         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
832         struct cl_io         *io;
833         ssize_t               result;
834         ENTRY;
835
836         io = ccc_env_thread_io(env);
837         ll_io_init(io, file, iot == CIT_WRITE);
838
839         if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
840                 struct vvp_io *vio = vvp_env_io(env);
841                 struct ccc_io *cio = ccc_env_io(env);
842                 int write_mutex_locked = 0;
843
844                 cio->cui_fd  = LUSTRE_FPRIVATE(file);
845                 vio->cui_io_subtype = args->via_io_subtype;
846
847                 switch (vio->cui_io_subtype) {
848                 case IO_NORMAL:
849                         cio->cui_iov = args->u.normal.via_iov;
850                         cio->cui_nrsegs = args->u.normal.via_nrsegs;
851                         cio->cui_tot_nrsegs = cio->cui_nrsegs;
852 #ifndef HAVE_FILE_WRITEV
853                         cio->cui_iocb = args->u.normal.via_iocb;
854 #endif
855                         if ((iot == CIT_WRITE) &&
856                             !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
857                                 if (cfs_mutex_lock_interruptible(&lli->
858                                                                lli_write_mutex))
859                                         GOTO(out, result = -ERESTARTSYS);
860                                 write_mutex_locked = 1;
861                         } else if (iot == CIT_READ) {
862                                 cfs_down_read(&lli->lli_trunc_sem);
863                         }
864                         break;
865                 case IO_SENDFILE:
866                         vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
867                         vio->u.sendfile.cui_target = args->u.sendfile.via_target;
868                         break;
869                 case IO_SPLICE:
870                         vio->u.splice.cui_pipe = args->u.splice.via_pipe;
871                         vio->u.splice.cui_flags = args->u.splice.via_flags;
872                         break;
873                 default:
874                         CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
875                         LBUG();
876                 }
877                 result = cl_io_loop(env, io);
878                 if (write_mutex_locked)
879                         cfs_mutex_unlock(&lli->lli_write_mutex);
880                 else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
881                         cfs_up_read(&lli->lli_trunc_sem);
882         } else {
883                 /* cl_io_rw_init() handled IO */
884                 result = io->ci_result;
885         }
886
887         if (io->ci_nob > 0) {
888                 result = io->ci_nob;
889                 *ppos = io->u.ci_wr.wr.crw_pos;
890         }
891         GOTO(out, result);
892 out:
893         cl_io_fini(env, io);
894
895         if (iot == CIT_READ) {
896                 if (result >= 0)
897                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
898                                            LPROC_LL_READ_BYTES, result);
899         } else if (iot == CIT_WRITE) {
900                 if (result >= 0) {
901                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
902                                            LPROC_LL_WRITE_BYTES, result);
903                         lli->lli_write_rc = 0;
904                 } else {
905                         lli->lli_write_rc = result;
906                 }
907         }
908
909         return result;
910 }
911
912
913 /*
914  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
915  */
916 static int ll_file_get_iov_count(const struct iovec *iov,
917                                  unsigned long *nr_segs, size_t *count)
918 {
919         size_t cnt = 0;
920         unsigned long seg;
921
922         for (seg = 0; seg < *nr_segs; seg++) {
923                 const struct iovec *iv = &iov[seg];
924
925                 /*
926                  * If any segment has a negative length, or the cumulative
927                  * length ever wraps negative then return -EINVAL.
928                  */
929                 cnt += iv->iov_len;
930                 if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
931                         return -EINVAL;
932                 if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
933                         continue;
934                 if (seg == 0)
935                         return -EFAULT;
936                 *nr_segs = seg;
937                 cnt -= iv->iov_len;   /* This segment is no good */
938                 break;
939         }
940         *count = cnt;
941         return 0;
942 }
943
944 #ifdef HAVE_FILE_READV
945 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
946                               unsigned long nr_segs, loff_t *ppos)
947 {
948         struct lu_env      *env;
949         struct vvp_io_args *args;
950         size_t              count;
951         ssize_t             result;
952         int                 refcheck;
953         ENTRY;
954
955         result = ll_file_get_iov_count(iov, &nr_segs, &count);
956         if (result)
957                 RETURN(result);
958
959         env = cl_env_get(&refcheck);
960         if (IS_ERR(env))
961                 RETURN(PTR_ERR(env));
962
963         args = vvp_env_args(env, IO_NORMAL);
964         args->u.normal.via_iov = (struct iovec *)iov;
965         args->u.normal.via_nrsegs = nr_segs;
966
967         result = ll_file_io_generic(env, args, file, CIT_READ, ppos, count);
968         cl_env_put(env, &refcheck);
969         RETURN(result);
970 }
971
972 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
973                             loff_t *ppos)
974 {
975         struct lu_env *env;
976         struct iovec  *local_iov;
977         ssize_t        result;
978         int            refcheck;
979         ENTRY;
980
981         env = cl_env_get(&refcheck);
982         if (IS_ERR(env))
983                 RETURN(PTR_ERR(env));
984
985         local_iov = &vvp_env_info(env)->vti_local_iov;
986         local_iov->iov_base = (void __user *)buf;
987         local_iov->iov_len = count;
988         result = ll_file_readv(file, local_iov, 1, ppos);
989         cl_env_put(env, &refcheck);
990         RETURN(result);
991 }
992
993 #else
994 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
995                                 unsigned long nr_segs, loff_t pos)
996 {
997         struct lu_env      *env;
998         struct vvp_io_args *args;
999         size_t              count;
1000         ssize_t             result;
1001         int                 refcheck;
1002         ENTRY;
1003
1004         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1005         if (result)
1006                 RETURN(result);
1007
1008         env = cl_env_get(&refcheck);
1009         if (IS_ERR(env))
1010                 RETURN(PTR_ERR(env));
1011
1012         args = vvp_env_args(env, IO_NORMAL);
1013         args->u.normal.via_iov = (struct iovec *)iov;
1014         args->u.normal.via_nrsegs = nr_segs;
1015         args->u.normal.via_iocb = iocb;
1016
1017         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1018                                     &iocb->ki_pos, count);
1019         cl_env_put(env, &refcheck);
1020         RETURN(result);
1021 }
1022
1023 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1024                             loff_t *ppos)
1025 {
1026         struct lu_env *env;
1027         struct iovec  *local_iov;
1028         struct kiocb  *kiocb;
1029         ssize_t        result;
1030         int            refcheck;
1031         ENTRY;
1032
1033         env = cl_env_get(&refcheck);
1034         if (IS_ERR(env))
1035                 RETURN(PTR_ERR(env));
1036
1037         local_iov = &vvp_env_info(env)->vti_local_iov;
1038         kiocb = &vvp_env_info(env)->vti_kiocb;
1039         local_iov->iov_base = (void __user *)buf;
1040         local_iov->iov_len = count;
1041         init_sync_kiocb(kiocb, file);
1042         kiocb->ki_pos = *ppos;
1043         kiocb->ki_left = count;
1044
1045         result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1046         *ppos = kiocb->ki_pos;
1047
1048         cl_env_put(env, &refcheck);
1049         RETURN(result);
1050 }
1051 #endif
1052
1053 /*
1054  * Write to a file (through the page cache).
1055  */
1056 #ifdef HAVE_FILE_WRITEV
1057 static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
1058                               unsigned long nr_segs, loff_t *ppos)
1059 {
1060         struct lu_env      *env;
1061         struct vvp_io_args *args;
1062         size_t              count;
1063         ssize_t             result;
1064         int                 refcheck;
1065         ENTRY;
1066
1067         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1068         if (result)
1069                 RETURN(result);
1070
1071         env = cl_env_get(&refcheck);
1072         if (IS_ERR(env))
1073                 RETURN(PTR_ERR(env));
1074
1075         args = vvp_env_args(env, IO_NORMAL);
1076         args->u.normal.via_iov = (struct iovec *)iov;
1077         args->u.normal.via_nrsegs = nr_segs;
1078
1079         result = ll_file_io_generic(env, args, file, CIT_WRITE, ppos, count);
1080         cl_env_put(env, &refcheck);
1081         RETURN(result);
1082 }
1083
1084 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1085                              loff_t *ppos)
1086 {
1087         struct lu_env    *env;
1088         struct iovec     *local_iov;
1089         ssize_t           result;
1090         int               refcheck;
1091         ENTRY;
1092
1093         env = cl_env_get(&refcheck);
1094         if (IS_ERR(env))
1095                 RETURN(PTR_ERR(env));
1096
1097         local_iov = &vvp_env_info(env)->vti_local_iov;
1098         local_iov->iov_base = (void __user *)buf;
1099         local_iov->iov_len = count;
1100
1101         result = ll_file_writev(file, local_iov, 1, ppos);
1102         cl_env_put(env, &refcheck);
1103         RETURN(result);
1104 }
1105
1106 #else /* AIO stuff */
1107 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1108                                  unsigned long nr_segs, loff_t pos)
1109 {
1110         struct lu_env      *env;
1111         struct vvp_io_args *args;
1112         size_t              count;
1113         ssize_t             result;
1114         int                 refcheck;
1115         ENTRY;
1116
1117         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1118         if (result)
1119                 RETURN(result);
1120
1121         env = cl_env_get(&refcheck);
1122         if (IS_ERR(env))
1123                 RETURN(PTR_ERR(env));
1124
1125         args = vvp_env_args(env, IO_NORMAL);
1126         args->u.normal.via_iov = (struct iovec *)iov;
1127         args->u.normal.via_nrsegs = nr_segs;
1128         args->u.normal.via_iocb = iocb;
1129
1130         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1131                                   &iocb->ki_pos, count);
1132         cl_env_put(env, &refcheck);
1133         RETURN(result);
1134 }
1135
1136 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1137                              loff_t *ppos)
1138 {
1139         struct lu_env *env;
1140         struct iovec  *local_iov;
1141         struct kiocb  *kiocb;
1142         ssize_t        result;
1143         int            refcheck;
1144         ENTRY;
1145
1146         env = cl_env_get(&refcheck);
1147         if (IS_ERR(env))
1148                 RETURN(PTR_ERR(env));
1149
1150         local_iov = &vvp_env_info(env)->vti_local_iov;
1151         kiocb = &vvp_env_info(env)->vti_kiocb;
1152         local_iov->iov_base = (void __user *)buf;
1153         local_iov->iov_len = count;
1154         init_sync_kiocb(kiocb, file);
1155         kiocb->ki_pos = *ppos;
1156         kiocb->ki_left = count;
1157
1158         result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1159         *ppos = kiocb->ki_pos;
1160
1161         cl_env_put(env, &refcheck);
1162         RETURN(result);
1163 }
1164 #endif
1165
1166
1167 #ifdef HAVE_KERNEL_SENDFILE
1168 /*
1169  * Send file content (through pagecache) somewhere with helper
1170  */
1171 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1172                                 read_actor_t actor, void *target)
1173 {
1174         struct lu_env      *env;
1175         struct vvp_io_args *args;
1176         ssize_t             result;
1177         int                 refcheck;
1178         ENTRY;
1179
1180         env = cl_env_get(&refcheck);
1181         if (IS_ERR(env))
1182                 RETURN(PTR_ERR(env));
1183
1184         args = vvp_env_args(env, IO_SENDFILE);
1185         args->u.sendfile.via_target = target;
1186         args->u.sendfile.via_actor = actor;
1187
1188         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1189         cl_env_put(env, &refcheck);
1190         RETURN(result);
1191 }
1192 #endif
1193
1194 #ifdef HAVE_KERNEL_SPLICE_READ
1195 /*
1196  * Send file content (through pagecache) somewhere with helper
1197  */
1198 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1199                                    struct pipe_inode_info *pipe, size_t count,
1200                                    unsigned int flags)
1201 {
1202         struct lu_env      *env;
1203         struct vvp_io_args *args;
1204         ssize_t             result;
1205         int                 refcheck;
1206         ENTRY;
1207
1208         env = cl_env_get(&refcheck);
1209         if (IS_ERR(env))
1210                 RETURN(PTR_ERR(env));
1211
1212         args = vvp_env_args(env, IO_SPLICE);
1213         args->u.splice.via_pipe = pipe;
1214         args->u.splice.via_flags = flags;
1215
1216         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1217         cl_env_put(env, &refcheck);
1218         RETURN(result);
1219 }
1220 #endif
1221
1222 static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq,
1223                            obd_count ost_idx)
1224 {
1225         struct obd_export *exp = ll_i2dtexp(inode);
1226         struct obd_trans_info oti = { 0 };
1227         struct obdo *oa = NULL;
1228         int lsm_size;
1229         int rc = 0;
1230         struct lov_stripe_md *lsm = NULL, *lsm2;
1231         ENTRY;
1232
1233         OBDO_ALLOC(oa);
1234         if (oa == NULL)
1235                 RETURN(-ENOMEM);
1236
1237         lsm = ccc_inode_lsm_get(inode);
1238         if (lsm == NULL)
1239                 GOTO(out, rc = -ENOENT);
1240
1241         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1242                    (lsm->lsm_stripe_count));
1243
1244         OBD_ALLOC_LARGE(lsm2, lsm_size);
1245         if (lsm2 == NULL)
1246                 GOTO(out, rc = -ENOMEM);
1247
1248         oa->o_id = id;
1249         oa->o_seq = seq;
1250         oa->o_nlink = ost_idx;
1251         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1252         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1253         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1254                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1255         obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1256         memcpy(lsm2, lsm, lsm_size);
1257         ll_inode_size_lock(inode);
1258         rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1259         ll_inode_size_unlock(inode);
1260
1261         OBD_FREE_LARGE(lsm2, lsm_size);
1262         GOTO(out, rc);
1263 out:
1264         ccc_inode_lsm_put(inode, lsm);
1265         OBDO_FREE(oa);
1266         return rc;
1267 }
1268
1269 static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1270 {
1271         struct ll_recreate_obj ucreat;
1272         ENTRY;
1273
1274         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1275                 RETURN(-EPERM);
1276
1277         if (cfs_copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1278                                sizeof(struct ll_recreate_obj)))
1279                 RETURN(-EFAULT);
1280
1281         RETURN(ll_lov_recreate(inode, ucreat.lrc_id, 0,
1282                                ucreat.lrc_ost_idx));
1283 }
1284
1285 static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1286 {
1287         struct lu_fid fid;
1288         obd_id id;
1289         obd_count ost_idx;
1290         ENTRY;
1291
1292         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1293                 RETURN(-EPERM);
1294
1295         if (cfs_copy_from_user(&fid, (struct lu_fid *)arg,
1296                                sizeof(struct lu_fid)))
1297                 RETURN(-EFAULT);
1298
1299         id = fid_oid(&fid) | ((fid_seq(&fid) & 0xffff) << 32);
1300         ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1301         RETURN(ll_lov_recreate(inode, id, 0, ost_idx));
1302 }
1303
1304 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1305                              int flags, struct lov_user_md *lum, int lum_size)
1306 {
1307         struct lov_stripe_md *lsm = NULL;
1308         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1309         int rc = 0;
1310         ENTRY;
1311
1312         lsm = ccc_inode_lsm_get(inode);
1313         if (lsm != NULL) {
1314                 ccc_inode_lsm_put(inode, lsm);
1315                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1316                        inode->i_ino);
1317                 RETURN(-EEXIST);
1318         }
1319
1320         ll_inode_size_lock(inode);
1321         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1322         if (rc)
1323                 GOTO(out, rc);
1324         rc = oit.d.lustre.it_status;
1325         if (rc < 0)
1326                 GOTO(out_req_free, rc);
1327
1328         ll_release_openhandle(file->f_dentry, &oit);
1329
1330  out:
1331         ll_inode_size_unlock(inode);
1332         ll_intent_release(&oit);
1333         ccc_inode_lsm_put(inode, lsm);
1334         RETURN(rc);
1335 out_req_free:
1336         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1337         goto out;
1338 }
1339
1340 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1341                              struct lov_mds_md **lmmp, int *lmm_size,
1342                              struct ptlrpc_request **request)
1343 {
1344         struct ll_sb_info *sbi = ll_i2sbi(inode);
1345         struct mdt_body  *body;
1346         struct lov_mds_md *lmm = NULL;
1347         struct ptlrpc_request *req = NULL;
1348         struct md_op_data *op_data;
1349         int rc, lmmsize;
1350
1351         rc = ll_get_max_mdsize(sbi, &lmmsize);
1352         if (rc)
1353                 RETURN(rc);
1354
1355         op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1356                                      strlen(filename), lmmsize,
1357                                      LUSTRE_OPC_ANY, NULL);
1358         if (IS_ERR(op_data))
1359                 RETURN(PTR_ERR(op_data));
1360
1361         op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1362         rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1363         ll_finish_md_op_data(op_data);
1364         if (rc < 0) {
1365                 CDEBUG(D_INFO, "md_getattr_name failed "
1366                        "on %s: rc %d\n", filename, rc);
1367                 GOTO(out, rc);
1368         }
1369
1370         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1371         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1372
1373         lmmsize = body->eadatasize;
1374
1375         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1376                         lmmsize == 0) {
1377                 GOTO(out, rc = -ENODATA);
1378         }
1379
1380         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1381         LASSERT(lmm != NULL);
1382
1383         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1384             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1385                 GOTO(out, rc = -EPROTO);
1386         }
1387
1388         /*
1389          * This is coming from the MDS, so is probably in
1390          * little endian.  We convert it to host endian before
1391          * passing it to userspace.
1392          */
1393         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1394                 /* if function called for directory - we should
1395                  * avoid swab not existent lsm objects */
1396                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1397                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1398                         if (S_ISREG(body->mode))
1399                                 lustre_swab_lov_user_md_objects(
1400                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1401                                  ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
1402                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1403                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1404                         if (S_ISREG(body->mode))
1405                                 lustre_swab_lov_user_md_objects(
1406                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1407                                  ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
1408                 }
1409         }
1410
1411 out:
1412         *lmmp = lmm;
1413         *lmm_size = lmmsize;
1414         *request = req;
1415         return rc;
1416 }
1417
1418 static int ll_lov_setea(struct inode *inode, struct file *file,
1419                             unsigned long arg)
1420 {
1421         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1422         struct lov_user_md  *lump;
1423         int lum_size = sizeof(struct lov_user_md) +
1424                        sizeof(struct lov_user_ost_data);
1425         int rc;
1426         ENTRY;
1427
1428         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1429                 RETURN(-EPERM);
1430
1431         OBD_ALLOC_LARGE(lump, lum_size);
1432         if (lump == NULL) {
1433                 RETURN(-ENOMEM);
1434         }
1435         if (cfs_copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1436                 OBD_FREE_LARGE(lump, lum_size);
1437                 RETURN(-EFAULT);
1438         }
1439
1440         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1441
1442         OBD_FREE_LARGE(lump, lum_size);
1443         RETURN(rc);
1444 }
1445
1446 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1447                             unsigned long arg)
1448 {
1449         struct lov_user_md_v3 lumv3;
1450         struct lov_user_md_v1 *lumv1 = (struct lov_user_md_v1 *)&lumv3;
1451         struct lov_user_md_v1 *lumv1p = (struct lov_user_md_v1 *)arg;
1452         struct lov_user_md_v3 *lumv3p = (struct lov_user_md_v3 *)arg;
1453         int lum_size;
1454         int rc;
1455         int flags = FMODE_WRITE;
1456         ENTRY;
1457
1458         /* first try with v1 which is smaller than v3 */
1459         lum_size = sizeof(struct lov_user_md_v1);
1460         if (cfs_copy_from_user(lumv1, lumv1p, lum_size))
1461                 RETURN(-EFAULT);
1462
1463         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1464                 lum_size = sizeof(struct lov_user_md_v3);
1465                 if (cfs_copy_from_user(&lumv3, lumv3p, lum_size))
1466                         RETURN(-EFAULT);
1467         }
1468
1469         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1470         if (rc == 0) {
1471                 struct lov_stripe_md *lsm;
1472                 put_user(0, &lumv1p->lmm_stripe_count);
1473                 lsm = ccc_inode_lsm_get(inode);
1474                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1475                                    0, lsm, (void *)arg);
1476                 ccc_inode_lsm_put(inode, lsm);
1477         }
1478         RETURN(rc);
1479 }
1480
1481 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1482 {
1483         struct lov_stripe_md *lsm;
1484         int rc = -ENODATA;
1485         ENTRY;
1486
1487         lsm = ccc_inode_lsm_get(inode);
1488         if (lsm != NULL)
1489                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1490                                    lsm, (void *)arg);
1491         ccc_inode_lsm_put(inode, lsm);
1492         RETURN(rc);
1493 }
1494
1495 int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1496 {
1497         struct ll_inode_info   *lli = ll_i2info(inode);
1498         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1499         struct ccc_grouplock    grouplock;
1500         int                     rc;
1501         ENTRY;
1502
1503         if (ll_file_nolock(file))
1504                 RETURN(-EOPNOTSUPP);
1505
1506         cfs_spin_lock(&lli->lli_lock);
1507         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1508                 CWARN("group lock already existed with gid %lu\n",
1509                        fd->fd_grouplock.cg_gid);
1510                 cfs_spin_unlock(&lli->lli_lock);
1511                 RETURN(-EINVAL);
1512         }
1513         LASSERT(fd->fd_grouplock.cg_lock == NULL);
1514         cfs_spin_unlock(&lli->lli_lock);
1515
1516         rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1517                               arg, (file->f_flags & O_NONBLOCK), &grouplock);
1518         if (rc)
1519                 RETURN(rc);
1520
1521         cfs_spin_lock(&lli->lli_lock);
1522         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1523                 cfs_spin_unlock(&lli->lli_lock);
1524                 CERROR("another thread just won the race\n");
1525                 cl_put_grouplock(&grouplock);
1526                 RETURN(-EINVAL);
1527         }
1528
1529         fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1530         fd->fd_grouplock = grouplock;
1531         cfs_spin_unlock(&lli->lli_lock);
1532
1533         CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1534         RETURN(0);
1535 }
1536
1537 int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1538 {
1539         struct ll_inode_info   *lli = ll_i2info(inode);
1540         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1541         struct ccc_grouplock    grouplock;
1542         ENTRY;
1543
1544         cfs_spin_lock(&lli->lli_lock);
1545         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1546                 cfs_spin_unlock(&lli->lli_lock);
1547                 CWARN("no group lock held\n");
1548                 RETURN(-EINVAL);
1549         }
1550         LASSERT(fd->fd_grouplock.cg_lock != NULL);
1551
1552         if (fd->fd_grouplock.cg_gid != arg) {
1553                 CWARN("group lock %lu doesn't match current id %lu\n",
1554                        arg, fd->fd_grouplock.cg_gid);
1555                 cfs_spin_unlock(&lli->lli_lock);
1556                 RETURN(-EINVAL);
1557         }
1558
1559         grouplock = fd->fd_grouplock;
1560         memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1561         fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1562         cfs_spin_unlock(&lli->lli_lock);
1563
1564         cl_put_grouplock(&grouplock);
1565         CDEBUG(D_INFO, "group lock %lu released\n", arg);
1566         RETURN(0);
1567 }
1568
1569 /**
1570  * Close inode open handle
1571  *
1572  * \param dentry [in]     dentry which contains the inode
1573  * \param it     [in,out] intent which contains open info and result
1574  *
1575  * \retval 0     success
1576  * \retval <0    failure
1577  */
1578 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1579 {
1580         struct inode *inode = dentry->d_inode;
1581         struct obd_client_handle *och;
1582         int rc;
1583         ENTRY;
1584
1585         LASSERT(inode);
1586
1587         /* Root ? Do nothing. */
1588         if (dentry->d_inode->i_sb->s_root == dentry)
1589                 RETURN(0);
1590
1591         /* No open handle to close? Move away */
1592         if (!it_disposition(it, DISP_OPEN_OPEN))
1593                 RETURN(0);
1594
1595         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1596
1597         OBD_ALLOC(och, sizeof(*och));
1598         if (!och)
1599                 GOTO(out, rc = -ENOMEM);
1600
1601         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
1602                     ll_i2info(inode), it, och);
1603
1604         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1605                                        inode, och);
1606  out:
1607         /* this one is in place of ll_file_open */
1608         if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1609                 ptlrpc_req_finished(it->d.lustre.it_data);
1610                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1611         }
1612         RETURN(rc);
1613 }
1614
1615 /**
1616  * Get size for inode for which FIEMAP mapping is requested.
1617  * Make the FIEMAP get_info call and returns the result.
1618  */
1619 int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1620               int num_bytes)
1621 {
1622         struct obd_export *exp = ll_i2dtexp(inode);
1623         struct lov_stripe_md *lsm = NULL;
1624         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1625         int vallen = num_bytes;
1626         int rc;
1627         ENTRY;
1628
1629         /* Checks for fiemap flags */
1630         if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1631                 fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1632                 return -EBADR;
1633         }
1634
1635         /* Check for FIEMAP_FLAG_SYNC */
1636         if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1637                 rc = filemap_fdatawrite(inode->i_mapping);
1638                 if (rc)
1639                         return rc;
1640         }
1641
1642         lsm = ccc_inode_lsm_get(inode);
1643         if (lsm == NULL)
1644                 return -ENOENT;
1645
1646         /* If the stripe_count > 1 and the application does not understand
1647          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1648          */
1649         if (lsm->lsm_stripe_count > 1 &&
1650             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1651                 GOTO(out, rc = -EOPNOTSUPP);
1652
1653         fm_key.oa.o_id = lsm->lsm_object_id;
1654         fm_key.oa.o_seq = lsm->lsm_object_seq;
1655         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1656
1657         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1658         obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1659         /* If filesize is 0, then there would be no objects for mapping */
1660         if (fm_key.oa.o_size == 0) {
1661                 fiemap->fm_mapped_extents = 0;
1662                 GOTO(out, rc = 0);
1663         }
1664
1665         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1666
1667         rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1668                           fiemap, lsm);
1669         if (rc)
1670                 CERROR("obd_get_info failed: rc = %d\n", rc);
1671
1672 out:
1673         ccc_inode_lsm_put(inode, lsm);
1674         RETURN(rc);
1675 }
1676
1677 int ll_fid2path(struct obd_export *exp, void *arg)
1678 {
1679         struct getinfo_fid2path *gfout, *gfin;
1680         int outsize, rc;
1681         ENTRY;
1682
1683         /* Need to get the buflen */
1684         OBD_ALLOC_PTR(gfin);
1685         if (gfin == NULL)
1686                 RETURN(-ENOMEM);
1687         if (cfs_copy_from_user(gfin, arg, sizeof(*gfin))) {
1688                 OBD_FREE_PTR(gfin);
1689                 RETURN(-EFAULT);
1690         }
1691
1692         outsize = sizeof(*gfout) + gfin->gf_pathlen;
1693         OBD_ALLOC(gfout, outsize);
1694         if (gfout == NULL) {
1695                 OBD_FREE_PTR(gfin);
1696                 RETURN(-ENOMEM);
1697         }
1698         memcpy(gfout, gfin, sizeof(*gfout));
1699         OBD_FREE_PTR(gfin);
1700
1701         /* Call mdc_iocontrol */
1702         rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1703         if (rc)
1704                 GOTO(gf_free, rc);
1705         if (cfs_copy_to_user(arg, gfout, outsize))
1706                 rc = -EFAULT;
1707
1708 gf_free:
1709         OBD_FREE(gfout, outsize);
1710         RETURN(rc);
1711 }
1712
1713 static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1714 {
1715         struct ll_user_fiemap *fiemap_s;
1716         size_t num_bytes, ret_bytes;
1717         unsigned int extent_count;
1718         int rc = 0;
1719
1720         /* Get the extent count so we can calculate the size of
1721          * required fiemap buffer */
1722         if (get_user(extent_count,
1723             &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1724                 RETURN(-EFAULT);
1725         num_bytes = sizeof(*fiemap_s) + (extent_count *
1726                                          sizeof(struct ll_fiemap_extent));
1727
1728         OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1729         if (fiemap_s == NULL)
1730                 RETURN(-ENOMEM);
1731
1732         /* get the fiemap value */
1733         if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
1734                            sizeof(*fiemap_s)))
1735                 GOTO(error, rc = -EFAULT);
1736
1737         /* If fm_extent_count is non-zero, read the first extent since
1738          * it is used to calculate end_offset and device from previous
1739          * fiemap call. */
1740         if (extent_count) {
1741                 if (copy_from_user(&fiemap_s->fm_extents[0],
1742                     (char __user *)arg + sizeof(*fiemap_s),
1743                     sizeof(struct ll_fiemap_extent)))
1744                         GOTO(error, rc = -EFAULT);
1745         }
1746
1747         rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1748         if (rc)
1749                 GOTO(error, rc);
1750
1751         ret_bytes = sizeof(struct ll_user_fiemap);
1752
1753         if (extent_count != 0)
1754                 ret_bytes += (fiemap_s->fm_mapped_extents *
1755                                  sizeof(struct ll_fiemap_extent));
1756
1757         if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1758                 rc = -EFAULT;
1759
1760 error:
1761         OBD_FREE_LARGE(fiemap_s, num_bytes);
1762         RETURN(rc);
1763 }
1764
1765 /*
1766  * Read the data_version for inode.
1767  *
1768  * This value is computed using stripe object version on OST.
1769  * Version is computed using server side locking.
1770  *
1771  * @param extent_lock  Take extent lock. Not needed if a process is already
1772  *                     holding the OST object group locks.
1773  */
1774 static int ll_data_version(struct inode *inode, __u64 *data_version,
1775                            int extent_lock)
1776 {
1777         struct lov_stripe_md *lsm = NULL;
1778         struct ll_sb_info    *sbi = ll_i2sbi(inode);
1779         struct obdo          *obdo = NULL;
1780         int                   rc;
1781         ENTRY;
1782
1783         /* If no stripe, we consider version is 0. */
1784         lsm = ccc_inode_lsm_get(inode);
1785         if (lsm == NULL) {
1786                 *data_version = 0;
1787                 CDEBUG(D_INODE, "No object for inode\n");
1788                 RETURN(0);
1789         }
1790
1791         OBD_ALLOC_PTR(obdo);
1792         if (obdo == NULL) {
1793                 ccc_inode_lsm_put(inode, lsm);
1794                 RETURN(-ENOMEM);
1795         }
1796
1797         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
1798         if (!rc) {
1799                 if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1800                         rc = -EOPNOTSUPP;
1801                 else
1802                         *data_version = obdo->o_data_version;
1803         }
1804
1805         OBD_FREE_PTR(obdo);
1806         ccc_inode_lsm_put(inode, lsm);
1807
1808         RETURN(rc);
1809 }
1810
1811 long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
1812 {
1813         struct inode *inode = file->f_dentry->d_inode;
1814         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1815         int flags;
1816
1817         ENTRY;
1818
1819         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1820                inode->i_generation, inode, cmd);
1821         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
1822
1823         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
1824         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
1825                 RETURN(-ENOTTY);
1826
1827         switch(cmd) {
1828         case LL_IOC_GETFLAGS:
1829                 /* Get the current value of the file flags */
1830                 return put_user(fd->fd_flags, (int *)arg);
1831         case LL_IOC_SETFLAGS:
1832         case LL_IOC_CLRFLAGS:
1833                 /* Set or clear specific file flags */
1834                 /* XXX This probably needs checks to ensure the flags are
1835                  *     not abused, and to handle any flag side effects.
1836                  */
1837                 if (get_user(flags, (int *) arg))
1838                         RETURN(-EFAULT);
1839
1840                 if (cmd == LL_IOC_SETFLAGS) {
1841                         if ((flags & LL_FILE_IGNORE_LOCK) &&
1842                             !(file->f_flags & O_DIRECT)) {
1843                                 CERROR("%s: unable to disable locking on "
1844                                        "non-O_DIRECT file\n", current->comm);
1845                                 RETURN(-EINVAL);
1846                         }
1847
1848                         fd->fd_flags |= flags;
1849                 } else {
1850                         fd->fd_flags &= ~flags;
1851                 }
1852                 RETURN(0);
1853         case LL_IOC_LOV_SETSTRIPE:
1854                 RETURN(ll_lov_setstripe(inode, file, arg));
1855         case LL_IOC_LOV_SETEA:
1856                 RETURN(ll_lov_setea(inode, file, arg));
1857         case LL_IOC_LOV_GETSTRIPE:
1858                 RETURN(ll_lov_getstripe(inode, arg));
1859         case LL_IOC_RECREATE_OBJ:
1860                 RETURN(ll_lov_recreate_obj(inode, arg));
1861         case LL_IOC_RECREATE_FID:
1862                 RETURN(ll_lov_recreate_fid(inode, arg));
1863         case FSFILT_IOC_FIEMAP:
1864                 RETURN(ll_ioctl_fiemap(inode, arg));
1865         case FSFILT_IOC_GETFLAGS:
1866         case FSFILT_IOC_SETFLAGS:
1867                 RETURN(ll_iocontrol(inode, file, cmd, arg));
1868         case FSFILT_IOC_GETVERSION_OLD:
1869         case FSFILT_IOC_GETVERSION:
1870                 RETURN(put_user(inode->i_generation, (int *)arg));
1871         case LL_IOC_GROUP_LOCK:
1872                 RETURN(ll_get_grouplock(inode, file, arg));
1873         case LL_IOC_GROUP_UNLOCK:
1874                 RETURN(ll_put_grouplock(inode, file, arg));
1875         case IOC_OBD_STATFS:
1876                 RETURN(ll_obd_statfs(inode, (void *)arg));
1877
1878         /* We need to special case any other ioctls we want to handle,
1879          * to send them to the MDS/OST as appropriate and to properly
1880          * network encode the arg field.
1881         case FSFILT_IOC_SETVERSION_OLD:
1882         case FSFILT_IOC_SETVERSION:
1883         */
1884         case LL_IOC_FLUSHCTX:
1885                 RETURN(ll_flush_ctx(inode));
1886         case LL_IOC_PATH2FID: {
1887                 if (cfs_copy_to_user((void *)arg, ll_inode2fid(inode),
1888                                      sizeof(struct lu_fid)))
1889                         RETURN(-EFAULT);
1890
1891                 RETURN(0);
1892         }
1893         case OBD_IOC_FID2PATH:
1894                 RETURN(ll_fid2path(ll_i2mdexp(inode), (void *)arg));
1895         case LL_IOC_DATA_VERSION: {
1896                 struct ioc_data_version idv;
1897                 int rc;
1898
1899                 if (cfs_copy_from_user(&idv, (char *)arg, sizeof(idv)))
1900                         RETURN(-EFAULT);
1901
1902                 rc = ll_data_version(inode, &idv.idv_version,
1903                                      !(idv.idv_flags & LL_DV_NOFLUSH));
1904
1905                 if (rc == 0 &&
1906                     cfs_copy_to_user((char *) arg, &idv, sizeof(idv)))
1907                         RETURN(-EFAULT);
1908
1909                 RETURN(rc);
1910         }
1911
1912         case LL_IOC_GET_MDTIDX: {
1913                 int mdtidx;
1914
1915                 mdtidx = ll_get_mdt_idx(inode);
1916                 if (mdtidx < 0)
1917                         RETURN(mdtidx);
1918
1919                 if (put_user((int)mdtidx, (int*)arg))
1920                         RETURN(-EFAULT);
1921
1922                 RETURN(0);
1923         }
1924         case OBD_IOC_GETDTNAME:
1925         case OBD_IOC_GETMDNAME:
1926                 RETURN(ll_get_obd_name(inode, cmd, arg));
1927         default: {
1928                 int err;
1929
1930                 if (LLIOC_STOP ==
1931                     ll_iocontrol_call(inode, file, cmd, arg, &err))
1932                         RETURN(err);
1933
1934                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
1935                                      (void *)arg));
1936         }
1937         }
1938 }
1939
1940 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
1941 {
1942         struct inode *inode = file->f_dentry->d_inode;
1943         loff_t retval;
1944         ENTRY;
1945         retval = offset + ((origin == 2) ? i_size_read(inode) :
1946                            (origin == 1) ? file->f_pos : 0);
1947         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%s)\n",
1948                inode->i_ino, inode->i_generation, inode, retval, retval,
1949                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
1950         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
1951
1952         if (origin == 2) { /* SEEK_END */
1953                 int rc;
1954
1955                 rc = ll_glimpse_size(inode);
1956                 if (rc != 0)
1957                         RETURN(rc);
1958
1959                 offset += i_size_read(inode);
1960         } else if (origin == 1) { /* SEEK_CUR */
1961                 offset += file->f_pos;
1962         }
1963
1964         retval = -EINVAL;
1965         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
1966                 if (offset != file->f_pos) {
1967                         file->f_pos = offset;
1968                 }
1969                 retval = offset;
1970         }
1971
1972         RETURN(retval);
1973 }
1974
1975 int ll_flush(struct file *file, fl_owner_t id)
1976 {
1977         struct inode *inode = file->f_dentry->d_inode;
1978         struct ll_inode_info *lli = ll_i2info(inode);
1979         int rc, err;
1980
1981         LASSERT(!S_ISDIR(inode->i_mode));
1982
1983         /* the application should know write failure already. */
1984         if (lli->lli_write_rc)
1985                 return 0;
1986
1987         /* catch async errors that were recorded back when async writeback
1988          * failed for pages in this mapping. */
1989         rc = lli->lli_async_rc;
1990         lli->lli_async_rc = 0;
1991         err = lov_read_and_clear_async_rc(lli->lli_clob);
1992         if (rc == 0)
1993                 rc = err;
1994
1995         return rc ? -EIO : 0;
1996 }
1997
1998 /**
1999  * Called to make sure a portion of file has been written out.
2000  * if @local_only is not true, it will send OST_SYNC RPCs to ost.
2001  *
2002  * Return how many pages have been written.
2003  */
2004 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2005                        enum cl_fsync_mode mode)
2006 {
2007         struct cl_env_nest nest;
2008         struct lu_env *env;
2009         struct cl_io *io;
2010         struct obd_capa *capa = NULL;
2011         struct cl_fsync_io *fio;
2012         int result;
2013         ENTRY;
2014
2015         if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2016             mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2017                 RETURN(-EINVAL);
2018
2019         env = cl_env_nested_get(&nest);
2020         if (IS_ERR(env))
2021                 RETURN(PTR_ERR(env));
2022
2023         capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2024
2025         io = ccc_env_thread_io(env);
2026         io->ci_obj = cl_i2info(inode)->lli_clob;
2027         io->ci_ignore_layout = 1;
2028
2029         /* initialize parameters for sync */
2030         fio = &io->u.ci_fsync;
2031         fio->fi_capa = capa;
2032         fio->fi_start = start;
2033         fio->fi_end = end;
2034         fio->fi_fid = ll_inode2fid(inode);
2035         fio->fi_mode = mode;
2036         fio->fi_nr_written = 0;
2037
2038         if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2039                 result = cl_io_loop(env, io);
2040         else
2041                 result = io->ci_result;
2042         if (result == 0)
2043                 result = fio->fi_nr_written;
2044         cl_io_fini(env, io);
2045         cl_env_nested_put(&nest, env);
2046
2047         capa_put(capa);
2048
2049         RETURN(result);
2050 }
2051
2052 #ifdef HAVE_FILE_FSYNC_4ARGS
2053 int ll_fsync(struct file *file, loff_t start, loff_t end, int data)
2054 #elif defined(HAVE_FILE_FSYNC_2ARGS)
2055 int ll_fsync(struct file *file, int data)
2056 #else
2057 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2058 #endif
2059 {
2060         struct inode *inode = file->f_dentry->d_inode;
2061         struct ll_inode_info *lli = ll_i2info(inode);
2062         struct ptlrpc_request *req;
2063         struct obd_capa *oc;
2064         struct lov_stripe_md *lsm;
2065         int rc, err;
2066         ENTRY;
2067         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2068                inode->i_generation, inode);
2069         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2070
2071         /* fsync's caller has already called _fdata{sync,write}, we want
2072          * that IO to finish before calling the osc and mdc sync methods */
2073         rc = filemap_fdatawait(inode->i_mapping);
2074
2075         /* catch async errors that were recorded back when async writeback
2076          * failed for pages in this mapping. */
2077         if (!S_ISDIR(inode->i_mode)) {
2078                 err = lli->lli_async_rc;
2079                 lli->lli_async_rc = 0;
2080                 if (rc == 0)
2081                         rc = err;
2082                 err = lov_read_and_clear_async_rc(lli->lli_clob);
2083                 if (rc == 0)
2084                         rc = err;
2085         }
2086
2087         oc = ll_mdscapa_get(inode);
2088         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2089                       &req);
2090         capa_put(oc);
2091         if (!rc)
2092                 rc = err;
2093         if (!err)
2094                 ptlrpc_req_finished(req);
2095
2096         lsm = ccc_inode_lsm_get(inode);
2097         if (data && lsm) {
2098                 err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
2099                                 CL_FSYNC_ALL);
2100                 if (rc == 0 && err < 0)
2101                         rc = err;
2102                 lli->lli_write_rc = rc < 0 ? rc : 0;
2103         }
2104         ccc_inode_lsm_put(inode, lsm);
2105
2106         RETURN(rc);
2107 }
2108
2109 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2110 {
2111         struct inode *inode = file->f_dentry->d_inode;
2112         struct ll_sb_info *sbi = ll_i2sbi(inode);
2113         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
2114                                            .ei_cb_cp =ldlm_flock_completion_ast,
2115                                            .ei_cbdata = file_lock };
2116         struct md_op_data *op_data;
2117         struct lustre_handle lockh = {0};
2118         ldlm_policy_data_t flock = {{0}};
2119         int flags = 0;
2120         int rc;
2121         ENTRY;
2122
2123         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2124                inode->i_ino, file_lock);
2125
2126         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2127
2128         if (file_lock->fl_flags & FL_FLOCK) {
2129                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2130                 /* flocks are whole-file locks */
2131                 flock.l_flock.end = OFFSET_MAX;
2132                 /* For flocks owner is determined by the local file desctiptor*/
2133                 flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2134         } else if (file_lock->fl_flags & FL_POSIX) {
2135                 flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2136                 flock.l_flock.start = file_lock->fl_start;
2137                 flock.l_flock.end = file_lock->fl_end;
2138         } else {
2139                 RETURN(-EINVAL);
2140         }
2141         flock.l_flock.pid = file_lock->fl_pid;
2142
2143         /* Somewhat ugly workaround for svc lockd.
2144          * lockd installs custom fl_lmops->fl_compare_owner that checks
2145          * for the fl_owner to be the same (which it always is on local node
2146          * I guess between lockd processes) and then compares pid.
2147          * As such we assign pid to the owner field to make it all work,
2148          * conflict with normal locks is unlikely since pid space and
2149          * pointer space for current->files are not intersecting */
2150         if (file_lock->fl_lmops && file_lock->fl_lmops->fl_compare_owner)
2151                 flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2152
2153         switch (file_lock->fl_type) {
2154         case F_RDLCK:
2155                 einfo.ei_mode = LCK_PR;
2156                 break;
2157         case F_UNLCK:
2158                 /* An unlock request may or may not have any relation to
2159                  * existing locks so we may not be able to pass a lock handle
2160                  * via a normal ldlm_lock_cancel() request. The request may even
2161                  * unlock a byte range in the middle of an existing lock. In
2162                  * order to process an unlock request we need all of the same
2163                  * information that is given with a normal read or write record
2164                  * lock request. To avoid creating another ldlm unlock (cancel)
2165                  * message we'll treat a LCK_NL flock request as an unlock. */
2166                 einfo.ei_mode = LCK_NL;
2167                 break;
2168         case F_WRLCK:
2169                 einfo.ei_mode = LCK_PW;
2170                 break;
2171         default:
2172                 CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2173                         file_lock->fl_type);
2174                 RETURN (-ENOTSUPP);
2175         }
2176
2177         switch (cmd) {
2178         case F_SETLKW:
2179 #ifdef F_SETLKW64
2180         case F_SETLKW64:
2181 #endif
2182                 flags = 0;
2183                 break;
2184         case F_SETLK:
2185 #ifdef F_SETLK64
2186         case F_SETLK64:
2187 #endif
2188                 flags = LDLM_FL_BLOCK_NOWAIT;
2189                 break;
2190         case F_GETLK:
2191 #ifdef F_GETLK64
2192         case F_GETLK64:
2193 #endif
2194                 flags = LDLM_FL_TEST_LOCK;
2195                 /* Save the old mode so that if the mode in the lock changes we
2196                  * can decrement the appropriate reader or writer refcount. */
2197                 file_lock->fl_type = einfo.ei_mode;
2198                 break;
2199         default:
2200                 CERROR("unknown fcntl lock command: %d\n", cmd);
2201                 RETURN (-EINVAL);
2202         }
2203
2204         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2205                                      LUSTRE_OPC_ANY, NULL);
2206         if (IS_ERR(op_data))
2207                 RETURN(PTR_ERR(op_data));
2208
2209         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2210                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2211                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2212
2213         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2214                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2215
2216         ll_finish_md_op_data(op_data);
2217
2218         if ((file_lock->fl_flags & FL_FLOCK) &&
2219             (rc == 0 || file_lock->fl_type == F_UNLCK))
2220                 flock_lock_file_wait(file, file_lock);
2221         if ((file_lock->fl_flags & FL_POSIX) &&
2222             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2223             !(flags & LDLM_FL_TEST_LOCK))
2224                 posix_lock_file_wait(file, file_lock);
2225
2226         RETURN(rc);
2227 }
2228
2229 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2230 {
2231         ENTRY;
2232
2233         RETURN(-ENOSYS);
2234 }
2235
2236 /**
2237  * test if some locks matching bits and l_req_mode are acquired
2238  * - bits can be in different locks
2239  * - if found clear the common lock bits in *bits
2240  * - the bits not found, are kept in *bits
2241  * \param inode [IN]
2242  * \param bits [IN] searched lock bits [IN]
2243  * \param l_req_mode [IN] searched lock mode
2244  * \retval boolean, true iff all bits are found
2245  */
2246 int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2247 {
2248         struct lustre_handle lockh;
2249         ldlm_policy_data_t policy;
2250         ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
2251                                 (LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
2252         struct lu_fid *fid;
2253         int flags;
2254         int i;
2255         ENTRY;
2256
2257         if (!inode)
2258                RETURN(0);
2259
2260         fid = &ll_i2info(inode)->lli_fid;
2261         CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
2262                ldlm_lockname[mode]);
2263
2264         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2265         for (i = 0; i < MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
2266                 policy.l_inodebits.bits = *bits & (1 << i);
2267                 if (policy.l_inodebits.bits == 0)
2268                         continue;
2269
2270                 if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
2271                                   &policy, mode, &lockh)) {
2272                         struct ldlm_lock *lock;
2273
2274                         lock = ldlm_handle2lock(&lockh);
2275                         if (lock) {
2276                                 *bits &=
2277                                       ~(lock->l_policy_data.l_inodebits.bits);
2278                                 LDLM_LOCK_PUT(lock);
2279                         } else {
2280                                 *bits &= ~policy.l_inodebits.bits;
2281                         }
2282                 }
2283         }
2284         RETURN(*bits == 0);
2285 }
2286
2287 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2288                             struct lustre_handle *lockh)
2289 {
2290         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2291         struct lu_fid *fid;
2292         ldlm_mode_t rc;
2293         int flags;
2294         ENTRY;
2295
2296         fid = &ll_i2info(inode)->lli_fid;
2297         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2298
2299         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2300         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2301                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2302         RETURN(rc);
2303 }
2304
2305 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2306         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2307                               * and return success */
2308                 inode->i_nlink = 0;
2309                 /* This path cannot be hit for regular files unless in
2310                  * case of obscure races, so no need to to validate
2311                  * size. */
2312                 if (!S_ISREG(inode->i_mode) &&
2313                     !S_ISDIR(inode->i_mode))
2314                         return 0;
2315         }
2316
2317         if (rc) {
2318                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2319                 return -abs(rc);
2320
2321         }
2322
2323         return 0;
2324 }
2325
2326 int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2327                              __u64 ibits)
2328 {
2329         struct inode *inode = dentry->d_inode;
2330         struct ptlrpc_request *req = NULL;
2331         struct obd_export *exp;
2332         int rc = 0;
2333         ENTRY;
2334
2335         if (!inode) {
2336                 CERROR("REPORT THIS LINE TO PETER\n");
2337                 RETURN(0);
2338         }
2339
2340         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2341                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2342
2343         exp = ll_i2mdexp(inode);
2344
2345         /* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
2346          *      But under CMD case, it caused some lock issues, should be fixed
2347          *      with new CMD ibits lock. See bug 12718 */
2348         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2349                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2350                 struct md_op_data *op_data;
2351
2352                 if (ibits == MDS_INODELOCK_LOOKUP)
2353                         oit.it_op = IT_LOOKUP;
2354
2355                 /* Call getattr by fid, so do not provide name at all. */
2356                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2357                                              dentry->d_inode, NULL, 0, 0,
2358                                              LUSTRE_OPC_ANY, NULL);
2359                 if (IS_ERR(op_data))
2360                         RETURN(PTR_ERR(op_data));
2361
2362                 oit.it_create_mode |= M_CHECK_STALE;
2363                 rc = md_intent_lock(exp, op_data, NULL, 0,
2364                                     /* we are not interested in name
2365                                        based lookup */
2366                                     &oit, 0, &req,
2367                                     ll_md_blocking_ast, 0);
2368                 ll_finish_md_op_data(op_data);
2369                 oit.it_create_mode &= ~M_CHECK_STALE;
2370                 if (rc < 0) {
2371                         rc = ll_inode_revalidate_fini(inode, rc);
2372                         GOTO (out, rc);
2373                 }
2374
2375                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2376                 if (rc != 0) {
2377                         ll_intent_release(&oit);
2378                         GOTO(out, rc);
2379                 }
2380
2381                 /* Unlinked? Unhash dentry, so it is not picked up later by
2382                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2383                    here to preserve get_cwd functionality on 2.6.
2384                    Bug 10503 */
2385                 if (!dentry->d_inode->i_nlink)
2386                         d_lustre_invalidate(dentry);
2387
2388                 ll_lookup_finish_locks(&oit, dentry);
2389         } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
2390                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2391                 obd_valid valid = OBD_MD_FLGETATTR;
2392                 struct md_op_data *op_data;
2393                 int ealen = 0;
2394
2395                 if (S_ISREG(inode->i_mode)) {
2396                         rc = ll_get_max_mdsize(sbi, &ealen);
2397                         if (rc)
2398                                 RETURN(rc);
2399                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2400                 }
2401
2402                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
2403                                              0, ealen, LUSTRE_OPC_ANY,
2404                                              NULL);
2405                 if (IS_ERR(op_data))
2406                         RETURN(PTR_ERR(op_data));
2407
2408                 op_data->op_valid = valid;
2409                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2410                  * capa for this inode. Because we only keep capas of dirs
2411                  * fresh. */
2412                 rc = md_getattr(sbi->ll_md_exp, op_data, &req);
2413                 ll_finish_md_op_data(op_data);
2414                 if (rc) {
2415                         rc = ll_inode_revalidate_fini(inode, rc);
2416                         RETURN(rc);
2417                 }
2418
2419                 rc = ll_prep_inode(&inode, req, NULL);
2420         }
2421 out:
2422         ptlrpc_req_finished(req);
2423         return rc;
2424 }
2425
2426 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2427                            __u64 ibits)
2428 {
2429         struct inode *inode = dentry->d_inode;
2430         int rc;
2431         ENTRY;
2432
2433         rc = __ll_inode_revalidate_it(dentry, it, ibits);
2434
2435         /* if object not yet allocated, don't validate size */
2436         if (rc == 0 && !ll_i2info(dentry->d_inode)->lli_has_smd) {
2437                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
2438                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
2439                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
2440                 RETURN(0);
2441         }
2442
2443         /* ll_glimpse_size will prefer locally cached writes if they extend
2444          * the file */
2445
2446         if (rc == 0)
2447                 rc = ll_glimpse_size(inode);
2448
2449         RETURN(rc);
2450 }
2451
2452 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2453                   struct lookup_intent *it, struct kstat *stat)
2454 {
2455         struct inode *inode = de->d_inode;
2456         struct ll_sb_info *sbi = ll_i2sbi(inode);
2457         struct ll_inode_info *lli = ll_i2info(inode);
2458         int res = 0;
2459
2460         res = ll_inode_revalidate_it(de, it, MDS_INODELOCK_UPDATE |
2461                                              MDS_INODELOCK_LOOKUP);
2462         ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
2463
2464         if (res)
2465                 return res;
2466
2467         stat->dev = inode->i_sb->s_dev;
2468         if (ll_need_32bit_api(sbi))
2469                 stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
2470         else
2471                 stat->ino = inode->i_ino;
2472         stat->mode = inode->i_mode;
2473         stat->nlink = inode->i_nlink;
2474         stat->uid = inode->i_uid;
2475         stat->gid = inode->i_gid;
2476         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2477         stat->atime = inode->i_atime;
2478         stat->mtime = inode->i_mtime;
2479         stat->ctime = inode->i_ctime;
2480 #ifdef HAVE_INODE_BLKSIZE
2481         stat->blksize = inode->i_blksize;
2482 #else
2483         stat->blksize = 1 << inode->i_blkbits;
2484 #endif
2485
2486         stat->size = i_size_read(inode);
2487         stat->blocks = inode->i_blocks;
2488
2489         return 0;
2490 }
2491 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2492 {
2493         struct lookup_intent it = { .it_op = IT_GETATTR };
2494
2495         return ll_getattr_it(mnt, de, &it, stat);
2496 }
2497
2498 #ifdef HAVE_LINUX_FIEMAP_H
2499 int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
2500                 __u64 start, __u64 len)
2501 {
2502         int rc;
2503         size_t num_bytes;
2504         struct ll_user_fiemap *fiemap;
2505         unsigned int extent_count = fieinfo->fi_extents_max;
2506
2507         num_bytes = sizeof(*fiemap) + (extent_count *
2508                                        sizeof(struct ll_fiemap_extent));
2509         OBD_ALLOC_LARGE(fiemap, num_bytes);
2510
2511         if (fiemap == NULL)
2512                 RETURN(-ENOMEM);
2513
2514         fiemap->fm_flags = fieinfo->fi_flags;
2515         fiemap->fm_extent_count = fieinfo->fi_extents_max;
2516         fiemap->fm_start = start;
2517         fiemap->fm_length = len;
2518         memcpy(&fiemap->fm_extents[0], fieinfo->fi_extents_start,
2519                sizeof(struct ll_fiemap_extent));
2520
2521         rc = ll_do_fiemap(inode, fiemap, num_bytes);
2522
2523         fieinfo->fi_flags = fiemap->fm_flags;
2524         fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
2525         memcpy(fieinfo->fi_extents_start, &fiemap->fm_extents[0],
2526                fiemap->fm_mapped_extents * sizeof(struct ll_fiemap_extent));
2527
2528         OBD_FREE_LARGE(fiemap, num_bytes);
2529         return rc;
2530 }
2531 #endif
2532
2533
2534 static int
2535 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
2536 lustre_check_acl(struct inode *inode, int mask, unsigned int flags)
2537 #else
2538 lustre_check_acl(struct inode *inode, int mask)
2539 #endif
2540 {
2541 #ifdef CONFIG_FS_POSIX_ACL
2542         struct ll_inode_info *lli = ll_i2info(inode);
2543         struct posix_acl *acl;
2544         int rc;
2545         ENTRY;
2546
2547 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
2548         if (flags & IPERM_FLAG_RCU)
2549                 return -ECHILD;
2550 #endif
2551         cfs_spin_lock(&lli->lli_lock);
2552         acl = posix_acl_dup(lli->lli_posix_acl);
2553         cfs_spin_unlock(&lli->lli_lock);
2554
2555         if (!acl)
2556                 RETURN(-EAGAIN);
2557
2558         rc = posix_acl_permission(inode, acl, mask);
2559         posix_acl_release(acl);
2560
2561         RETURN(rc);
2562 #else
2563         return -EAGAIN;
2564 #endif
2565 }
2566
2567 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
2568 int ll_inode_permission(struct inode *inode, int mask, unsigned int flags)
2569 #else
2570 # ifdef HAVE_INODE_PERMISION_2ARGS
2571 int ll_inode_permission(struct inode *inode, int mask)
2572 # else
2573 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2574 # endif
2575 #endif
2576 {
2577         int rc = 0;
2578         ENTRY;
2579
2580 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
2581         if (flags & IPERM_FLAG_RCU)
2582                 return -ECHILD;
2583 #endif
2584
2585        /* as root inode are NOT getting validated in lookup operation,
2586         * need to do it before permission check. */
2587
2588         if (inode == inode->i_sb->s_root->d_inode) {
2589                 struct lookup_intent it = { .it_op = IT_LOOKUP };
2590
2591                 rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
2592                                               MDS_INODELOCK_LOOKUP);
2593                 if (rc)
2594                         RETURN(rc);
2595         }
2596
2597         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
2598                inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
2599
2600         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2601                 return lustre_check_remote_perm(inode, mask);
2602
2603         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2604         rc = ll_generic_permission(inode, mask, flags, lustre_check_acl);
2605
2606         RETURN(rc);
2607 }
2608
2609 #ifdef HAVE_FILE_READV
2610 #define READ_METHOD readv
2611 #define READ_FUNCTION ll_file_readv
2612 #define WRITE_METHOD writev
2613 #define WRITE_FUNCTION ll_file_writev
2614 #else
2615 #define READ_METHOD aio_read
2616 #define READ_FUNCTION ll_file_aio_read
2617 #define WRITE_METHOD aio_write
2618 #define WRITE_FUNCTION ll_file_aio_write
2619 #endif
2620
2621 /* -o localflock - only provides locally consistent flock locks */
2622 struct file_operations ll_file_operations = {
2623         .read           = ll_file_read,
2624         .READ_METHOD    = READ_FUNCTION,
2625         .write          = ll_file_write,
2626         .WRITE_METHOD   = WRITE_FUNCTION,
2627         .unlocked_ioctl = ll_file_ioctl,
2628         .open           = ll_file_open,
2629         .release        = ll_file_release,
2630         .mmap           = ll_file_mmap,
2631         .llseek         = ll_file_seek,
2632 #ifdef HAVE_KERNEL_SENDFILE
2633         .sendfile       = ll_file_sendfile,
2634 #endif
2635 #ifdef HAVE_KERNEL_SPLICE_READ
2636         .splice_read    = ll_file_splice_read,
2637 #endif
2638         .fsync          = ll_fsync,
2639         .flush          = ll_flush
2640 };
2641
2642 struct file_operations ll_file_operations_flock = {
2643         .read           = ll_file_read,
2644         .READ_METHOD    = READ_FUNCTION,
2645         .write          = ll_file_write,
2646         .WRITE_METHOD   = WRITE_FUNCTION,
2647         .unlocked_ioctl = ll_file_ioctl,
2648         .open           = ll_file_open,
2649         .release        = ll_file_release,
2650         .mmap           = ll_file_mmap,
2651         .llseek         = ll_file_seek,
2652 #ifdef HAVE_KERNEL_SENDFILE
2653         .sendfile       = ll_file_sendfile,
2654 #endif
2655 #ifdef HAVE_KERNEL_SPLICE_READ
2656         .splice_read    = ll_file_splice_read,
2657 #endif
2658         .fsync          = ll_fsync,
2659         .flush          = ll_flush,
2660         .flock          = ll_file_flock,
2661         .lock           = ll_file_flock
2662 };
2663
2664 /* These are for -o noflock - to return ENOSYS on flock calls */
2665 struct file_operations ll_file_operations_noflock = {
2666         .read           = ll_file_read,
2667         .READ_METHOD    = READ_FUNCTION,
2668         .write          = ll_file_write,
2669         .WRITE_METHOD   = WRITE_FUNCTION,
2670         .unlocked_ioctl = ll_file_ioctl,
2671         .open           = ll_file_open,
2672         .release        = ll_file_release,
2673         .mmap           = ll_file_mmap,
2674         .llseek         = ll_file_seek,
2675 #ifdef HAVE_KERNEL_SENDFILE
2676         .sendfile       = ll_file_sendfile,
2677 #endif
2678 #ifdef HAVE_KERNEL_SPLICE_READ
2679         .splice_read    = ll_file_splice_read,
2680 #endif
2681         .fsync          = ll_fsync,
2682         .flush          = ll_flush,
2683         .flock          = ll_file_noflock,
2684         .lock           = ll_file_noflock
2685 };
2686
2687 struct inode_operations ll_file_inode_operations = {
2688         .setattr        = ll_setattr,
2689         .truncate       = ll_truncate,
2690         .getattr        = ll_getattr,
2691         .permission     = ll_inode_permission,
2692         .setxattr       = ll_setxattr,
2693         .getxattr       = ll_getxattr,
2694         .listxattr      = ll_listxattr,
2695         .removexattr    = ll_removexattr,
2696 #ifdef  HAVE_LINUX_FIEMAP_H
2697         .fiemap         = ll_fiemap,
2698 #endif
2699 };
2700
2701 /* dynamic ioctl number support routins */
2702 static struct llioc_ctl_data {
2703         cfs_rw_semaphore_t      ioc_sem;
2704         cfs_list_t              ioc_head;
2705 } llioc = {
2706         __RWSEM_INITIALIZER(llioc.ioc_sem),
2707         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2708 };
2709
2710
2711 struct llioc_data {
2712         cfs_list_t              iocd_list;
2713         unsigned int            iocd_size;
2714         llioc_callback_t        iocd_cb;
2715         unsigned int            iocd_count;
2716         unsigned int            iocd_cmd[0];
2717 };
2718
2719 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2720 {
2721         unsigned int size;
2722         struct llioc_data *in_data = NULL;
2723         ENTRY;
2724
2725         if (cb == NULL || cmd == NULL ||
2726             count > LLIOC_MAX_CMD || count < 0)
2727                 RETURN(NULL);
2728
2729         size = sizeof(*in_data) + count * sizeof(unsigned int);
2730         OBD_ALLOC(in_data, size);
2731         if (in_data == NULL)
2732                 RETURN(NULL);
2733
2734         memset(in_data, 0, sizeof(*in_data));
2735         in_data->iocd_size = size;
2736         in_data->iocd_cb = cb;
2737         in_data->iocd_count = count;
2738         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2739
2740         cfs_down_write(&llioc.ioc_sem);
2741         cfs_list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2742         cfs_up_write(&llioc.ioc_sem);
2743
2744         RETURN(in_data);
2745 }
2746
2747 void ll_iocontrol_unregister(void *magic)
2748 {
2749         struct llioc_data *tmp;
2750
2751         if (magic == NULL)
2752                 return;
2753
2754         cfs_down_write(&llioc.ioc_sem);
2755         cfs_list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2756                 if (tmp == magic) {
2757                         unsigned int size = tmp->iocd_size;
2758
2759                         cfs_list_del(&tmp->iocd_list);
2760                         cfs_up_write(&llioc.ioc_sem);
2761
2762                         OBD_FREE(tmp, size);
2763                         return;
2764                 }
2765         }
2766         cfs_up_write(&llioc.ioc_sem);
2767
2768         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2769 }
2770
2771 EXPORT_SYMBOL(ll_iocontrol_register);
2772 EXPORT_SYMBOL(ll_iocontrol_unregister);
2773
2774 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
2775                         unsigned int cmd, unsigned long arg, int *rcp)
2776 {
2777         enum llioc_iter ret = LLIOC_CONT;
2778         struct llioc_data *data;
2779         int rc = -EINVAL, i;
2780
2781         cfs_down_read(&llioc.ioc_sem);
2782         cfs_list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2783                 for (i = 0; i < data->iocd_count; i++) {
2784                         if (cmd != data->iocd_cmd[i])
2785                                 continue;
2786
2787                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2788                         break;
2789                 }
2790
2791                 if (ret == LLIOC_STOP)
2792                         break;
2793         }
2794         cfs_up_read(&llioc.ioc_sem);
2795
2796         if (rcp)
2797                 *rcp = rc;
2798         return ret;
2799 }
2800
2801 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
2802 {
2803         struct ll_inode_info *lli = ll_i2info(inode);
2804         struct cl_env_nest nest;
2805         struct lu_env *env;
2806         int result;
2807         ENTRY;
2808
2809         if (lli->lli_clob == NULL)
2810                 RETURN(0);
2811
2812         env = cl_env_nested_get(&nest);
2813         if (IS_ERR(env))
2814                 RETURN(PTR_ERR(env));
2815
2816         result = cl_conf_set(env, lli->lli_clob, conf);
2817         cl_env_nested_put(&nest, env);
2818         RETURN(result);
2819 }
2820
2821 /**
2822  * This function checks if there exists a LAYOUT lock on the client side,
2823  * or enqueues it if it doesn't have one in cache.
2824  *
2825  * This function will not hold layout lock so it may be revoked any time after
2826  * this function returns. Any operations depend on layout should be redone
2827  * in that case.
2828  *
2829  * This function should be called before lov_io_init() to get an uptodate
2830  * layout version, the caller should save the version number and after IO
2831  * is finished, this function should be called again to verify that layout
2832  * is not changed during IO time.
2833  */
2834 int ll_layout_refresh(struct inode *inode, __u32 *gen)
2835 {
2836         struct ll_inode_info  *lli = ll_i2info(inode);
2837         struct ll_sb_info     *sbi = ll_i2sbi(inode);
2838         struct md_op_data     *op_data = NULL;
2839         struct ptlrpc_request *req = NULL;
2840         struct lookup_intent   it = { .it_op = IT_LAYOUT };
2841         struct lustre_handle   lockh;
2842         ldlm_mode_t            mode;
2843         struct cl_object_conf  conf = {  .coc_inode = inode,
2844                                          .coc_validate_only = true };
2845         int rc;
2846         ENTRY;
2847
2848         *gen = 0;
2849         if (!(ll_i2sbi(inode)->ll_flags & LL_SBI_LAYOUT_LOCK))
2850                 RETURN(0);
2851
2852         /* sanity checks */
2853         LASSERT(fid_is_sane(ll_inode2fid(inode)));
2854         LASSERT(S_ISREG(inode->i_mode));
2855
2856         /* mostly layout lock is caching on the local side, so try to match
2857          * it before grabbing layout lock mutex. */
2858         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh);
2859         if (mode != 0) { /* hit cached lock */
2860                 struct lov_stripe_md *lsm;
2861
2862                 lsm = ccc_inode_lsm_get(inode);
2863                 if (lsm != NULL)
2864                         *gen = lsm->lsm_layout_gen;
2865                 ccc_inode_lsm_put(inode, lsm);
2866                 ldlm_lock_decref(&lockh, mode);
2867
2868                 RETURN(0);
2869         }
2870
2871         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
2872                                      0, 0, LUSTRE_OPC_ANY, NULL);
2873         if (IS_ERR(op_data))
2874                 RETURN(PTR_ERR(op_data));
2875
2876         /* take layout lock mutex to enqueue layout lock exclusively. */
2877         cfs_mutex_lock(&lli->lli_layout_mutex);
2878
2879         /* make sure the old conf goes away */
2880         ll_layout_conf(inode, &conf);
2881
2882         /* enqueue layout lock */
2883         rc = md_intent_lock(sbi->ll_md_exp, op_data, NULL, 0, &it, 0,
2884                         &req, ll_md_blocking_ast, 0);
2885         if (rc == 0) {
2886                 /* we get a new lock, so update the lock data */
2887                 lockh.cookie = it.d.lustre.it_lock_handle;
2888                 md_set_lock_data(sbi->ll_md_exp, &lockh.cookie, inode, NULL);
2889
2890                 /* req == NULL is when lock was found in client cache, without
2891                  * any request to server (but lsm can be canceled just after a
2892                  * release) */
2893                 if (req != NULL) {
2894                         struct ldlm_lock *lock = ldlm_handle2lock(&lockh);
2895                         struct lustre_md md = { NULL };
2896                         void *lmm;
2897                         int lmmsize;
2898
2899                         /* for IT_LAYOUT lock, lmm is returned in lock's lvb
2900                          * data via completion callback */
2901                         LASSERT(lock != NULL);
2902                         lmm = lock->l_lvb_data;
2903                         lmmsize = lock->l_lvb_len;
2904                         if (lmm != NULL)
2905                                 rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
2906                                                 lmm, lmmsize);
2907                         if (rc == 0) {
2908                                 if (md.lsm != NULL)
2909                                         *gen = md.lsm->lsm_layout_gen;
2910
2911                                 memset(&conf, 0, sizeof conf);
2912                                 conf.coc_inode = inode;
2913                                 conf.u.coc_md = &md;
2914                                 ll_layout_conf(inode, &conf);
2915                                 /* is this racy? */
2916                                 lli->lli_has_smd = md.lsm != NULL;
2917                         }
2918                         if (md.lsm != NULL)
2919                                 obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
2920
2921                         LDLM_LOCK_PUT(lock);
2922                         ptlrpc_req_finished(req);
2923                 } else { /* hit caching lock */
2924                         struct lov_stripe_md *lsm;
2925
2926                         lsm = ccc_inode_lsm_get(inode);
2927                         if (lsm != NULL)
2928                                 *gen = lsm->lsm_layout_gen;
2929                         ccc_inode_lsm_put(inode, lsm);
2930                 }
2931                 ll_intent_drop_lock(&it);
2932         }
2933         cfs_mutex_unlock(&lli->lli_layout_mutex);
2934         ll_finish_md_op_data(op_data);
2935
2936         RETURN(rc);
2937 }