Whamcloud - gitweb
f793d52445c5439e0f3e6facee9c317d3ae299d5
[fs/lustre-release.git] / lustre / llite / file.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <linux/pagemap.h>
47 #include <linux/file.h>
48 #include "llite_internal.h"
49 #include <lustre/ll_fiemap.h>
50
51 #include "cl_object.h"
52
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, CFS_ALLOC_IO);
58         fd->fd_write_failed = false;
59         return fd;
60 }
61
62 static void ll_file_data_put(struct ll_file_data *fd)
63 {
64         if (fd != NULL)
65                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
66 }
67
68 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
69                           struct lustre_handle *fh)
70 {
71         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
72         op_data->op_attr.ia_mode = inode->i_mode;
73         op_data->op_attr.ia_atime = inode->i_atime;
74         op_data->op_attr.ia_mtime = inode->i_mtime;
75         op_data->op_attr.ia_ctime = inode->i_ctime;
76         op_data->op_attr.ia_size = i_size_read(inode);
77         op_data->op_attr_blocks = inode->i_blocks;
78         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
79                                         ll_inode_to_ext_flags(inode->i_flags);
80         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
81         if (fh)
82                 op_data->op_handle = *fh;
83         op_data->op_capa1 = ll_mdscapa_get(inode);
84 }
85
86 /**
87  * Closes the IO epoch and packs all the attributes into @op_data for
88  * the CLOSE rpc.
89  */
90 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
91                              struct obd_client_handle *och)
92 {
93         ENTRY;
94
95         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
96                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
97
98         if (!(och->och_flags & FMODE_WRITE))
99                 goto out;
100
101         if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
102                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
103         else
104                 ll_ioepoch_close(inode, op_data, &och, 0);
105
106 out:
107         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
108         ll_prep_md_op_data(op_data, inode, NULL, NULL,
109                            0, 0, LUSTRE_OPC_ANY, NULL);
110         EXIT;
111 }
112
113 static int ll_close_inode_openhandle(struct obd_export *md_exp,
114                                      struct inode *inode,
115                                      struct obd_client_handle *och)
116 {
117         struct obd_export *exp = ll_i2mdexp(inode);
118         struct md_op_data *op_data;
119         struct ptlrpc_request *req = NULL;
120         struct obd_device *obd = class_exp2obd(exp);
121         int epoch_close = 1;
122         int rc;
123         ENTRY;
124
125         if (obd == NULL) {
126                 /*
127                  * XXX: in case of LMV, is this correct to access
128                  * ->exp_handle?
129                  */
130                 CERROR("Invalid MDC connection handle "LPX64"\n",
131                        ll_i2mdexp(inode)->exp_handle.h_cookie);
132                 GOTO(out, rc = 0);
133         }
134
135         OBD_ALLOC_PTR(op_data);
136         if (op_data == NULL)
137                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
138
139         ll_prepare_close(inode, op_data, och);
140         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
141         rc = md_close(md_exp, op_data, och->och_mod, &req);
142         if (rc == -EAGAIN) {
143                 /* This close must have the epoch closed. */
144                 LASSERT(epoch_close);
145                 /* MDS has instructed us to obtain Size-on-MDS attribute from
146                  * OSTs and send setattr to back to MDS. */
147                 rc = ll_som_update(inode, op_data);
148                 if (rc) {
149                         CERROR("inode %lu mdc Size-on-MDS update failed: "
150                                "rc = %d\n", inode->i_ino, rc);
151                         rc = 0;
152                 }
153         } else if (rc) {
154                 CERROR("inode %lu mdc close failed: rc = %d\n",
155                        inode->i_ino, rc);
156         }
157         ll_finish_md_op_data(op_data);
158
159         if (rc == 0) {
160                 rc = ll_objects_destroy(req, inode);
161                 if (rc)
162                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
163                                inode->i_ino, rc);
164         }
165
166         EXIT;
167 out:
168
169         if (exp_connect_som(exp) && !epoch_close &&
170             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
171                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
172         } else {
173                 md_clear_open_replay_data(md_exp, och);
174                 /* Free @och if it is not waiting for DONE_WRITING. */
175                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
176                 OBD_FREE_PTR(och);
177         }
178         if (req) /* This is close request */
179                 ptlrpc_req_finished(req);
180         return rc;
181 }
182
183 int ll_md_real_close(struct inode *inode, int flags)
184 {
185         struct ll_inode_info *lli = ll_i2info(inode);
186         struct obd_client_handle **och_p;
187         struct obd_client_handle *och;
188         __u64 *och_usecount;
189         int rc = 0;
190         ENTRY;
191
192         if (flags & FMODE_WRITE) {
193                 och_p = &lli->lli_mds_write_och;
194                 och_usecount = &lli->lli_open_fd_write_count;
195         } else if (flags & FMODE_EXEC) {
196                 och_p = &lli->lli_mds_exec_och;
197                 och_usecount = &lli->lli_open_fd_exec_count;
198         } else {
199                 LASSERT(flags & FMODE_READ);
200                 och_p = &lli->lli_mds_read_och;
201                 och_usecount = &lli->lli_open_fd_read_count;
202         }
203
204         cfs_mutex_lock(&lli->lli_och_mutex);
205         if (*och_usecount) { /* There are still users of this handle, so
206                                 skip freeing it. */
207                 cfs_mutex_unlock(&lli->lli_och_mutex);
208                 RETURN(0);
209         }
210         och=*och_p;
211         *och_p = NULL;
212         cfs_mutex_unlock(&lli->lli_och_mutex);
213
214         if (och) { /* There might be a race and somebody have freed this och
215                       already */
216                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
217                                                inode, och);
218         }
219
220         RETURN(rc);
221 }
222
223 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
224                 struct file *file)
225 {
226         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
227         struct ll_inode_info *lli = ll_i2info(inode);
228         int rc = 0;
229         ENTRY;
230
231         /* clear group lock, if present */
232         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
233                 ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
234
235         /* Let's see if we have good enough OPEN lock on the file and if
236            we can skip talking to MDS */
237         if (file->f_dentry->d_inode) { /* Can this ever be false? */
238                 int lockmode;
239                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
240                 struct lustre_handle lockh;
241                 struct inode *inode = file->f_dentry->d_inode;
242                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
243
244                 cfs_mutex_lock(&lli->lli_och_mutex);
245                 if (fd->fd_omode & FMODE_WRITE) {
246                         lockmode = LCK_CW;
247                         LASSERT(lli->lli_open_fd_write_count);
248                         lli->lli_open_fd_write_count--;
249                 } else if (fd->fd_omode & FMODE_EXEC) {
250                         lockmode = LCK_PR;
251                         LASSERT(lli->lli_open_fd_exec_count);
252                         lli->lli_open_fd_exec_count--;
253                 } else {
254                         lockmode = LCK_CR;
255                         LASSERT(lli->lli_open_fd_read_count);
256                         lli->lli_open_fd_read_count--;
257                 }
258                 cfs_mutex_unlock(&lli->lli_och_mutex);
259
260                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
261                                    LDLM_IBITS, &policy, lockmode,
262                                    &lockh)) {
263                         rc = ll_md_real_close(file->f_dentry->d_inode,
264                                               fd->fd_omode);
265                 }
266         } else {
267                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
268                        file, file->f_dentry, file->f_dentry->d_name.name);
269         }
270
271         LUSTRE_FPRIVATE(file) = NULL;
272         ll_file_data_put(fd);
273         ll_capa_close(inode);
274
275         RETURN(rc);
276 }
277
278 /* While this returns an error code, fput() the caller does not, so we need
279  * to make every effort to clean up all of our state here.  Also, applications
280  * rarely check close errors and even if an error is returned they will not
281  * re-try the close call.
282  */
283 int ll_file_release(struct inode *inode, struct file *file)
284 {
285         struct ll_file_data *fd;
286         struct ll_sb_info *sbi = ll_i2sbi(inode);
287         struct ll_inode_info *lli = ll_i2info(inode);
288         int rc;
289         ENTRY;
290
291         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
292                inode->i_generation, inode);
293
294 #ifdef CONFIG_FS_POSIX_ACL
295         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
296             inode == inode->i_sb->s_root->d_inode) {
297                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
298
299                 LASSERT(fd != NULL);
300                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
301                         fd->fd_flags &= ~LL_FILE_RMTACL;
302                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
303                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
304                 }
305         }
306 #endif
307
308         if (inode->i_sb->s_root != file->f_dentry)
309                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
310         fd = LUSTRE_FPRIVATE(file);
311         LASSERT(fd != NULL);
312
313         /* The last ref on @file, maybe not the the owner pid of statahead.
314          * Different processes can open the same dir, "ll_opendir_key" means:
315          * it is me that should stop the statahead thread. */
316         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
317             lli->lli_opendir_pid != 0)
318                 ll_stop_statahead(inode, lli->lli_opendir_key);
319
320         if (inode->i_sb->s_root == file->f_dentry) {
321                 LUSTRE_FPRIVATE(file) = NULL;
322                 ll_file_data_put(fd);
323                 RETURN(0);
324         }
325
326         if (!S_ISDIR(inode->i_mode)) {
327                 lov_read_and_clear_async_rc(lli->lli_clob);
328                 lli->lli_async_rc = 0;
329         }
330
331         rc = ll_md_close(sbi->ll_md_exp, inode, file);
332
333         if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
334                 libcfs_debug_dumplog();
335
336         RETURN(rc);
337 }
338
339 static int ll_intent_file_open(struct file *file, void *lmm,
340                                int lmmsize, struct lookup_intent *itp)
341 {
342         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
343         struct dentry *parent = file->f_dentry->d_parent;
344         const char *name = file->f_dentry->d_name.name;
345         const int len = file->f_dentry->d_name.len;
346         struct md_op_data *op_data;
347         struct ptlrpc_request *req;
348         __u32 opc = LUSTRE_OPC_ANY;
349         int rc;
350         ENTRY;
351
352         if (!parent)
353                 RETURN(-ENOENT);
354
355         /* Usually we come here only for NFSD, and we want open lock.
356            But we can also get here with pre 2.6.15 patchless kernels, and in
357            that case that lock is also ok */
358         /* We can also get here if there was cached open handle in revalidate_it
359          * but it disappeared while we were getting from there to ll_file_open.
360          * But this means this file was closed and immediatelly opened which
361          * makes a good candidate for using OPEN lock */
362         /* If lmmsize & lmm are not 0, we are just setting stripe info
363          * parameters. No need for the open lock */
364         if (lmm == NULL && lmmsize == 0) {
365                 itp->it_flags |= MDS_OPEN_LOCK;
366                 if (itp->it_flags & FMODE_WRITE)
367                         opc = LUSTRE_OPC_CREATE;
368         }
369
370         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
371                                       file->f_dentry->d_inode, name, len,
372                                       O_RDWR, opc, NULL);
373         if (IS_ERR(op_data))
374                 RETURN(PTR_ERR(op_data));
375
376         itp->it_flags |= MDS_OPEN_BY_FID;
377         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
378                             0 /*unused */, &req, ll_md_blocking_ast, 0);
379         ll_finish_md_op_data(op_data);
380         if (rc == -ESTALE) {
381                 /* reason for keep own exit path - don`t flood log
382                 * with messages with -ESTALE errors.
383                 */
384                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
385                      it_open_error(DISP_OPEN_OPEN, itp))
386                         GOTO(out, rc);
387                 ll_release_openhandle(file->f_dentry, itp);
388                 GOTO(out, rc);
389         }
390
391         if (it_disposition(itp, DISP_LOOKUP_NEG))
392                 GOTO(out, rc = -ENOENT);
393
394         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
395                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
396                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
397                 GOTO(out, rc);
398         }
399
400         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
401         if (!rc && itp->d.lustre.it_lock_mode)
402                 ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
403                                  itp, NULL);
404
405 out:
406         ptlrpc_req_finished(itp->d.lustre.it_data);
407         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
408         ll_intent_drop_lock(itp);
409
410         RETURN(rc);
411 }
412
413 /**
414  * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
415  * not believe attributes if a few ioepoch holders exist. Attributes for
416  * previous ioepoch if new one is opened are also skipped by MDS.
417  */
418 void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
419 {
420         if (ioepoch && lli->lli_ioepoch != ioepoch) {
421                 lli->lli_ioepoch = ioepoch;
422                 CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
423                        ioepoch, PFID(&lli->lli_fid));
424         }
425 }
426
427 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
428                        struct lookup_intent *it, struct obd_client_handle *och)
429 {
430         struct ptlrpc_request *req = it->d.lustre.it_data;
431         struct mdt_body *body;
432
433         LASSERT(och);
434
435         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
436         LASSERT(body != NULL);                      /* reply already checked out */
437
438         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
439         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
440         och->och_fid = lli->lli_fid;
441         och->och_flags = it->it_flags;
442         ll_ioepoch_open(lli, body->ioepoch);
443
444         return md_set_open_replay_data(md_exp, och, req);
445 }
446
447 int ll_local_open(struct file *file, struct lookup_intent *it,
448                   struct ll_file_data *fd, struct obd_client_handle *och)
449 {
450         struct inode *inode = file->f_dentry->d_inode;
451         struct ll_inode_info *lli = ll_i2info(inode);
452         ENTRY;
453
454         LASSERT(!LUSTRE_FPRIVATE(file));
455
456         LASSERT(fd != NULL);
457
458         if (och) {
459                 struct ptlrpc_request *req = it->d.lustre.it_data;
460                 struct mdt_body *body;
461                 int rc;
462
463                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
464                 if (rc)
465                         RETURN(rc);
466
467                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
468                 if ((it->it_flags & FMODE_WRITE) &&
469                     (body->valid & OBD_MD_FLSIZE))
470                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
471                                lli->lli_ioepoch, PFID(&lli->lli_fid));
472         }
473
474         LUSTRE_FPRIVATE(file) = fd;
475         ll_readahead_init(inode, &fd->fd_ras);
476         fd->fd_omode = it->it_flags;
477         RETURN(0);
478 }
479
480 /* Open a file, and (for the very first open) create objects on the OSTs at
481  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
482  * creation or open until ll_lov_setstripe() ioctl is called.
483  *
484  * If we already have the stripe MD locally then we don't request it in
485  * md_open(), by passing a lmm_size = 0.
486  *
487  * It is up to the application to ensure no other processes open this file
488  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
489  * used.  We might be able to avoid races of that sort by getting lli_open_sem
490  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
491  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
492  */
493 int ll_file_open(struct inode *inode, struct file *file)
494 {
495         struct ll_inode_info *lli = ll_i2info(inode);
496         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
497                                           .it_flags = file->f_flags };
498         struct obd_client_handle **och_p = NULL;
499         __u64 *och_usecount = NULL;
500         struct ll_file_data *fd;
501         int rc = 0, opendir_set = 0;
502         ENTRY;
503
504         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
505                inode->i_generation, inode, file->f_flags);
506
507         it = file->private_data; /* XXX: compat macro */
508         file->private_data = NULL; /* prevent ll_local_open assertion */
509
510         fd = ll_file_data_get();
511         if (fd == NULL)
512                 GOTO(out_och_free, rc = -ENOMEM);
513
514         fd->fd_file = file;
515         if (S_ISDIR(inode->i_mode)) {
516                 cfs_spin_lock(&lli->lli_sa_lock);
517                 if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
518                     lli->lli_opendir_pid == 0) {
519                         lli->lli_opendir_key = fd;
520                         lli->lli_opendir_pid = cfs_curproc_pid();
521                         opendir_set = 1;
522                 }
523                 cfs_spin_unlock(&lli->lli_sa_lock);
524         }
525
526         if (inode->i_sb->s_root == file->f_dentry) {
527                 LUSTRE_FPRIVATE(file) = fd;
528                 RETURN(0);
529         }
530
531         if (!it || !it->d.lustre.it_disposition) {
532                 /* Convert f_flags into access mode. We cannot use file->f_mode,
533                  * because everything but O_ACCMODE mask was stripped from
534                  * there */
535                 if ((oit.it_flags + 1) & O_ACCMODE)
536                         oit.it_flags++;
537                 if (file->f_flags & O_TRUNC)
538                         oit.it_flags |= FMODE_WRITE;
539
540                 /* kernel only call f_op->open in dentry_open.  filp_open calls
541                  * dentry_open after call to open_namei that checks permissions.
542                  * Only nfsd_open call dentry_open directly without checking
543                  * permissions and because of that this code below is safe. */
544                 if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
545                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
546
547                 /* We do not want O_EXCL here, presumably we opened the file
548                  * already? XXX - NFS implications? */
549                 oit.it_flags &= ~O_EXCL;
550
551                 /* bug20584, if "it_flags" contains O_CREAT, the file will be
552                  * created if necessary, then "IT_CREAT" should be set to keep
553                  * consistent with it */
554                 if (oit.it_flags & O_CREAT)
555                         oit.it_op |= IT_CREAT;
556
557                 it = &oit;
558         }
559
560 restart:
561         /* Let's see if we have file open on MDS already. */
562         if (it->it_flags & FMODE_WRITE) {
563                 och_p = &lli->lli_mds_write_och;
564                 och_usecount = &lli->lli_open_fd_write_count;
565         } else if (it->it_flags & FMODE_EXEC) {
566                 och_p = &lli->lli_mds_exec_och;
567                 och_usecount = &lli->lli_open_fd_exec_count;
568          } else {
569                 och_p = &lli->lli_mds_read_och;
570                 och_usecount = &lli->lli_open_fd_read_count;
571         }
572
573         cfs_mutex_lock(&lli->lli_och_mutex);
574         if (*och_p) { /* Open handle is present */
575                 if (it_disposition(it, DISP_OPEN_OPEN)) {
576                         /* Well, there's extra open request that we do not need,
577                            let's close it somehow. This will decref request. */
578                         rc = it_open_error(DISP_OPEN_OPEN, it);
579                         if (rc) {
580                                 cfs_mutex_unlock(&lli->lli_och_mutex);
581                                 GOTO(out_openerr, rc);
582                         }
583
584                         ll_release_openhandle(file->f_dentry, it);
585                 }
586                 (*och_usecount)++;
587
588                 rc = ll_local_open(file, it, fd, NULL);
589                 if (rc) {
590                         (*och_usecount)--;
591                         cfs_mutex_unlock(&lli->lli_och_mutex);
592                         GOTO(out_openerr, rc);
593                 }
594         } else {
595                 LASSERT(*och_usecount == 0);
596                 if (!it->d.lustre.it_disposition) {
597                         /* We cannot just request lock handle now, new ELC code
598                            means that one of other OPEN locks for this file
599                            could be cancelled, and since blocking ast handler
600                            would attempt to grab och_mutex as well, that would
601                            result in a deadlock */
602                         cfs_mutex_unlock(&lli->lli_och_mutex);
603                         it->it_create_mode |= M_CHECK_STALE;
604                         rc = ll_intent_file_open(file, NULL, 0, it);
605                         it->it_create_mode &= ~M_CHECK_STALE;
606                         if (rc)
607                                 GOTO(out_openerr, rc);
608
609                         goto restart;
610                 }
611                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
612                 if (!*och_p)
613                         GOTO(out_och_free, rc = -ENOMEM);
614
615                 (*och_usecount)++;
616
617                 /* md_intent_lock() didn't get a request ref if there was an
618                  * open error, so don't do cleanup on the request here
619                  * (bug 3430) */
620                 /* XXX (green): Should not we bail out on any error here, not
621                  * just open error? */
622                 rc = it_open_error(DISP_OPEN_OPEN, it);
623                 if (rc)
624                         GOTO(out_och_free, rc);
625
626                 LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
627
628                 rc = ll_local_open(file, it, fd, *och_p);
629                 if (rc)
630                         GOTO(out_och_free, rc);
631         }
632         cfs_mutex_unlock(&lli->lli_och_mutex);
633         fd = NULL;
634
635         /* Must do this outside lli_och_mutex lock to prevent deadlock where
636            different kind of OPEN lock for this same inode gets cancelled
637            by ldlm_cancel_lru */
638         if (!S_ISREG(inode->i_mode))
639                 GOTO(out_och_free, rc);
640
641         ll_capa_open(inode);
642
643         if (!lli->lli_has_smd) {
644                 if (file->f_flags & O_LOV_DELAY_CREATE ||
645                     !(file->f_mode & FMODE_WRITE)) {
646                         CDEBUG(D_INODE, "object creation was delayed\n");
647                         GOTO(out_och_free, rc);
648                 }
649         }
650         file->f_flags &= ~O_LOV_DELAY_CREATE;
651         GOTO(out_och_free, rc);
652
653 out_och_free:
654         if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
655                 ptlrpc_req_finished(it->d.lustre.it_data);
656                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
657         }
658
659         if (rc) {
660                 if (och_p && *och_p) {
661                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
662                         *och_p = NULL; /* OBD_FREE writes some magic there */
663                         (*och_usecount)--;
664                 }
665                 cfs_mutex_unlock(&lli->lli_och_mutex);
666
667 out_openerr:
668                 if (opendir_set != 0)
669                         ll_stop_statahead(inode, lli->lli_opendir_key);
670                 if (fd != NULL)
671                         ll_file_data_put(fd);
672         } else {
673                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
674         }
675
676         return rc;
677 }
678
679 /* Fills the obdo with the attributes for the lsm */
680 static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
681                           struct obd_capa *capa, struct obdo *obdo,
682                           __u64 ioepoch, int sync)
683 {
684         struct ptlrpc_request_set *set;
685         struct obd_info            oinfo = { { { 0 } } };
686         int                        rc;
687
688         ENTRY;
689
690         LASSERT(lsm != NULL);
691
692         oinfo.oi_md = lsm;
693         oinfo.oi_oa = obdo;
694         oinfo.oi_oa->o_id = lsm->lsm_object_id;
695         oinfo.oi_oa->o_seq = lsm->lsm_object_seq;
696         oinfo.oi_oa->o_mode = S_IFREG;
697         oinfo.oi_oa->o_ioepoch = ioepoch;
698         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
699                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
700                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
701                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
702                                OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
703                                OBD_MD_FLDATAVERSION;
704         oinfo.oi_capa = capa;
705         if (sync) {
706                 oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
707                 oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
708         }
709
710         set = ptlrpc_prep_set();
711         if (set == NULL) {
712                 CERROR("can't allocate ptlrpc set\n");
713                 rc = -ENOMEM;
714         } else {
715                 rc = obd_getattr_async(exp, &oinfo, set);
716                 if (rc == 0)
717                         rc = ptlrpc_set_wait(set);
718                 ptlrpc_set_destroy(set);
719         }
720         if (rc == 0)
721                 oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
722                                          OBD_MD_FLATIME | OBD_MD_FLMTIME |
723                                          OBD_MD_FLCTIME | OBD_MD_FLSIZE |
724                                          OBD_MD_FLDATAVERSION);
725         RETURN(rc);
726 }
727
728 /**
729   * Performs the getattr on the inode and updates its fields.
730   * If @sync != 0, perform the getattr under the server-side lock.
731   */
732 int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
733                      __u64 ioepoch, int sync)
734 {
735         struct obd_capa      *capa = ll_mdscapa_get(inode);
736         struct lov_stripe_md *lsm;
737         int rc;
738         ENTRY;
739
740         lsm = ccc_inode_lsm_get(inode);
741         rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
742                             capa, obdo, ioepoch, sync);
743         capa_put(capa);
744         if (rc == 0) {
745                 obdo_refresh_inode(inode, obdo, obdo->o_valid);
746                 CDEBUG(D_INODE,
747                        "objid "LPX64" size %llu, blocks %llu, blksize %lu\n",
748                        lsm ? lsm->lsm_object_id : 0, i_size_read(inode),
749                        (unsigned long long)inode->i_blocks,
750                        (unsigned long)ll_inode_blksize(inode));
751         }
752         ccc_inode_lsm_put(inode, lsm);
753         RETURN(rc);
754 }
755
756 int ll_merge_lvb(struct inode *inode)
757 {
758         struct ll_inode_info *lli = ll_i2info(inode);
759         struct ll_sb_info *sbi = ll_i2sbi(inode);
760         struct lov_stripe_md *lsm;
761         struct ost_lvb lvb;
762         int rc = 0;
763
764         ENTRY;
765
766         lsm = ccc_inode_lsm_get(inode);
767         ll_inode_size_lock(inode);
768         inode_init_lvb(inode, &lvb);
769
770         /* merge timestamps the most resently obtained from mds with
771            timestamps obtained from osts */
772         lvb.lvb_atime = lli->lli_lvb.lvb_atime;
773         lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
774         lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
775         if (lsm != NULL) {
776                 rc = obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
777                 cl_isize_write_nolock(inode, lvb.lvb_size);
778
779                 CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
780                                 PFID(&lli->lli_fid), lvb.lvb_size);
781                 inode->i_blocks = lvb.lvb_blocks;
782
783                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
784                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
785                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
786         }
787         ll_inode_size_unlock(inode);
788         ccc_inode_lsm_put(inode, lsm);
789
790         RETURN(rc);
791 }
792
793 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
794                      lstat_t *st)
795 {
796         struct obdo obdo = { 0 };
797         int rc;
798
799         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
800         if (rc == 0) {
801                 st->st_size   = obdo.o_size;
802                 st->st_blocks = obdo.o_blocks;
803                 st->st_mtime  = obdo.o_mtime;
804                 st->st_atime  = obdo.o_atime;
805                 st->st_ctime  = obdo.o_ctime;
806         }
807         return rc;
808 }
809
810 void ll_io_init(struct cl_io *io, const struct file *file, int write)
811 {
812         struct inode *inode = file->f_dentry->d_inode;
813
814         io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
815         if (write) {
816                 io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
817                 io->u.ci_wr.wr_sync = file->f_flags & O_SYNC || IS_SYNC(inode);
818         }
819         io->ci_obj     = ll_i2info(inode)->lli_clob;
820         io->ci_lockreq = CILR_MAYBE;
821         if (ll_file_nolock(file)) {
822                 io->ci_lockreq = CILR_NEVER;
823                 io->ci_no_srvlock = 1;
824         } else if (file->f_flags & O_APPEND) {
825                 io->ci_lockreq = CILR_MANDATORY;
826         }
827 }
828
829 static ssize_t
830 ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
831                    struct file *file, enum cl_io_type iot,
832                    loff_t *ppos, size_t count)
833 {
834         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
835         struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
836         struct cl_io         *io;
837         ssize_t               result;
838         ENTRY;
839
840         io = ccc_env_thread_io(env);
841         ll_io_init(io, file, iot == CIT_WRITE);
842
843         if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
844                 struct vvp_io *vio = vvp_env_io(env);
845                 struct ccc_io *cio = ccc_env_io(env);
846                 int write_mutex_locked = 0;
847
848                 cio->cui_fd  = LUSTRE_FPRIVATE(file);
849                 vio->cui_io_subtype = args->via_io_subtype;
850
851                 switch (vio->cui_io_subtype) {
852                 case IO_NORMAL:
853                         cio->cui_iov = args->u.normal.via_iov;
854                         cio->cui_nrsegs = args->u.normal.via_nrsegs;
855                         cio->cui_tot_nrsegs = cio->cui_nrsegs;
856 #ifndef HAVE_FILE_WRITEV
857                         cio->cui_iocb = args->u.normal.via_iocb;
858 #endif
859                         if ((iot == CIT_WRITE) &&
860                             !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
861                                 if (cfs_mutex_lock_interruptible(&lli->
862                                                                lli_write_mutex))
863                                         GOTO(out, result = -ERESTARTSYS);
864                                 write_mutex_locked = 1;
865                         } else if (iot == CIT_READ) {
866                                 cfs_down_read(&lli->lli_trunc_sem);
867                         }
868                         break;
869                 case IO_SENDFILE:
870                         vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
871                         vio->u.sendfile.cui_target = args->u.sendfile.via_target;
872                         break;
873                 case IO_SPLICE:
874                         vio->u.splice.cui_pipe = args->u.splice.via_pipe;
875                         vio->u.splice.cui_flags = args->u.splice.via_flags;
876                         break;
877                 default:
878                         CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
879                         LBUG();
880                 }
881                 result = cl_io_loop(env, io);
882                 if (write_mutex_locked)
883                         cfs_mutex_unlock(&lli->lli_write_mutex);
884                 else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
885                         cfs_up_read(&lli->lli_trunc_sem);
886         } else {
887                 /* cl_io_rw_init() handled IO */
888                 result = io->ci_result;
889         }
890
891         if (io->ci_nob > 0) {
892                 result = io->ci_nob;
893                 *ppos = io->u.ci_wr.wr.crw_pos;
894         }
895         GOTO(out, result);
896 out:
897         cl_io_fini(env, io);
898
899         if (iot == CIT_READ) {
900                 if (result >= 0)
901                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
902                                            LPROC_LL_READ_BYTES, result);
903         } else if (iot == CIT_WRITE) {
904                 if (result >= 0) {
905                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
906                                            LPROC_LL_WRITE_BYTES, result);
907                         fd->fd_write_failed = false;
908                 } else {
909                         fd->fd_write_failed = true;
910                 }
911         }
912
913         return result;
914 }
915
916
917 /*
918  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
919  */
920 static int ll_file_get_iov_count(const struct iovec *iov,
921                                  unsigned long *nr_segs, size_t *count)
922 {
923         size_t cnt = 0;
924         unsigned long seg;
925
926         for (seg = 0; seg < *nr_segs; seg++) {
927                 const struct iovec *iv = &iov[seg];
928
929                 /*
930                  * If any segment has a negative length, or the cumulative
931                  * length ever wraps negative then return -EINVAL.
932                  */
933                 cnt += iv->iov_len;
934                 if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
935                         return -EINVAL;
936                 if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
937                         continue;
938                 if (seg == 0)
939                         return -EFAULT;
940                 *nr_segs = seg;
941                 cnt -= iv->iov_len;   /* This segment is no good */
942                 break;
943         }
944         *count = cnt;
945         return 0;
946 }
947
948 #ifdef HAVE_FILE_READV
949 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
950                               unsigned long nr_segs, loff_t *ppos)
951 {
952         struct lu_env      *env;
953         struct vvp_io_args *args;
954         size_t              count;
955         ssize_t             result;
956         int                 refcheck;
957         ENTRY;
958
959         result = ll_file_get_iov_count(iov, &nr_segs, &count);
960         if (result)
961                 RETURN(result);
962
963         env = cl_env_get(&refcheck);
964         if (IS_ERR(env))
965                 RETURN(PTR_ERR(env));
966
967         args = vvp_env_args(env, IO_NORMAL);
968         args->u.normal.via_iov = (struct iovec *)iov;
969         args->u.normal.via_nrsegs = nr_segs;
970
971         result = ll_file_io_generic(env, args, file, CIT_READ, ppos, count);
972         cl_env_put(env, &refcheck);
973         RETURN(result);
974 }
975
976 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
977                             loff_t *ppos)
978 {
979         struct lu_env *env;
980         struct iovec  *local_iov;
981         ssize_t        result;
982         int            refcheck;
983         ENTRY;
984
985         env = cl_env_get(&refcheck);
986         if (IS_ERR(env))
987                 RETURN(PTR_ERR(env));
988
989         local_iov = &vvp_env_info(env)->vti_local_iov;
990         local_iov->iov_base = (void __user *)buf;
991         local_iov->iov_len = count;
992         result = ll_file_readv(file, local_iov, 1, ppos);
993         cl_env_put(env, &refcheck);
994         RETURN(result);
995 }
996
997 #else
998 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
999                                 unsigned long nr_segs, loff_t pos)
1000 {
1001         struct lu_env      *env;
1002         struct vvp_io_args *args;
1003         size_t              count;
1004         ssize_t             result;
1005         int                 refcheck;
1006         ENTRY;
1007
1008         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1009         if (result)
1010                 RETURN(result);
1011
1012         env = cl_env_get(&refcheck);
1013         if (IS_ERR(env))
1014                 RETURN(PTR_ERR(env));
1015
1016         args = vvp_env_args(env, IO_NORMAL);
1017         args->u.normal.via_iov = (struct iovec *)iov;
1018         args->u.normal.via_nrsegs = nr_segs;
1019         args->u.normal.via_iocb = iocb;
1020
1021         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1022                                     &iocb->ki_pos, count);
1023         cl_env_put(env, &refcheck);
1024         RETURN(result);
1025 }
1026
1027 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1028                             loff_t *ppos)
1029 {
1030         struct lu_env *env;
1031         struct iovec  *local_iov;
1032         struct kiocb  *kiocb;
1033         ssize_t        result;
1034         int            refcheck;
1035         ENTRY;
1036
1037         env = cl_env_get(&refcheck);
1038         if (IS_ERR(env))
1039                 RETURN(PTR_ERR(env));
1040
1041         local_iov = &vvp_env_info(env)->vti_local_iov;
1042         kiocb = &vvp_env_info(env)->vti_kiocb;
1043         local_iov->iov_base = (void __user *)buf;
1044         local_iov->iov_len = count;
1045         init_sync_kiocb(kiocb, file);
1046         kiocb->ki_pos = *ppos;
1047         kiocb->ki_left = count;
1048
1049         result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1050         *ppos = kiocb->ki_pos;
1051
1052         cl_env_put(env, &refcheck);
1053         RETURN(result);
1054 }
1055 #endif
1056
1057 /*
1058  * Write to a file (through the page cache).
1059  */
1060 #ifdef HAVE_FILE_WRITEV
1061 static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
1062                               unsigned long nr_segs, loff_t *ppos)
1063 {
1064         struct lu_env      *env;
1065         struct vvp_io_args *args;
1066         size_t              count;
1067         ssize_t             result;
1068         int                 refcheck;
1069         ENTRY;
1070
1071         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1072         if (result)
1073                 RETURN(result);
1074
1075         env = cl_env_get(&refcheck);
1076         if (IS_ERR(env))
1077                 RETURN(PTR_ERR(env));
1078
1079         args = vvp_env_args(env, IO_NORMAL);
1080         args->u.normal.via_iov = (struct iovec *)iov;
1081         args->u.normal.via_nrsegs = nr_segs;
1082
1083         result = ll_file_io_generic(env, args, file, CIT_WRITE, ppos, count);
1084         cl_env_put(env, &refcheck);
1085         RETURN(result);
1086 }
1087
1088 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1089                              loff_t *ppos)
1090 {
1091         struct lu_env    *env;
1092         struct iovec     *local_iov;
1093         ssize_t           result;
1094         int               refcheck;
1095         ENTRY;
1096
1097         env = cl_env_get(&refcheck);
1098         if (IS_ERR(env))
1099                 RETURN(PTR_ERR(env));
1100
1101         local_iov = &vvp_env_info(env)->vti_local_iov;
1102         local_iov->iov_base = (void __user *)buf;
1103         local_iov->iov_len = count;
1104
1105         result = ll_file_writev(file, local_iov, 1, ppos);
1106         cl_env_put(env, &refcheck);
1107         RETURN(result);
1108 }
1109
1110 #else /* AIO stuff */
1111 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1112                                  unsigned long nr_segs, loff_t pos)
1113 {
1114         struct lu_env      *env;
1115         struct vvp_io_args *args;
1116         size_t              count;
1117         ssize_t             result;
1118         int                 refcheck;
1119         ENTRY;
1120
1121         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1122         if (result)
1123                 RETURN(result);
1124
1125         env = cl_env_get(&refcheck);
1126         if (IS_ERR(env))
1127                 RETURN(PTR_ERR(env));
1128
1129         args = vvp_env_args(env, IO_NORMAL);
1130         args->u.normal.via_iov = (struct iovec *)iov;
1131         args->u.normal.via_nrsegs = nr_segs;
1132         args->u.normal.via_iocb = iocb;
1133
1134         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1135                                   &iocb->ki_pos, count);
1136         cl_env_put(env, &refcheck);
1137         RETURN(result);
1138 }
1139
1140 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1141                              loff_t *ppos)
1142 {
1143         struct lu_env *env;
1144         struct iovec  *local_iov;
1145         struct kiocb  *kiocb;
1146         ssize_t        result;
1147         int            refcheck;
1148         ENTRY;
1149
1150         env = cl_env_get(&refcheck);
1151         if (IS_ERR(env))
1152                 RETURN(PTR_ERR(env));
1153
1154         local_iov = &vvp_env_info(env)->vti_local_iov;
1155         kiocb = &vvp_env_info(env)->vti_kiocb;
1156         local_iov->iov_base = (void __user *)buf;
1157         local_iov->iov_len = count;
1158         init_sync_kiocb(kiocb, file);
1159         kiocb->ki_pos = *ppos;
1160         kiocb->ki_left = count;
1161
1162         result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1163         *ppos = kiocb->ki_pos;
1164
1165         cl_env_put(env, &refcheck);
1166         RETURN(result);
1167 }
1168 #endif
1169
1170
1171 #ifdef HAVE_KERNEL_SENDFILE
1172 /*
1173  * Send file content (through pagecache) somewhere with helper
1174  */
1175 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1176                                 read_actor_t actor, void *target)
1177 {
1178         struct lu_env      *env;
1179         struct vvp_io_args *args;
1180         ssize_t             result;
1181         int                 refcheck;
1182         ENTRY;
1183
1184         env = cl_env_get(&refcheck);
1185         if (IS_ERR(env))
1186                 RETURN(PTR_ERR(env));
1187
1188         args = vvp_env_args(env, IO_SENDFILE);
1189         args->u.sendfile.via_target = target;
1190         args->u.sendfile.via_actor = actor;
1191
1192         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1193         cl_env_put(env, &refcheck);
1194         RETURN(result);
1195 }
1196 #endif
1197
1198 #ifdef HAVE_KERNEL_SPLICE_READ
1199 /*
1200  * Send file content (through pagecache) somewhere with helper
1201  */
1202 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1203                                    struct pipe_inode_info *pipe, size_t count,
1204                                    unsigned int flags)
1205 {
1206         struct lu_env      *env;
1207         struct vvp_io_args *args;
1208         ssize_t             result;
1209         int                 refcheck;
1210         ENTRY;
1211
1212         env = cl_env_get(&refcheck);
1213         if (IS_ERR(env))
1214                 RETURN(PTR_ERR(env));
1215
1216         args = vvp_env_args(env, IO_SPLICE);
1217         args->u.splice.via_pipe = pipe;
1218         args->u.splice.via_flags = flags;
1219
1220         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1221         cl_env_put(env, &refcheck);
1222         RETURN(result);
1223 }
1224 #endif
1225
1226 static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq,
1227                            obd_count ost_idx)
1228 {
1229         struct obd_export *exp = ll_i2dtexp(inode);
1230         struct obd_trans_info oti = { 0 };
1231         struct obdo *oa = NULL;
1232         int lsm_size;
1233         int rc = 0;
1234         struct lov_stripe_md *lsm = NULL, *lsm2;
1235         ENTRY;
1236
1237         OBDO_ALLOC(oa);
1238         if (oa == NULL)
1239                 RETURN(-ENOMEM);
1240
1241         lsm = ccc_inode_lsm_get(inode);
1242         if (lsm == NULL)
1243                 GOTO(out, rc = -ENOENT);
1244
1245         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1246                    (lsm->lsm_stripe_count));
1247
1248         OBD_ALLOC_LARGE(lsm2, lsm_size);
1249         if (lsm2 == NULL)
1250                 GOTO(out, rc = -ENOMEM);
1251
1252         oa->o_id = id;
1253         oa->o_seq = seq;
1254         oa->o_nlink = ost_idx;
1255         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1256         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1257         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1258                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1259         obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1260         memcpy(lsm2, lsm, lsm_size);
1261         ll_inode_size_lock(inode);
1262         rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1263         ll_inode_size_unlock(inode);
1264
1265         OBD_FREE_LARGE(lsm2, lsm_size);
1266         GOTO(out, rc);
1267 out:
1268         ccc_inode_lsm_put(inode, lsm);
1269         OBDO_FREE(oa);
1270         return rc;
1271 }
1272
1273 static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1274 {
1275         struct ll_recreate_obj ucreat;
1276         ENTRY;
1277
1278         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1279                 RETURN(-EPERM);
1280
1281         if (cfs_copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1282                                sizeof(struct ll_recreate_obj)))
1283                 RETURN(-EFAULT);
1284
1285         RETURN(ll_lov_recreate(inode, ucreat.lrc_id, 0,
1286                                ucreat.lrc_ost_idx));
1287 }
1288
1289 static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1290 {
1291         struct lu_fid fid;
1292         obd_id id;
1293         obd_count ost_idx;
1294         ENTRY;
1295
1296         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1297                 RETURN(-EPERM);
1298
1299         if (cfs_copy_from_user(&fid, (struct lu_fid *)arg,
1300                                sizeof(struct lu_fid)))
1301                 RETURN(-EFAULT);
1302
1303         id = fid_oid(&fid) | ((fid_seq(&fid) & 0xffff) << 32);
1304         ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1305         RETURN(ll_lov_recreate(inode, id, 0, ost_idx));
1306 }
1307
1308 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1309                              int flags, struct lov_user_md *lum, int lum_size)
1310 {
1311         struct lov_stripe_md *lsm = NULL;
1312         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1313         int rc = 0;
1314         ENTRY;
1315
1316         lsm = ccc_inode_lsm_get(inode);
1317         if (lsm != NULL) {
1318                 ccc_inode_lsm_put(inode, lsm);
1319                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1320                        inode->i_ino);
1321                 RETURN(-EEXIST);
1322         }
1323
1324         ll_inode_size_lock(inode);
1325         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1326         if (rc)
1327                 GOTO(out, rc);
1328         rc = oit.d.lustre.it_status;
1329         if (rc < 0)
1330                 GOTO(out_req_free, rc);
1331
1332         ll_release_openhandle(file->f_dentry, &oit);
1333
1334  out:
1335         ll_inode_size_unlock(inode);
1336         ll_intent_release(&oit);
1337         ccc_inode_lsm_put(inode, lsm);
1338         RETURN(rc);
1339 out_req_free:
1340         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1341         goto out;
1342 }
1343
1344 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1345                              struct lov_mds_md **lmmp, int *lmm_size,
1346                              struct ptlrpc_request **request)
1347 {
1348         struct ll_sb_info *sbi = ll_i2sbi(inode);
1349         struct mdt_body  *body;
1350         struct lov_mds_md *lmm = NULL;
1351         struct ptlrpc_request *req = NULL;
1352         struct md_op_data *op_data;
1353         int rc, lmmsize;
1354
1355         rc = ll_get_max_mdsize(sbi, &lmmsize);
1356         if (rc)
1357                 RETURN(rc);
1358
1359         op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1360                                      strlen(filename), lmmsize,
1361                                      LUSTRE_OPC_ANY, NULL);
1362         if (IS_ERR(op_data))
1363                 RETURN(PTR_ERR(op_data));
1364
1365         op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1366         rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1367         ll_finish_md_op_data(op_data);
1368         if (rc < 0) {
1369                 CDEBUG(D_INFO, "md_getattr_name failed "
1370                        "on %s: rc %d\n", filename, rc);
1371                 GOTO(out, rc);
1372         }
1373
1374         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1375         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1376
1377         lmmsize = body->eadatasize;
1378
1379         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1380                         lmmsize == 0) {
1381                 GOTO(out, rc = -ENODATA);
1382         }
1383
1384         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1385         LASSERT(lmm != NULL);
1386
1387         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1388             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1389                 GOTO(out, rc = -EPROTO);
1390         }
1391
1392         /*
1393          * This is coming from the MDS, so is probably in
1394          * little endian.  We convert it to host endian before
1395          * passing it to userspace.
1396          */
1397         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1398                 /* if function called for directory - we should
1399                  * avoid swab not existent lsm objects */
1400                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1401                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1402                         if (S_ISREG(body->mode))
1403                                 lustre_swab_lov_user_md_objects(
1404                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1405                                  ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
1406                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1407                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1408                         if (S_ISREG(body->mode))
1409                                 lustre_swab_lov_user_md_objects(
1410                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1411                                  ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
1412                 }
1413         }
1414
1415 out:
1416         *lmmp = lmm;
1417         *lmm_size = lmmsize;
1418         *request = req;
1419         return rc;
1420 }
1421
1422 static int ll_lov_setea(struct inode *inode, struct file *file,
1423                             unsigned long arg)
1424 {
1425         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1426         struct lov_user_md  *lump;
1427         int lum_size = sizeof(struct lov_user_md) +
1428                        sizeof(struct lov_user_ost_data);
1429         int rc;
1430         ENTRY;
1431
1432         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1433                 RETURN(-EPERM);
1434
1435         OBD_ALLOC_LARGE(lump, lum_size);
1436         if (lump == NULL) {
1437                 RETURN(-ENOMEM);
1438         }
1439         if (cfs_copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1440                 OBD_FREE_LARGE(lump, lum_size);
1441                 RETURN(-EFAULT);
1442         }
1443
1444         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1445
1446         OBD_FREE_LARGE(lump, lum_size);
1447         RETURN(rc);
1448 }
1449
1450 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1451                             unsigned long arg)
1452 {
1453         struct lov_user_md_v3 lumv3;
1454         struct lov_user_md_v1 *lumv1 = (struct lov_user_md_v1 *)&lumv3;
1455         struct lov_user_md_v1 *lumv1p = (struct lov_user_md_v1 *)arg;
1456         struct lov_user_md_v3 *lumv3p = (struct lov_user_md_v3 *)arg;
1457         int lum_size;
1458         int rc;
1459         int flags = FMODE_WRITE;
1460         ENTRY;
1461
1462         /* first try with v1 which is smaller than v3 */
1463         lum_size = sizeof(struct lov_user_md_v1);
1464         if (cfs_copy_from_user(lumv1, lumv1p, lum_size))
1465                 RETURN(-EFAULT);
1466
1467         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1468                 lum_size = sizeof(struct lov_user_md_v3);
1469                 if (cfs_copy_from_user(&lumv3, lumv3p, lum_size))
1470                         RETURN(-EFAULT);
1471         }
1472
1473         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1474         if (rc == 0) {
1475                 struct lov_stripe_md *lsm;
1476                 put_user(0, &lumv1p->lmm_stripe_count);
1477                 lsm = ccc_inode_lsm_get(inode);
1478                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1479                                    0, lsm, (void *)arg);
1480                 ccc_inode_lsm_put(inode, lsm);
1481         }
1482         RETURN(rc);
1483 }
1484
1485 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1486 {
1487         struct lov_stripe_md *lsm;
1488         int rc = -ENODATA;
1489         ENTRY;
1490
1491         lsm = ccc_inode_lsm_get(inode);
1492         if (lsm != NULL)
1493                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1494                                    lsm, (void *)arg);
1495         ccc_inode_lsm_put(inode, lsm);
1496         RETURN(rc);
1497 }
1498
1499 int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1500 {
1501         struct ll_inode_info   *lli = ll_i2info(inode);
1502         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1503         struct ccc_grouplock    grouplock;
1504         int                     rc;
1505         ENTRY;
1506
1507         if (ll_file_nolock(file))
1508                 RETURN(-EOPNOTSUPP);
1509
1510         cfs_spin_lock(&lli->lli_lock);
1511         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1512                 CWARN("group lock already existed with gid %lu\n",
1513                        fd->fd_grouplock.cg_gid);
1514                 cfs_spin_unlock(&lli->lli_lock);
1515                 RETURN(-EINVAL);
1516         }
1517         LASSERT(fd->fd_grouplock.cg_lock == NULL);
1518         cfs_spin_unlock(&lli->lli_lock);
1519
1520         rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1521                               arg, (file->f_flags & O_NONBLOCK), &grouplock);
1522         if (rc)
1523                 RETURN(rc);
1524
1525         cfs_spin_lock(&lli->lli_lock);
1526         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1527                 cfs_spin_unlock(&lli->lli_lock);
1528                 CERROR("another thread just won the race\n");
1529                 cl_put_grouplock(&grouplock);
1530                 RETURN(-EINVAL);
1531         }
1532
1533         fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1534         fd->fd_grouplock = grouplock;
1535         cfs_spin_unlock(&lli->lli_lock);
1536
1537         CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1538         RETURN(0);
1539 }
1540
1541 int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1542 {
1543         struct ll_inode_info   *lli = ll_i2info(inode);
1544         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1545         struct ccc_grouplock    grouplock;
1546         ENTRY;
1547
1548         cfs_spin_lock(&lli->lli_lock);
1549         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1550                 cfs_spin_unlock(&lli->lli_lock);
1551                 CWARN("no group lock held\n");
1552                 RETURN(-EINVAL);
1553         }
1554         LASSERT(fd->fd_grouplock.cg_lock != NULL);
1555
1556         if (fd->fd_grouplock.cg_gid != arg) {
1557                 CWARN("group lock %lu doesn't match current id %lu\n",
1558                        arg, fd->fd_grouplock.cg_gid);
1559                 cfs_spin_unlock(&lli->lli_lock);
1560                 RETURN(-EINVAL);
1561         }
1562
1563         grouplock = fd->fd_grouplock;
1564         memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1565         fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1566         cfs_spin_unlock(&lli->lli_lock);
1567
1568         cl_put_grouplock(&grouplock);
1569         CDEBUG(D_INFO, "group lock %lu released\n", arg);
1570         RETURN(0);
1571 }
1572
1573 /**
1574  * Close inode open handle
1575  *
1576  * \param dentry [in]     dentry which contains the inode
1577  * \param it     [in,out] intent which contains open info and result
1578  *
1579  * \retval 0     success
1580  * \retval <0    failure
1581  */
1582 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1583 {
1584         struct inode *inode = dentry->d_inode;
1585         struct obd_client_handle *och;
1586         int rc;
1587         ENTRY;
1588
1589         LASSERT(inode);
1590
1591         /* Root ? Do nothing. */
1592         if (dentry->d_inode->i_sb->s_root == dentry)
1593                 RETURN(0);
1594
1595         /* No open handle to close? Move away */
1596         if (!it_disposition(it, DISP_OPEN_OPEN))
1597                 RETURN(0);
1598
1599         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1600
1601         OBD_ALLOC(och, sizeof(*och));
1602         if (!och)
1603                 GOTO(out, rc = -ENOMEM);
1604
1605         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
1606                     ll_i2info(inode), it, och);
1607
1608         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1609                                        inode, och);
1610  out:
1611         /* this one is in place of ll_file_open */
1612         if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1613                 ptlrpc_req_finished(it->d.lustre.it_data);
1614                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1615         }
1616         RETURN(rc);
1617 }
1618
1619 /**
1620  * Get size for inode for which FIEMAP mapping is requested.
1621  * Make the FIEMAP get_info call and returns the result.
1622  */
1623 int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1624               int num_bytes)
1625 {
1626         struct obd_export *exp = ll_i2dtexp(inode);
1627         struct lov_stripe_md *lsm = NULL;
1628         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1629         int vallen = num_bytes;
1630         int rc;
1631         ENTRY;
1632
1633         /* Checks for fiemap flags */
1634         if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1635                 fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1636                 return -EBADR;
1637         }
1638
1639         /* Check for FIEMAP_FLAG_SYNC */
1640         if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1641                 rc = filemap_fdatawrite(inode->i_mapping);
1642                 if (rc)
1643                         return rc;
1644         }
1645
1646         lsm = ccc_inode_lsm_get(inode);
1647         if (lsm == NULL)
1648                 return -ENOENT;
1649
1650         /* If the stripe_count > 1 and the application does not understand
1651          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1652          */
1653         if (lsm->lsm_stripe_count > 1 &&
1654             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1655                 GOTO(out, rc = -EOPNOTSUPP);
1656
1657         fm_key.oa.o_id = lsm->lsm_object_id;
1658         fm_key.oa.o_seq = lsm->lsm_object_seq;
1659         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1660
1661         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1662         obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1663         /* If filesize is 0, then there would be no objects for mapping */
1664         if (fm_key.oa.o_size == 0) {
1665                 fiemap->fm_mapped_extents = 0;
1666                 GOTO(out, rc = 0);
1667         }
1668
1669         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1670
1671         rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1672                           fiemap, lsm);
1673         if (rc)
1674                 CERROR("obd_get_info failed: rc = %d\n", rc);
1675
1676 out:
1677         ccc_inode_lsm_put(inode, lsm);
1678         RETURN(rc);
1679 }
1680
1681 int ll_fid2path(struct inode *inode, void *arg)
1682 {
1683         struct obd_export *exp = ll_i2mdexp(inode);
1684         struct getinfo_fid2path *gfout, *gfin;
1685         int outsize, rc;
1686         ENTRY;
1687
1688         if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
1689             !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
1690                 RETURN(-EPERM);
1691
1692         /* Need to get the buflen */
1693         OBD_ALLOC_PTR(gfin);
1694         if (gfin == NULL)
1695                 RETURN(-ENOMEM);
1696         if (cfs_copy_from_user(gfin, arg, sizeof(*gfin))) {
1697                 OBD_FREE_PTR(gfin);
1698                 RETURN(-EFAULT);
1699         }
1700
1701         outsize = sizeof(*gfout) + gfin->gf_pathlen;
1702         OBD_ALLOC(gfout, outsize);
1703         if (gfout == NULL) {
1704                 OBD_FREE_PTR(gfin);
1705                 RETURN(-ENOMEM);
1706         }
1707         memcpy(gfout, gfin, sizeof(*gfout));
1708         OBD_FREE_PTR(gfin);
1709
1710         /* Call mdc_iocontrol */
1711         rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1712         if (rc)
1713                 GOTO(gf_free, rc);
1714         if (cfs_copy_to_user(arg, gfout, outsize))
1715                 rc = -EFAULT;
1716
1717 gf_free:
1718         OBD_FREE(gfout, outsize);
1719         RETURN(rc);
1720 }
1721
1722 static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1723 {
1724         struct ll_user_fiemap *fiemap_s;
1725         size_t num_bytes, ret_bytes;
1726         unsigned int extent_count;
1727         int rc = 0;
1728
1729         /* Get the extent count so we can calculate the size of
1730          * required fiemap buffer */
1731         if (get_user(extent_count,
1732             &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1733                 RETURN(-EFAULT);
1734         num_bytes = sizeof(*fiemap_s) + (extent_count *
1735                                          sizeof(struct ll_fiemap_extent));
1736
1737         OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1738         if (fiemap_s == NULL)
1739                 RETURN(-ENOMEM);
1740
1741         /* get the fiemap value */
1742         if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
1743                            sizeof(*fiemap_s)))
1744                 GOTO(error, rc = -EFAULT);
1745
1746         /* If fm_extent_count is non-zero, read the first extent since
1747          * it is used to calculate end_offset and device from previous
1748          * fiemap call. */
1749         if (extent_count) {
1750                 if (copy_from_user(&fiemap_s->fm_extents[0],
1751                     (char __user *)arg + sizeof(*fiemap_s),
1752                     sizeof(struct ll_fiemap_extent)))
1753                         GOTO(error, rc = -EFAULT);
1754         }
1755
1756         rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1757         if (rc)
1758                 GOTO(error, rc);
1759
1760         ret_bytes = sizeof(struct ll_user_fiemap);
1761
1762         if (extent_count != 0)
1763                 ret_bytes += (fiemap_s->fm_mapped_extents *
1764                                  sizeof(struct ll_fiemap_extent));
1765
1766         if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1767                 rc = -EFAULT;
1768
1769 error:
1770         OBD_FREE_LARGE(fiemap_s, num_bytes);
1771         RETURN(rc);
1772 }
1773
1774 /*
1775  * Read the data_version for inode.
1776  *
1777  * This value is computed using stripe object version on OST.
1778  * Version is computed using server side locking.
1779  *
1780  * @param extent_lock  Take extent lock. Not needed if a process is already
1781  *                     holding the OST object group locks.
1782  */
1783 static int ll_data_version(struct inode *inode, __u64 *data_version,
1784                            int extent_lock)
1785 {
1786         struct lov_stripe_md *lsm = NULL;
1787         struct ll_sb_info    *sbi = ll_i2sbi(inode);
1788         struct obdo          *obdo = NULL;
1789         int                   rc;
1790         ENTRY;
1791
1792         /* If no stripe, we consider version is 0. */
1793         lsm = ccc_inode_lsm_get(inode);
1794         if (lsm == NULL) {
1795                 *data_version = 0;
1796                 CDEBUG(D_INODE, "No object for inode\n");
1797                 RETURN(0);
1798         }
1799
1800         OBD_ALLOC_PTR(obdo);
1801         if (obdo == NULL) {
1802                 ccc_inode_lsm_put(inode, lsm);
1803                 RETURN(-ENOMEM);
1804         }
1805
1806         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
1807         if (!rc) {
1808                 if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1809                         rc = -EOPNOTSUPP;
1810                 else
1811                         *data_version = obdo->o_data_version;
1812         }
1813
1814         OBD_FREE_PTR(obdo);
1815         ccc_inode_lsm_put(inode, lsm);
1816
1817         RETURN(rc);
1818 }
1819
1820 long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
1821 {
1822         struct inode *inode = file->f_dentry->d_inode;
1823         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1824         int flags;
1825
1826         ENTRY;
1827
1828         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1829                inode->i_generation, inode, cmd);
1830         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
1831
1832         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
1833         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
1834                 RETURN(-ENOTTY);
1835
1836         switch(cmd) {
1837         case LL_IOC_GETFLAGS:
1838                 /* Get the current value of the file flags */
1839                 return put_user(fd->fd_flags, (int *)arg);
1840         case LL_IOC_SETFLAGS:
1841         case LL_IOC_CLRFLAGS:
1842                 /* Set or clear specific file flags */
1843                 /* XXX This probably needs checks to ensure the flags are
1844                  *     not abused, and to handle any flag side effects.
1845                  */
1846                 if (get_user(flags, (int *) arg))
1847                         RETURN(-EFAULT);
1848
1849                 if (cmd == LL_IOC_SETFLAGS) {
1850                         if ((flags & LL_FILE_IGNORE_LOCK) &&
1851                             !(file->f_flags & O_DIRECT)) {
1852                                 CERROR("%s: unable to disable locking on "
1853                                        "non-O_DIRECT file\n", current->comm);
1854                                 RETURN(-EINVAL);
1855                         }
1856
1857                         fd->fd_flags |= flags;
1858                 } else {
1859                         fd->fd_flags &= ~flags;
1860                 }
1861                 RETURN(0);
1862         case LL_IOC_LOV_SETSTRIPE:
1863                 RETURN(ll_lov_setstripe(inode, file, arg));
1864         case LL_IOC_LOV_SETEA:
1865                 RETURN(ll_lov_setea(inode, file, arg));
1866         case LL_IOC_LOV_GETSTRIPE:
1867                 RETURN(ll_lov_getstripe(inode, arg));
1868         case LL_IOC_RECREATE_OBJ:
1869                 RETURN(ll_lov_recreate_obj(inode, arg));
1870         case LL_IOC_RECREATE_FID:
1871                 RETURN(ll_lov_recreate_fid(inode, arg));
1872         case FSFILT_IOC_FIEMAP:
1873                 RETURN(ll_ioctl_fiemap(inode, arg));
1874         case FSFILT_IOC_GETFLAGS:
1875         case FSFILT_IOC_SETFLAGS:
1876                 RETURN(ll_iocontrol(inode, file, cmd, arg));
1877         case FSFILT_IOC_GETVERSION_OLD:
1878         case FSFILT_IOC_GETVERSION:
1879                 RETURN(put_user(inode->i_generation, (int *)arg));
1880         case LL_IOC_GROUP_LOCK:
1881                 RETURN(ll_get_grouplock(inode, file, arg));
1882         case LL_IOC_GROUP_UNLOCK:
1883                 RETURN(ll_put_grouplock(inode, file, arg));
1884         case IOC_OBD_STATFS:
1885                 RETURN(ll_obd_statfs(inode, (void *)arg));
1886
1887         /* We need to special case any other ioctls we want to handle,
1888          * to send them to the MDS/OST as appropriate and to properly
1889          * network encode the arg field.
1890         case FSFILT_IOC_SETVERSION_OLD:
1891         case FSFILT_IOC_SETVERSION:
1892         */
1893         case LL_IOC_FLUSHCTX:
1894                 RETURN(ll_flush_ctx(inode));
1895         case LL_IOC_PATH2FID: {
1896                 if (cfs_copy_to_user((void *)arg, ll_inode2fid(inode),
1897                                      sizeof(struct lu_fid)))
1898                         RETURN(-EFAULT);
1899
1900                 RETURN(0);
1901         }
1902         case OBD_IOC_FID2PATH:
1903                 RETURN(ll_fid2path(inode, (void *)arg));
1904         case LL_IOC_DATA_VERSION: {
1905                 struct ioc_data_version idv;
1906                 int rc;
1907
1908                 if (cfs_copy_from_user(&idv, (char *)arg, sizeof(idv)))
1909                         RETURN(-EFAULT);
1910
1911                 rc = ll_data_version(inode, &idv.idv_version,
1912                                      !(idv.idv_flags & LL_DV_NOFLUSH));
1913
1914                 if (rc == 0 &&
1915                     cfs_copy_to_user((char *) arg, &idv, sizeof(idv)))
1916                         RETURN(-EFAULT);
1917
1918                 RETURN(rc);
1919         }
1920
1921         case LL_IOC_GET_MDTIDX: {
1922                 int mdtidx;
1923
1924                 mdtidx = ll_get_mdt_idx(inode);
1925                 if (mdtidx < 0)
1926                         RETURN(mdtidx);
1927
1928                 if (put_user((int)mdtidx, (int*)arg))
1929                         RETURN(-EFAULT);
1930
1931                 RETURN(0);
1932         }
1933         case OBD_IOC_GETDTNAME:
1934         case OBD_IOC_GETMDNAME:
1935                 RETURN(ll_get_obd_name(inode, cmd, arg));
1936         default: {
1937                 int err;
1938
1939                 if (LLIOC_STOP ==
1940                     ll_iocontrol_call(inode, file, cmd, arg, &err))
1941                         RETURN(err);
1942
1943                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
1944                                      (void *)arg));
1945         }
1946         }
1947 }
1948
1949 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
1950 {
1951         struct inode *inode = file->f_dentry->d_inode;
1952         loff_t retval;
1953         ENTRY;
1954         retval = offset + ((origin == 2) ? i_size_read(inode) :
1955                            (origin == 1) ? file->f_pos : 0);
1956         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%s)\n",
1957                inode->i_ino, inode->i_generation, inode, retval, retval,
1958                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
1959         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
1960
1961         if (origin == 2) { /* SEEK_END */
1962                 int rc;
1963
1964                 rc = ll_glimpse_size(inode);
1965                 if (rc != 0)
1966                         RETURN(rc);
1967
1968                 offset += i_size_read(inode);
1969         } else if (origin == 1) { /* SEEK_CUR */
1970                 offset += file->f_pos;
1971         }
1972
1973         retval = -EINVAL;
1974         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
1975                 if (offset != file->f_pos) {
1976                         file->f_pos = offset;
1977                 }
1978                 retval = offset;
1979         }
1980
1981         RETURN(retval);
1982 }
1983
1984 int ll_flush(struct file *file, fl_owner_t id)
1985 {
1986         struct inode *inode = file->f_dentry->d_inode;
1987         struct ll_inode_info *lli = ll_i2info(inode);
1988         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1989         int rc, err;
1990
1991         LASSERT(!S_ISDIR(inode->i_mode));
1992
1993         /* catch async errors that were recorded back when async writeback
1994          * failed for pages in this mapping. */
1995         rc = lli->lli_async_rc;
1996         lli->lli_async_rc = 0;
1997         err = lov_read_and_clear_async_rc(lli->lli_clob);
1998         if (rc == 0)
1999                 rc = err;
2000
2001         /* The application has been told write failure already.
2002          * Do not report failure again. */
2003         if (fd->fd_write_failed)
2004                 return 0;
2005         return rc ? -EIO : 0;
2006 }
2007
2008 /**
2009  * Called to make sure a portion of file has been written out.
2010  * if @local_only is not true, it will send OST_SYNC RPCs to ost.
2011  *
2012  * Return how many pages have been written.
2013  */
2014 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2015                        enum cl_fsync_mode mode)
2016 {
2017         struct cl_env_nest nest;
2018         struct lu_env *env;
2019         struct cl_io *io;
2020         struct obd_capa *capa = NULL;
2021         struct cl_fsync_io *fio;
2022         int result;
2023         ENTRY;
2024
2025         if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2026             mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2027                 RETURN(-EINVAL);
2028
2029         env = cl_env_nested_get(&nest);
2030         if (IS_ERR(env))
2031                 RETURN(PTR_ERR(env));
2032
2033         capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2034
2035         io = ccc_env_thread_io(env);
2036         io->ci_obj = cl_i2info(inode)->lli_clob;
2037         io->ci_ignore_layout = 1;
2038
2039         /* initialize parameters for sync */
2040         fio = &io->u.ci_fsync;
2041         fio->fi_capa = capa;
2042         fio->fi_start = start;
2043         fio->fi_end = end;
2044         fio->fi_fid = ll_inode2fid(inode);
2045         fio->fi_mode = mode;
2046         fio->fi_nr_written = 0;
2047
2048         if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2049                 result = cl_io_loop(env, io);
2050         else
2051                 result = io->ci_result;
2052         if (result == 0)
2053                 result = fio->fi_nr_written;
2054         cl_io_fini(env, io);
2055         cl_env_nested_put(&nest, env);
2056
2057         capa_put(capa);
2058
2059         RETURN(result);
2060 }
2061
2062 #ifdef HAVE_FILE_FSYNC_4ARGS
2063 int ll_fsync(struct file *file, loff_t start, loff_t end, int data)
2064 #elif defined(HAVE_FILE_FSYNC_2ARGS)
2065 int ll_fsync(struct file *file, int data)
2066 #else
2067 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2068 #endif
2069 {
2070         struct inode *inode = file->f_dentry->d_inode;
2071         struct ll_inode_info *lli = ll_i2info(inode);
2072         struct ptlrpc_request *req;
2073         struct obd_capa *oc;
2074         struct lov_stripe_md *lsm;
2075         int rc, err;
2076         ENTRY;
2077         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2078                inode->i_generation, inode);
2079         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2080
2081         /* fsync's caller has already called _fdata{sync,write}, we want
2082          * that IO to finish before calling the osc and mdc sync methods */
2083         rc = filemap_fdatawait(inode->i_mapping);
2084
2085         /* catch async errors that were recorded back when async writeback
2086          * failed for pages in this mapping. */
2087         if (!S_ISDIR(inode->i_mode)) {
2088                 err = lli->lli_async_rc;
2089                 lli->lli_async_rc = 0;
2090                 if (rc == 0)
2091                         rc = err;
2092                 err = lov_read_and_clear_async_rc(lli->lli_clob);
2093                 if (rc == 0)
2094                         rc = err;
2095         }
2096
2097         oc = ll_mdscapa_get(inode);
2098         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2099                       &req);
2100         capa_put(oc);
2101         if (!rc)
2102                 rc = err;
2103         if (!err)
2104                 ptlrpc_req_finished(req);
2105
2106         lsm = ccc_inode_lsm_get(inode);
2107         if (data && lsm) {
2108                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2109
2110                 err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
2111                                 CL_FSYNC_ALL);
2112                 if (rc == 0 && err < 0)
2113                         rc = err;
2114                 if (rc < 0)
2115                         fd->fd_write_failed = true;
2116                 else
2117                         fd->fd_write_failed = false;
2118         }
2119         ccc_inode_lsm_put(inode, lsm);
2120
2121         RETURN(rc);
2122 }
2123
2124 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2125 {
2126         struct inode *inode = file->f_dentry->d_inode;
2127         struct ll_sb_info *sbi = ll_i2sbi(inode);
2128         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
2129                                            .ei_cb_cp =ldlm_flock_completion_ast,
2130                                            .ei_cbdata = file_lock };
2131         struct md_op_data *op_data;
2132         struct lustre_handle lockh = {0};
2133         ldlm_policy_data_t flock = {{0}};
2134         int flags = 0;
2135         int rc;
2136         ENTRY;
2137
2138         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2139                inode->i_ino, file_lock);
2140
2141         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2142
2143         if (file_lock->fl_flags & FL_FLOCK) {
2144                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2145                 /* flocks are whole-file locks */
2146                 flock.l_flock.end = OFFSET_MAX;
2147                 /* For flocks owner is determined by the local file desctiptor*/
2148                 flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2149         } else if (file_lock->fl_flags & FL_POSIX) {
2150                 flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2151                 flock.l_flock.start = file_lock->fl_start;
2152                 flock.l_flock.end = file_lock->fl_end;
2153         } else {
2154                 RETURN(-EINVAL);
2155         }
2156         flock.l_flock.pid = file_lock->fl_pid;
2157
2158         /* Somewhat ugly workaround for svc lockd.
2159          * lockd installs custom fl_lmops->fl_compare_owner that checks
2160          * for the fl_owner to be the same (which it always is on local node
2161          * I guess between lockd processes) and then compares pid.
2162          * As such we assign pid to the owner field to make it all work,
2163          * conflict with normal locks is unlikely since pid space and
2164          * pointer space for current->files are not intersecting */
2165         if (file_lock->fl_lmops && file_lock->fl_lmops->fl_compare_owner)
2166                 flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2167
2168         switch (file_lock->fl_type) {
2169         case F_RDLCK:
2170                 einfo.ei_mode = LCK_PR;
2171                 break;
2172         case F_UNLCK:
2173                 /* An unlock request may or may not have any relation to
2174                  * existing locks so we may not be able to pass a lock handle
2175                  * via a normal ldlm_lock_cancel() request. The request may even
2176                  * unlock a byte range in the middle of an existing lock. In
2177                  * order to process an unlock request we need all of the same
2178                  * information that is given with a normal read or write record
2179                  * lock request. To avoid creating another ldlm unlock (cancel)
2180                  * message we'll treat a LCK_NL flock request as an unlock. */
2181                 einfo.ei_mode = LCK_NL;
2182                 break;
2183         case F_WRLCK:
2184                 einfo.ei_mode = LCK_PW;
2185                 break;
2186         default:
2187                 CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2188                         file_lock->fl_type);
2189                 RETURN (-ENOTSUPP);
2190         }
2191
2192         switch (cmd) {
2193         case F_SETLKW:
2194 #ifdef F_SETLKW64
2195         case F_SETLKW64:
2196 #endif
2197                 flags = 0;
2198                 break;
2199         case F_SETLK:
2200 #ifdef F_SETLK64
2201         case F_SETLK64:
2202 #endif
2203                 flags = LDLM_FL_BLOCK_NOWAIT;
2204                 break;
2205         case F_GETLK:
2206 #ifdef F_GETLK64
2207         case F_GETLK64:
2208 #endif
2209                 flags = LDLM_FL_TEST_LOCK;
2210                 /* Save the old mode so that if the mode in the lock changes we
2211                  * can decrement the appropriate reader or writer refcount. */
2212                 file_lock->fl_type = einfo.ei_mode;
2213                 break;
2214         default:
2215                 CERROR("unknown fcntl lock command: %d\n", cmd);
2216                 RETURN (-EINVAL);
2217         }
2218
2219         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2220                                      LUSTRE_OPC_ANY, NULL);
2221         if (IS_ERR(op_data))
2222                 RETURN(PTR_ERR(op_data));
2223
2224         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2225                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2226                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2227
2228         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2229                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2230
2231         ll_finish_md_op_data(op_data);
2232
2233         if ((file_lock->fl_flags & FL_FLOCK) &&
2234             (rc == 0 || file_lock->fl_type == F_UNLCK))
2235                 flock_lock_file_wait(file, file_lock);
2236         if ((file_lock->fl_flags & FL_POSIX) &&
2237             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2238             !(flags & LDLM_FL_TEST_LOCK))
2239                 posix_lock_file_wait(file, file_lock);
2240
2241         RETURN(rc);
2242 }
2243
2244 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2245 {
2246         ENTRY;
2247
2248         RETURN(-ENOSYS);
2249 }
2250
2251 /**
2252  * test if some locks matching bits and l_req_mode are acquired
2253  * - bits can be in different locks
2254  * - if found clear the common lock bits in *bits
2255  * - the bits not found, are kept in *bits
2256  * \param inode [IN]
2257  * \param bits [IN] searched lock bits [IN]
2258  * \param l_req_mode [IN] searched lock mode
2259  * \retval boolean, true iff all bits are found
2260  */
2261 int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2262 {
2263         struct lustre_handle lockh;
2264         ldlm_policy_data_t policy;
2265         ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
2266                                 (LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
2267         struct lu_fid *fid;
2268         int flags;
2269         int i;
2270         ENTRY;
2271
2272         if (!inode)
2273                RETURN(0);
2274
2275         fid = &ll_i2info(inode)->lli_fid;
2276         CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
2277                ldlm_lockname[mode]);
2278
2279         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2280         for (i = 0; i < MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
2281                 policy.l_inodebits.bits = *bits & (1 << i);
2282                 if (policy.l_inodebits.bits == 0)
2283                         continue;
2284
2285                 if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
2286                                   &policy, mode, &lockh)) {
2287                         struct ldlm_lock *lock;
2288
2289                         lock = ldlm_handle2lock(&lockh);
2290                         if (lock) {
2291                                 *bits &=
2292                                       ~(lock->l_policy_data.l_inodebits.bits);
2293                                 LDLM_LOCK_PUT(lock);
2294                         } else {
2295                                 *bits &= ~policy.l_inodebits.bits;
2296                         }
2297                 }
2298         }
2299         RETURN(*bits == 0);
2300 }
2301
2302 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2303                             struct lustre_handle *lockh)
2304 {
2305         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2306         struct lu_fid *fid;
2307         ldlm_mode_t rc;
2308         int flags;
2309         ENTRY;
2310
2311         fid = &ll_i2info(inode)->lli_fid;
2312         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2313
2314         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2315         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2316                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2317         RETURN(rc);
2318 }
2319
2320 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2321         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2322                               * and return success */
2323                 inode->i_nlink = 0;
2324                 /* This path cannot be hit for regular files unless in
2325                  * case of obscure races, so no need to to validate
2326                  * size. */
2327                 if (!S_ISREG(inode->i_mode) &&
2328                     !S_ISDIR(inode->i_mode))
2329                         return 0;
2330         }
2331
2332         if (rc) {
2333                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2334                 return -abs(rc);
2335
2336         }
2337
2338         return 0;
2339 }
2340
2341 int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2342                              __u64 ibits)
2343 {
2344         struct inode *inode = dentry->d_inode;
2345         struct ptlrpc_request *req = NULL;
2346         struct obd_export *exp;
2347         int rc = 0;
2348         ENTRY;
2349
2350         if (!inode) {
2351                 CERROR("REPORT THIS LINE TO PETER\n");
2352                 RETURN(0);
2353         }
2354
2355         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2356                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2357
2358         exp = ll_i2mdexp(inode);
2359
2360         /* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
2361          *      But under CMD case, it caused some lock issues, should be fixed
2362          *      with new CMD ibits lock. See bug 12718 */
2363         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2364                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2365                 struct md_op_data *op_data;
2366
2367                 if (ibits == MDS_INODELOCK_LOOKUP)
2368                         oit.it_op = IT_LOOKUP;
2369
2370                 /* Call getattr by fid, so do not provide name at all. */
2371                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2372                                              dentry->d_inode, NULL, 0, 0,
2373                                              LUSTRE_OPC_ANY, NULL);
2374                 if (IS_ERR(op_data))
2375                         RETURN(PTR_ERR(op_data));
2376
2377                 oit.it_create_mode |= M_CHECK_STALE;
2378                 rc = md_intent_lock(exp, op_data, NULL, 0,
2379                                     /* we are not interested in name
2380                                        based lookup */
2381                                     &oit, 0, &req,
2382                                     ll_md_blocking_ast, 0);
2383                 ll_finish_md_op_data(op_data);
2384                 oit.it_create_mode &= ~M_CHECK_STALE;
2385                 if (rc < 0) {
2386                         rc = ll_inode_revalidate_fini(inode, rc);
2387                         GOTO (out, rc);
2388                 }
2389
2390                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2391                 if (rc != 0) {
2392                         ll_intent_release(&oit);
2393                         GOTO(out, rc);
2394                 }
2395
2396                 /* Unlinked? Unhash dentry, so it is not picked up later by
2397                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2398                    here to preserve get_cwd functionality on 2.6.
2399                    Bug 10503 */
2400                 if (!dentry->d_inode->i_nlink)
2401                         d_lustre_invalidate(dentry);
2402
2403                 ll_lookup_finish_locks(&oit, dentry);
2404         } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
2405                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2406                 obd_valid valid = OBD_MD_FLGETATTR;
2407                 struct md_op_data *op_data;
2408                 int ealen = 0;
2409
2410                 if (S_ISREG(inode->i_mode)) {
2411                         rc = ll_get_max_mdsize(sbi, &ealen);
2412                         if (rc)
2413                                 RETURN(rc);
2414                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2415                 }
2416
2417                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
2418                                              0, ealen, LUSTRE_OPC_ANY,
2419                                              NULL);
2420                 if (IS_ERR(op_data))
2421                         RETURN(PTR_ERR(op_data));
2422
2423                 op_data->op_valid = valid;
2424                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2425                  * capa for this inode. Because we only keep capas of dirs
2426                  * fresh. */
2427                 rc = md_getattr(sbi->ll_md_exp, op_data, &req);
2428                 ll_finish_md_op_data(op_data);
2429                 if (rc) {
2430                         rc = ll_inode_revalidate_fini(inode, rc);
2431                         RETURN(rc);
2432                 }
2433
2434                 rc = ll_prep_inode(&inode, req, NULL);
2435         }
2436 out:
2437         ptlrpc_req_finished(req);
2438         return rc;
2439 }
2440
2441 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2442                            __u64 ibits)
2443 {
2444         struct inode *inode = dentry->d_inode;
2445         int rc;
2446         ENTRY;
2447
2448         rc = __ll_inode_revalidate_it(dentry, it, ibits);
2449
2450         /* if object not yet allocated, don't validate size */
2451         if (rc == 0 && !ll_i2info(dentry->d_inode)->lli_has_smd) {
2452                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
2453                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
2454                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
2455                 RETURN(0);
2456         }
2457
2458         /* ll_glimpse_size will prefer locally cached writes if they extend
2459          * the file */
2460
2461         if (rc == 0)
2462                 rc = ll_glimpse_size(inode);
2463
2464         RETURN(rc);
2465 }
2466
2467 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2468                   struct lookup_intent *it, struct kstat *stat)
2469 {
2470         struct inode *inode = de->d_inode;
2471         struct ll_sb_info *sbi = ll_i2sbi(inode);
2472         struct ll_inode_info *lli = ll_i2info(inode);
2473         int res = 0;
2474
2475         res = ll_inode_revalidate_it(de, it, MDS_INODELOCK_UPDATE |
2476                                              MDS_INODELOCK_LOOKUP);
2477         ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
2478
2479         if (res)
2480                 return res;
2481
2482         stat->dev = inode->i_sb->s_dev;
2483         if (ll_need_32bit_api(sbi))
2484                 stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
2485         else
2486                 stat->ino = inode->i_ino;
2487         stat->mode = inode->i_mode;
2488         stat->nlink = inode->i_nlink;
2489         stat->uid = inode->i_uid;
2490         stat->gid = inode->i_gid;
2491         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2492         stat->atime = inode->i_atime;
2493         stat->mtime = inode->i_mtime;
2494         stat->ctime = inode->i_ctime;
2495         stat->blksize = 1 << inode->i_blkbits;
2496
2497         stat->size = i_size_read(inode);
2498         stat->blocks = inode->i_blocks;
2499
2500         return 0;
2501 }
2502 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2503 {
2504         struct lookup_intent it = { .it_op = IT_GETATTR };
2505
2506         return ll_getattr_it(mnt, de, &it, stat);
2507 }
2508
2509 #ifdef HAVE_LINUX_FIEMAP_H
2510 int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
2511                 __u64 start, __u64 len)
2512 {
2513         int rc;
2514         size_t num_bytes;
2515         struct ll_user_fiemap *fiemap;
2516         unsigned int extent_count = fieinfo->fi_extents_max;
2517
2518         num_bytes = sizeof(*fiemap) + (extent_count *
2519                                        sizeof(struct ll_fiemap_extent));
2520         OBD_ALLOC_LARGE(fiemap, num_bytes);
2521
2522         if (fiemap == NULL)
2523                 RETURN(-ENOMEM);
2524
2525         fiemap->fm_flags = fieinfo->fi_flags;
2526         fiemap->fm_extent_count = fieinfo->fi_extents_max;
2527         fiemap->fm_start = start;
2528         fiemap->fm_length = len;
2529         memcpy(&fiemap->fm_extents[0], fieinfo->fi_extents_start,
2530                sizeof(struct ll_fiemap_extent));
2531
2532         rc = ll_do_fiemap(inode, fiemap, num_bytes);
2533
2534         fieinfo->fi_flags = fiemap->fm_flags;
2535         fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
2536         memcpy(fieinfo->fi_extents_start, &fiemap->fm_extents[0],
2537                fiemap->fm_mapped_extents * sizeof(struct ll_fiemap_extent));
2538
2539         OBD_FREE_LARGE(fiemap, num_bytes);
2540         return rc;
2541 }
2542 #endif
2543
2544 #ifndef HAVE_GENERIC_PERMISSION_2ARGS
2545 static int
2546 # ifdef HAVE_GENERIC_PERMISSION_4ARGS
2547 lustre_check_acl(struct inode *inode, int mask, unsigned int flags)
2548 # else
2549 lustre_check_acl(struct inode *inode, int mask)
2550 # endif
2551 {
2552 # ifdef CONFIG_FS_POSIX_ACL
2553         struct ll_inode_info *lli = ll_i2info(inode);
2554         struct posix_acl *acl;
2555         int rc;
2556         ENTRY;
2557
2558 #  ifdef HAVE_GENERIC_PERMISSION_4ARGS
2559         if (flags & IPERM_FLAG_RCU)
2560                 return -ECHILD;
2561 #  endif
2562         cfs_spin_lock(&lli->lli_lock);
2563         acl = posix_acl_dup(lli->lli_posix_acl);
2564         cfs_spin_unlock(&lli->lli_lock);
2565
2566         if (!acl)
2567                 RETURN(-EAGAIN);
2568
2569         rc = posix_acl_permission(inode, acl, mask);
2570         posix_acl_release(acl);
2571
2572         RETURN(rc);
2573 # else /* !CONFIG_FS_POSIX_ACL */
2574         return -EAGAIN;
2575 # endif /* CONFIG_FS_POSIX_ACL */
2576 }
2577 #endif /* HAVE_GENERIC_PERMISSION_2ARGS */
2578
2579 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
2580 int ll_inode_permission(struct inode *inode, int mask, unsigned int flags)
2581 #else
2582 # ifdef HAVE_INODE_PERMISION_2ARGS
2583 int ll_inode_permission(struct inode *inode, int mask)
2584 # else
2585 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2586 # endif
2587 #endif
2588 {
2589         int rc = 0;
2590         ENTRY;
2591
2592 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
2593         if (flags & IPERM_FLAG_RCU)
2594                 return -ECHILD;
2595 #endif
2596
2597        /* as root inode are NOT getting validated in lookup operation,
2598         * need to do it before permission check. */
2599
2600         if (inode == inode->i_sb->s_root->d_inode) {
2601                 struct lookup_intent it = { .it_op = IT_LOOKUP };
2602
2603                 rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
2604                                               MDS_INODELOCK_LOOKUP);
2605                 if (rc)
2606                         RETURN(rc);
2607         }
2608
2609         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
2610                inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
2611
2612         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2613                 return lustre_check_remote_perm(inode, mask);
2614
2615         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2616         rc = ll_generic_permission(inode, mask, flags, lustre_check_acl);
2617
2618         RETURN(rc);
2619 }
2620
2621 #ifdef HAVE_FILE_READV
2622 #define READ_METHOD readv
2623 #define READ_FUNCTION ll_file_readv
2624 #define WRITE_METHOD writev
2625 #define WRITE_FUNCTION ll_file_writev
2626 #else
2627 #define READ_METHOD aio_read
2628 #define READ_FUNCTION ll_file_aio_read
2629 #define WRITE_METHOD aio_write
2630 #define WRITE_FUNCTION ll_file_aio_write
2631 #endif
2632
2633 /* -o localflock - only provides locally consistent flock locks */
2634 struct file_operations ll_file_operations = {
2635         .read           = ll_file_read,
2636         .READ_METHOD    = READ_FUNCTION,
2637         .write          = ll_file_write,
2638         .WRITE_METHOD   = WRITE_FUNCTION,
2639         .unlocked_ioctl = ll_file_ioctl,
2640         .open           = ll_file_open,
2641         .release        = ll_file_release,
2642         .mmap           = ll_file_mmap,
2643         .llseek         = ll_file_seek,
2644 #ifdef HAVE_KERNEL_SENDFILE
2645         .sendfile       = ll_file_sendfile,
2646 #endif
2647 #ifdef HAVE_KERNEL_SPLICE_READ
2648         .splice_read    = ll_file_splice_read,
2649 #endif
2650         .fsync          = ll_fsync,
2651         .flush          = ll_flush
2652 };
2653
2654 struct file_operations ll_file_operations_flock = {
2655         .read           = ll_file_read,
2656         .READ_METHOD    = READ_FUNCTION,
2657         .write          = ll_file_write,
2658         .WRITE_METHOD   = WRITE_FUNCTION,
2659         .unlocked_ioctl = ll_file_ioctl,
2660         .open           = ll_file_open,
2661         .release        = ll_file_release,
2662         .mmap           = ll_file_mmap,
2663         .llseek         = ll_file_seek,
2664 #ifdef HAVE_KERNEL_SENDFILE
2665         .sendfile       = ll_file_sendfile,
2666 #endif
2667 #ifdef HAVE_KERNEL_SPLICE_READ
2668         .splice_read    = ll_file_splice_read,
2669 #endif
2670         .fsync          = ll_fsync,
2671         .flush          = ll_flush,
2672         .flock          = ll_file_flock,
2673         .lock           = ll_file_flock
2674 };
2675
2676 /* These are for -o noflock - to return ENOSYS on flock calls */
2677 struct file_operations ll_file_operations_noflock = {
2678         .read           = ll_file_read,
2679         .READ_METHOD    = READ_FUNCTION,
2680         .write          = ll_file_write,
2681         .WRITE_METHOD   = WRITE_FUNCTION,
2682         .unlocked_ioctl = ll_file_ioctl,
2683         .open           = ll_file_open,
2684         .release        = ll_file_release,
2685         .mmap           = ll_file_mmap,
2686         .llseek         = ll_file_seek,
2687 #ifdef HAVE_KERNEL_SENDFILE
2688         .sendfile       = ll_file_sendfile,
2689 #endif
2690 #ifdef HAVE_KERNEL_SPLICE_READ
2691         .splice_read    = ll_file_splice_read,
2692 #endif
2693         .fsync          = ll_fsync,
2694         .flush          = ll_flush,
2695         .flock          = ll_file_noflock,
2696         .lock           = ll_file_noflock
2697 };
2698
2699 struct inode_operations ll_file_inode_operations = {
2700         .setattr        = ll_setattr,
2701         .truncate       = ll_truncate,
2702         .getattr        = ll_getattr,
2703         .permission     = ll_inode_permission,
2704         .setxattr       = ll_setxattr,
2705         .getxattr       = ll_getxattr,
2706         .listxattr      = ll_listxattr,
2707         .removexattr    = ll_removexattr,
2708 #ifdef  HAVE_LINUX_FIEMAP_H
2709         .fiemap         = ll_fiemap,
2710 #endif
2711 };
2712
2713 /* dynamic ioctl number support routins */
2714 static struct llioc_ctl_data {
2715         cfs_rw_semaphore_t      ioc_sem;
2716         cfs_list_t              ioc_head;
2717 } llioc = {
2718         __RWSEM_INITIALIZER(llioc.ioc_sem),
2719         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2720 };
2721
2722
2723 struct llioc_data {
2724         cfs_list_t              iocd_list;
2725         unsigned int            iocd_size;
2726         llioc_callback_t        iocd_cb;
2727         unsigned int            iocd_count;
2728         unsigned int            iocd_cmd[0];
2729 };
2730
2731 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2732 {
2733         unsigned int size;
2734         struct llioc_data *in_data = NULL;
2735         ENTRY;
2736
2737         if (cb == NULL || cmd == NULL ||
2738             count > LLIOC_MAX_CMD || count < 0)
2739                 RETURN(NULL);
2740
2741         size = sizeof(*in_data) + count * sizeof(unsigned int);
2742         OBD_ALLOC(in_data, size);
2743         if (in_data == NULL)
2744                 RETURN(NULL);
2745
2746         memset(in_data, 0, sizeof(*in_data));
2747         in_data->iocd_size = size;
2748         in_data->iocd_cb = cb;
2749         in_data->iocd_count = count;
2750         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2751
2752         cfs_down_write(&llioc.ioc_sem);
2753         cfs_list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2754         cfs_up_write(&llioc.ioc_sem);
2755
2756         RETURN(in_data);
2757 }
2758
2759 void ll_iocontrol_unregister(void *magic)
2760 {
2761         struct llioc_data *tmp;
2762
2763         if (magic == NULL)
2764                 return;
2765
2766         cfs_down_write(&llioc.ioc_sem);
2767         cfs_list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2768                 if (tmp == magic) {
2769                         unsigned int size = tmp->iocd_size;
2770
2771                         cfs_list_del(&tmp->iocd_list);
2772                         cfs_up_write(&llioc.ioc_sem);
2773
2774                         OBD_FREE(tmp, size);
2775                         return;
2776                 }
2777         }
2778         cfs_up_write(&llioc.ioc_sem);
2779
2780         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2781 }
2782
2783 EXPORT_SYMBOL(ll_iocontrol_register);
2784 EXPORT_SYMBOL(ll_iocontrol_unregister);
2785
2786 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
2787                         unsigned int cmd, unsigned long arg, int *rcp)
2788 {
2789         enum llioc_iter ret = LLIOC_CONT;
2790         struct llioc_data *data;
2791         int rc = -EINVAL, i;
2792
2793         cfs_down_read(&llioc.ioc_sem);
2794         cfs_list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2795                 for (i = 0; i < data->iocd_count; i++) {
2796                         if (cmd != data->iocd_cmd[i])
2797                                 continue;
2798
2799                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2800                         break;
2801                 }
2802
2803                 if (ret == LLIOC_STOP)
2804                         break;
2805         }
2806         cfs_up_read(&llioc.ioc_sem);
2807
2808         if (rcp)
2809                 *rcp = rc;
2810         return ret;
2811 }
2812
2813 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
2814 {
2815         struct ll_inode_info *lli = ll_i2info(inode);
2816         struct cl_env_nest nest;
2817         struct lu_env *env;
2818         int result;
2819         ENTRY;
2820
2821         if (lli->lli_clob == NULL)
2822                 RETURN(0);
2823
2824         env = cl_env_nested_get(&nest);
2825         if (IS_ERR(env))
2826                 RETURN(PTR_ERR(env));
2827
2828         result = cl_conf_set(env, lli->lli_clob, conf);
2829         cl_env_nested_put(&nest, env);
2830         RETURN(result);
2831 }
2832
2833 /**
2834  * This function checks if there exists a LAYOUT lock on the client side,
2835  * or enqueues it if it doesn't have one in cache.
2836  *
2837  * This function will not hold layout lock so it may be revoked any time after
2838  * this function returns. Any operations depend on layout should be redone
2839  * in that case.
2840  *
2841  * This function should be called before lov_io_init() to get an uptodate
2842  * layout version, the caller should save the version number and after IO
2843  * is finished, this function should be called again to verify that layout
2844  * is not changed during IO time.
2845  */
2846 int ll_layout_refresh(struct inode *inode, __u32 *gen)
2847 {
2848         struct ll_inode_info  *lli = ll_i2info(inode);
2849         struct ll_sb_info     *sbi = ll_i2sbi(inode);
2850         struct md_op_data     *op_data = NULL;
2851         struct ptlrpc_request *req = NULL;
2852         struct lookup_intent   it = { .it_op = IT_LAYOUT };
2853         struct lustre_handle   lockh;
2854         ldlm_mode_t            mode;
2855         struct cl_object_conf  conf = {  .coc_inode = inode,
2856                                          .coc_validate_only = true };
2857         int rc;
2858         ENTRY;
2859
2860         *gen = 0;
2861         if (!(ll_i2sbi(inode)->ll_flags & LL_SBI_LAYOUT_LOCK))
2862                 RETURN(0);
2863
2864         /* sanity checks */
2865         LASSERT(fid_is_sane(ll_inode2fid(inode)));
2866         LASSERT(S_ISREG(inode->i_mode));
2867
2868         /* mostly layout lock is caching on the local side, so try to match
2869          * it before grabbing layout lock mutex. */
2870         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh);
2871         if (mode != 0) { /* hit cached lock */
2872                 struct lov_stripe_md *lsm;
2873
2874                 lsm = ccc_inode_lsm_get(inode);
2875                 if (lsm != NULL)
2876                         *gen = lsm->lsm_layout_gen;
2877                 ccc_inode_lsm_put(inode, lsm);
2878                 ldlm_lock_decref(&lockh, mode);
2879
2880                 RETURN(0);
2881         }
2882
2883         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
2884                                      0, 0, LUSTRE_OPC_ANY, NULL);
2885         if (IS_ERR(op_data))
2886                 RETURN(PTR_ERR(op_data));
2887
2888         /* take layout lock mutex to enqueue layout lock exclusively. */
2889         cfs_mutex_lock(&lli->lli_layout_mutex);
2890
2891         /* make sure the old conf goes away */
2892         ll_layout_conf(inode, &conf);
2893
2894         /* enqueue layout lock */
2895         rc = md_intent_lock(sbi->ll_md_exp, op_data, NULL, 0, &it, 0,
2896                         &req, ll_md_blocking_ast, 0);
2897         if (rc == 0) {
2898                 /* we get a new lock, so update the lock data */
2899                 lockh.cookie = it.d.lustre.it_lock_handle;
2900                 md_set_lock_data(sbi->ll_md_exp, &lockh.cookie, inode, NULL);
2901
2902                 /* req == NULL is when lock was found in client cache, without
2903                  * any request to server (but lsm can be canceled just after a
2904                  * release) */
2905                 if (req != NULL) {
2906                         struct ldlm_lock *lock = ldlm_handle2lock(&lockh);
2907                         struct lustre_md md = { NULL };
2908                         void *lmm;
2909                         int lmmsize;
2910
2911                         /* for IT_LAYOUT lock, lmm is returned in lock's lvb
2912                          * data via completion callback */
2913                         LASSERT(lock != NULL);
2914                         lmm = lock->l_lvb_data;
2915                         lmmsize = lock->l_lvb_len;
2916                         if (lmm != NULL)
2917                                 rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
2918                                                 lmm, lmmsize);
2919                         if (rc == 0) {
2920                                 if (md.lsm != NULL)
2921                                         *gen = md.lsm->lsm_layout_gen;
2922
2923                                 memset(&conf, 0, sizeof conf);
2924                                 conf.coc_inode = inode;
2925                                 conf.u.coc_md = &md;
2926                                 ll_layout_conf(inode, &conf);
2927                                 /* is this racy? */
2928                                 lli->lli_has_smd = md.lsm != NULL;
2929                         }
2930                         if (md.lsm != NULL)
2931                                 obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
2932
2933                         LDLM_LOCK_PUT(lock);
2934                         ptlrpc_req_finished(req);
2935                 } else { /* hit caching lock */
2936                         struct lov_stripe_md *lsm;
2937
2938                         lsm = ccc_inode_lsm_get(inode);
2939                         if (lsm != NULL)
2940                                 *gen = lsm->lsm_layout_gen;
2941                         ccc_inode_lsm_put(inode, lsm);
2942                 }
2943                 ll_intent_drop_lock(&it);
2944         }
2945         cfs_mutex_unlock(&lli->lli_layout_mutex);
2946         ll_finish_md_op_data(op_data);
2947
2948         RETURN(rc);
2949 }