Whamcloud - gitweb
LU-941 hsm: Manage dirty flag for hsm-archived files
[fs/lustre-release.git] / lustre / llite / file.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <linux/pagemap.h>
47 #include <linux/file.h>
48 #include "llite_internal.h"
49 #include <lustre/ll_fiemap.h>
50
51 #include "cl_object.h"
52
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, CFS_ALLOC_IO);
58         fd->fd_write_failed = false;
59         return fd;
60 }
61
62 static void ll_file_data_put(struct ll_file_data *fd)
63 {
64         if (fd != NULL)
65                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
66 }
67
68 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
69                           struct lustre_handle *fh)
70 {
71         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
72         op_data->op_attr.ia_mode = inode->i_mode;
73         op_data->op_attr.ia_atime = inode->i_atime;
74         op_data->op_attr.ia_mtime = inode->i_mtime;
75         op_data->op_attr.ia_ctime = inode->i_ctime;
76         op_data->op_attr.ia_size = i_size_read(inode);
77         op_data->op_attr_blocks = inode->i_blocks;
78         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
79                                         ll_inode_to_ext_flags(inode->i_flags);
80         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
81         if (fh)
82                 op_data->op_handle = *fh;
83         op_data->op_capa1 = ll_mdscapa_get(inode);
84
85         if (LLIF_DATA_MODIFIED & ll_i2info(inode)->lli_flags)
86                 op_data->op_bias |= MDS_DATA_MODIFIED;
87 }
88
89 /**
90  * Closes the IO epoch and packs all the attributes into @op_data for
91  * the CLOSE rpc.
92  */
93 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
94                              struct obd_client_handle *och)
95 {
96         ENTRY;
97
98         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
99                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
100
101         if (!(och->och_flags & FMODE_WRITE))
102                 goto out;
103
104         if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
105                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
106         else
107                 ll_ioepoch_close(inode, op_data, &och, 0);
108
109 out:
110         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
111         ll_prep_md_op_data(op_data, inode, NULL, NULL,
112                            0, 0, LUSTRE_OPC_ANY, NULL);
113         EXIT;
114 }
115
116 static int ll_close_inode_openhandle(struct obd_export *md_exp,
117                                      struct inode *inode,
118                                      struct obd_client_handle *och)
119 {
120         struct obd_export *exp = ll_i2mdexp(inode);
121         struct md_op_data *op_data;
122         struct ptlrpc_request *req = NULL;
123         struct obd_device *obd = class_exp2obd(exp);
124         int epoch_close = 1;
125         int rc;
126         ENTRY;
127
128         if (obd == NULL) {
129                 /*
130                  * XXX: in case of LMV, is this correct to access
131                  * ->exp_handle?
132                  */
133                 CERROR("Invalid MDC connection handle "LPX64"\n",
134                        ll_i2mdexp(inode)->exp_handle.h_cookie);
135                 GOTO(out, rc = 0);
136         }
137
138         OBD_ALLOC_PTR(op_data);
139         if (op_data == NULL)
140                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
141
142         ll_prepare_close(inode, op_data, och);
143         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
144         rc = md_close(md_exp, op_data, och->och_mod, &req);
145         if (rc == -EAGAIN) {
146                 /* This close must have the epoch closed. */
147                 LASSERT(epoch_close);
148                 /* MDS has instructed us to obtain Size-on-MDS attribute from
149                  * OSTs and send setattr to back to MDS. */
150                 rc = ll_som_update(inode, op_data);
151                 if (rc) {
152                         CERROR("inode %lu mdc Size-on-MDS update failed: "
153                                "rc = %d\n", inode->i_ino, rc);
154                         rc = 0;
155                 }
156         } else if (rc) {
157                 CERROR("inode %lu mdc close failed: rc = %d\n",
158                        inode->i_ino, rc);
159         }
160
161         /* DATA_MODIFIED flag was successfully sent on close, cancel data
162          * modification flag. */
163         if (rc == 0 && (op_data->op_bias & MDS_DATA_MODIFIED)) {
164                 struct ll_inode_info *lli = ll_i2info(inode);
165
166                 spin_lock(&lli->lli_lock);
167                 lli->lli_flags &= ~LLIF_DATA_MODIFIED;
168                 spin_unlock(&lli->lli_lock);
169         }
170
171         ll_finish_md_op_data(op_data);
172
173         if (rc == 0) {
174                 rc = ll_objects_destroy(req, inode);
175                 if (rc)
176                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
177                                inode->i_ino, rc);
178         }
179
180         EXIT;
181 out:
182
183         if (exp_connect_som(exp) && !epoch_close &&
184             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
185                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
186         } else {
187                 md_clear_open_replay_data(md_exp, och);
188                 /* Free @och if it is not waiting for DONE_WRITING. */
189                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
190                 OBD_FREE_PTR(och);
191         }
192         if (req) /* This is close request */
193                 ptlrpc_req_finished(req);
194         return rc;
195 }
196
197 int ll_md_real_close(struct inode *inode, int flags)
198 {
199         struct ll_inode_info *lli = ll_i2info(inode);
200         struct obd_client_handle **och_p;
201         struct obd_client_handle *och;
202         __u64 *och_usecount;
203         int rc = 0;
204         ENTRY;
205
206         if (flags & FMODE_WRITE) {
207                 och_p = &lli->lli_mds_write_och;
208                 och_usecount = &lli->lli_open_fd_write_count;
209         } else if (flags & FMODE_EXEC) {
210                 och_p = &lli->lli_mds_exec_och;
211                 och_usecount = &lli->lli_open_fd_exec_count;
212         } else {
213                 LASSERT(flags & FMODE_READ);
214                 och_p = &lli->lli_mds_read_och;
215                 och_usecount = &lli->lli_open_fd_read_count;
216         }
217
218         mutex_lock(&lli->lli_och_mutex);
219         if (*och_usecount) { /* There are still users of this handle, so
220                                 skip freeing it. */
221                 mutex_unlock(&lli->lli_och_mutex);
222                 RETURN(0);
223         }
224         och=*och_p;
225         *och_p = NULL;
226         mutex_unlock(&lli->lli_och_mutex);
227
228         if (och) { /* There might be a race and somebody have freed this och
229                       already */
230                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
231                                                inode, och);
232         }
233
234         RETURN(rc);
235 }
236
237 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
238                 struct file *file)
239 {
240         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
241         struct ll_inode_info *lli = ll_i2info(inode);
242         int rc = 0;
243         ENTRY;
244
245         /* clear group lock, if present */
246         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
247                 ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
248
249         /* Let's see if we have good enough OPEN lock on the file and if
250            we can skip talking to MDS */
251         if (file->f_dentry->d_inode) { /* Can this ever be false? */
252                 int lockmode;
253                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
254                 struct lustre_handle lockh;
255                 struct inode *inode = file->f_dentry->d_inode;
256                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
257
258                 mutex_lock(&lli->lli_och_mutex);
259                 if (fd->fd_omode & FMODE_WRITE) {
260                         lockmode = LCK_CW;
261                         LASSERT(lli->lli_open_fd_write_count);
262                         lli->lli_open_fd_write_count--;
263                 } else if (fd->fd_omode & FMODE_EXEC) {
264                         lockmode = LCK_PR;
265                         LASSERT(lli->lli_open_fd_exec_count);
266                         lli->lli_open_fd_exec_count--;
267                 } else {
268                         lockmode = LCK_CR;
269                         LASSERT(lli->lli_open_fd_read_count);
270                         lli->lli_open_fd_read_count--;
271                 }
272                 mutex_unlock(&lli->lli_och_mutex);
273
274                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
275                                    LDLM_IBITS, &policy, lockmode,
276                                    &lockh)) {
277                         rc = ll_md_real_close(file->f_dentry->d_inode,
278                                               fd->fd_omode);
279                 }
280         } else {
281                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
282                        file, file->f_dentry, file->f_dentry->d_name.name);
283         }
284
285         LUSTRE_FPRIVATE(file) = NULL;
286         ll_file_data_put(fd);
287         ll_capa_close(inode);
288
289         RETURN(rc);
290 }
291
292 /* While this returns an error code, fput() the caller does not, so we need
293  * to make every effort to clean up all of our state here.  Also, applications
294  * rarely check close errors and even if an error is returned they will not
295  * re-try the close call.
296  */
297 int ll_file_release(struct inode *inode, struct file *file)
298 {
299         struct ll_file_data *fd;
300         struct ll_sb_info *sbi = ll_i2sbi(inode);
301         struct ll_inode_info *lli = ll_i2info(inode);
302         int rc;
303         ENTRY;
304
305         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
306                inode->i_generation, inode);
307
308 #ifdef CONFIG_FS_POSIX_ACL
309         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
310             inode == inode->i_sb->s_root->d_inode) {
311                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
312
313                 LASSERT(fd != NULL);
314                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
315                         fd->fd_flags &= ~LL_FILE_RMTACL;
316                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
317                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
318                 }
319         }
320 #endif
321
322         if (inode->i_sb->s_root != file->f_dentry)
323                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
324         fd = LUSTRE_FPRIVATE(file);
325         LASSERT(fd != NULL);
326
327         /* The last ref on @file, maybe not the the owner pid of statahead.
328          * Different processes can open the same dir, "ll_opendir_key" means:
329          * it is me that should stop the statahead thread. */
330         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
331             lli->lli_opendir_pid != 0)
332                 ll_stop_statahead(inode, lli->lli_opendir_key);
333
334         if (inode->i_sb->s_root == file->f_dentry) {
335                 LUSTRE_FPRIVATE(file) = NULL;
336                 ll_file_data_put(fd);
337                 RETURN(0);
338         }
339
340         if (!S_ISDIR(inode->i_mode)) {
341                 lov_read_and_clear_async_rc(lli->lli_clob);
342                 lli->lli_async_rc = 0;
343         }
344
345         rc = ll_md_close(sbi->ll_md_exp, inode, file);
346
347         if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
348                 libcfs_debug_dumplog();
349
350         RETURN(rc);
351 }
352
353 static int ll_intent_file_open(struct file *file, void *lmm,
354                                int lmmsize, struct lookup_intent *itp)
355 {
356         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
357         struct dentry *parent = file->f_dentry->d_parent;
358         const char *name = file->f_dentry->d_name.name;
359         const int len = file->f_dentry->d_name.len;
360         struct md_op_data *op_data;
361         struct ptlrpc_request *req;
362         __u32 opc = LUSTRE_OPC_ANY;
363         int rc;
364         ENTRY;
365
366         if (!parent)
367                 RETURN(-ENOENT);
368
369         /* Usually we come here only for NFSD, and we want open lock.
370            But we can also get here with pre 2.6.15 patchless kernels, and in
371            that case that lock is also ok */
372         /* We can also get here if there was cached open handle in revalidate_it
373          * but it disappeared while we were getting from there to ll_file_open.
374          * But this means this file was closed and immediatelly opened which
375          * makes a good candidate for using OPEN lock */
376         /* If lmmsize & lmm are not 0, we are just setting stripe info
377          * parameters. No need for the open lock */
378         if (lmm == NULL && lmmsize == 0) {
379                 itp->it_flags |= MDS_OPEN_LOCK;
380                 if (itp->it_flags & FMODE_WRITE)
381                         opc = LUSTRE_OPC_CREATE;
382         }
383
384         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
385                                       file->f_dentry->d_inode, name, len,
386                                       O_RDWR, opc, NULL);
387         if (IS_ERR(op_data))
388                 RETURN(PTR_ERR(op_data));
389
390         itp->it_flags |= MDS_OPEN_BY_FID;
391         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
392                             0 /*unused */, &req, ll_md_blocking_ast, 0);
393         ll_finish_md_op_data(op_data);
394         if (rc == -ESTALE) {
395                 /* reason for keep own exit path - don`t flood log
396                 * with messages with -ESTALE errors.
397                 */
398                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
399                      it_open_error(DISP_OPEN_OPEN, itp))
400                         GOTO(out, rc);
401                 ll_release_openhandle(file->f_dentry, itp);
402                 GOTO(out, rc);
403         }
404
405         if (it_disposition(itp, DISP_LOOKUP_NEG))
406                 GOTO(out, rc = -ENOENT);
407
408         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
409                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
410                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
411                 GOTO(out, rc);
412         }
413
414         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
415         if (!rc && itp->d.lustre.it_lock_mode)
416                 ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
417                                  itp, NULL);
418
419 out:
420         ptlrpc_req_finished(itp->d.lustre.it_data);
421         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
422         ll_intent_drop_lock(itp);
423
424         RETURN(rc);
425 }
426
427 /**
428  * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
429  * not believe attributes if a few ioepoch holders exist. Attributes for
430  * previous ioepoch if new one is opened are also skipped by MDS.
431  */
432 void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
433 {
434         if (ioepoch && lli->lli_ioepoch != ioepoch) {
435                 lli->lli_ioepoch = ioepoch;
436                 CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
437                        ioepoch, PFID(&lli->lli_fid));
438         }
439 }
440
441 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
442                        struct lookup_intent *it, struct obd_client_handle *och)
443 {
444         struct ptlrpc_request *req = it->d.lustre.it_data;
445         struct mdt_body *body;
446
447         LASSERT(och);
448
449         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
450         LASSERT(body != NULL);                      /* reply already checked out */
451
452         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
453         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
454         och->och_fid = lli->lli_fid;
455         och->och_flags = it->it_flags;
456         ll_ioepoch_open(lli, body->ioepoch);
457
458         return md_set_open_replay_data(md_exp, och, req);
459 }
460
461 int ll_local_open(struct file *file, struct lookup_intent *it,
462                   struct ll_file_data *fd, struct obd_client_handle *och)
463 {
464         struct inode *inode = file->f_dentry->d_inode;
465         struct ll_inode_info *lli = ll_i2info(inode);
466         ENTRY;
467
468         LASSERT(!LUSTRE_FPRIVATE(file));
469
470         LASSERT(fd != NULL);
471
472         if (och) {
473                 struct ptlrpc_request *req = it->d.lustre.it_data;
474                 struct mdt_body *body;
475                 int rc;
476
477                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
478                 if (rc)
479                         RETURN(rc);
480
481                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
482                 if ((it->it_flags & FMODE_WRITE) &&
483                     (body->valid & OBD_MD_FLSIZE))
484                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
485                                lli->lli_ioepoch, PFID(&lli->lli_fid));
486         }
487
488         LUSTRE_FPRIVATE(file) = fd;
489         ll_readahead_init(inode, &fd->fd_ras);
490         fd->fd_omode = it->it_flags;
491         RETURN(0);
492 }
493
494 /* Open a file, and (for the very first open) create objects on the OSTs at
495  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
496  * creation or open until ll_lov_setstripe() ioctl is called.
497  *
498  * If we already have the stripe MD locally then we don't request it in
499  * md_open(), by passing a lmm_size = 0.
500  *
501  * It is up to the application to ensure no other processes open this file
502  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
503  * used.  We might be able to avoid races of that sort by getting lli_open_sem
504  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
505  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
506  */
507 int ll_file_open(struct inode *inode, struct file *file)
508 {
509         struct ll_inode_info *lli = ll_i2info(inode);
510         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
511                                           .it_flags = file->f_flags };
512         struct obd_client_handle **och_p = NULL;
513         __u64 *och_usecount = NULL;
514         struct ll_file_data *fd;
515         int rc = 0, opendir_set = 0;
516         ENTRY;
517
518         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
519                inode->i_generation, inode, file->f_flags);
520
521         it = file->private_data; /* XXX: compat macro */
522         file->private_data = NULL; /* prevent ll_local_open assertion */
523
524         fd = ll_file_data_get();
525         if (fd == NULL)
526                 GOTO(out_och_free, rc = -ENOMEM);
527
528         fd->fd_file = file;
529         if (S_ISDIR(inode->i_mode)) {
530                 spin_lock(&lli->lli_sa_lock);
531                 if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
532                     lli->lli_opendir_pid == 0) {
533                         lli->lli_opendir_key = fd;
534                         lli->lli_opendir_pid = cfs_curproc_pid();
535                         opendir_set = 1;
536                 }
537                 spin_unlock(&lli->lli_sa_lock);
538         }
539
540         if (inode->i_sb->s_root == file->f_dentry) {
541                 LUSTRE_FPRIVATE(file) = fd;
542                 RETURN(0);
543         }
544
545         if (!it || !it->d.lustre.it_disposition) {
546                 /* Convert f_flags into access mode. We cannot use file->f_mode,
547                  * because everything but O_ACCMODE mask was stripped from
548                  * there */
549                 if ((oit.it_flags + 1) & O_ACCMODE)
550                         oit.it_flags++;
551                 if (file->f_flags & O_TRUNC)
552                         oit.it_flags |= FMODE_WRITE;
553
554                 /* kernel only call f_op->open in dentry_open.  filp_open calls
555                  * dentry_open after call to open_namei that checks permissions.
556                  * Only nfsd_open call dentry_open directly without checking
557                  * permissions and because of that this code below is safe. */
558                 if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
559                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
560
561                 /* We do not want O_EXCL here, presumably we opened the file
562                  * already? XXX - NFS implications? */
563                 oit.it_flags &= ~O_EXCL;
564
565                 /* bug20584, if "it_flags" contains O_CREAT, the file will be
566                  * created if necessary, then "IT_CREAT" should be set to keep
567                  * consistent with it */
568                 if (oit.it_flags & O_CREAT)
569                         oit.it_op |= IT_CREAT;
570
571                 it = &oit;
572         }
573
574 restart:
575         /* Let's see if we have file open on MDS already. */
576         if (it->it_flags & FMODE_WRITE) {
577                 och_p = &lli->lli_mds_write_och;
578                 och_usecount = &lli->lli_open_fd_write_count;
579         } else if (it->it_flags & FMODE_EXEC) {
580                 och_p = &lli->lli_mds_exec_och;
581                 och_usecount = &lli->lli_open_fd_exec_count;
582          } else {
583                 och_p = &lli->lli_mds_read_och;
584                 och_usecount = &lli->lli_open_fd_read_count;
585         }
586
587         mutex_lock(&lli->lli_och_mutex);
588         if (*och_p) { /* Open handle is present */
589                 if (it_disposition(it, DISP_OPEN_OPEN)) {
590                         /* Well, there's extra open request that we do not need,
591                            let's close it somehow. This will decref request. */
592                         rc = it_open_error(DISP_OPEN_OPEN, it);
593                         if (rc) {
594                                 mutex_unlock(&lli->lli_och_mutex);
595                                 GOTO(out_openerr, rc);
596                         }
597
598                         ll_release_openhandle(file->f_dentry, it);
599                 }
600                 (*och_usecount)++;
601
602                 rc = ll_local_open(file, it, fd, NULL);
603                 if (rc) {
604                         (*och_usecount)--;
605                         mutex_unlock(&lli->lli_och_mutex);
606                         GOTO(out_openerr, rc);
607                 }
608         } else {
609                 LASSERT(*och_usecount == 0);
610                 if (!it->d.lustre.it_disposition) {
611                         /* We cannot just request lock handle now, new ELC code
612                            means that one of other OPEN locks for this file
613                            could be cancelled, and since blocking ast handler
614                            would attempt to grab och_mutex as well, that would
615                            result in a deadlock */
616                         mutex_unlock(&lli->lli_och_mutex);
617                         it->it_create_mode |= M_CHECK_STALE;
618                         rc = ll_intent_file_open(file, NULL, 0, it);
619                         it->it_create_mode &= ~M_CHECK_STALE;
620                         if (rc)
621                                 GOTO(out_openerr, rc);
622
623                         goto restart;
624                 }
625                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
626                 if (!*och_p)
627                         GOTO(out_och_free, rc = -ENOMEM);
628
629                 (*och_usecount)++;
630
631                 /* md_intent_lock() didn't get a request ref if there was an
632                  * open error, so don't do cleanup on the request here
633                  * (bug 3430) */
634                 /* XXX (green): Should not we bail out on any error here, not
635                  * just open error? */
636                 rc = it_open_error(DISP_OPEN_OPEN, it);
637                 if (rc)
638                         GOTO(out_och_free, rc);
639
640                 LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
641
642                 rc = ll_local_open(file, it, fd, *och_p);
643                 if (rc)
644                         GOTO(out_och_free, rc);
645         }
646         mutex_unlock(&lli->lli_och_mutex);
647         fd = NULL;
648
649         /* Must do this outside lli_och_mutex lock to prevent deadlock where
650            different kind of OPEN lock for this same inode gets cancelled
651            by ldlm_cancel_lru */
652         if (!S_ISREG(inode->i_mode))
653                 GOTO(out_och_free, rc);
654
655         ll_capa_open(inode);
656
657         if (!lli->lli_has_smd) {
658                 if (file->f_flags & O_LOV_DELAY_CREATE ||
659                     !(file->f_mode & FMODE_WRITE)) {
660                         CDEBUG(D_INODE, "object creation was delayed\n");
661                         GOTO(out_och_free, rc);
662                 }
663         }
664         file->f_flags &= ~O_LOV_DELAY_CREATE;
665         GOTO(out_och_free, rc);
666
667 out_och_free:
668         if (rc) {
669                 if (och_p && *och_p) {
670                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
671                         *och_p = NULL; /* OBD_FREE writes some magic there */
672                         (*och_usecount)--;
673                 }
674                 mutex_unlock(&lli->lli_och_mutex);
675
676 out_openerr:
677                 if (opendir_set != 0)
678                         ll_stop_statahead(inode, lli->lli_opendir_key);
679                 if (fd != NULL)
680                         ll_file_data_put(fd);
681         } else {
682                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
683         }
684
685         if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
686                 ptlrpc_req_finished(it->d.lustre.it_data);
687                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
688         }
689
690         return rc;
691 }
692
693 /* Fills the obdo with the attributes for the lsm */
694 static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
695                           struct obd_capa *capa, struct obdo *obdo,
696                           __u64 ioepoch, int sync)
697 {
698         struct ptlrpc_request_set *set;
699         struct obd_info            oinfo = { { { 0 } } };
700         int                        rc;
701
702         ENTRY;
703
704         LASSERT(lsm != NULL);
705
706         oinfo.oi_md = lsm;
707         oinfo.oi_oa = obdo;
708         oinfo.oi_oa->o_id = lsm->lsm_object_id;
709         oinfo.oi_oa->o_seq = lsm->lsm_object_seq;
710         oinfo.oi_oa->o_mode = S_IFREG;
711         oinfo.oi_oa->o_ioepoch = ioepoch;
712         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
713                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
714                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
715                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
716                                OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
717                                OBD_MD_FLDATAVERSION;
718         oinfo.oi_capa = capa;
719         if (sync) {
720                 oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
721                 oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
722         }
723
724         set = ptlrpc_prep_set();
725         if (set == NULL) {
726                 CERROR("can't allocate ptlrpc set\n");
727                 rc = -ENOMEM;
728         } else {
729                 rc = obd_getattr_async(exp, &oinfo, set);
730                 if (rc == 0)
731                         rc = ptlrpc_set_wait(set);
732                 ptlrpc_set_destroy(set);
733         }
734         if (rc == 0)
735                 oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
736                                          OBD_MD_FLATIME | OBD_MD_FLMTIME |
737                                          OBD_MD_FLCTIME | OBD_MD_FLSIZE |
738                                          OBD_MD_FLDATAVERSION);
739         RETURN(rc);
740 }
741
742 /**
743   * Performs the getattr on the inode and updates its fields.
744   * If @sync != 0, perform the getattr under the server-side lock.
745   */
746 int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
747                      __u64 ioepoch, int sync)
748 {
749         struct obd_capa      *capa = ll_mdscapa_get(inode);
750         struct lov_stripe_md *lsm;
751         int rc;
752         ENTRY;
753
754         lsm = ccc_inode_lsm_get(inode);
755         rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
756                             capa, obdo, ioepoch, sync);
757         capa_put(capa);
758         if (rc == 0) {
759                 obdo_refresh_inode(inode, obdo, obdo->o_valid);
760                 CDEBUG(D_INODE,
761                        "objid "LPX64" size %llu, blocks %llu, blksize %lu\n",
762                        lsm ? lsm->lsm_object_id : 0, i_size_read(inode),
763                        (unsigned long long)inode->i_blocks,
764                        (unsigned long)ll_inode_blksize(inode));
765         }
766         ccc_inode_lsm_put(inode, lsm);
767         RETURN(rc);
768 }
769
770 int ll_merge_lvb(struct inode *inode)
771 {
772         struct ll_inode_info *lli = ll_i2info(inode);
773         struct ll_sb_info *sbi = ll_i2sbi(inode);
774         struct lov_stripe_md *lsm;
775         struct ost_lvb lvb;
776         int rc = 0;
777
778         ENTRY;
779
780         lsm = ccc_inode_lsm_get(inode);
781         ll_inode_size_lock(inode);
782         inode_init_lvb(inode, &lvb);
783
784         /* merge timestamps the most resently obtained from mds with
785            timestamps obtained from osts */
786         lvb.lvb_atime = lli->lli_lvb.lvb_atime;
787         lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
788         lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
789         if (lsm != NULL) {
790                 rc = obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
791                 cl_isize_write_nolock(inode, lvb.lvb_size);
792
793                 CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
794                                 PFID(&lli->lli_fid), lvb.lvb_size);
795                 inode->i_blocks = lvb.lvb_blocks;
796         }
797         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
798         LTIME_S(inode->i_atime) = lvb.lvb_atime;
799         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
800         ll_inode_size_unlock(inode);
801         ccc_inode_lsm_put(inode, lsm);
802
803         RETURN(rc);
804 }
805
806 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
807                      lstat_t *st)
808 {
809         struct obdo obdo = { 0 };
810         int rc;
811
812         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
813         if (rc == 0) {
814                 st->st_size   = obdo.o_size;
815                 st->st_blocks = obdo.o_blocks;
816                 st->st_mtime  = obdo.o_mtime;
817                 st->st_atime  = obdo.o_atime;
818                 st->st_ctime  = obdo.o_ctime;
819         }
820         return rc;
821 }
822
823 void ll_io_init(struct cl_io *io, const struct file *file, int write)
824 {
825         struct inode *inode = file->f_dentry->d_inode;
826
827         io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
828         if (write) {
829                 io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
830                 io->u.ci_wr.wr_sync = file->f_flags & O_SYNC || IS_SYNC(inode);
831         }
832         io->ci_obj     = ll_i2info(inode)->lli_clob;
833         io->ci_lockreq = CILR_MAYBE;
834         if (ll_file_nolock(file)) {
835                 io->ci_lockreq = CILR_NEVER;
836                 io->ci_no_srvlock = 1;
837         } else if (file->f_flags & O_APPEND) {
838                 io->ci_lockreq = CILR_MANDATORY;
839         }
840 }
841
842 static ssize_t
843 ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
844                    struct file *file, enum cl_io_type iot,
845                    loff_t *ppos, size_t count)
846 {
847         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
848         struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
849         struct cl_io         *io;
850         ssize_t               result;
851         ENTRY;
852
853         io = ccc_env_thread_io(env);
854         ll_io_init(io, file, iot == CIT_WRITE);
855
856         if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
857                 struct vvp_io *vio = vvp_env_io(env);
858                 struct ccc_io *cio = ccc_env_io(env);
859                 int write_mutex_locked = 0;
860
861                 cio->cui_fd  = LUSTRE_FPRIVATE(file);
862                 vio->cui_io_subtype = args->via_io_subtype;
863
864                 switch (vio->cui_io_subtype) {
865                 case IO_NORMAL:
866                         cio->cui_iov = args->u.normal.via_iov;
867                         cio->cui_nrsegs = args->u.normal.via_nrsegs;
868                         cio->cui_tot_nrsegs = cio->cui_nrsegs;
869 #ifndef HAVE_FILE_WRITEV
870                         cio->cui_iocb = args->u.normal.via_iocb;
871 #endif
872                         if ((iot == CIT_WRITE) &&
873                             !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
874                                 if (mutex_lock_interruptible(&lli->
875                                                                lli_write_mutex))
876                                         GOTO(out, result = -ERESTARTSYS);
877                                 write_mutex_locked = 1;
878                         } else if (iot == CIT_READ) {
879                                 down_read(&lli->lli_trunc_sem);
880                         }
881                         break;
882                 case IO_SENDFILE:
883                         vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
884                         vio->u.sendfile.cui_target = args->u.sendfile.via_target;
885                         break;
886                 case IO_SPLICE:
887                         vio->u.splice.cui_pipe = args->u.splice.via_pipe;
888                         vio->u.splice.cui_flags = args->u.splice.via_flags;
889                         break;
890                 default:
891                         CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
892                         LBUG();
893                 }
894                 result = cl_io_loop(env, io);
895                 if (write_mutex_locked)
896                         mutex_unlock(&lli->lli_write_mutex);
897                 else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
898                         up_read(&lli->lli_trunc_sem);
899         } else {
900                 /* cl_io_rw_init() handled IO */
901                 result = io->ci_result;
902         }
903
904         if (io->ci_nob > 0) {
905                 result = io->ci_nob;
906                 *ppos = io->u.ci_wr.wr.crw_pos;
907         }
908         GOTO(out, result);
909 out:
910         cl_io_fini(env, io);
911
912         if (iot == CIT_READ) {
913                 if (result >= 0)
914                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
915                                            LPROC_LL_READ_BYTES, result);
916         } else if (iot == CIT_WRITE) {
917                 if (result >= 0) {
918                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
919                                            LPROC_LL_WRITE_BYTES, result);
920                         fd->fd_write_failed = false;
921                 } else {
922                         fd->fd_write_failed = true;
923                 }
924         }
925
926         return result;
927 }
928
929
930 /*
931  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
932  */
933 static int ll_file_get_iov_count(const struct iovec *iov,
934                                  unsigned long *nr_segs, size_t *count)
935 {
936         size_t cnt = 0;
937         unsigned long seg;
938
939         for (seg = 0; seg < *nr_segs; seg++) {
940                 const struct iovec *iv = &iov[seg];
941
942                 /*
943                  * If any segment has a negative length, or the cumulative
944                  * length ever wraps negative then return -EINVAL.
945                  */
946                 cnt += iv->iov_len;
947                 if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
948                         return -EINVAL;
949                 if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
950                         continue;
951                 if (seg == 0)
952                         return -EFAULT;
953                 *nr_segs = seg;
954                 cnt -= iv->iov_len;   /* This segment is no good */
955                 break;
956         }
957         *count = cnt;
958         return 0;
959 }
960
961 #ifdef HAVE_FILE_READV
962 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
963                               unsigned long nr_segs, loff_t *ppos)
964 {
965         struct lu_env      *env;
966         struct vvp_io_args *args;
967         size_t              count;
968         ssize_t             result;
969         int                 refcheck;
970         ENTRY;
971
972         result = ll_file_get_iov_count(iov, &nr_segs, &count);
973         if (result)
974                 RETURN(result);
975
976         env = cl_env_get(&refcheck);
977         if (IS_ERR(env))
978                 RETURN(PTR_ERR(env));
979
980         args = vvp_env_args(env, IO_NORMAL);
981         args->u.normal.via_iov = (struct iovec *)iov;
982         args->u.normal.via_nrsegs = nr_segs;
983
984         result = ll_file_io_generic(env, args, file, CIT_READ, ppos, count);
985         cl_env_put(env, &refcheck);
986         RETURN(result);
987 }
988
989 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
990                             loff_t *ppos)
991 {
992         struct lu_env *env;
993         struct iovec  *local_iov;
994         ssize_t        result;
995         int            refcheck;
996         ENTRY;
997
998         env = cl_env_get(&refcheck);
999         if (IS_ERR(env))
1000                 RETURN(PTR_ERR(env));
1001
1002         local_iov = &vvp_env_info(env)->vti_local_iov;
1003         local_iov->iov_base = (void __user *)buf;
1004         local_iov->iov_len = count;
1005         result = ll_file_readv(file, local_iov, 1, ppos);
1006         cl_env_put(env, &refcheck);
1007         RETURN(result);
1008 }
1009
1010 #else
1011 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
1012                                 unsigned long nr_segs, loff_t pos)
1013 {
1014         struct lu_env      *env;
1015         struct vvp_io_args *args;
1016         size_t              count;
1017         ssize_t             result;
1018         int                 refcheck;
1019         ENTRY;
1020
1021         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1022         if (result)
1023                 RETURN(result);
1024
1025         env = cl_env_get(&refcheck);
1026         if (IS_ERR(env))
1027                 RETURN(PTR_ERR(env));
1028
1029         args = vvp_env_args(env, IO_NORMAL);
1030         args->u.normal.via_iov = (struct iovec *)iov;
1031         args->u.normal.via_nrsegs = nr_segs;
1032         args->u.normal.via_iocb = iocb;
1033
1034         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1035                                     &iocb->ki_pos, count);
1036         cl_env_put(env, &refcheck);
1037         RETURN(result);
1038 }
1039
1040 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1041                             loff_t *ppos)
1042 {
1043         struct lu_env *env;
1044         struct iovec  *local_iov;
1045         struct kiocb  *kiocb;
1046         ssize_t        result;
1047         int            refcheck;
1048         ENTRY;
1049
1050         env = cl_env_get(&refcheck);
1051         if (IS_ERR(env))
1052                 RETURN(PTR_ERR(env));
1053
1054         local_iov = &vvp_env_info(env)->vti_local_iov;
1055         kiocb = &vvp_env_info(env)->vti_kiocb;
1056         local_iov->iov_base = (void __user *)buf;
1057         local_iov->iov_len = count;
1058         init_sync_kiocb(kiocb, file);
1059         kiocb->ki_pos = *ppos;
1060         kiocb->ki_left = count;
1061
1062         result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1063         *ppos = kiocb->ki_pos;
1064
1065         cl_env_put(env, &refcheck);
1066         RETURN(result);
1067 }
1068 #endif
1069
1070 /*
1071  * Write to a file (through the page cache).
1072  */
1073 #ifdef HAVE_FILE_WRITEV
1074 static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
1075                               unsigned long nr_segs, loff_t *ppos)
1076 {
1077         struct lu_env      *env;
1078         struct vvp_io_args *args;
1079         size_t              count;
1080         ssize_t             result;
1081         int                 refcheck;
1082         ENTRY;
1083
1084         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1085         if (result)
1086                 RETURN(result);
1087
1088         env = cl_env_get(&refcheck);
1089         if (IS_ERR(env))
1090                 RETURN(PTR_ERR(env));
1091
1092         args = vvp_env_args(env, IO_NORMAL);
1093         args->u.normal.via_iov = (struct iovec *)iov;
1094         args->u.normal.via_nrsegs = nr_segs;
1095
1096         result = ll_file_io_generic(env, args, file, CIT_WRITE, ppos, count);
1097         cl_env_put(env, &refcheck);
1098         RETURN(result);
1099 }
1100
1101 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1102                              loff_t *ppos)
1103 {
1104         struct lu_env    *env;
1105         struct iovec     *local_iov;
1106         ssize_t           result;
1107         int               refcheck;
1108         ENTRY;
1109
1110         env = cl_env_get(&refcheck);
1111         if (IS_ERR(env))
1112                 RETURN(PTR_ERR(env));
1113
1114         local_iov = &vvp_env_info(env)->vti_local_iov;
1115         local_iov->iov_base = (void __user *)buf;
1116         local_iov->iov_len = count;
1117
1118         result = ll_file_writev(file, local_iov, 1, ppos);
1119         cl_env_put(env, &refcheck);
1120         RETURN(result);
1121 }
1122
1123 #else /* AIO stuff */
1124 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1125                                  unsigned long nr_segs, loff_t pos)
1126 {
1127         struct lu_env      *env;
1128         struct vvp_io_args *args;
1129         size_t              count;
1130         ssize_t             result;
1131         int                 refcheck;
1132         ENTRY;
1133
1134         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1135         if (result)
1136                 RETURN(result);
1137
1138         env = cl_env_get(&refcheck);
1139         if (IS_ERR(env))
1140                 RETURN(PTR_ERR(env));
1141
1142         args = vvp_env_args(env, IO_NORMAL);
1143         args->u.normal.via_iov = (struct iovec *)iov;
1144         args->u.normal.via_nrsegs = nr_segs;
1145         args->u.normal.via_iocb = iocb;
1146
1147         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1148                                   &iocb->ki_pos, count);
1149         cl_env_put(env, &refcheck);
1150         RETURN(result);
1151 }
1152
1153 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1154                              loff_t *ppos)
1155 {
1156         struct lu_env *env;
1157         struct iovec  *local_iov;
1158         struct kiocb  *kiocb;
1159         ssize_t        result;
1160         int            refcheck;
1161         ENTRY;
1162
1163         env = cl_env_get(&refcheck);
1164         if (IS_ERR(env))
1165                 RETURN(PTR_ERR(env));
1166
1167         local_iov = &vvp_env_info(env)->vti_local_iov;
1168         kiocb = &vvp_env_info(env)->vti_kiocb;
1169         local_iov->iov_base = (void __user *)buf;
1170         local_iov->iov_len = count;
1171         init_sync_kiocb(kiocb, file);
1172         kiocb->ki_pos = *ppos;
1173         kiocb->ki_left = count;
1174
1175         result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1176         *ppos = kiocb->ki_pos;
1177
1178         cl_env_put(env, &refcheck);
1179         RETURN(result);
1180 }
1181 #endif
1182
1183
1184 #ifdef HAVE_KERNEL_SENDFILE
1185 /*
1186  * Send file content (through pagecache) somewhere with helper
1187  */
1188 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1189                                 read_actor_t actor, void *target)
1190 {
1191         struct lu_env      *env;
1192         struct vvp_io_args *args;
1193         ssize_t             result;
1194         int                 refcheck;
1195         ENTRY;
1196
1197         env = cl_env_get(&refcheck);
1198         if (IS_ERR(env))
1199                 RETURN(PTR_ERR(env));
1200
1201         args = vvp_env_args(env, IO_SENDFILE);
1202         args->u.sendfile.via_target = target;
1203         args->u.sendfile.via_actor = actor;
1204
1205         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1206         cl_env_put(env, &refcheck);
1207         RETURN(result);
1208 }
1209 #endif
1210
1211 #ifdef HAVE_KERNEL_SPLICE_READ
1212 /*
1213  * Send file content (through pagecache) somewhere with helper
1214  */
1215 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1216                                    struct pipe_inode_info *pipe, size_t count,
1217                                    unsigned int flags)
1218 {
1219         struct lu_env      *env;
1220         struct vvp_io_args *args;
1221         ssize_t             result;
1222         int                 refcheck;
1223         ENTRY;
1224
1225         env = cl_env_get(&refcheck);
1226         if (IS_ERR(env))
1227                 RETURN(PTR_ERR(env));
1228
1229         args = vvp_env_args(env, IO_SPLICE);
1230         args->u.splice.via_pipe = pipe;
1231         args->u.splice.via_flags = flags;
1232
1233         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1234         cl_env_put(env, &refcheck);
1235         RETURN(result);
1236 }
1237 #endif
1238
1239 static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq,
1240                            obd_count ost_idx)
1241 {
1242         struct obd_export *exp = ll_i2dtexp(inode);
1243         struct obd_trans_info oti = { 0 };
1244         struct obdo *oa = NULL;
1245         int lsm_size;
1246         int rc = 0;
1247         struct lov_stripe_md *lsm = NULL, *lsm2;
1248         ENTRY;
1249
1250         OBDO_ALLOC(oa);
1251         if (oa == NULL)
1252                 RETURN(-ENOMEM);
1253
1254         lsm = ccc_inode_lsm_get(inode);
1255         if (lsm == NULL)
1256                 GOTO(out, rc = -ENOENT);
1257
1258         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1259                    (lsm->lsm_stripe_count));
1260
1261         OBD_ALLOC_LARGE(lsm2, lsm_size);
1262         if (lsm2 == NULL)
1263                 GOTO(out, rc = -ENOMEM);
1264
1265         oa->o_id = id;
1266         oa->o_seq = seq;
1267         oa->o_nlink = ost_idx;
1268         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1269         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1270         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1271                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1272         obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1273         memcpy(lsm2, lsm, lsm_size);
1274         ll_inode_size_lock(inode);
1275         rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1276         ll_inode_size_unlock(inode);
1277
1278         OBD_FREE_LARGE(lsm2, lsm_size);
1279         GOTO(out, rc);
1280 out:
1281         ccc_inode_lsm_put(inode, lsm);
1282         OBDO_FREE(oa);
1283         return rc;
1284 }
1285
1286 static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1287 {
1288         struct ll_recreate_obj ucreat;
1289         ENTRY;
1290
1291         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1292                 RETURN(-EPERM);
1293
1294         if (cfs_copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1295                                sizeof(struct ll_recreate_obj)))
1296                 RETURN(-EFAULT);
1297
1298         RETURN(ll_lov_recreate(inode, ucreat.lrc_id, 0,
1299                                ucreat.lrc_ost_idx));
1300 }
1301
1302 static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1303 {
1304         struct lu_fid fid;
1305         obd_id id;
1306         obd_count ost_idx;
1307         ENTRY;
1308
1309         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1310                 RETURN(-EPERM);
1311
1312         if (cfs_copy_from_user(&fid, (struct lu_fid *)arg,
1313                                sizeof(struct lu_fid)))
1314                 RETURN(-EFAULT);
1315
1316         id = fid_oid(&fid) | ((fid_seq(&fid) & 0xffff) << 32);
1317         ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1318         RETURN(ll_lov_recreate(inode, id, 0, ost_idx));
1319 }
1320
1321 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1322                              int flags, struct lov_user_md *lum, int lum_size)
1323 {
1324         struct lov_stripe_md *lsm = NULL;
1325         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1326         int rc = 0;
1327         ENTRY;
1328
1329         lsm = ccc_inode_lsm_get(inode);
1330         if (lsm != NULL) {
1331                 ccc_inode_lsm_put(inode, lsm);
1332                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1333                        inode->i_ino);
1334                 RETURN(-EEXIST);
1335         }
1336
1337         ll_inode_size_lock(inode);
1338         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1339         if (rc)
1340                 GOTO(out, rc);
1341         rc = oit.d.lustre.it_status;
1342         if (rc < 0)
1343                 GOTO(out_req_free, rc);
1344
1345         ll_release_openhandle(file->f_dentry, &oit);
1346
1347  out:
1348         ll_inode_size_unlock(inode);
1349         ll_intent_release(&oit);
1350         ccc_inode_lsm_put(inode, lsm);
1351         RETURN(rc);
1352 out_req_free:
1353         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1354         goto out;
1355 }
1356
1357 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1358                              struct lov_mds_md **lmmp, int *lmm_size,
1359                              struct ptlrpc_request **request)
1360 {
1361         struct ll_sb_info *sbi = ll_i2sbi(inode);
1362         struct mdt_body  *body;
1363         struct lov_mds_md *lmm = NULL;
1364         struct ptlrpc_request *req = NULL;
1365         struct md_op_data *op_data;
1366         int rc, lmmsize;
1367
1368         rc = ll_get_max_mdsize(sbi, &lmmsize);
1369         if (rc)
1370                 RETURN(rc);
1371
1372         op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1373                                      strlen(filename), lmmsize,
1374                                      LUSTRE_OPC_ANY, NULL);
1375         if (IS_ERR(op_data))
1376                 RETURN(PTR_ERR(op_data));
1377
1378         op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1379         rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1380         ll_finish_md_op_data(op_data);
1381         if (rc < 0) {
1382                 CDEBUG(D_INFO, "md_getattr_name failed "
1383                        "on %s: rc %d\n", filename, rc);
1384                 GOTO(out, rc);
1385         }
1386
1387         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1388         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1389
1390         lmmsize = body->eadatasize;
1391
1392         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1393                         lmmsize == 0) {
1394                 GOTO(out, rc = -ENODATA);
1395         }
1396
1397         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1398         LASSERT(lmm != NULL);
1399
1400         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1401             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1402                 GOTO(out, rc = -EPROTO);
1403         }
1404
1405         /*
1406          * This is coming from the MDS, so is probably in
1407          * little endian.  We convert it to host endian before
1408          * passing it to userspace.
1409          */
1410         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1411                 /* if function called for directory - we should
1412                  * avoid swab not existent lsm objects */
1413                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1414                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1415                         if (S_ISREG(body->mode))
1416                                 lustre_swab_lov_user_md_objects(
1417                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1418                                  ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
1419                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1420                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1421                         if (S_ISREG(body->mode))
1422                                 lustre_swab_lov_user_md_objects(
1423                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1424                                  ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
1425                 }
1426         }
1427
1428 out:
1429         *lmmp = lmm;
1430         *lmm_size = lmmsize;
1431         *request = req;
1432         return rc;
1433 }
1434
1435 static int ll_lov_setea(struct inode *inode, struct file *file,
1436                             unsigned long arg)
1437 {
1438         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1439         struct lov_user_md  *lump;
1440         int lum_size = sizeof(struct lov_user_md) +
1441                        sizeof(struct lov_user_ost_data);
1442         int rc;
1443         ENTRY;
1444
1445         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1446                 RETURN(-EPERM);
1447
1448         OBD_ALLOC_LARGE(lump, lum_size);
1449         if (lump == NULL) {
1450                 RETURN(-ENOMEM);
1451         }
1452         if (cfs_copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1453                 OBD_FREE_LARGE(lump, lum_size);
1454                 RETURN(-EFAULT);
1455         }
1456
1457         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1458
1459         OBD_FREE_LARGE(lump, lum_size);
1460         RETURN(rc);
1461 }
1462
1463 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1464                             unsigned long arg)
1465 {
1466         struct lov_user_md_v3 lumv3;
1467         struct lov_user_md_v1 *lumv1 = (struct lov_user_md_v1 *)&lumv3;
1468         struct lov_user_md_v1 *lumv1p = (struct lov_user_md_v1 *)arg;
1469         struct lov_user_md_v3 *lumv3p = (struct lov_user_md_v3 *)arg;
1470         int lum_size;
1471         int rc;
1472         int flags = FMODE_WRITE;
1473         ENTRY;
1474
1475         /* first try with v1 which is smaller than v3 */
1476         lum_size = sizeof(struct lov_user_md_v1);
1477         if (cfs_copy_from_user(lumv1, lumv1p, lum_size))
1478                 RETURN(-EFAULT);
1479
1480         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1481                 lum_size = sizeof(struct lov_user_md_v3);
1482                 if (cfs_copy_from_user(&lumv3, lumv3p, lum_size))
1483                         RETURN(-EFAULT);
1484         }
1485
1486         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1487         if (rc == 0) {
1488                 struct lov_stripe_md *lsm;
1489                 put_user(0, &lumv1p->lmm_stripe_count);
1490                 lsm = ccc_inode_lsm_get(inode);
1491                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1492                                    0, lsm, (void *)arg);
1493                 ccc_inode_lsm_put(inode, lsm);
1494         }
1495         RETURN(rc);
1496 }
1497
1498 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1499 {
1500         struct lov_stripe_md *lsm;
1501         int rc = -ENODATA;
1502         ENTRY;
1503
1504         lsm = ccc_inode_lsm_get(inode);
1505         if (lsm != NULL)
1506                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1507                                    lsm, (void *)arg);
1508         ccc_inode_lsm_put(inode, lsm);
1509         RETURN(rc);
1510 }
1511
1512 int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1513 {
1514         struct ll_inode_info   *lli = ll_i2info(inode);
1515         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1516         struct ccc_grouplock    grouplock;
1517         int                     rc;
1518         ENTRY;
1519
1520         if (ll_file_nolock(file))
1521                 RETURN(-EOPNOTSUPP);
1522
1523         spin_lock(&lli->lli_lock);
1524         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1525                 CWARN("group lock already existed with gid %lu\n",
1526                       fd->fd_grouplock.cg_gid);
1527                 spin_unlock(&lli->lli_lock);
1528                 RETURN(-EINVAL);
1529         }
1530         LASSERT(fd->fd_grouplock.cg_lock == NULL);
1531         spin_unlock(&lli->lli_lock);
1532
1533         rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1534                               arg, (file->f_flags & O_NONBLOCK), &grouplock);
1535         if (rc)
1536                 RETURN(rc);
1537
1538         spin_lock(&lli->lli_lock);
1539         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1540                 spin_unlock(&lli->lli_lock);
1541                 CERROR("another thread just won the race\n");
1542                 cl_put_grouplock(&grouplock);
1543                 RETURN(-EINVAL);
1544         }
1545
1546         fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1547         fd->fd_grouplock = grouplock;
1548         spin_unlock(&lli->lli_lock);
1549
1550         CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1551         RETURN(0);
1552 }
1553
1554 int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1555 {
1556         struct ll_inode_info   *lli = ll_i2info(inode);
1557         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1558         struct ccc_grouplock    grouplock;
1559         ENTRY;
1560
1561         spin_lock(&lli->lli_lock);
1562         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1563                 spin_unlock(&lli->lli_lock);
1564                 CWARN("no group lock held\n");
1565                 RETURN(-EINVAL);
1566         }
1567         LASSERT(fd->fd_grouplock.cg_lock != NULL);
1568
1569         if (fd->fd_grouplock.cg_gid != arg) {
1570                 CWARN("group lock %lu doesn't match current id %lu\n",
1571                        arg, fd->fd_grouplock.cg_gid);
1572                 spin_unlock(&lli->lli_lock);
1573                 RETURN(-EINVAL);
1574         }
1575
1576         grouplock = fd->fd_grouplock;
1577         memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1578         fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1579         spin_unlock(&lli->lli_lock);
1580
1581         cl_put_grouplock(&grouplock);
1582         CDEBUG(D_INFO, "group lock %lu released\n", arg);
1583         RETURN(0);
1584 }
1585
1586 /**
1587  * Close inode open handle
1588  *
1589  * \param dentry [in]     dentry which contains the inode
1590  * \param it     [in,out] intent which contains open info and result
1591  *
1592  * \retval 0     success
1593  * \retval <0    failure
1594  */
1595 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1596 {
1597         struct inode *inode = dentry->d_inode;
1598         struct obd_client_handle *och;
1599         int rc;
1600         ENTRY;
1601
1602         LASSERT(inode);
1603
1604         /* Root ? Do nothing. */
1605         if (dentry->d_inode->i_sb->s_root == dentry)
1606                 RETURN(0);
1607
1608         /* No open handle to close? Move away */
1609         if (!it_disposition(it, DISP_OPEN_OPEN))
1610                 RETURN(0);
1611
1612         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1613
1614         OBD_ALLOC(och, sizeof(*och));
1615         if (!och)
1616                 GOTO(out, rc = -ENOMEM);
1617
1618         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
1619                     ll_i2info(inode), it, och);
1620
1621         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1622                                        inode, och);
1623  out:
1624         /* this one is in place of ll_file_open */
1625         if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1626                 ptlrpc_req_finished(it->d.lustre.it_data);
1627                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1628         }
1629         RETURN(rc);
1630 }
1631
1632 /**
1633  * Get size for inode for which FIEMAP mapping is requested.
1634  * Make the FIEMAP get_info call and returns the result.
1635  */
1636 int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1637               int num_bytes)
1638 {
1639         struct obd_export *exp = ll_i2dtexp(inode);
1640         struct lov_stripe_md *lsm = NULL;
1641         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1642         int vallen = num_bytes;
1643         int rc;
1644         ENTRY;
1645
1646         /* Checks for fiemap flags */
1647         if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1648                 fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1649                 return -EBADR;
1650         }
1651
1652         /* Check for FIEMAP_FLAG_SYNC */
1653         if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1654                 rc = filemap_fdatawrite(inode->i_mapping);
1655                 if (rc)
1656                         return rc;
1657         }
1658
1659         lsm = ccc_inode_lsm_get(inode);
1660         if (lsm == NULL)
1661                 return -ENOENT;
1662
1663         /* If the stripe_count > 1 and the application does not understand
1664          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1665          */
1666         if (lsm->lsm_stripe_count > 1 &&
1667             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1668                 GOTO(out, rc = -EOPNOTSUPP);
1669
1670         fm_key.oa.o_id = lsm->lsm_object_id;
1671         fm_key.oa.o_seq = lsm->lsm_object_seq;
1672         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1673
1674         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1675         obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1676         /* If filesize is 0, then there would be no objects for mapping */
1677         if (fm_key.oa.o_size == 0) {
1678                 fiemap->fm_mapped_extents = 0;
1679                 GOTO(out, rc = 0);
1680         }
1681
1682         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1683
1684         rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1685                           fiemap, lsm);
1686         if (rc)
1687                 CERROR("obd_get_info failed: rc = %d\n", rc);
1688
1689 out:
1690         ccc_inode_lsm_put(inode, lsm);
1691         RETURN(rc);
1692 }
1693
1694 int ll_fid2path(struct inode *inode, void *arg)
1695 {
1696         struct obd_export *exp = ll_i2mdexp(inode);
1697         struct getinfo_fid2path *gfout, *gfin;
1698         int outsize, rc;
1699         ENTRY;
1700
1701         if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
1702             !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
1703                 RETURN(-EPERM);
1704
1705         /* Need to get the buflen */
1706         OBD_ALLOC_PTR(gfin);
1707         if (gfin == NULL)
1708                 RETURN(-ENOMEM);
1709         if (cfs_copy_from_user(gfin, arg, sizeof(*gfin))) {
1710                 OBD_FREE_PTR(gfin);
1711                 RETURN(-EFAULT);
1712         }
1713
1714         outsize = sizeof(*gfout) + gfin->gf_pathlen;
1715         OBD_ALLOC(gfout, outsize);
1716         if (gfout == NULL) {
1717                 OBD_FREE_PTR(gfin);
1718                 RETURN(-ENOMEM);
1719         }
1720         memcpy(gfout, gfin, sizeof(*gfout));
1721         OBD_FREE_PTR(gfin);
1722
1723         /* Call mdc_iocontrol */
1724         rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1725         if (rc)
1726                 GOTO(gf_free, rc);
1727         if (cfs_copy_to_user(arg, gfout, outsize))
1728                 rc = -EFAULT;
1729
1730 gf_free:
1731         OBD_FREE(gfout, outsize);
1732         RETURN(rc);
1733 }
1734
1735 static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1736 {
1737         struct ll_user_fiemap *fiemap_s;
1738         size_t num_bytes, ret_bytes;
1739         unsigned int extent_count;
1740         int rc = 0;
1741
1742         /* Get the extent count so we can calculate the size of
1743          * required fiemap buffer */
1744         if (get_user(extent_count,
1745             &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1746                 RETURN(-EFAULT);
1747         num_bytes = sizeof(*fiemap_s) + (extent_count *
1748                                          sizeof(struct ll_fiemap_extent));
1749
1750         OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1751         if (fiemap_s == NULL)
1752                 RETURN(-ENOMEM);
1753
1754         /* get the fiemap value */
1755         if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
1756                            sizeof(*fiemap_s)))
1757                 GOTO(error, rc = -EFAULT);
1758
1759         /* If fm_extent_count is non-zero, read the first extent since
1760          * it is used to calculate end_offset and device from previous
1761          * fiemap call. */
1762         if (extent_count) {
1763                 if (copy_from_user(&fiemap_s->fm_extents[0],
1764                     (char __user *)arg + sizeof(*fiemap_s),
1765                     sizeof(struct ll_fiemap_extent)))
1766                         GOTO(error, rc = -EFAULT);
1767         }
1768
1769         rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1770         if (rc)
1771                 GOTO(error, rc);
1772
1773         ret_bytes = sizeof(struct ll_user_fiemap);
1774
1775         if (extent_count != 0)
1776                 ret_bytes += (fiemap_s->fm_mapped_extents *
1777                                  sizeof(struct ll_fiemap_extent));
1778
1779         if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1780                 rc = -EFAULT;
1781
1782 error:
1783         OBD_FREE_LARGE(fiemap_s, num_bytes);
1784         RETURN(rc);
1785 }
1786
1787 /*
1788  * Read the data_version for inode.
1789  *
1790  * This value is computed using stripe object version on OST.
1791  * Version is computed using server side locking.
1792  *
1793  * @param extent_lock  Take extent lock. Not needed if a process is already
1794  *                     holding the OST object group locks.
1795  */
1796 static int ll_data_version(struct inode *inode, __u64 *data_version,
1797                            int extent_lock)
1798 {
1799         struct lov_stripe_md *lsm = NULL;
1800         struct ll_sb_info    *sbi = ll_i2sbi(inode);
1801         struct obdo          *obdo = NULL;
1802         int                   rc;
1803         ENTRY;
1804
1805         /* If no stripe, we consider version is 0. */
1806         lsm = ccc_inode_lsm_get(inode);
1807         if (lsm == NULL) {
1808                 *data_version = 0;
1809                 CDEBUG(D_INODE, "No object for inode\n");
1810                 RETURN(0);
1811         }
1812
1813         OBD_ALLOC_PTR(obdo);
1814         if (obdo == NULL) {
1815                 ccc_inode_lsm_put(inode, lsm);
1816                 RETURN(-ENOMEM);
1817         }
1818
1819         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
1820         if (!rc) {
1821                 if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1822                         rc = -EOPNOTSUPP;
1823                 else
1824                         *data_version = obdo->o_data_version;
1825         }
1826
1827         OBD_FREE_PTR(obdo);
1828         ccc_inode_lsm_put(inode, lsm);
1829
1830         RETURN(rc);
1831 }
1832
1833 long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
1834 {
1835         struct inode *inode = file->f_dentry->d_inode;
1836         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1837         int flags;
1838
1839         ENTRY;
1840
1841         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1842                inode->i_generation, inode, cmd);
1843         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
1844
1845         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
1846         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
1847                 RETURN(-ENOTTY);
1848
1849         switch(cmd) {
1850         case LL_IOC_GETFLAGS:
1851                 /* Get the current value of the file flags */
1852                 return put_user(fd->fd_flags, (int *)arg);
1853         case LL_IOC_SETFLAGS:
1854         case LL_IOC_CLRFLAGS:
1855                 /* Set or clear specific file flags */
1856                 /* XXX This probably needs checks to ensure the flags are
1857                  *     not abused, and to handle any flag side effects.
1858                  */
1859                 if (get_user(flags, (int *) arg))
1860                         RETURN(-EFAULT);
1861
1862                 if (cmd == LL_IOC_SETFLAGS) {
1863                         if ((flags & LL_FILE_IGNORE_LOCK) &&
1864                             !(file->f_flags & O_DIRECT)) {
1865                                 CERROR("%s: unable to disable locking on "
1866                                        "non-O_DIRECT file\n", current->comm);
1867                                 RETURN(-EINVAL);
1868                         }
1869
1870                         fd->fd_flags |= flags;
1871                 } else {
1872                         fd->fd_flags &= ~flags;
1873                 }
1874                 RETURN(0);
1875         case LL_IOC_LOV_SETSTRIPE:
1876                 RETURN(ll_lov_setstripe(inode, file, arg));
1877         case LL_IOC_LOV_SETEA:
1878                 RETURN(ll_lov_setea(inode, file, arg));
1879         case LL_IOC_LOV_GETSTRIPE:
1880                 RETURN(ll_lov_getstripe(inode, arg));
1881         case LL_IOC_RECREATE_OBJ:
1882                 RETURN(ll_lov_recreate_obj(inode, arg));
1883         case LL_IOC_RECREATE_FID:
1884                 RETURN(ll_lov_recreate_fid(inode, arg));
1885         case FSFILT_IOC_FIEMAP:
1886                 RETURN(ll_ioctl_fiemap(inode, arg));
1887         case FSFILT_IOC_GETFLAGS:
1888         case FSFILT_IOC_SETFLAGS:
1889                 RETURN(ll_iocontrol(inode, file, cmd, arg));
1890         case FSFILT_IOC_GETVERSION_OLD:
1891         case FSFILT_IOC_GETVERSION:
1892                 RETURN(put_user(inode->i_generation, (int *)arg));
1893         case LL_IOC_GROUP_LOCK:
1894                 RETURN(ll_get_grouplock(inode, file, arg));
1895         case LL_IOC_GROUP_UNLOCK:
1896                 RETURN(ll_put_grouplock(inode, file, arg));
1897         case IOC_OBD_STATFS:
1898                 RETURN(ll_obd_statfs(inode, (void *)arg));
1899
1900         /* We need to special case any other ioctls we want to handle,
1901          * to send them to the MDS/OST as appropriate and to properly
1902          * network encode the arg field.
1903         case FSFILT_IOC_SETVERSION_OLD:
1904         case FSFILT_IOC_SETVERSION:
1905         */
1906         case LL_IOC_FLUSHCTX:
1907                 RETURN(ll_flush_ctx(inode));
1908         case LL_IOC_PATH2FID: {
1909                 if (cfs_copy_to_user((void *)arg, ll_inode2fid(inode),
1910                                      sizeof(struct lu_fid)))
1911                         RETURN(-EFAULT);
1912
1913                 RETURN(0);
1914         }
1915         case OBD_IOC_FID2PATH:
1916                 RETURN(ll_fid2path(inode, (void *)arg));
1917         case LL_IOC_DATA_VERSION: {
1918                 struct ioc_data_version idv;
1919                 int rc;
1920
1921                 if (cfs_copy_from_user(&idv, (char *)arg, sizeof(idv)))
1922                         RETURN(-EFAULT);
1923
1924                 rc = ll_data_version(inode, &idv.idv_version,
1925                                      !(idv.idv_flags & LL_DV_NOFLUSH));
1926
1927                 if (rc == 0 &&
1928                     cfs_copy_to_user((char *) arg, &idv, sizeof(idv)))
1929                         RETURN(-EFAULT);
1930
1931                 RETURN(rc);
1932         }
1933
1934         case LL_IOC_GET_MDTIDX: {
1935                 int mdtidx;
1936
1937                 mdtidx = ll_get_mdt_idx(inode);
1938                 if (mdtidx < 0)
1939                         RETURN(mdtidx);
1940
1941                 if (put_user((int)mdtidx, (int*)arg))
1942                         RETURN(-EFAULT);
1943
1944                 RETURN(0);
1945         }
1946         case OBD_IOC_GETDTNAME:
1947         case OBD_IOC_GETMDNAME:
1948                 RETURN(ll_get_obd_name(inode, cmd, arg));
1949         default: {
1950                 int err;
1951
1952                 if (LLIOC_STOP ==
1953                     ll_iocontrol_call(inode, file, cmd, arg, &err))
1954                         RETURN(err);
1955
1956                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
1957                                      (void *)arg));
1958         }
1959         }
1960 }
1961
1962 #ifndef HAVE_FILE_LLSEEK_SIZE
1963 static inline loff_t
1964 llseek_execute(struct file *file, loff_t offset, loff_t maxsize)
1965 {
1966         if (offset < 0 && !(file->f_mode & FMODE_UNSIGNED_OFFSET))
1967                 return -EINVAL;
1968         if (offset > maxsize)
1969                 return -EINVAL;
1970
1971         if (offset != file->f_pos) {
1972                 file->f_pos = offset;
1973                 file->f_version = 0;
1974         }
1975         return offset;
1976 }
1977
1978 static loff_t
1979 generic_file_llseek_size(struct file *file, loff_t offset, int origin,
1980                 loff_t maxsize, loff_t eof)
1981 {
1982         struct inode *inode = file->f_dentry->d_inode;
1983
1984         switch (origin) {
1985         case SEEK_END:
1986                 offset += eof;
1987                 break;
1988         case SEEK_CUR:
1989                 /*
1990                  * Here we special-case the lseek(fd, 0, SEEK_CUR)
1991                  * position-querying operation.  Avoid rewriting the "same"
1992                  * f_pos value back to the file because a concurrent read(),
1993                  * write() or lseek() might have altered it
1994                  */
1995                 if (offset == 0)
1996                         return file->f_pos;
1997                 /*
1998                  * f_lock protects against read/modify/write race with other
1999                  * SEEK_CURs. Note that parallel writes and reads behave
2000                  * like SEEK_SET.
2001                  */
2002                 mutex_lock(&inode->i_mutex);
2003                 offset = llseek_execute(file, file->f_pos + offset, maxsize);
2004                 mutex_unlock(&inode->i_mutex);
2005                 return offset;
2006         case SEEK_DATA:
2007                 /*
2008                  * In the generic case the entire file is data, so as long as
2009                  * offset isn't at the end of the file then the offset is data.
2010                  */
2011                 if (offset >= eof)
2012                         return -ENXIO;
2013                 break;
2014         case SEEK_HOLE:
2015                 /*
2016                  * There is a virtual hole at the end of the file, so as long as
2017                  * offset isn't i_size or larger, return i_size.
2018                  */
2019                 if (offset >= eof)
2020                         return -ENXIO;
2021                 offset = eof;
2022                 break;
2023         }
2024
2025         return llseek_execute(file, offset, maxsize);
2026 }
2027 #endif
2028
2029 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2030 {
2031         struct inode *inode = file->f_dentry->d_inode;
2032         loff_t retval, eof = 0;
2033
2034         ENTRY;
2035         retval = offset + ((origin == SEEK_END) ? i_size_read(inode) :
2036                            (origin == SEEK_CUR) ? file->f_pos : 0);
2037         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%d)\n",
2038                inode->i_ino, inode->i_generation, inode, retval, retval,
2039                origin);
2040         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2041
2042         if (origin == SEEK_END || origin == SEEK_HOLE || origin == SEEK_DATA) {
2043                 retval = ll_glimpse_size(inode);
2044                 if (retval != 0)
2045                         RETURN(retval);
2046                 eof = i_size_read(inode);
2047         }
2048
2049         retval = generic_file_llseek_size(file, offset, origin,
2050                                           ll_file_maxbytes(inode), eof);
2051         RETURN(retval);
2052 }
2053
2054 int ll_flush(struct file *file, fl_owner_t id)
2055 {
2056         struct inode *inode = file->f_dentry->d_inode;
2057         struct ll_inode_info *lli = ll_i2info(inode);
2058         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2059         int rc, err;
2060
2061         LASSERT(!S_ISDIR(inode->i_mode));
2062
2063         /* catch async errors that were recorded back when async writeback
2064          * failed for pages in this mapping. */
2065         rc = lli->lli_async_rc;
2066         lli->lli_async_rc = 0;
2067         err = lov_read_and_clear_async_rc(lli->lli_clob);
2068         if (rc == 0)
2069                 rc = err;
2070
2071         /* The application has been told write failure already.
2072          * Do not report failure again. */
2073         if (fd->fd_write_failed)
2074                 return 0;
2075         return rc ? -EIO : 0;
2076 }
2077
2078 /**
2079  * Called to make sure a portion of file has been written out.
2080  * if @local_only is not true, it will send OST_SYNC RPCs to ost.
2081  *
2082  * Return how many pages have been written.
2083  */
2084 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2085                        enum cl_fsync_mode mode)
2086 {
2087         struct cl_env_nest nest;
2088         struct lu_env *env;
2089         struct cl_io *io;
2090         struct obd_capa *capa = NULL;
2091         struct cl_fsync_io *fio;
2092         int result;
2093         ENTRY;
2094
2095         if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2096             mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2097                 RETURN(-EINVAL);
2098
2099         env = cl_env_nested_get(&nest);
2100         if (IS_ERR(env))
2101                 RETURN(PTR_ERR(env));
2102
2103         capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2104
2105         io = ccc_env_thread_io(env);
2106         io->ci_obj = cl_i2info(inode)->lli_clob;
2107         io->ci_ignore_layout = 1;
2108
2109         /* initialize parameters for sync */
2110         fio = &io->u.ci_fsync;
2111         fio->fi_capa = capa;
2112         fio->fi_start = start;
2113         fio->fi_end = end;
2114         fio->fi_fid = ll_inode2fid(inode);
2115         fio->fi_mode = mode;
2116         fio->fi_nr_written = 0;
2117
2118         if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2119                 result = cl_io_loop(env, io);
2120         else
2121                 result = io->ci_result;
2122         if (result == 0)
2123                 result = fio->fi_nr_written;
2124         cl_io_fini(env, io);
2125         cl_env_nested_put(&nest, env);
2126
2127         capa_put(capa);
2128
2129         RETURN(result);
2130 }
2131
2132 #ifdef HAVE_FILE_FSYNC_4ARGS
2133 int ll_fsync(struct file *file, loff_t start, loff_t end, int data)
2134 #elif defined(HAVE_FILE_FSYNC_2ARGS)
2135 int ll_fsync(struct file *file, int data)
2136 #else
2137 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2138 #endif
2139 {
2140         struct inode *inode = file->f_dentry->d_inode;
2141         struct ll_inode_info *lli = ll_i2info(inode);
2142         struct ptlrpc_request *req;
2143         struct obd_capa *oc;
2144         int rc, err;
2145         ENTRY;
2146
2147         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2148                inode->i_generation, inode);
2149         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2150
2151 #ifdef HAVE_FILE_FSYNC_4ARGS
2152         rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
2153         mutex_lock(&inode->i_mutex);
2154 #else
2155         /* fsync's caller has already called _fdata{sync,write}, we want
2156          * that IO to finish before calling the osc and mdc sync methods */
2157         rc = filemap_fdatawait(inode->i_mapping);
2158 #endif
2159
2160         /* catch async errors that were recorded back when async writeback
2161          * failed for pages in this mapping. */
2162         if (!S_ISDIR(inode->i_mode)) {
2163                 err = lli->lli_async_rc;
2164                 lli->lli_async_rc = 0;
2165                 if (rc == 0)
2166                         rc = err;
2167                 err = lov_read_and_clear_async_rc(lli->lli_clob);
2168                 if (rc == 0)
2169                         rc = err;
2170         }
2171
2172         oc = ll_mdscapa_get(inode);
2173         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2174                       &req);
2175         capa_put(oc);
2176         if (!rc)
2177                 rc = err;
2178         if (!err)
2179                 ptlrpc_req_finished(req);
2180
2181         if (data) {
2182                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2183
2184                 err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
2185                                 CL_FSYNC_ALL);
2186                 if (rc == 0 && err < 0)
2187                         rc = err;
2188                 if (rc < 0)
2189                         fd->fd_write_failed = true;
2190                 else
2191                         fd->fd_write_failed = false;
2192         }
2193
2194 #ifdef HAVE_FILE_FSYNC_4ARGS
2195         mutex_unlock(&inode->i_mutex);
2196 #endif
2197         RETURN(rc);
2198 }
2199
2200 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2201 {
2202         struct inode *inode = file->f_dentry->d_inode;
2203         struct ll_sb_info *sbi = ll_i2sbi(inode);
2204         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
2205                                            .ei_cb_cp =ldlm_flock_completion_ast,
2206                                            .ei_cbdata = file_lock };
2207         struct md_op_data *op_data;
2208         struct lustre_handle lockh = {0};
2209         ldlm_policy_data_t flock = {{0}};
2210         int flags = 0;
2211         int rc;
2212         ENTRY;
2213
2214         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2215                inode->i_ino, file_lock);
2216
2217         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2218
2219         if (file_lock->fl_flags & FL_FLOCK) {
2220                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2221                 /* flocks are whole-file locks */
2222                 flock.l_flock.end = OFFSET_MAX;
2223                 /* For flocks owner is determined by the local file desctiptor*/
2224                 flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2225         } else if (file_lock->fl_flags & FL_POSIX) {
2226                 flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2227                 flock.l_flock.start = file_lock->fl_start;
2228                 flock.l_flock.end = file_lock->fl_end;
2229         } else {
2230                 RETURN(-EINVAL);
2231         }
2232         flock.l_flock.pid = file_lock->fl_pid;
2233
2234         /* Somewhat ugly workaround for svc lockd.
2235          * lockd installs custom fl_lmops->lm_compare_owner that checks
2236          * for the fl_owner to be the same (which it always is on local node
2237          * I guess between lockd processes) and then compares pid.
2238          * As such we assign pid to the owner field to make it all work,
2239          * conflict with normal locks is unlikely since pid space and
2240          * pointer space for current->files are not intersecting */
2241         if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
2242                 flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2243
2244         switch (file_lock->fl_type) {
2245         case F_RDLCK:
2246                 einfo.ei_mode = LCK_PR;
2247                 break;
2248         case F_UNLCK:
2249                 /* An unlock request may or may not have any relation to
2250                  * existing locks so we may not be able to pass a lock handle
2251                  * via a normal ldlm_lock_cancel() request. The request may even
2252                  * unlock a byte range in the middle of an existing lock. In
2253                  * order to process an unlock request we need all of the same
2254                  * information that is given with a normal read or write record
2255                  * lock request. To avoid creating another ldlm unlock (cancel)
2256                  * message we'll treat a LCK_NL flock request as an unlock. */
2257                 einfo.ei_mode = LCK_NL;
2258                 break;
2259         case F_WRLCK:
2260                 einfo.ei_mode = LCK_PW;
2261                 break;
2262         default:
2263                 CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2264                         file_lock->fl_type);
2265                 RETURN (-ENOTSUPP);
2266         }
2267
2268         switch (cmd) {
2269         case F_SETLKW:
2270 #ifdef F_SETLKW64
2271         case F_SETLKW64:
2272 #endif
2273                 flags = 0;
2274                 break;
2275         case F_SETLK:
2276 #ifdef F_SETLK64
2277         case F_SETLK64:
2278 #endif
2279                 flags = LDLM_FL_BLOCK_NOWAIT;
2280                 break;
2281         case F_GETLK:
2282 #ifdef F_GETLK64
2283         case F_GETLK64:
2284 #endif
2285                 flags = LDLM_FL_TEST_LOCK;
2286                 /* Save the old mode so that if the mode in the lock changes we
2287                  * can decrement the appropriate reader or writer refcount. */
2288                 file_lock->fl_type = einfo.ei_mode;
2289                 break;
2290         default:
2291                 CERROR("unknown fcntl lock command: %d\n", cmd);
2292                 RETURN (-EINVAL);
2293         }
2294
2295         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2296                                      LUSTRE_OPC_ANY, NULL);
2297         if (IS_ERR(op_data))
2298                 RETURN(PTR_ERR(op_data));
2299
2300         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2301                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2302                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2303
2304         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2305                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2306
2307         ll_finish_md_op_data(op_data);
2308
2309         if ((file_lock->fl_flags & FL_FLOCK) &&
2310             (rc == 0 || file_lock->fl_type == F_UNLCK))
2311                 flock_lock_file_wait(file, file_lock);
2312         if ((file_lock->fl_flags & FL_POSIX) &&
2313             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2314             !(flags & LDLM_FL_TEST_LOCK))
2315                 posix_lock_file_wait(file, file_lock);
2316
2317         RETURN(rc);
2318 }
2319
2320 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2321 {
2322         ENTRY;
2323
2324         RETURN(-ENOSYS);
2325 }
2326
2327 /**
2328  * test if some locks matching bits and l_req_mode are acquired
2329  * - bits can be in different locks
2330  * - if found clear the common lock bits in *bits
2331  * - the bits not found, are kept in *bits
2332  * \param inode [IN]
2333  * \param bits [IN] searched lock bits [IN]
2334  * \param l_req_mode [IN] searched lock mode
2335  * \retval boolean, true iff all bits are found
2336  */
2337 int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2338 {
2339         struct lustre_handle lockh;
2340         ldlm_policy_data_t policy;
2341         ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
2342                                 (LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
2343         struct lu_fid *fid;
2344         __u64 flags;
2345         int i;
2346         ENTRY;
2347
2348         if (!inode)
2349                RETURN(0);
2350
2351         fid = &ll_i2info(inode)->lli_fid;
2352         CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
2353                ldlm_lockname[mode]);
2354
2355         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2356         for (i = 0; i < MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
2357                 policy.l_inodebits.bits = *bits & (1 << i);
2358                 if (policy.l_inodebits.bits == 0)
2359                         continue;
2360
2361                 if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
2362                                   &policy, mode, &lockh)) {
2363                         struct ldlm_lock *lock;
2364
2365                         lock = ldlm_handle2lock(&lockh);
2366                         if (lock) {
2367                                 *bits &=
2368                                       ~(lock->l_policy_data.l_inodebits.bits);
2369                                 LDLM_LOCK_PUT(lock);
2370                         } else {
2371                                 *bits &= ~policy.l_inodebits.bits;
2372                         }
2373                 }
2374         }
2375         RETURN(*bits == 0);
2376 }
2377
2378 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2379                             struct lustre_handle *lockh, __u64 flags)
2380 {
2381         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2382         struct lu_fid *fid;
2383         ldlm_mode_t rc;
2384         ENTRY;
2385
2386         fid = &ll_i2info(inode)->lli_fid;
2387         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2388
2389         rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
2390                            fid, LDLM_IBITS, &policy,
2391                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2392         RETURN(rc);
2393 }
2394
2395 static int ll_inode_revalidate_fini(struct inode *inode, int rc)
2396 {
2397         /* Already unlinked. Just update nlink and return success */
2398         if (rc == -ENOENT) {
2399                 clear_nlink(inode);
2400                 /* This path cannot be hit for regular files unless in
2401                  * case of obscure races, so no need to to validate
2402                  * size. */
2403                 if (!S_ISREG(inode->i_mode) && !S_ISDIR(inode->i_mode))
2404                         return 0;
2405         } else if (rc != 0) {
2406                 CERROR("%s: revalidate FID "DFID" error: rc = %d\n",
2407                        ll_get_fsname(inode->i_sb, NULL, 0),
2408                        PFID(ll_inode2fid(inode)), rc);
2409         }
2410
2411         return rc;
2412 }
2413
2414 int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2415                              __u64 ibits)
2416 {
2417         struct inode *inode = dentry->d_inode;
2418         struct ptlrpc_request *req = NULL;
2419         struct obd_export *exp;
2420         int rc = 0;
2421         ENTRY;
2422
2423         LASSERT(inode != NULL);
2424
2425         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2426                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2427
2428         exp = ll_i2mdexp(inode);
2429
2430         /* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
2431          *      But under CMD case, it caused some lock issues, should be fixed
2432          *      with new CMD ibits lock. See bug 12718 */
2433         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2434                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2435                 struct md_op_data *op_data;
2436
2437                 if (ibits == MDS_INODELOCK_LOOKUP)
2438                         oit.it_op = IT_LOOKUP;
2439
2440                 /* Call getattr by fid, so do not provide name at all. */
2441                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2442                                              dentry->d_inode, NULL, 0, 0,
2443                                              LUSTRE_OPC_ANY, NULL);
2444                 if (IS_ERR(op_data))
2445                         RETURN(PTR_ERR(op_data));
2446
2447                 oit.it_create_mode |= M_CHECK_STALE;
2448                 rc = md_intent_lock(exp, op_data, NULL, 0,
2449                                     /* we are not interested in name
2450                                        based lookup */
2451                                     &oit, 0, &req,
2452                                     ll_md_blocking_ast, 0);
2453                 ll_finish_md_op_data(op_data);
2454                 oit.it_create_mode &= ~M_CHECK_STALE;
2455                 if (rc < 0) {
2456                         rc = ll_inode_revalidate_fini(inode, rc);
2457                         GOTO (out, rc);
2458                 }
2459
2460                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2461                 if (rc != 0) {
2462                         ll_intent_release(&oit);
2463                         GOTO(out, rc);
2464                 }
2465
2466                 /* Unlinked? Unhash dentry, so it is not picked up later by
2467                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2468                    here to preserve get_cwd functionality on 2.6.
2469                    Bug 10503 */
2470                 if (!dentry->d_inode->i_nlink)
2471                         d_lustre_invalidate(dentry);
2472
2473                 ll_lookup_finish_locks(&oit, dentry);
2474         } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
2475                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2476                 obd_valid valid = OBD_MD_FLGETATTR;
2477                 struct md_op_data *op_data;
2478                 int ealen = 0;
2479
2480                 if (S_ISREG(inode->i_mode)) {
2481                         rc = ll_get_max_mdsize(sbi, &ealen);
2482                         if (rc)
2483                                 RETURN(rc);
2484                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2485                 }
2486
2487                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
2488                                              0, ealen, LUSTRE_OPC_ANY,
2489                                              NULL);
2490                 if (IS_ERR(op_data))
2491                         RETURN(PTR_ERR(op_data));
2492
2493                 op_data->op_valid = valid;
2494                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2495                  * capa for this inode. Because we only keep capas of dirs
2496                  * fresh. */
2497                 rc = md_getattr(sbi->ll_md_exp, op_data, &req);
2498                 ll_finish_md_op_data(op_data);
2499                 if (rc) {
2500                         rc = ll_inode_revalidate_fini(inode, rc);
2501                         RETURN(rc);
2502                 }
2503
2504                 rc = ll_prep_inode(&inode, req, NULL);
2505         }
2506 out:
2507         ptlrpc_req_finished(req);
2508         return rc;
2509 }
2510
2511 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2512                            __u64 ibits)
2513 {
2514         struct inode *inode = dentry->d_inode;
2515         int rc;
2516         ENTRY;
2517
2518         rc = __ll_inode_revalidate_it(dentry, it, ibits);
2519         if (rc != 0)
2520                 RETURN(rc);
2521
2522         /* if object isn't regular file, don't validate size */
2523         if (!S_ISREG(inode->i_mode)) {
2524                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
2525                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
2526                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
2527         } else {
2528                 rc = ll_glimpse_size(inode);
2529         }
2530         RETURN(rc);
2531 }
2532
2533 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2534                   struct lookup_intent *it, struct kstat *stat)
2535 {
2536         struct inode *inode = de->d_inode;
2537         struct ll_sb_info *sbi = ll_i2sbi(inode);
2538         struct ll_inode_info *lli = ll_i2info(inode);
2539         int res = 0;
2540
2541         res = ll_inode_revalidate_it(de, it, MDS_INODELOCK_UPDATE |
2542                                              MDS_INODELOCK_LOOKUP);
2543         ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
2544
2545         if (res)
2546                 return res;
2547
2548         stat->dev = inode->i_sb->s_dev;
2549         if (ll_need_32bit_api(sbi))
2550                 stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
2551         else
2552                 stat->ino = inode->i_ino;
2553         stat->mode = inode->i_mode;
2554         stat->nlink = inode->i_nlink;
2555         stat->uid = inode->i_uid;
2556         stat->gid = inode->i_gid;
2557         stat->rdev = inode->i_rdev;
2558         stat->atime = inode->i_atime;
2559         stat->mtime = inode->i_mtime;
2560         stat->ctime = inode->i_ctime;
2561         stat->blksize = 1 << inode->i_blkbits;
2562
2563         stat->size = i_size_read(inode);
2564         stat->blocks = inode->i_blocks;
2565
2566         return 0;
2567 }
2568 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2569 {
2570         struct lookup_intent it = { .it_op = IT_GETATTR };
2571
2572         return ll_getattr_it(mnt, de, &it, stat);
2573 }
2574
2575 #ifdef HAVE_LINUX_FIEMAP_H
2576 int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
2577                 __u64 start, __u64 len)
2578 {
2579         int rc;
2580         size_t num_bytes;
2581         struct ll_user_fiemap *fiemap;
2582         unsigned int extent_count = fieinfo->fi_extents_max;
2583
2584         num_bytes = sizeof(*fiemap) + (extent_count *
2585                                        sizeof(struct ll_fiemap_extent));
2586         OBD_ALLOC_LARGE(fiemap, num_bytes);
2587
2588         if (fiemap == NULL)
2589                 RETURN(-ENOMEM);
2590
2591         fiemap->fm_flags = fieinfo->fi_flags;
2592         fiemap->fm_extent_count = fieinfo->fi_extents_max;
2593         fiemap->fm_start = start;
2594         fiemap->fm_length = len;
2595         memcpy(&fiemap->fm_extents[0], fieinfo->fi_extents_start,
2596                sizeof(struct ll_fiemap_extent));
2597
2598         rc = ll_do_fiemap(inode, fiemap, num_bytes);
2599
2600         fieinfo->fi_flags = fiemap->fm_flags;
2601         fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
2602         memcpy(fieinfo->fi_extents_start, &fiemap->fm_extents[0],
2603                fiemap->fm_mapped_extents * sizeof(struct ll_fiemap_extent));
2604
2605         OBD_FREE_LARGE(fiemap, num_bytes);
2606         return rc;
2607 }
2608 #endif
2609
2610 struct posix_acl * ll_get_acl(struct inode *inode, int type)
2611 {
2612         struct ll_inode_info *lli = ll_i2info(inode);
2613         struct posix_acl *acl = NULL;
2614         ENTRY;
2615
2616         spin_lock(&lli->lli_lock);
2617         /* VFS' acl_permission_check->check_acl will release the refcount */
2618         acl = posix_acl_dup(lli->lli_posix_acl);
2619         spin_unlock(&lli->lli_lock);
2620
2621         RETURN(acl);
2622 }
2623
2624 #ifndef HAVE_GENERIC_PERMISSION_2ARGS
2625 static int
2626 # ifdef HAVE_GENERIC_PERMISSION_4ARGS
2627 ll_check_acl(struct inode *inode, int mask, unsigned int flags)
2628 # else
2629 ll_check_acl(struct inode *inode, int mask)
2630 # endif
2631 {
2632 # ifdef CONFIG_FS_POSIX_ACL
2633         struct posix_acl *acl;
2634         int rc;
2635         ENTRY;
2636
2637 #  ifdef HAVE_GENERIC_PERMISSION_4ARGS
2638         if (flags & IPERM_FLAG_RCU)
2639                 return -ECHILD;
2640 #  endif
2641         acl = ll_get_acl(inode, ACL_TYPE_ACCESS);
2642
2643         if (!acl)
2644                 RETURN(-EAGAIN);
2645
2646         rc = posix_acl_permission(inode, acl, mask);
2647         posix_acl_release(acl);
2648
2649         RETURN(rc);
2650 # else /* !CONFIG_FS_POSIX_ACL */
2651         return -EAGAIN;
2652 # endif /* CONFIG_FS_POSIX_ACL */
2653 }
2654 #endif /* HAVE_GENERIC_PERMISSION_2ARGS */
2655
2656 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
2657 int ll_inode_permission(struct inode *inode, int mask, unsigned int flags)
2658 #else
2659 # ifdef HAVE_INODE_PERMISION_2ARGS
2660 int ll_inode_permission(struct inode *inode, int mask)
2661 # else
2662 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2663 # endif
2664 #endif
2665 {
2666         int rc = 0;
2667         ENTRY;
2668
2669 #ifdef MAY_NOT_BLOCK
2670         if (mask & MAY_NOT_BLOCK)
2671                 return -ECHILD;
2672 #elif defined(HAVE_GENERIC_PERMISSION_4ARGS)
2673         if (flags & IPERM_FLAG_RCU)
2674                 return -ECHILD;
2675 #endif
2676
2677        /* as root inode are NOT getting validated in lookup operation,
2678         * need to do it before permission check. */
2679
2680         if (inode == inode->i_sb->s_root->d_inode) {
2681                 struct lookup_intent it = { .it_op = IT_LOOKUP };
2682
2683                 rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
2684                                               MDS_INODELOCK_LOOKUP);
2685                 if (rc)
2686                         RETURN(rc);
2687         }
2688
2689         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
2690                inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
2691
2692         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2693                 return lustre_check_remote_perm(inode, mask);
2694
2695         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2696         rc = ll_generic_permission(inode, mask, flags, ll_check_acl);
2697
2698         RETURN(rc);
2699 }
2700
2701 #ifdef HAVE_FILE_READV
2702 #define READ_METHOD readv
2703 #define READ_FUNCTION ll_file_readv
2704 #define WRITE_METHOD writev
2705 #define WRITE_FUNCTION ll_file_writev
2706 #else
2707 #define READ_METHOD aio_read
2708 #define READ_FUNCTION ll_file_aio_read
2709 #define WRITE_METHOD aio_write
2710 #define WRITE_FUNCTION ll_file_aio_write
2711 #endif
2712
2713 /* -o localflock - only provides locally consistent flock locks */
2714 struct file_operations ll_file_operations = {
2715         .read           = ll_file_read,
2716         .READ_METHOD    = READ_FUNCTION,
2717         .write          = ll_file_write,
2718         .WRITE_METHOD   = WRITE_FUNCTION,
2719         .unlocked_ioctl = ll_file_ioctl,
2720         .open           = ll_file_open,
2721         .release        = ll_file_release,
2722         .mmap           = ll_file_mmap,
2723         .llseek         = ll_file_seek,
2724 #ifdef HAVE_KERNEL_SENDFILE
2725         .sendfile       = ll_file_sendfile,
2726 #endif
2727 #ifdef HAVE_KERNEL_SPLICE_READ
2728         .splice_read    = ll_file_splice_read,
2729 #endif
2730         .fsync          = ll_fsync,
2731         .flush          = ll_flush
2732 };
2733
2734 struct file_operations ll_file_operations_flock = {
2735         .read           = ll_file_read,
2736         .READ_METHOD    = READ_FUNCTION,
2737         .write          = ll_file_write,
2738         .WRITE_METHOD   = WRITE_FUNCTION,
2739         .unlocked_ioctl = ll_file_ioctl,
2740         .open           = ll_file_open,
2741         .release        = ll_file_release,
2742         .mmap           = ll_file_mmap,
2743         .llseek         = ll_file_seek,
2744 #ifdef HAVE_KERNEL_SENDFILE
2745         .sendfile       = ll_file_sendfile,
2746 #endif
2747 #ifdef HAVE_KERNEL_SPLICE_READ
2748         .splice_read    = ll_file_splice_read,
2749 #endif
2750         .fsync          = ll_fsync,
2751         .flush          = ll_flush,
2752         .flock          = ll_file_flock,
2753         .lock           = ll_file_flock
2754 };
2755
2756 /* These are for -o noflock - to return ENOSYS on flock calls */
2757 struct file_operations ll_file_operations_noflock = {
2758         .read           = ll_file_read,
2759         .READ_METHOD    = READ_FUNCTION,
2760         .write          = ll_file_write,
2761         .WRITE_METHOD   = WRITE_FUNCTION,
2762         .unlocked_ioctl = ll_file_ioctl,
2763         .open           = ll_file_open,
2764         .release        = ll_file_release,
2765         .mmap           = ll_file_mmap,
2766         .llseek         = ll_file_seek,
2767 #ifdef HAVE_KERNEL_SENDFILE
2768         .sendfile       = ll_file_sendfile,
2769 #endif
2770 #ifdef HAVE_KERNEL_SPLICE_READ
2771         .splice_read    = ll_file_splice_read,
2772 #endif
2773         .fsync          = ll_fsync,
2774         .flush          = ll_flush,
2775         .flock          = ll_file_noflock,
2776         .lock           = ll_file_noflock
2777 };
2778
2779 struct inode_operations ll_file_inode_operations = {
2780         .setattr        = ll_setattr,
2781         .getattr        = ll_getattr,
2782         .permission     = ll_inode_permission,
2783         .setxattr       = ll_setxattr,
2784         .getxattr       = ll_getxattr,
2785         .listxattr      = ll_listxattr,
2786         .removexattr    = ll_removexattr,
2787 #ifdef  HAVE_LINUX_FIEMAP_H
2788         .fiemap         = ll_fiemap,
2789 #endif
2790 #ifdef HAVE_IOP_GET_ACL
2791         .get_acl        = ll_get_acl,
2792 #endif
2793 };
2794
2795 /* dynamic ioctl number support routins */
2796 static struct llioc_ctl_data {
2797         struct rw_semaphore     ioc_sem;
2798         cfs_list_t              ioc_head;
2799 } llioc = {
2800         __RWSEM_INITIALIZER(llioc.ioc_sem),
2801         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2802 };
2803
2804
2805 struct llioc_data {
2806         cfs_list_t              iocd_list;
2807         unsigned int            iocd_size;
2808         llioc_callback_t        iocd_cb;
2809         unsigned int            iocd_count;
2810         unsigned int            iocd_cmd[0];
2811 };
2812
2813 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2814 {
2815         unsigned int size;
2816         struct llioc_data *in_data = NULL;
2817         ENTRY;
2818
2819         if (cb == NULL || cmd == NULL ||
2820             count > LLIOC_MAX_CMD || count < 0)
2821                 RETURN(NULL);
2822
2823         size = sizeof(*in_data) + count * sizeof(unsigned int);
2824         OBD_ALLOC(in_data, size);
2825         if (in_data == NULL)
2826                 RETURN(NULL);
2827
2828         memset(in_data, 0, sizeof(*in_data));
2829         in_data->iocd_size = size;
2830         in_data->iocd_cb = cb;
2831         in_data->iocd_count = count;
2832         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2833
2834         down_write(&llioc.ioc_sem);
2835         cfs_list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2836         up_write(&llioc.ioc_sem);
2837
2838         RETURN(in_data);
2839 }
2840
2841 void ll_iocontrol_unregister(void *magic)
2842 {
2843         struct llioc_data *tmp;
2844
2845         if (magic == NULL)
2846                 return;
2847
2848         down_write(&llioc.ioc_sem);
2849         cfs_list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2850                 if (tmp == magic) {
2851                         unsigned int size = tmp->iocd_size;
2852
2853                         cfs_list_del(&tmp->iocd_list);
2854                         up_write(&llioc.ioc_sem);
2855
2856                         OBD_FREE(tmp, size);
2857                         return;
2858                 }
2859         }
2860         up_write(&llioc.ioc_sem);
2861
2862         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2863 }
2864
2865 EXPORT_SYMBOL(ll_iocontrol_register);
2866 EXPORT_SYMBOL(ll_iocontrol_unregister);
2867
2868 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
2869                         unsigned int cmd, unsigned long arg, int *rcp)
2870 {
2871         enum llioc_iter ret = LLIOC_CONT;
2872         struct llioc_data *data;
2873         int rc = -EINVAL, i;
2874
2875         down_read(&llioc.ioc_sem);
2876         cfs_list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2877                 for (i = 0; i < data->iocd_count; i++) {
2878                         if (cmd != data->iocd_cmd[i])
2879                                 continue;
2880
2881                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2882                         break;
2883                 }
2884
2885                 if (ret == LLIOC_STOP)
2886                         break;
2887         }
2888         up_read(&llioc.ioc_sem);
2889
2890         if (rcp)
2891                 *rcp = rc;
2892         return ret;
2893 }
2894
2895 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
2896 {
2897         struct ll_inode_info *lli = ll_i2info(inode);
2898         struct cl_env_nest nest;
2899         struct lu_env *env;
2900         int result;
2901         ENTRY;
2902
2903         if (lli->lli_clob == NULL)
2904                 RETURN(0);
2905
2906         env = cl_env_nested_get(&nest);
2907         if (IS_ERR(env))
2908                 RETURN(PTR_ERR(env));
2909
2910         result = cl_conf_set(env, lli->lli_clob, conf);
2911         cl_env_nested_put(&nest, env);
2912         RETURN(result);
2913 }
2914
2915 /**
2916  * This function checks if there exists a LAYOUT lock on the client side,
2917  * or enqueues it if it doesn't have one in cache.
2918  *
2919  * This function will not hold layout lock so it may be revoked any time after
2920  * this function returns. Any operations depend on layout should be redone
2921  * in that case.
2922  *
2923  * This function should be called before lov_io_init() to get an uptodate
2924  * layout version, the caller should save the version number and after IO
2925  * is finished, this function should be called again to verify that layout
2926  * is not changed during IO time.
2927  */
2928 int ll_layout_refresh(struct inode *inode, __u32 *gen)
2929 {
2930         struct ll_inode_info  *lli = ll_i2info(inode);
2931         struct ll_sb_info     *sbi = ll_i2sbi(inode);
2932         struct md_op_data     *op_data = NULL;
2933         struct lookup_intent   it = { .it_op = IT_LAYOUT };
2934         struct lustre_handle   lockh = { 0 };
2935         ldlm_mode_t            mode;
2936         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
2937                                            .ei_mode = LCK_CR,
2938                                            .ei_cb_bl = ll_md_blocking_ast,
2939                                            .ei_cb_cp = ldlm_completion_ast,
2940                                            .ei_cbdata = inode };
2941         int rc;
2942         ENTRY;
2943
2944         *gen = 0;
2945         if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
2946                 RETURN(0);
2947
2948         /* sanity checks */
2949         LASSERT(fid_is_sane(ll_inode2fid(inode)));
2950         LASSERT(S_ISREG(inode->i_mode));
2951
2952         /* mostly layout lock is caching on the local side, so try to match
2953          * it before grabbing layout lock mutex. */
2954         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
2955                                 LDLM_FL_LVB_READY);
2956         if (mode != 0) { /* hit cached lock */
2957                 /* lsm_layout_gen is started from 0, plus 1 here to distinguish
2958                  * the cases of no layout and first layout. */
2959                 *gen = lli->lli_layout_gen + 1;
2960
2961                 ldlm_lock_decref(&lockh, mode);
2962                 RETURN(0);
2963         }
2964
2965         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
2966                                      0, 0, LUSTRE_OPC_ANY, NULL);
2967         if (IS_ERR(op_data))
2968                 RETURN(PTR_ERR(op_data));
2969
2970         /* take layout lock mutex to enqueue layout lock exclusively. */
2971         mutex_lock(&lli->lli_layout_mutex);
2972
2973         /* try again inside layout mutex */
2974         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
2975                                 LDLM_FL_LVB_READY);
2976         if (mode != 0) { /* hit cached lock */
2977                 *gen = lli->lli_layout_gen + 1;
2978
2979                 ldlm_lock_decref(&lockh, mode);
2980                 mutex_unlock(&lli->lli_layout_mutex);
2981                 ll_finish_md_op_data(op_data);
2982                 RETURN(0);
2983         }
2984
2985         /* have to enqueue one */
2986         rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
2987                         NULL, 0, NULL, 0);
2988         if (it.d.lustre.it_data != NULL)
2989                 ptlrpc_req_finished(it.d.lustre.it_data);
2990         it.d.lustre.it_data = NULL;
2991
2992         if (rc == 0) {
2993                 struct ldlm_lock *lock;
2994                 struct cl_object_conf conf;
2995                 struct lustre_md md = { NULL };
2996                 void *lmm;
2997                 int lmmsize;
2998
2999                 LASSERT(lustre_handle_is_used(&lockh));
3000
3001                 /* set lock data in case this is a new lock */
3002                 ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
3003
3004                 lock = ldlm_handle2lock(&lockh);
3005                 LASSERT(lock != NULL);
3006
3007                 /* for IT_LAYOUT lock, lmm is returned in lock's lvb
3008                  * data via completion callback */
3009                 lmm = lock->l_lvb_data;
3010                 lmmsize = lock->l_lvb_len;
3011                 if (lmm != NULL) {
3012                         rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
3013                                         lmm, lmmsize);
3014                         if (rc >= 0) {
3015                                 if (md.lsm != NULL)
3016                                         *gen = md.lsm->lsm_layout_gen + 1;
3017                                 rc = 0;
3018                         } else {
3019                                 CERROR("file: "DFID" unpackmd error: %d\n",
3020                                         PFID(&lli->lli_fid), rc);
3021                         }
3022                 }
3023                 LDLM_LOCK_PUT(lock);
3024
3025                 /* set layout to file. This may cause lock expiration as we
3026                  * set layout inside layout ibits lock. */
3027                 memset(&conf, 0, sizeof conf);
3028                 conf.coc_inode = inode;
3029                 conf.u.coc_md = &md;
3030                 ll_layout_conf(inode, &conf);
3031                 /* is this racy? */
3032                 lli->lli_has_smd = md.lsm != NULL;
3033                 if (md.lsm != NULL)
3034                         obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
3035         }
3036         ll_intent_drop_lock(&it);
3037
3038         mutex_unlock(&lli->lli_layout_mutex);
3039         ll_finish_md_op_data(op_data);
3040
3041         RETURN(rc);
3042 }