Whamcloud - gitweb
LU-1337 llite: support SEEK_HOLE and SEEK_DATA
[fs/lustre-release.git] / lustre / llite / file.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <linux/pagemap.h>
47 #include <linux/file.h>
48 #include "llite_internal.h"
49 #include <lustre/ll_fiemap.h>
50
51 #include "cl_object.h"
52
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, CFS_ALLOC_IO);
58         fd->fd_write_failed = false;
59         return fd;
60 }
61
62 static void ll_file_data_put(struct ll_file_data *fd)
63 {
64         if (fd != NULL)
65                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
66 }
67
68 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
69                           struct lustre_handle *fh)
70 {
71         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
72         op_data->op_attr.ia_mode = inode->i_mode;
73         op_data->op_attr.ia_atime = inode->i_atime;
74         op_data->op_attr.ia_mtime = inode->i_mtime;
75         op_data->op_attr.ia_ctime = inode->i_ctime;
76         op_data->op_attr.ia_size = i_size_read(inode);
77         op_data->op_attr_blocks = inode->i_blocks;
78         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
79                                         ll_inode_to_ext_flags(inode->i_flags);
80         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
81         if (fh)
82                 op_data->op_handle = *fh;
83         op_data->op_capa1 = ll_mdscapa_get(inode);
84 }
85
86 /**
87  * Closes the IO epoch and packs all the attributes into @op_data for
88  * the CLOSE rpc.
89  */
90 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
91                              struct obd_client_handle *och)
92 {
93         ENTRY;
94
95         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
96                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
97
98         if (!(och->och_flags & FMODE_WRITE))
99                 goto out;
100
101         if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
102                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
103         else
104                 ll_ioepoch_close(inode, op_data, &och, 0);
105
106 out:
107         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
108         ll_prep_md_op_data(op_data, inode, NULL, NULL,
109                            0, 0, LUSTRE_OPC_ANY, NULL);
110         EXIT;
111 }
112
113 static int ll_close_inode_openhandle(struct obd_export *md_exp,
114                                      struct inode *inode,
115                                      struct obd_client_handle *och)
116 {
117         struct obd_export *exp = ll_i2mdexp(inode);
118         struct md_op_data *op_data;
119         struct ptlrpc_request *req = NULL;
120         struct obd_device *obd = class_exp2obd(exp);
121         int epoch_close = 1;
122         int rc;
123         ENTRY;
124
125         if (obd == NULL) {
126                 /*
127                  * XXX: in case of LMV, is this correct to access
128                  * ->exp_handle?
129                  */
130                 CERROR("Invalid MDC connection handle "LPX64"\n",
131                        ll_i2mdexp(inode)->exp_handle.h_cookie);
132                 GOTO(out, rc = 0);
133         }
134
135         OBD_ALLOC_PTR(op_data);
136         if (op_data == NULL)
137                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
138
139         ll_prepare_close(inode, op_data, och);
140         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
141         rc = md_close(md_exp, op_data, och->och_mod, &req);
142         if (rc == -EAGAIN) {
143                 /* This close must have the epoch closed. */
144                 LASSERT(epoch_close);
145                 /* MDS has instructed us to obtain Size-on-MDS attribute from
146                  * OSTs and send setattr to back to MDS. */
147                 rc = ll_som_update(inode, op_data);
148                 if (rc) {
149                         CERROR("inode %lu mdc Size-on-MDS update failed: "
150                                "rc = %d\n", inode->i_ino, rc);
151                         rc = 0;
152                 }
153         } else if (rc) {
154                 CERROR("inode %lu mdc close failed: rc = %d\n",
155                        inode->i_ino, rc);
156         }
157         ll_finish_md_op_data(op_data);
158
159         if (rc == 0) {
160                 rc = ll_objects_destroy(req, inode);
161                 if (rc)
162                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
163                                inode->i_ino, rc);
164         }
165
166         EXIT;
167 out:
168
169         if (exp_connect_som(exp) && !epoch_close &&
170             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
171                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
172         } else {
173                 md_clear_open_replay_data(md_exp, och);
174                 /* Free @och if it is not waiting for DONE_WRITING. */
175                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
176                 OBD_FREE_PTR(och);
177         }
178         if (req) /* This is close request */
179                 ptlrpc_req_finished(req);
180         return rc;
181 }
182
183 int ll_md_real_close(struct inode *inode, int flags)
184 {
185         struct ll_inode_info *lli = ll_i2info(inode);
186         struct obd_client_handle **och_p;
187         struct obd_client_handle *och;
188         __u64 *och_usecount;
189         int rc = 0;
190         ENTRY;
191
192         if (flags & FMODE_WRITE) {
193                 och_p = &lli->lli_mds_write_och;
194                 och_usecount = &lli->lli_open_fd_write_count;
195         } else if (flags & FMODE_EXEC) {
196                 och_p = &lli->lli_mds_exec_och;
197                 och_usecount = &lli->lli_open_fd_exec_count;
198         } else {
199                 LASSERT(flags & FMODE_READ);
200                 och_p = &lli->lli_mds_read_och;
201                 och_usecount = &lli->lli_open_fd_read_count;
202         }
203
204         mutex_lock(&lli->lli_och_mutex);
205         if (*och_usecount) { /* There are still users of this handle, so
206                                 skip freeing it. */
207                 mutex_unlock(&lli->lli_och_mutex);
208                 RETURN(0);
209         }
210         och=*och_p;
211         *och_p = NULL;
212         mutex_unlock(&lli->lli_och_mutex);
213
214         if (och) { /* There might be a race and somebody have freed this och
215                       already */
216                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
217                                                inode, och);
218         }
219
220         RETURN(rc);
221 }
222
223 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
224                 struct file *file)
225 {
226         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
227         struct ll_inode_info *lli = ll_i2info(inode);
228         int rc = 0;
229         ENTRY;
230
231         /* clear group lock, if present */
232         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
233                 ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
234
235         /* Let's see if we have good enough OPEN lock on the file and if
236            we can skip talking to MDS */
237         if (file->f_dentry->d_inode) { /* Can this ever be false? */
238                 int lockmode;
239                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
240                 struct lustre_handle lockh;
241                 struct inode *inode = file->f_dentry->d_inode;
242                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
243
244                 mutex_lock(&lli->lli_och_mutex);
245                 if (fd->fd_omode & FMODE_WRITE) {
246                         lockmode = LCK_CW;
247                         LASSERT(lli->lli_open_fd_write_count);
248                         lli->lli_open_fd_write_count--;
249                 } else if (fd->fd_omode & FMODE_EXEC) {
250                         lockmode = LCK_PR;
251                         LASSERT(lli->lli_open_fd_exec_count);
252                         lli->lli_open_fd_exec_count--;
253                 } else {
254                         lockmode = LCK_CR;
255                         LASSERT(lli->lli_open_fd_read_count);
256                         lli->lli_open_fd_read_count--;
257                 }
258                 mutex_unlock(&lli->lli_och_mutex);
259
260                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
261                                    LDLM_IBITS, &policy, lockmode,
262                                    &lockh)) {
263                         rc = ll_md_real_close(file->f_dentry->d_inode,
264                                               fd->fd_omode);
265                 }
266         } else {
267                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
268                        file, file->f_dentry, file->f_dentry->d_name.name);
269         }
270
271         LUSTRE_FPRIVATE(file) = NULL;
272         ll_file_data_put(fd);
273         ll_capa_close(inode);
274
275         RETURN(rc);
276 }
277
278 /* While this returns an error code, fput() the caller does not, so we need
279  * to make every effort to clean up all of our state here.  Also, applications
280  * rarely check close errors and even if an error is returned they will not
281  * re-try the close call.
282  */
283 int ll_file_release(struct inode *inode, struct file *file)
284 {
285         struct ll_file_data *fd;
286         struct ll_sb_info *sbi = ll_i2sbi(inode);
287         struct ll_inode_info *lli = ll_i2info(inode);
288         int rc;
289         ENTRY;
290
291         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
292                inode->i_generation, inode);
293
294 #ifdef CONFIG_FS_POSIX_ACL
295         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
296             inode == inode->i_sb->s_root->d_inode) {
297                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
298
299                 LASSERT(fd != NULL);
300                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
301                         fd->fd_flags &= ~LL_FILE_RMTACL;
302                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
303                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
304                 }
305         }
306 #endif
307
308         if (inode->i_sb->s_root != file->f_dentry)
309                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
310         fd = LUSTRE_FPRIVATE(file);
311         LASSERT(fd != NULL);
312
313         /* The last ref on @file, maybe not the the owner pid of statahead.
314          * Different processes can open the same dir, "ll_opendir_key" means:
315          * it is me that should stop the statahead thread. */
316         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
317             lli->lli_opendir_pid != 0)
318                 ll_stop_statahead(inode, lli->lli_opendir_key);
319
320         if (inode->i_sb->s_root == file->f_dentry) {
321                 LUSTRE_FPRIVATE(file) = NULL;
322                 ll_file_data_put(fd);
323                 RETURN(0);
324         }
325
326         if (!S_ISDIR(inode->i_mode)) {
327                 lov_read_and_clear_async_rc(lli->lli_clob);
328                 lli->lli_async_rc = 0;
329         }
330
331         rc = ll_md_close(sbi->ll_md_exp, inode, file);
332
333         if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
334                 libcfs_debug_dumplog();
335
336         RETURN(rc);
337 }
338
339 static int ll_intent_file_open(struct file *file, void *lmm,
340                                int lmmsize, struct lookup_intent *itp)
341 {
342         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
343         struct dentry *parent = file->f_dentry->d_parent;
344         const char *name = file->f_dentry->d_name.name;
345         const int len = file->f_dentry->d_name.len;
346         struct md_op_data *op_data;
347         struct ptlrpc_request *req;
348         __u32 opc = LUSTRE_OPC_ANY;
349         int rc;
350         ENTRY;
351
352         if (!parent)
353                 RETURN(-ENOENT);
354
355         /* Usually we come here only for NFSD, and we want open lock.
356            But we can also get here with pre 2.6.15 patchless kernels, and in
357            that case that lock is also ok */
358         /* We can also get here if there was cached open handle in revalidate_it
359          * but it disappeared while we were getting from there to ll_file_open.
360          * But this means this file was closed and immediatelly opened which
361          * makes a good candidate for using OPEN lock */
362         /* If lmmsize & lmm are not 0, we are just setting stripe info
363          * parameters. No need for the open lock */
364         if (lmm == NULL && lmmsize == 0) {
365                 itp->it_flags |= MDS_OPEN_LOCK;
366                 if (itp->it_flags & FMODE_WRITE)
367                         opc = LUSTRE_OPC_CREATE;
368         }
369
370         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
371                                       file->f_dentry->d_inode, name, len,
372                                       O_RDWR, opc, NULL);
373         if (IS_ERR(op_data))
374                 RETURN(PTR_ERR(op_data));
375
376         itp->it_flags |= MDS_OPEN_BY_FID;
377         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
378                             0 /*unused */, &req, ll_md_blocking_ast, 0);
379         ll_finish_md_op_data(op_data);
380         if (rc == -ESTALE) {
381                 /* reason for keep own exit path - don`t flood log
382                 * with messages with -ESTALE errors.
383                 */
384                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
385                      it_open_error(DISP_OPEN_OPEN, itp))
386                         GOTO(out, rc);
387                 ll_release_openhandle(file->f_dentry, itp);
388                 GOTO(out, rc);
389         }
390
391         if (it_disposition(itp, DISP_LOOKUP_NEG))
392                 GOTO(out, rc = -ENOENT);
393
394         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
395                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
396                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
397                 GOTO(out, rc);
398         }
399
400         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
401         if (!rc && itp->d.lustre.it_lock_mode)
402                 ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
403                                  itp, NULL);
404
405 out:
406         ptlrpc_req_finished(itp->d.lustre.it_data);
407         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
408         ll_intent_drop_lock(itp);
409
410         RETURN(rc);
411 }
412
413 /**
414  * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
415  * not believe attributes if a few ioepoch holders exist. Attributes for
416  * previous ioepoch if new one is opened are also skipped by MDS.
417  */
418 void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
419 {
420         if (ioepoch && lli->lli_ioepoch != ioepoch) {
421                 lli->lli_ioepoch = ioepoch;
422                 CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
423                        ioepoch, PFID(&lli->lli_fid));
424         }
425 }
426
427 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
428                        struct lookup_intent *it, struct obd_client_handle *och)
429 {
430         struct ptlrpc_request *req = it->d.lustre.it_data;
431         struct mdt_body *body;
432
433         LASSERT(och);
434
435         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
436         LASSERT(body != NULL);                      /* reply already checked out */
437
438         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
439         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
440         och->och_fid = lli->lli_fid;
441         och->och_flags = it->it_flags;
442         ll_ioepoch_open(lli, body->ioepoch);
443
444         return md_set_open_replay_data(md_exp, och, req);
445 }
446
447 int ll_local_open(struct file *file, struct lookup_intent *it,
448                   struct ll_file_data *fd, struct obd_client_handle *och)
449 {
450         struct inode *inode = file->f_dentry->d_inode;
451         struct ll_inode_info *lli = ll_i2info(inode);
452         ENTRY;
453
454         LASSERT(!LUSTRE_FPRIVATE(file));
455
456         LASSERT(fd != NULL);
457
458         if (och) {
459                 struct ptlrpc_request *req = it->d.lustre.it_data;
460                 struct mdt_body *body;
461                 int rc;
462
463                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
464                 if (rc)
465                         RETURN(rc);
466
467                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
468                 if ((it->it_flags & FMODE_WRITE) &&
469                     (body->valid & OBD_MD_FLSIZE))
470                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
471                                lli->lli_ioepoch, PFID(&lli->lli_fid));
472         }
473
474         LUSTRE_FPRIVATE(file) = fd;
475         ll_readahead_init(inode, &fd->fd_ras);
476         fd->fd_omode = it->it_flags;
477         RETURN(0);
478 }
479
480 /* Open a file, and (for the very first open) create objects on the OSTs at
481  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
482  * creation or open until ll_lov_setstripe() ioctl is called.
483  *
484  * If we already have the stripe MD locally then we don't request it in
485  * md_open(), by passing a lmm_size = 0.
486  *
487  * It is up to the application to ensure no other processes open this file
488  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
489  * used.  We might be able to avoid races of that sort by getting lli_open_sem
490  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
491  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
492  */
493 int ll_file_open(struct inode *inode, struct file *file)
494 {
495         struct ll_inode_info *lli = ll_i2info(inode);
496         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
497                                           .it_flags = file->f_flags };
498         struct obd_client_handle **och_p = NULL;
499         __u64 *och_usecount = NULL;
500         struct ll_file_data *fd;
501         int rc = 0, opendir_set = 0;
502         ENTRY;
503
504         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
505                inode->i_generation, inode, file->f_flags);
506
507         it = file->private_data; /* XXX: compat macro */
508         file->private_data = NULL; /* prevent ll_local_open assertion */
509
510         fd = ll_file_data_get();
511         if (fd == NULL)
512                 GOTO(out_och_free, rc = -ENOMEM);
513
514         fd->fd_file = file;
515         if (S_ISDIR(inode->i_mode)) {
516                 spin_lock(&lli->lli_sa_lock);
517                 if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
518                     lli->lli_opendir_pid == 0) {
519                         lli->lli_opendir_key = fd;
520                         lli->lli_opendir_pid = cfs_curproc_pid();
521                         opendir_set = 1;
522                 }
523                 spin_unlock(&lli->lli_sa_lock);
524         }
525
526         if (inode->i_sb->s_root == file->f_dentry) {
527                 LUSTRE_FPRIVATE(file) = fd;
528                 RETURN(0);
529         }
530
531         if (!it || !it->d.lustre.it_disposition) {
532                 /* Convert f_flags into access mode. We cannot use file->f_mode,
533                  * because everything but O_ACCMODE mask was stripped from
534                  * there */
535                 if ((oit.it_flags + 1) & O_ACCMODE)
536                         oit.it_flags++;
537                 if (file->f_flags & O_TRUNC)
538                         oit.it_flags |= FMODE_WRITE;
539
540                 /* kernel only call f_op->open in dentry_open.  filp_open calls
541                  * dentry_open after call to open_namei that checks permissions.
542                  * Only nfsd_open call dentry_open directly without checking
543                  * permissions and because of that this code below is safe. */
544                 if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
545                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
546
547                 /* We do not want O_EXCL here, presumably we opened the file
548                  * already? XXX - NFS implications? */
549                 oit.it_flags &= ~O_EXCL;
550
551                 /* bug20584, if "it_flags" contains O_CREAT, the file will be
552                  * created if necessary, then "IT_CREAT" should be set to keep
553                  * consistent with it */
554                 if (oit.it_flags & O_CREAT)
555                         oit.it_op |= IT_CREAT;
556
557                 it = &oit;
558         }
559
560 restart:
561         /* Let's see if we have file open on MDS already. */
562         if (it->it_flags & FMODE_WRITE) {
563                 och_p = &lli->lli_mds_write_och;
564                 och_usecount = &lli->lli_open_fd_write_count;
565         } else if (it->it_flags & FMODE_EXEC) {
566                 och_p = &lli->lli_mds_exec_och;
567                 och_usecount = &lli->lli_open_fd_exec_count;
568          } else {
569                 och_p = &lli->lli_mds_read_och;
570                 och_usecount = &lli->lli_open_fd_read_count;
571         }
572
573         mutex_lock(&lli->lli_och_mutex);
574         if (*och_p) { /* Open handle is present */
575                 if (it_disposition(it, DISP_OPEN_OPEN)) {
576                         /* Well, there's extra open request that we do not need,
577                            let's close it somehow. This will decref request. */
578                         rc = it_open_error(DISP_OPEN_OPEN, it);
579                         if (rc) {
580                                 mutex_unlock(&lli->lli_och_mutex);
581                                 GOTO(out_openerr, rc);
582                         }
583
584                         ll_release_openhandle(file->f_dentry, it);
585                 }
586                 (*och_usecount)++;
587
588                 rc = ll_local_open(file, it, fd, NULL);
589                 if (rc) {
590                         (*och_usecount)--;
591                         mutex_unlock(&lli->lli_och_mutex);
592                         GOTO(out_openerr, rc);
593                 }
594         } else {
595                 LASSERT(*och_usecount == 0);
596                 if (!it->d.lustre.it_disposition) {
597                         /* We cannot just request lock handle now, new ELC code
598                            means that one of other OPEN locks for this file
599                            could be cancelled, and since blocking ast handler
600                            would attempt to grab och_mutex as well, that would
601                            result in a deadlock */
602                         mutex_unlock(&lli->lli_och_mutex);
603                         it->it_create_mode |= M_CHECK_STALE;
604                         rc = ll_intent_file_open(file, NULL, 0, it);
605                         it->it_create_mode &= ~M_CHECK_STALE;
606                         if (rc)
607                                 GOTO(out_openerr, rc);
608
609                         goto restart;
610                 }
611                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
612                 if (!*och_p)
613                         GOTO(out_och_free, rc = -ENOMEM);
614
615                 (*och_usecount)++;
616
617                 /* md_intent_lock() didn't get a request ref if there was an
618                  * open error, so don't do cleanup on the request here
619                  * (bug 3430) */
620                 /* XXX (green): Should not we bail out on any error here, not
621                  * just open error? */
622                 rc = it_open_error(DISP_OPEN_OPEN, it);
623                 if (rc)
624                         GOTO(out_och_free, rc);
625
626                 LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
627
628                 rc = ll_local_open(file, it, fd, *och_p);
629                 if (rc)
630                         GOTO(out_och_free, rc);
631         }
632         mutex_unlock(&lli->lli_och_mutex);
633         fd = NULL;
634
635         /* Must do this outside lli_och_mutex lock to prevent deadlock where
636            different kind of OPEN lock for this same inode gets cancelled
637            by ldlm_cancel_lru */
638         if (!S_ISREG(inode->i_mode))
639                 GOTO(out_och_free, rc);
640
641         ll_capa_open(inode);
642
643         if (!lli->lli_has_smd) {
644                 if (file->f_flags & O_LOV_DELAY_CREATE ||
645                     !(file->f_mode & FMODE_WRITE)) {
646                         CDEBUG(D_INODE, "object creation was delayed\n");
647                         GOTO(out_och_free, rc);
648                 }
649         }
650         file->f_flags &= ~O_LOV_DELAY_CREATE;
651         GOTO(out_och_free, rc);
652
653 out_och_free:
654         if (rc) {
655                 if (och_p && *och_p) {
656                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
657                         *och_p = NULL; /* OBD_FREE writes some magic there */
658                         (*och_usecount)--;
659                 }
660                 mutex_unlock(&lli->lli_och_mutex);
661
662 out_openerr:
663                 if (opendir_set != 0)
664                         ll_stop_statahead(inode, lli->lli_opendir_key);
665                 if (fd != NULL)
666                         ll_file_data_put(fd);
667         } else {
668                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
669         }
670
671         if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
672                 ptlrpc_req_finished(it->d.lustre.it_data);
673                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
674         }
675
676         return rc;
677 }
678
679 /* Fills the obdo with the attributes for the lsm */
680 static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
681                           struct obd_capa *capa, struct obdo *obdo,
682                           __u64 ioepoch, int sync)
683 {
684         struct ptlrpc_request_set *set;
685         struct obd_info            oinfo = { { { 0 } } };
686         int                        rc;
687
688         ENTRY;
689
690         LASSERT(lsm != NULL);
691
692         oinfo.oi_md = lsm;
693         oinfo.oi_oa = obdo;
694         oinfo.oi_oa->o_id = lsm->lsm_object_id;
695         oinfo.oi_oa->o_seq = lsm->lsm_object_seq;
696         oinfo.oi_oa->o_mode = S_IFREG;
697         oinfo.oi_oa->o_ioepoch = ioepoch;
698         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
699                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
700                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
701                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
702                                OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
703                                OBD_MD_FLDATAVERSION;
704         oinfo.oi_capa = capa;
705         if (sync) {
706                 oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
707                 oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
708         }
709
710         set = ptlrpc_prep_set();
711         if (set == NULL) {
712                 CERROR("can't allocate ptlrpc set\n");
713                 rc = -ENOMEM;
714         } else {
715                 rc = obd_getattr_async(exp, &oinfo, set);
716                 if (rc == 0)
717                         rc = ptlrpc_set_wait(set);
718                 ptlrpc_set_destroy(set);
719         }
720         if (rc == 0)
721                 oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
722                                          OBD_MD_FLATIME | OBD_MD_FLMTIME |
723                                          OBD_MD_FLCTIME | OBD_MD_FLSIZE |
724                                          OBD_MD_FLDATAVERSION);
725         RETURN(rc);
726 }
727
728 /**
729   * Performs the getattr on the inode and updates its fields.
730   * If @sync != 0, perform the getattr under the server-side lock.
731   */
732 int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
733                      __u64 ioepoch, int sync)
734 {
735         struct obd_capa      *capa = ll_mdscapa_get(inode);
736         struct lov_stripe_md *lsm;
737         int rc;
738         ENTRY;
739
740         lsm = ccc_inode_lsm_get(inode);
741         rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
742                             capa, obdo, ioepoch, sync);
743         capa_put(capa);
744         if (rc == 0) {
745                 obdo_refresh_inode(inode, obdo, obdo->o_valid);
746                 CDEBUG(D_INODE,
747                        "objid "LPX64" size %llu, blocks %llu, blksize %lu\n",
748                        lsm ? lsm->lsm_object_id : 0, i_size_read(inode),
749                        (unsigned long long)inode->i_blocks,
750                        (unsigned long)ll_inode_blksize(inode));
751         }
752         ccc_inode_lsm_put(inode, lsm);
753         RETURN(rc);
754 }
755
756 int ll_merge_lvb(struct inode *inode)
757 {
758         struct ll_inode_info *lli = ll_i2info(inode);
759         struct ll_sb_info *sbi = ll_i2sbi(inode);
760         struct lov_stripe_md *lsm;
761         struct ost_lvb lvb;
762         int rc = 0;
763
764         ENTRY;
765
766         lsm = ccc_inode_lsm_get(inode);
767         ll_inode_size_lock(inode);
768         inode_init_lvb(inode, &lvb);
769
770         /* merge timestamps the most resently obtained from mds with
771            timestamps obtained from osts */
772         lvb.lvb_atime = lli->lli_lvb.lvb_atime;
773         lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
774         lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
775         if (lsm != NULL) {
776                 rc = obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
777                 cl_isize_write_nolock(inode, lvb.lvb_size);
778
779                 CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
780                                 PFID(&lli->lli_fid), lvb.lvb_size);
781                 inode->i_blocks = lvb.lvb_blocks;
782         }
783         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
784         LTIME_S(inode->i_atime) = lvb.lvb_atime;
785         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
786         ll_inode_size_unlock(inode);
787         ccc_inode_lsm_put(inode, lsm);
788
789         RETURN(rc);
790 }
791
792 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
793                      lstat_t *st)
794 {
795         struct obdo obdo = { 0 };
796         int rc;
797
798         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
799         if (rc == 0) {
800                 st->st_size   = obdo.o_size;
801                 st->st_blocks = obdo.o_blocks;
802                 st->st_mtime  = obdo.o_mtime;
803                 st->st_atime  = obdo.o_atime;
804                 st->st_ctime  = obdo.o_ctime;
805         }
806         return rc;
807 }
808
809 void ll_io_init(struct cl_io *io, const struct file *file, int write)
810 {
811         struct inode *inode = file->f_dentry->d_inode;
812
813         io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
814         if (write) {
815                 io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
816                 io->u.ci_wr.wr_sync = file->f_flags & O_SYNC || IS_SYNC(inode);
817         }
818         io->ci_obj     = ll_i2info(inode)->lli_clob;
819         io->ci_lockreq = CILR_MAYBE;
820         if (ll_file_nolock(file)) {
821                 io->ci_lockreq = CILR_NEVER;
822                 io->ci_no_srvlock = 1;
823         } else if (file->f_flags & O_APPEND) {
824                 io->ci_lockreq = CILR_MANDATORY;
825         }
826 }
827
828 static ssize_t
829 ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
830                    struct file *file, enum cl_io_type iot,
831                    loff_t *ppos, size_t count)
832 {
833         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
834         struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
835         struct cl_io         *io;
836         ssize_t               result;
837         ENTRY;
838
839         io = ccc_env_thread_io(env);
840         ll_io_init(io, file, iot == CIT_WRITE);
841
842         if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
843                 struct vvp_io *vio = vvp_env_io(env);
844                 struct ccc_io *cio = ccc_env_io(env);
845                 int write_mutex_locked = 0;
846
847                 cio->cui_fd  = LUSTRE_FPRIVATE(file);
848                 vio->cui_io_subtype = args->via_io_subtype;
849
850                 switch (vio->cui_io_subtype) {
851                 case IO_NORMAL:
852                         cio->cui_iov = args->u.normal.via_iov;
853                         cio->cui_nrsegs = args->u.normal.via_nrsegs;
854                         cio->cui_tot_nrsegs = cio->cui_nrsegs;
855 #ifndef HAVE_FILE_WRITEV
856                         cio->cui_iocb = args->u.normal.via_iocb;
857 #endif
858                         if ((iot == CIT_WRITE) &&
859                             !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
860                                 if (mutex_lock_interruptible(&lli->
861                                                                lli_write_mutex))
862                                         GOTO(out, result = -ERESTARTSYS);
863                                 write_mutex_locked = 1;
864                         } else if (iot == CIT_READ) {
865                                 down_read(&lli->lli_trunc_sem);
866                         }
867                         break;
868                 case IO_SENDFILE:
869                         vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
870                         vio->u.sendfile.cui_target = args->u.sendfile.via_target;
871                         break;
872                 case IO_SPLICE:
873                         vio->u.splice.cui_pipe = args->u.splice.via_pipe;
874                         vio->u.splice.cui_flags = args->u.splice.via_flags;
875                         break;
876                 default:
877                         CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
878                         LBUG();
879                 }
880                 result = cl_io_loop(env, io);
881                 if (write_mutex_locked)
882                         mutex_unlock(&lli->lli_write_mutex);
883                 else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
884                         up_read(&lli->lli_trunc_sem);
885         } else {
886                 /* cl_io_rw_init() handled IO */
887                 result = io->ci_result;
888         }
889
890         if (io->ci_nob > 0) {
891                 result = io->ci_nob;
892                 *ppos = io->u.ci_wr.wr.crw_pos;
893         }
894         GOTO(out, result);
895 out:
896         cl_io_fini(env, io);
897
898         if (iot == CIT_READ) {
899                 if (result >= 0)
900                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
901                                            LPROC_LL_READ_BYTES, result);
902         } else if (iot == CIT_WRITE) {
903                 if (result >= 0) {
904                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
905                                            LPROC_LL_WRITE_BYTES, result);
906                         fd->fd_write_failed = false;
907                 } else {
908                         fd->fd_write_failed = true;
909                 }
910         }
911
912         return result;
913 }
914
915
916 /*
917  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
918  */
919 static int ll_file_get_iov_count(const struct iovec *iov,
920                                  unsigned long *nr_segs, size_t *count)
921 {
922         size_t cnt = 0;
923         unsigned long seg;
924
925         for (seg = 0; seg < *nr_segs; seg++) {
926                 const struct iovec *iv = &iov[seg];
927
928                 /*
929                  * If any segment has a negative length, or the cumulative
930                  * length ever wraps negative then return -EINVAL.
931                  */
932                 cnt += iv->iov_len;
933                 if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
934                         return -EINVAL;
935                 if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
936                         continue;
937                 if (seg == 0)
938                         return -EFAULT;
939                 *nr_segs = seg;
940                 cnt -= iv->iov_len;   /* This segment is no good */
941                 break;
942         }
943         *count = cnt;
944         return 0;
945 }
946
947 #ifdef HAVE_FILE_READV
948 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
949                               unsigned long nr_segs, loff_t *ppos)
950 {
951         struct lu_env      *env;
952         struct vvp_io_args *args;
953         size_t              count;
954         ssize_t             result;
955         int                 refcheck;
956         ENTRY;
957
958         result = ll_file_get_iov_count(iov, &nr_segs, &count);
959         if (result)
960                 RETURN(result);
961
962         env = cl_env_get(&refcheck);
963         if (IS_ERR(env))
964                 RETURN(PTR_ERR(env));
965
966         args = vvp_env_args(env, IO_NORMAL);
967         args->u.normal.via_iov = (struct iovec *)iov;
968         args->u.normal.via_nrsegs = nr_segs;
969
970         result = ll_file_io_generic(env, args, file, CIT_READ, ppos, count);
971         cl_env_put(env, &refcheck);
972         RETURN(result);
973 }
974
975 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
976                             loff_t *ppos)
977 {
978         struct lu_env *env;
979         struct iovec  *local_iov;
980         ssize_t        result;
981         int            refcheck;
982         ENTRY;
983
984         env = cl_env_get(&refcheck);
985         if (IS_ERR(env))
986                 RETURN(PTR_ERR(env));
987
988         local_iov = &vvp_env_info(env)->vti_local_iov;
989         local_iov->iov_base = (void __user *)buf;
990         local_iov->iov_len = count;
991         result = ll_file_readv(file, local_iov, 1, ppos);
992         cl_env_put(env, &refcheck);
993         RETURN(result);
994 }
995
996 #else
997 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
998                                 unsigned long nr_segs, loff_t pos)
999 {
1000         struct lu_env      *env;
1001         struct vvp_io_args *args;
1002         size_t              count;
1003         ssize_t             result;
1004         int                 refcheck;
1005         ENTRY;
1006
1007         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1008         if (result)
1009                 RETURN(result);
1010
1011         env = cl_env_get(&refcheck);
1012         if (IS_ERR(env))
1013                 RETURN(PTR_ERR(env));
1014
1015         args = vvp_env_args(env, IO_NORMAL);
1016         args->u.normal.via_iov = (struct iovec *)iov;
1017         args->u.normal.via_nrsegs = nr_segs;
1018         args->u.normal.via_iocb = iocb;
1019
1020         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1021                                     &iocb->ki_pos, count);
1022         cl_env_put(env, &refcheck);
1023         RETURN(result);
1024 }
1025
1026 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1027                             loff_t *ppos)
1028 {
1029         struct lu_env *env;
1030         struct iovec  *local_iov;
1031         struct kiocb  *kiocb;
1032         ssize_t        result;
1033         int            refcheck;
1034         ENTRY;
1035
1036         env = cl_env_get(&refcheck);
1037         if (IS_ERR(env))
1038                 RETURN(PTR_ERR(env));
1039
1040         local_iov = &vvp_env_info(env)->vti_local_iov;
1041         kiocb = &vvp_env_info(env)->vti_kiocb;
1042         local_iov->iov_base = (void __user *)buf;
1043         local_iov->iov_len = count;
1044         init_sync_kiocb(kiocb, file);
1045         kiocb->ki_pos = *ppos;
1046         kiocb->ki_left = count;
1047
1048         result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1049         *ppos = kiocb->ki_pos;
1050
1051         cl_env_put(env, &refcheck);
1052         RETURN(result);
1053 }
1054 #endif
1055
1056 /*
1057  * Write to a file (through the page cache).
1058  */
1059 #ifdef HAVE_FILE_WRITEV
1060 static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
1061                               unsigned long nr_segs, loff_t *ppos)
1062 {
1063         struct lu_env      *env;
1064         struct vvp_io_args *args;
1065         size_t              count;
1066         ssize_t             result;
1067         int                 refcheck;
1068         ENTRY;
1069
1070         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1071         if (result)
1072                 RETURN(result);
1073
1074         env = cl_env_get(&refcheck);
1075         if (IS_ERR(env))
1076                 RETURN(PTR_ERR(env));
1077
1078         args = vvp_env_args(env, IO_NORMAL);
1079         args->u.normal.via_iov = (struct iovec *)iov;
1080         args->u.normal.via_nrsegs = nr_segs;
1081
1082         result = ll_file_io_generic(env, args, file, CIT_WRITE, ppos, count);
1083         cl_env_put(env, &refcheck);
1084         RETURN(result);
1085 }
1086
1087 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1088                              loff_t *ppos)
1089 {
1090         struct lu_env    *env;
1091         struct iovec     *local_iov;
1092         ssize_t           result;
1093         int               refcheck;
1094         ENTRY;
1095
1096         env = cl_env_get(&refcheck);
1097         if (IS_ERR(env))
1098                 RETURN(PTR_ERR(env));
1099
1100         local_iov = &vvp_env_info(env)->vti_local_iov;
1101         local_iov->iov_base = (void __user *)buf;
1102         local_iov->iov_len = count;
1103
1104         result = ll_file_writev(file, local_iov, 1, ppos);
1105         cl_env_put(env, &refcheck);
1106         RETURN(result);
1107 }
1108
1109 #else /* AIO stuff */
1110 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1111                                  unsigned long nr_segs, loff_t pos)
1112 {
1113         struct lu_env      *env;
1114         struct vvp_io_args *args;
1115         size_t              count;
1116         ssize_t             result;
1117         int                 refcheck;
1118         ENTRY;
1119
1120         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1121         if (result)
1122                 RETURN(result);
1123
1124         env = cl_env_get(&refcheck);
1125         if (IS_ERR(env))
1126                 RETURN(PTR_ERR(env));
1127
1128         args = vvp_env_args(env, IO_NORMAL);
1129         args->u.normal.via_iov = (struct iovec *)iov;
1130         args->u.normal.via_nrsegs = nr_segs;
1131         args->u.normal.via_iocb = iocb;
1132
1133         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1134                                   &iocb->ki_pos, count);
1135         cl_env_put(env, &refcheck);
1136         RETURN(result);
1137 }
1138
1139 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1140                              loff_t *ppos)
1141 {
1142         struct lu_env *env;
1143         struct iovec  *local_iov;
1144         struct kiocb  *kiocb;
1145         ssize_t        result;
1146         int            refcheck;
1147         ENTRY;
1148
1149         env = cl_env_get(&refcheck);
1150         if (IS_ERR(env))
1151                 RETURN(PTR_ERR(env));
1152
1153         local_iov = &vvp_env_info(env)->vti_local_iov;
1154         kiocb = &vvp_env_info(env)->vti_kiocb;
1155         local_iov->iov_base = (void __user *)buf;
1156         local_iov->iov_len = count;
1157         init_sync_kiocb(kiocb, file);
1158         kiocb->ki_pos = *ppos;
1159         kiocb->ki_left = count;
1160
1161         result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1162         *ppos = kiocb->ki_pos;
1163
1164         cl_env_put(env, &refcheck);
1165         RETURN(result);
1166 }
1167 #endif
1168
1169
1170 #ifdef HAVE_KERNEL_SENDFILE
1171 /*
1172  * Send file content (through pagecache) somewhere with helper
1173  */
1174 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1175                                 read_actor_t actor, void *target)
1176 {
1177         struct lu_env      *env;
1178         struct vvp_io_args *args;
1179         ssize_t             result;
1180         int                 refcheck;
1181         ENTRY;
1182
1183         env = cl_env_get(&refcheck);
1184         if (IS_ERR(env))
1185                 RETURN(PTR_ERR(env));
1186
1187         args = vvp_env_args(env, IO_SENDFILE);
1188         args->u.sendfile.via_target = target;
1189         args->u.sendfile.via_actor = actor;
1190
1191         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1192         cl_env_put(env, &refcheck);
1193         RETURN(result);
1194 }
1195 #endif
1196
1197 #ifdef HAVE_KERNEL_SPLICE_READ
1198 /*
1199  * Send file content (through pagecache) somewhere with helper
1200  */
1201 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1202                                    struct pipe_inode_info *pipe, size_t count,
1203                                    unsigned int flags)
1204 {
1205         struct lu_env      *env;
1206         struct vvp_io_args *args;
1207         ssize_t             result;
1208         int                 refcheck;
1209         ENTRY;
1210
1211         env = cl_env_get(&refcheck);
1212         if (IS_ERR(env))
1213                 RETURN(PTR_ERR(env));
1214
1215         args = vvp_env_args(env, IO_SPLICE);
1216         args->u.splice.via_pipe = pipe;
1217         args->u.splice.via_flags = flags;
1218
1219         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1220         cl_env_put(env, &refcheck);
1221         RETURN(result);
1222 }
1223 #endif
1224
1225 static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq,
1226                            obd_count ost_idx)
1227 {
1228         struct obd_export *exp = ll_i2dtexp(inode);
1229         struct obd_trans_info oti = { 0 };
1230         struct obdo *oa = NULL;
1231         int lsm_size;
1232         int rc = 0;
1233         struct lov_stripe_md *lsm = NULL, *lsm2;
1234         ENTRY;
1235
1236         OBDO_ALLOC(oa);
1237         if (oa == NULL)
1238                 RETURN(-ENOMEM);
1239
1240         lsm = ccc_inode_lsm_get(inode);
1241         if (lsm == NULL)
1242                 GOTO(out, rc = -ENOENT);
1243
1244         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1245                    (lsm->lsm_stripe_count));
1246
1247         OBD_ALLOC_LARGE(lsm2, lsm_size);
1248         if (lsm2 == NULL)
1249                 GOTO(out, rc = -ENOMEM);
1250
1251         oa->o_id = id;
1252         oa->o_seq = seq;
1253         oa->o_nlink = ost_idx;
1254         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1255         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1256         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1257                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1258         obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1259         memcpy(lsm2, lsm, lsm_size);
1260         ll_inode_size_lock(inode);
1261         rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1262         ll_inode_size_unlock(inode);
1263
1264         OBD_FREE_LARGE(lsm2, lsm_size);
1265         GOTO(out, rc);
1266 out:
1267         ccc_inode_lsm_put(inode, lsm);
1268         OBDO_FREE(oa);
1269         return rc;
1270 }
1271
1272 static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1273 {
1274         struct ll_recreate_obj ucreat;
1275         ENTRY;
1276
1277         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1278                 RETURN(-EPERM);
1279
1280         if (cfs_copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1281                                sizeof(struct ll_recreate_obj)))
1282                 RETURN(-EFAULT);
1283
1284         RETURN(ll_lov_recreate(inode, ucreat.lrc_id, 0,
1285                                ucreat.lrc_ost_idx));
1286 }
1287
1288 static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1289 {
1290         struct lu_fid fid;
1291         obd_id id;
1292         obd_count ost_idx;
1293         ENTRY;
1294
1295         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1296                 RETURN(-EPERM);
1297
1298         if (cfs_copy_from_user(&fid, (struct lu_fid *)arg,
1299                                sizeof(struct lu_fid)))
1300                 RETURN(-EFAULT);
1301
1302         id = fid_oid(&fid) | ((fid_seq(&fid) & 0xffff) << 32);
1303         ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1304         RETURN(ll_lov_recreate(inode, id, 0, ost_idx));
1305 }
1306
1307 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1308                              int flags, struct lov_user_md *lum, int lum_size)
1309 {
1310         struct lov_stripe_md *lsm = NULL;
1311         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1312         int rc = 0;
1313         ENTRY;
1314
1315         lsm = ccc_inode_lsm_get(inode);
1316         if (lsm != NULL) {
1317                 ccc_inode_lsm_put(inode, lsm);
1318                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1319                        inode->i_ino);
1320                 RETURN(-EEXIST);
1321         }
1322
1323         ll_inode_size_lock(inode);
1324         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1325         if (rc)
1326                 GOTO(out, rc);
1327         rc = oit.d.lustre.it_status;
1328         if (rc < 0)
1329                 GOTO(out_req_free, rc);
1330
1331         ll_release_openhandle(file->f_dentry, &oit);
1332
1333  out:
1334         ll_inode_size_unlock(inode);
1335         ll_intent_release(&oit);
1336         ccc_inode_lsm_put(inode, lsm);
1337         RETURN(rc);
1338 out_req_free:
1339         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1340         goto out;
1341 }
1342
1343 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1344                              struct lov_mds_md **lmmp, int *lmm_size,
1345                              struct ptlrpc_request **request)
1346 {
1347         struct ll_sb_info *sbi = ll_i2sbi(inode);
1348         struct mdt_body  *body;
1349         struct lov_mds_md *lmm = NULL;
1350         struct ptlrpc_request *req = NULL;
1351         struct md_op_data *op_data;
1352         int rc, lmmsize;
1353
1354         rc = ll_get_max_mdsize(sbi, &lmmsize);
1355         if (rc)
1356                 RETURN(rc);
1357
1358         op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1359                                      strlen(filename), lmmsize,
1360                                      LUSTRE_OPC_ANY, NULL);
1361         if (IS_ERR(op_data))
1362                 RETURN(PTR_ERR(op_data));
1363
1364         op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1365         rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1366         ll_finish_md_op_data(op_data);
1367         if (rc < 0) {
1368                 CDEBUG(D_INFO, "md_getattr_name failed "
1369                        "on %s: rc %d\n", filename, rc);
1370                 GOTO(out, rc);
1371         }
1372
1373         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1374         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1375
1376         lmmsize = body->eadatasize;
1377
1378         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1379                         lmmsize == 0) {
1380                 GOTO(out, rc = -ENODATA);
1381         }
1382
1383         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1384         LASSERT(lmm != NULL);
1385
1386         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1387             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1388                 GOTO(out, rc = -EPROTO);
1389         }
1390
1391         /*
1392          * This is coming from the MDS, so is probably in
1393          * little endian.  We convert it to host endian before
1394          * passing it to userspace.
1395          */
1396         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1397                 /* if function called for directory - we should
1398                  * avoid swab not existent lsm objects */
1399                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1400                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1401                         if (S_ISREG(body->mode))
1402                                 lustre_swab_lov_user_md_objects(
1403                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1404                                  ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
1405                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1406                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1407                         if (S_ISREG(body->mode))
1408                                 lustre_swab_lov_user_md_objects(
1409                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1410                                  ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
1411                 }
1412         }
1413
1414 out:
1415         *lmmp = lmm;
1416         *lmm_size = lmmsize;
1417         *request = req;
1418         return rc;
1419 }
1420
1421 static int ll_lov_setea(struct inode *inode, struct file *file,
1422                             unsigned long arg)
1423 {
1424         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1425         struct lov_user_md  *lump;
1426         int lum_size = sizeof(struct lov_user_md) +
1427                        sizeof(struct lov_user_ost_data);
1428         int rc;
1429         ENTRY;
1430
1431         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1432                 RETURN(-EPERM);
1433
1434         OBD_ALLOC_LARGE(lump, lum_size);
1435         if (lump == NULL) {
1436                 RETURN(-ENOMEM);
1437         }
1438         if (cfs_copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1439                 OBD_FREE_LARGE(lump, lum_size);
1440                 RETURN(-EFAULT);
1441         }
1442
1443         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1444
1445         OBD_FREE_LARGE(lump, lum_size);
1446         RETURN(rc);
1447 }
1448
1449 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1450                             unsigned long arg)
1451 {
1452         struct lov_user_md_v3 lumv3;
1453         struct lov_user_md_v1 *lumv1 = (struct lov_user_md_v1 *)&lumv3;
1454         struct lov_user_md_v1 *lumv1p = (struct lov_user_md_v1 *)arg;
1455         struct lov_user_md_v3 *lumv3p = (struct lov_user_md_v3 *)arg;
1456         int lum_size;
1457         int rc;
1458         int flags = FMODE_WRITE;
1459         ENTRY;
1460
1461         /* first try with v1 which is smaller than v3 */
1462         lum_size = sizeof(struct lov_user_md_v1);
1463         if (cfs_copy_from_user(lumv1, lumv1p, lum_size))
1464                 RETURN(-EFAULT);
1465
1466         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1467                 lum_size = sizeof(struct lov_user_md_v3);
1468                 if (cfs_copy_from_user(&lumv3, lumv3p, lum_size))
1469                         RETURN(-EFAULT);
1470         }
1471
1472         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1473         if (rc == 0) {
1474                 struct lov_stripe_md *lsm;
1475                 put_user(0, &lumv1p->lmm_stripe_count);
1476                 lsm = ccc_inode_lsm_get(inode);
1477                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1478                                    0, lsm, (void *)arg);
1479                 ccc_inode_lsm_put(inode, lsm);
1480         }
1481         RETURN(rc);
1482 }
1483
1484 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1485 {
1486         struct lov_stripe_md *lsm;
1487         int rc = -ENODATA;
1488         ENTRY;
1489
1490         lsm = ccc_inode_lsm_get(inode);
1491         if (lsm != NULL)
1492                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1493                                    lsm, (void *)arg);
1494         ccc_inode_lsm_put(inode, lsm);
1495         RETURN(rc);
1496 }
1497
1498 int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1499 {
1500         struct ll_inode_info   *lli = ll_i2info(inode);
1501         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1502         struct ccc_grouplock    grouplock;
1503         int                     rc;
1504         ENTRY;
1505
1506         if (ll_file_nolock(file))
1507                 RETURN(-EOPNOTSUPP);
1508
1509         spin_lock(&lli->lli_lock);
1510         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1511                 CWARN("group lock already existed with gid %lu\n",
1512                       fd->fd_grouplock.cg_gid);
1513                 spin_unlock(&lli->lli_lock);
1514                 RETURN(-EINVAL);
1515         }
1516         LASSERT(fd->fd_grouplock.cg_lock == NULL);
1517         spin_unlock(&lli->lli_lock);
1518
1519         rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1520                               arg, (file->f_flags & O_NONBLOCK), &grouplock);
1521         if (rc)
1522                 RETURN(rc);
1523
1524         spin_lock(&lli->lli_lock);
1525         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1526                 spin_unlock(&lli->lli_lock);
1527                 CERROR("another thread just won the race\n");
1528                 cl_put_grouplock(&grouplock);
1529                 RETURN(-EINVAL);
1530         }
1531
1532         fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1533         fd->fd_grouplock = grouplock;
1534         spin_unlock(&lli->lli_lock);
1535
1536         CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1537         RETURN(0);
1538 }
1539
1540 int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1541 {
1542         struct ll_inode_info   *lli = ll_i2info(inode);
1543         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1544         struct ccc_grouplock    grouplock;
1545         ENTRY;
1546
1547         spin_lock(&lli->lli_lock);
1548         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1549                 spin_unlock(&lli->lli_lock);
1550                 CWARN("no group lock held\n");
1551                 RETURN(-EINVAL);
1552         }
1553         LASSERT(fd->fd_grouplock.cg_lock != NULL);
1554
1555         if (fd->fd_grouplock.cg_gid != arg) {
1556                 CWARN("group lock %lu doesn't match current id %lu\n",
1557                        arg, fd->fd_grouplock.cg_gid);
1558                 spin_unlock(&lli->lli_lock);
1559                 RETURN(-EINVAL);
1560         }
1561
1562         grouplock = fd->fd_grouplock;
1563         memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1564         fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1565         spin_unlock(&lli->lli_lock);
1566
1567         cl_put_grouplock(&grouplock);
1568         CDEBUG(D_INFO, "group lock %lu released\n", arg);
1569         RETURN(0);
1570 }
1571
1572 /**
1573  * Close inode open handle
1574  *
1575  * \param dentry [in]     dentry which contains the inode
1576  * \param it     [in,out] intent which contains open info and result
1577  *
1578  * \retval 0     success
1579  * \retval <0    failure
1580  */
1581 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1582 {
1583         struct inode *inode = dentry->d_inode;
1584         struct obd_client_handle *och;
1585         int rc;
1586         ENTRY;
1587
1588         LASSERT(inode);
1589
1590         /* Root ? Do nothing. */
1591         if (dentry->d_inode->i_sb->s_root == dentry)
1592                 RETURN(0);
1593
1594         /* No open handle to close? Move away */
1595         if (!it_disposition(it, DISP_OPEN_OPEN))
1596                 RETURN(0);
1597
1598         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1599
1600         OBD_ALLOC(och, sizeof(*och));
1601         if (!och)
1602                 GOTO(out, rc = -ENOMEM);
1603
1604         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
1605                     ll_i2info(inode), it, och);
1606
1607         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1608                                        inode, och);
1609  out:
1610         /* this one is in place of ll_file_open */
1611         if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1612                 ptlrpc_req_finished(it->d.lustre.it_data);
1613                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1614         }
1615         RETURN(rc);
1616 }
1617
1618 /**
1619  * Get size for inode for which FIEMAP mapping is requested.
1620  * Make the FIEMAP get_info call and returns the result.
1621  */
1622 int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1623               int num_bytes)
1624 {
1625         struct obd_export *exp = ll_i2dtexp(inode);
1626         struct lov_stripe_md *lsm = NULL;
1627         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1628         int vallen = num_bytes;
1629         int rc;
1630         ENTRY;
1631
1632         /* Checks for fiemap flags */
1633         if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1634                 fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1635                 return -EBADR;
1636         }
1637
1638         /* Check for FIEMAP_FLAG_SYNC */
1639         if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1640                 rc = filemap_fdatawrite(inode->i_mapping);
1641                 if (rc)
1642                         return rc;
1643         }
1644
1645         lsm = ccc_inode_lsm_get(inode);
1646         if (lsm == NULL)
1647                 return -ENOENT;
1648
1649         /* If the stripe_count > 1 and the application does not understand
1650          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1651          */
1652         if (lsm->lsm_stripe_count > 1 &&
1653             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1654                 GOTO(out, rc = -EOPNOTSUPP);
1655
1656         fm_key.oa.o_id = lsm->lsm_object_id;
1657         fm_key.oa.o_seq = lsm->lsm_object_seq;
1658         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1659
1660         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1661         obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1662         /* If filesize is 0, then there would be no objects for mapping */
1663         if (fm_key.oa.o_size == 0) {
1664                 fiemap->fm_mapped_extents = 0;
1665                 GOTO(out, rc = 0);
1666         }
1667
1668         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1669
1670         rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1671                           fiemap, lsm);
1672         if (rc)
1673                 CERROR("obd_get_info failed: rc = %d\n", rc);
1674
1675 out:
1676         ccc_inode_lsm_put(inode, lsm);
1677         RETURN(rc);
1678 }
1679
1680 int ll_fid2path(struct inode *inode, void *arg)
1681 {
1682         struct obd_export *exp = ll_i2mdexp(inode);
1683         struct getinfo_fid2path *gfout, *gfin;
1684         int outsize, rc;
1685         ENTRY;
1686
1687         if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
1688             !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
1689                 RETURN(-EPERM);
1690
1691         /* Need to get the buflen */
1692         OBD_ALLOC_PTR(gfin);
1693         if (gfin == NULL)
1694                 RETURN(-ENOMEM);
1695         if (cfs_copy_from_user(gfin, arg, sizeof(*gfin))) {
1696                 OBD_FREE_PTR(gfin);
1697                 RETURN(-EFAULT);
1698         }
1699
1700         outsize = sizeof(*gfout) + gfin->gf_pathlen;
1701         OBD_ALLOC(gfout, outsize);
1702         if (gfout == NULL) {
1703                 OBD_FREE_PTR(gfin);
1704                 RETURN(-ENOMEM);
1705         }
1706         memcpy(gfout, gfin, sizeof(*gfout));
1707         OBD_FREE_PTR(gfin);
1708
1709         /* Call mdc_iocontrol */
1710         rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1711         if (rc)
1712                 GOTO(gf_free, rc);
1713         if (cfs_copy_to_user(arg, gfout, outsize))
1714                 rc = -EFAULT;
1715
1716 gf_free:
1717         OBD_FREE(gfout, outsize);
1718         RETURN(rc);
1719 }
1720
1721 static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1722 {
1723         struct ll_user_fiemap *fiemap_s;
1724         size_t num_bytes, ret_bytes;
1725         unsigned int extent_count;
1726         int rc = 0;
1727
1728         /* Get the extent count so we can calculate the size of
1729          * required fiemap buffer */
1730         if (get_user(extent_count,
1731             &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1732                 RETURN(-EFAULT);
1733         num_bytes = sizeof(*fiemap_s) + (extent_count *
1734                                          sizeof(struct ll_fiemap_extent));
1735
1736         OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1737         if (fiemap_s == NULL)
1738                 RETURN(-ENOMEM);
1739
1740         /* get the fiemap value */
1741         if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
1742                            sizeof(*fiemap_s)))
1743                 GOTO(error, rc = -EFAULT);
1744
1745         /* If fm_extent_count is non-zero, read the first extent since
1746          * it is used to calculate end_offset and device from previous
1747          * fiemap call. */
1748         if (extent_count) {
1749                 if (copy_from_user(&fiemap_s->fm_extents[0],
1750                     (char __user *)arg + sizeof(*fiemap_s),
1751                     sizeof(struct ll_fiemap_extent)))
1752                         GOTO(error, rc = -EFAULT);
1753         }
1754
1755         rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1756         if (rc)
1757                 GOTO(error, rc);
1758
1759         ret_bytes = sizeof(struct ll_user_fiemap);
1760
1761         if (extent_count != 0)
1762                 ret_bytes += (fiemap_s->fm_mapped_extents *
1763                                  sizeof(struct ll_fiemap_extent));
1764
1765         if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1766                 rc = -EFAULT;
1767
1768 error:
1769         OBD_FREE_LARGE(fiemap_s, num_bytes);
1770         RETURN(rc);
1771 }
1772
1773 /*
1774  * Read the data_version for inode.
1775  *
1776  * This value is computed using stripe object version on OST.
1777  * Version is computed using server side locking.
1778  *
1779  * @param extent_lock  Take extent lock. Not needed if a process is already
1780  *                     holding the OST object group locks.
1781  */
1782 static int ll_data_version(struct inode *inode, __u64 *data_version,
1783                            int extent_lock)
1784 {
1785         struct lov_stripe_md *lsm = NULL;
1786         struct ll_sb_info    *sbi = ll_i2sbi(inode);
1787         struct obdo          *obdo = NULL;
1788         int                   rc;
1789         ENTRY;
1790
1791         /* If no stripe, we consider version is 0. */
1792         lsm = ccc_inode_lsm_get(inode);
1793         if (lsm == NULL) {
1794                 *data_version = 0;
1795                 CDEBUG(D_INODE, "No object for inode\n");
1796                 RETURN(0);
1797         }
1798
1799         OBD_ALLOC_PTR(obdo);
1800         if (obdo == NULL) {
1801                 ccc_inode_lsm_put(inode, lsm);
1802                 RETURN(-ENOMEM);
1803         }
1804
1805         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
1806         if (!rc) {
1807                 if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1808                         rc = -EOPNOTSUPP;
1809                 else
1810                         *data_version = obdo->o_data_version;
1811         }
1812
1813         OBD_FREE_PTR(obdo);
1814         ccc_inode_lsm_put(inode, lsm);
1815
1816         RETURN(rc);
1817 }
1818
1819 long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
1820 {
1821         struct inode *inode = file->f_dentry->d_inode;
1822         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1823         int flags;
1824
1825         ENTRY;
1826
1827         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1828                inode->i_generation, inode, cmd);
1829         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
1830
1831         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
1832         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
1833                 RETURN(-ENOTTY);
1834
1835         switch(cmd) {
1836         case LL_IOC_GETFLAGS:
1837                 /* Get the current value of the file flags */
1838                 return put_user(fd->fd_flags, (int *)arg);
1839         case LL_IOC_SETFLAGS:
1840         case LL_IOC_CLRFLAGS:
1841                 /* Set or clear specific file flags */
1842                 /* XXX This probably needs checks to ensure the flags are
1843                  *     not abused, and to handle any flag side effects.
1844                  */
1845                 if (get_user(flags, (int *) arg))
1846                         RETURN(-EFAULT);
1847
1848                 if (cmd == LL_IOC_SETFLAGS) {
1849                         if ((flags & LL_FILE_IGNORE_LOCK) &&
1850                             !(file->f_flags & O_DIRECT)) {
1851                                 CERROR("%s: unable to disable locking on "
1852                                        "non-O_DIRECT file\n", current->comm);
1853                                 RETURN(-EINVAL);
1854                         }
1855
1856                         fd->fd_flags |= flags;
1857                 } else {
1858                         fd->fd_flags &= ~flags;
1859                 }
1860                 RETURN(0);
1861         case LL_IOC_LOV_SETSTRIPE:
1862                 RETURN(ll_lov_setstripe(inode, file, arg));
1863         case LL_IOC_LOV_SETEA:
1864                 RETURN(ll_lov_setea(inode, file, arg));
1865         case LL_IOC_LOV_GETSTRIPE:
1866                 RETURN(ll_lov_getstripe(inode, arg));
1867         case LL_IOC_RECREATE_OBJ:
1868                 RETURN(ll_lov_recreate_obj(inode, arg));
1869         case LL_IOC_RECREATE_FID:
1870                 RETURN(ll_lov_recreate_fid(inode, arg));
1871         case FSFILT_IOC_FIEMAP:
1872                 RETURN(ll_ioctl_fiemap(inode, arg));
1873         case FSFILT_IOC_GETFLAGS:
1874         case FSFILT_IOC_SETFLAGS:
1875                 RETURN(ll_iocontrol(inode, file, cmd, arg));
1876         case FSFILT_IOC_GETVERSION_OLD:
1877         case FSFILT_IOC_GETVERSION:
1878                 RETURN(put_user(inode->i_generation, (int *)arg));
1879         case LL_IOC_GROUP_LOCK:
1880                 RETURN(ll_get_grouplock(inode, file, arg));
1881         case LL_IOC_GROUP_UNLOCK:
1882                 RETURN(ll_put_grouplock(inode, file, arg));
1883         case IOC_OBD_STATFS:
1884                 RETURN(ll_obd_statfs(inode, (void *)arg));
1885
1886         /* We need to special case any other ioctls we want to handle,
1887          * to send them to the MDS/OST as appropriate and to properly
1888          * network encode the arg field.
1889         case FSFILT_IOC_SETVERSION_OLD:
1890         case FSFILT_IOC_SETVERSION:
1891         */
1892         case LL_IOC_FLUSHCTX:
1893                 RETURN(ll_flush_ctx(inode));
1894         case LL_IOC_PATH2FID: {
1895                 if (cfs_copy_to_user((void *)arg, ll_inode2fid(inode),
1896                                      sizeof(struct lu_fid)))
1897                         RETURN(-EFAULT);
1898
1899                 RETURN(0);
1900         }
1901         case OBD_IOC_FID2PATH:
1902                 RETURN(ll_fid2path(inode, (void *)arg));
1903         case LL_IOC_DATA_VERSION: {
1904                 struct ioc_data_version idv;
1905                 int rc;
1906
1907                 if (cfs_copy_from_user(&idv, (char *)arg, sizeof(idv)))
1908                         RETURN(-EFAULT);
1909
1910                 rc = ll_data_version(inode, &idv.idv_version,
1911                                      !(idv.idv_flags & LL_DV_NOFLUSH));
1912
1913                 if (rc == 0 &&
1914                     cfs_copy_to_user((char *) arg, &idv, sizeof(idv)))
1915                         RETURN(-EFAULT);
1916
1917                 RETURN(rc);
1918         }
1919
1920         case LL_IOC_GET_MDTIDX: {
1921                 int mdtidx;
1922
1923                 mdtidx = ll_get_mdt_idx(inode);
1924                 if (mdtidx < 0)
1925                         RETURN(mdtidx);
1926
1927                 if (put_user((int)mdtidx, (int*)arg))
1928                         RETURN(-EFAULT);
1929
1930                 RETURN(0);
1931         }
1932         case OBD_IOC_GETDTNAME:
1933         case OBD_IOC_GETMDNAME:
1934                 RETURN(ll_get_obd_name(inode, cmd, arg));
1935         default: {
1936                 int err;
1937
1938                 if (LLIOC_STOP ==
1939                     ll_iocontrol_call(inode, file, cmd, arg, &err))
1940                         RETURN(err);
1941
1942                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
1943                                      (void *)arg));
1944         }
1945         }
1946 }
1947
1948 #ifndef HAVE_FILE_LLSEEK_SIZE
1949 static inline loff_t
1950 llseek_execute(struct file *file, loff_t offset, loff_t maxsize)
1951 {
1952         if (offset < 0 && !(file->f_mode & FMODE_UNSIGNED_OFFSET))
1953                 return -EINVAL;
1954         if (offset > maxsize)
1955                 return -EINVAL;
1956
1957         if (offset != file->f_pos) {
1958                 file->f_pos = offset;
1959                 file->f_version = 0;
1960         }
1961         return offset;
1962 }
1963
1964 static loff_t
1965 generic_file_llseek_size(struct file *file, loff_t offset, int origin,
1966                 loff_t maxsize, loff_t eof)
1967 {
1968         struct inode *inode = file->f_dentry->d_inode;
1969
1970         switch (origin) {
1971         case SEEK_END:
1972                 offset += eof;
1973                 break;
1974         case SEEK_CUR:
1975                 /*
1976                  * Here we special-case the lseek(fd, 0, SEEK_CUR)
1977                  * position-querying operation.  Avoid rewriting the "same"
1978                  * f_pos value back to the file because a concurrent read(),
1979                  * write() or lseek() might have altered it
1980                  */
1981                 if (offset == 0)
1982                         return file->f_pos;
1983                 /*
1984                  * f_lock protects against read/modify/write race with other
1985                  * SEEK_CURs. Note that parallel writes and reads behave
1986                  * like SEEK_SET.
1987                  */
1988                 mutex_lock(&inode->i_mutex);
1989                 offset = llseek_execute(file, file->f_pos + offset, maxsize);
1990                 mutex_unlock(&inode->i_mutex);
1991                 return offset;
1992         case SEEK_DATA:
1993                 /*
1994                  * In the generic case the entire file is data, so as long as
1995                  * offset isn't at the end of the file then the offset is data.
1996                  */
1997                 if (offset >= eof)
1998                         return -ENXIO;
1999                 break;
2000         case SEEK_HOLE:
2001                 /*
2002                  * There is a virtual hole at the end of the file, so as long as
2003                  * offset isn't i_size or larger, return i_size.
2004                  */
2005                 if (offset >= eof)
2006                         return -ENXIO;
2007                 offset = eof;
2008                 break;
2009         }
2010
2011         return llseek_execute(file, offset, maxsize);
2012 }
2013 #endif
2014
2015 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2016 {
2017         struct inode *inode = file->f_dentry->d_inode;
2018         loff_t retval, eof = 0;
2019
2020         ENTRY;
2021         retval = offset + ((origin == SEEK_END) ? i_size_read(inode) :
2022                            (origin == SEEK_CUR) ? file->f_pos : 0);
2023         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%d)\n",
2024                inode->i_ino, inode->i_generation, inode, retval, retval,
2025                origin);
2026         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2027
2028         if (origin == SEEK_END || origin == SEEK_HOLE || origin == SEEK_DATA) {
2029                 retval = ll_glimpse_size(inode);
2030                 if (retval != 0)
2031                         RETURN(retval);
2032                 eof = i_size_read(inode);
2033         }
2034
2035         retval = generic_file_llseek_size(file, offset, origin,
2036                                           ll_file_maxbytes(inode), eof);
2037         RETURN(retval);
2038 }
2039
2040 int ll_flush(struct file *file, fl_owner_t id)
2041 {
2042         struct inode *inode = file->f_dentry->d_inode;
2043         struct ll_inode_info *lli = ll_i2info(inode);
2044         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2045         int rc, err;
2046
2047         LASSERT(!S_ISDIR(inode->i_mode));
2048
2049         /* catch async errors that were recorded back when async writeback
2050          * failed for pages in this mapping. */
2051         rc = lli->lli_async_rc;
2052         lli->lli_async_rc = 0;
2053         err = lov_read_and_clear_async_rc(lli->lli_clob);
2054         if (rc == 0)
2055                 rc = err;
2056
2057         /* The application has been told write failure already.
2058          * Do not report failure again. */
2059         if (fd->fd_write_failed)
2060                 return 0;
2061         return rc ? -EIO : 0;
2062 }
2063
2064 /**
2065  * Called to make sure a portion of file has been written out.
2066  * if @local_only is not true, it will send OST_SYNC RPCs to ost.
2067  *
2068  * Return how many pages have been written.
2069  */
2070 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2071                        enum cl_fsync_mode mode)
2072 {
2073         struct cl_env_nest nest;
2074         struct lu_env *env;
2075         struct cl_io *io;
2076         struct obd_capa *capa = NULL;
2077         struct cl_fsync_io *fio;
2078         int result;
2079         ENTRY;
2080
2081         if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2082             mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2083                 RETURN(-EINVAL);
2084
2085         env = cl_env_nested_get(&nest);
2086         if (IS_ERR(env))
2087                 RETURN(PTR_ERR(env));
2088
2089         capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2090
2091         io = ccc_env_thread_io(env);
2092         io->ci_obj = cl_i2info(inode)->lli_clob;
2093         io->ci_ignore_layout = 1;
2094
2095         /* initialize parameters for sync */
2096         fio = &io->u.ci_fsync;
2097         fio->fi_capa = capa;
2098         fio->fi_start = start;
2099         fio->fi_end = end;
2100         fio->fi_fid = ll_inode2fid(inode);
2101         fio->fi_mode = mode;
2102         fio->fi_nr_written = 0;
2103
2104         if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2105                 result = cl_io_loop(env, io);
2106         else
2107                 result = io->ci_result;
2108         if (result == 0)
2109                 result = fio->fi_nr_written;
2110         cl_io_fini(env, io);
2111         cl_env_nested_put(&nest, env);
2112
2113         capa_put(capa);
2114
2115         RETURN(result);
2116 }
2117
2118 #ifdef HAVE_FILE_FSYNC_4ARGS
2119 int ll_fsync(struct file *file, loff_t start, loff_t end, int data)
2120 #elif defined(HAVE_FILE_FSYNC_2ARGS)
2121 int ll_fsync(struct file *file, int data)
2122 #else
2123 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2124 #endif
2125 {
2126         struct inode *inode = file->f_dentry->d_inode;
2127         struct ll_inode_info *lli = ll_i2info(inode);
2128         struct ptlrpc_request *req;
2129         struct obd_capa *oc;
2130         int rc, err;
2131         ENTRY;
2132
2133         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2134                inode->i_generation, inode);
2135         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2136
2137 #ifdef HAVE_FILE_FSYNC_4ARGS
2138         rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
2139         mutex_lock(&inode->i_mutex);
2140 #else
2141         /* fsync's caller has already called _fdata{sync,write}, we want
2142          * that IO to finish before calling the osc and mdc sync methods */
2143         rc = filemap_fdatawait(inode->i_mapping);
2144 #endif
2145
2146         /* catch async errors that were recorded back when async writeback
2147          * failed for pages in this mapping. */
2148         if (!S_ISDIR(inode->i_mode)) {
2149                 err = lli->lli_async_rc;
2150                 lli->lli_async_rc = 0;
2151                 if (rc == 0)
2152                         rc = err;
2153                 err = lov_read_and_clear_async_rc(lli->lli_clob);
2154                 if (rc == 0)
2155                         rc = err;
2156         }
2157
2158         oc = ll_mdscapa_get(inode);
2159         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2160                       &req);
2161         capa_put(oc);
2162         if (!rc)
2163                 rc = err;
2164         if (!err)
2165                 ptlrpc_req_finished(req);
2166
2167         if (data) {
2168                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2169
2170                 err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
2171                                 CL_FSYNC_ALL);
2172                 if (rc == 0 && err < 0)
2173                         rc = err;
2174                 if (rc < 0)
2175                         fd->fd_write_failed = true;
2176                 else
2177                         fd->fd_write_failed = false;
2178         }
2179
2180 #ifdef HAVE_FILE_FSYNC_4ARGS
2181         mutex_unlock(&inode->i_mutex);
2182 #endif
2183         RETURN(rc);
2184 }
2185
2186 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2187 {
2188         struct inode *inode = file->f_dentry->d_inode;
2189         struct ll_sb_info *sbi = ll_i2sbi(inode);
2190         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
2191                                            .ei_cb_cp =ldlm_flock_completion_ast,
2192                                            .ei_cbdata = file_lock };
2193         struct md_op_data *op_data;
2194         struct lustre_handle lockh = {0};
2195         ldlm_policy_data_t flock = {{0}};
2196         int flags = 0;
2197         int rc;
2198         ENTRY;
2199
2200         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2201                inode->i_ino, file_lock);
2202
2203         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2204
2205         if (file_lock->fl_flags & FL_FLOCK) {
2206                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2207                 /* flocks are whole-file locks */
2208                 flock.l_flock.end = OFFSET_MAX;
2209                 /* For flocks owner is determined by the local file desctiptor*/
2210                 flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2211         } else if (file_lock->fl_flags & FL_POSIX) {
2212                 flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2213                 flock.l_flock.start = file_lock->fl_start;
2214                 flock.l_flock.end = file_lock->fl_end;
2215         } else {
2216                 RETURN(-EINVAL);
2217         }
2218         flock.l_flock.pid = file_lock->fl_pid;
2219
2220         /* Somewhat ugly workaround for svc lockd.
2221          * lockd installs custom fl_lmops->lm_compare_owner that checks
2222          * for the fl_owner to be the same (which it always is on local node
2223          * I guess between lockd processes) and then compares pid.
2224          * As such we assign pid to the owner field to make it all work,
2225          * conflict with normal locks is unlikely since pid space and
2226          * pointer space for current->files are not intersecting */
2227         if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
2228                 flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2229
2230         switch (file_lock->fl_type) {
2231         case F_RDLCK:
2232                 einfo.ei_mode = LCK_PR;
2233                 break;
2234         case F_UNLCK:
2235                 /* An unlock request may or may not have any relation to
2236                  * existing locks so we may not be able to pass a lock handle
2237                  * via a normal ldlm_lock_cancel() request. The request may even
2238                  * unlock a byte range in the middle of an existing lock. In
2239                  * order to process an unlock request we need all of the same
2240                  * information that is given with a normal read or write record
2241                  * lock request. To avoid creating another ldlm unlock (cancel)
2242                  * message we'll treat a LCK_NL flock request as an unlock. */
2243                 einfo.ei_mode = LCK_NL;
2244                 break;
2245         case F_WRLCK:
2246                 einfo.ei_mode = LCK_PW;
2247                 break;
2248         default:
2249                 CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2250                         file_lock->fl_type);
2251                 RETURN (-ENOTSUPP);
2252         }
2253
2254         switch (cmd) {
2255         case F_SETLKW:
2256 #ifdef F_SETLKW64
2257         case F_SETLKW64:
2258 #endif
2259                 flags = 0;
2260                 break;
2261         case F_SETLK:
2262 #ifdef F_SETLK64
2263         case F_SETLK64:
2264 #endif
2265                 flags = LDLM_FL_BLOCK_NOWAIT;
2266                 break;
2267         case F_GETLK:
2268 #ifdef F_GETLK64
2269         case F_GETLK64:
2270 #endif
2271                 flags = LDLM_FL_TEST_LOCK;
2272                 /* Save the old mode so that if the mode in the lock changes we
2273                  * can decrement the appropriate reader or writer refcount. */
2274                 file_lock->fl_type = einfo.ei_mode;
2275                 break;
2276         default:
2277                 CERROR("unknown fcntl lock command: %d\n", cmd);
2278                 RETURN (-EINVAL);
2279         }
2280
2281         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2282                                      LUSTRE_OPC_ANY, NULL);
2283         if (IS_ERR(op_data))
2284                 RETURN(PTR_ERR(op_data));
2285
2286         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2287                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2288                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2289
2290         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2291                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2292
2293         ll_finish_md_op_data(op_data);
2294
2295         if ((file_lock->fl_flags & FL_FLOCK) &&
2296             (rc == 0 || file_lock->fl_type == F_UNLCK))
2297                 flock_lock_file_wait(file, file_lock);
2298         if ((file_lock->fl_flags & FL_POSIX) &&
2299             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2300             !(flags & LDLM_FL_TEST_LOCK))
2301                 posix_lock_file_wait(file, file_lock);
2302
2303         RETURN(rc);
2304 }
2305
2306 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2307 {
2308         ENTRY;
2309
2310         RETURN(-ENOSYS);
2311 }
2312
2313 /**
2314  * test if some locks matching bits and l_req_mode are acquired
2315  * - bits can be in different locks
2316  * - if found clear the common lock bits in *bits
2317  * - the bits not found, are kept in *bits
2318  * \param inode [IN]
2319  * \param bits [IN] searched lock bits [IN]
2320  * \param l_req_mode [IN] searched lock mode
2321  * \retval boolean, true iff all bits are found
2322  */
2323 int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2324 {
2325         struct lustre_handle lockh;
2326         ldlm_policy_data_t policy;
2327         ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
2328                                 (LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
2329         struct lu_fid *fid;
2330         __u64 flags;
2331         int i;
2332         ENTRY;
2333
2334         if (!inode)
2335                RETURN(0);
2336
2337         fid = &ll_i2info(inode)->lli_fid;
2338         CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
2339                ldlm_lockname[mode]);
2340
2341         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2342         for (i = 0; i < MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
2343                 policy.l_inodebits.bits = *bits & (1 << i);
2344                 if (policy.l_inodebits.bits == 0)
2345                         continue;
2346
2347                 if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
2348                                   &policy, mode, &lockh)) {
2349                         struct ldlm_lock *lock;
2350
2351                         lock = ldlm_handle2lock(&lockh);
2352                         if (lock) {
2353                                 *bits &=
2354                                       ~(lock->l_policy_data.l_inodebits.bits);
2355                                 LDLM_LOCK_PUT(lock);
2356                         } else {
2357                                 *bits &= ~policy.l_inodebits.bits;
2358                         }
2359                 }
2360         }
2361         RETURN(*bits == 0);
2362 }
2363
2364 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2365                             struct lustre_handle *lockh, __u64 flags)
2366 {
2367         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2368         struct lu_fid *fid;
2369         ldlm_mode_t rc;
2370         ENTRY;
2371
2372         fid = &ll_i2info(inode)->lli_fid;
2373         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2374
2375         rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
2376                            fid, LDLM_IBITS, &policy,
2377                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2378         RETURN(rc);
2379 }
2380
2381 static int ll_inode_revalidate_fini(struct inode *inode, int rc)
2382 {
2383         /* Already unlinked. Just update nlink and return success */
2384         if (rc == -ENOENT) {
2385                 clear_nlink(inode);
2386                 /* This path cannot be hit for regular files unless in
2387                  * case of obscure races, so no need to to validate
2388                  * size. */
2389                 if (!S_ISREG(inode->i_mode) && !S_ISDIR(inode->i_mode))
2390                         return 0;
2391         } else if (rc != 0) {
2392                 CERROR("%s: revalidate FID "DFID" error: rc = %d\n",
2393                        ll_get_fsname(inode->i_sb, NULL, 0),
2394                        PFID(ll_inode2fid(inode)), rc);
2395         }
2396
2397         return rc;
2398 }
2399
2400 int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2401                              __u64 ibits)
2402 {
2403         struct inode *inode = dentry->d_inode;
2404         struct ptlrpc_request *req = NULL;
2405         struct obd_export *exp;
2406         int rc = 0;
2407         ENTRY;
2408
2409         LASSERT(inode != NULL);
2410
2411         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2412                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2413
2414         exp = ll_i2mdexp(inode);
2415
2416         /* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
2417          *      But under CMD case, it caused some lock issues, should be fixed
2418          *      with new CMD ibits lock. See bug 12718 */
2419         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2420                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2421                 struct md_op_data *op_data;
2422
2423                 if (ibits == MDS_INODELOCK_LOOKUP)
2424                         oit.it_op = IT_LOOKUP;
2425
2426                 /* Call getattr by fid, so do not provide name at all. */
2427                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2428                                              dentry->d_inode, NULL, 0, 0,
2429                                              LUSTRE_OPC_ANY, NULL);
2430                 if (IS_ERR(op_data))
2431                         RETURN(PTR_ERR(op_data));
2432
2433                 oit.it_create_mode |= M_CHECK_STALE;
2434                 rc = md_intent_lock(exp, op_data, NULL, 0,
2435                                     /* we are not interested in name
2436                                        based lookup */
2437                                     &oit, 0, &req,
2438                                     ll_md_blocking_ast, 0);
2439                 ll_finish_md_op_data(op_data);
2440                 oit.it_create_mode &= ~M_CHECK_STALE;
2441                 if (rc < 0) {
2442                         rc = ll_inode_revalidate_fini(inode, rc);
2443                         GOTO (out, rc);
2444                 }
2445
2446                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2447                 if (rc != 0) {
2448                         ll_intent_release(&oit);
2449                         GOTO(out, rc);
2450                 }
2451
2452                 /* Unlinked? Unhash dentry, so it is not picked up later by
2453                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2454                    here to preserve get_cwd functionality on 2.6.
2455                    Bug 10503 */
2456                 if (!dentry->d_inode->i_nlink)
2457                         d_lustre_invalidate(dentry);
2458
2459                 ll_lookup_finish_locks(&oit, dentry);
2460         } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
2461                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2462                 obd_valid valid = OBD_MD_FLGETATTR;
2463                 struct md_op_data *op_data;
2464                 int ealen = 0;
2465
2466                 if (S_ISREG(inode->i_mode)) {
2467                         rc = ll_get_max_mdsize(sbi, &ealen);
2468                         if (rc)
2469                                 RETURN(rc);
2470                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2471                 }
2472
2473                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
2474                                              0, ealen, LUSTRE_OPC_ANY,
2475                                              NULL);
2476                 if (IS_ERR(op_data))
2477                         RETURN(PTR_ERR(op_data));
2478
2479                 op_data->op_valid = valid;
2480                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2481                  * capa for this inode. Because we only keep capas of dirs
2482                  * fresh. */
2483                 rc = md_getattr(sbi->ll_md_exp, op_data, &req);
2484                 ll_finish_md_op_data(op_data);
2485                 if (rc) {
2486                         rc = ll_inode_revalidate_fini(inode, rc);
2487                         RETURN(rc);
2488                 }
2489
2490                 rc = ll_prep_inode(&inode, req, NULL);
2491         }
2492 out:
2493         ptlrpc_req_finished(req);
2494         return rc;
2495 }
2496
2497 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2498                            __u64 ibits)
2499 {
2500         struct inode *inode = dentry->d_inode;
2501         int rc;
2502         ENTRY;
2503
2504         rc = __ll_inode_revalidate_it(dentry, it, ibits);
2505         if (rc != 0)
2506                 RETURN(rc);
2507
2508         /* if object isn't regular file, don't validate size */
2509         if (!S_ISREG(inode->i_mode)) {
2510                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
2511                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
2512                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
2513         } else {
2514                 rc = ll_glimpse_size(inode);
2515         }
2516         RETURN(rc);
2517 }
2518
2519 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2520                   struct lookup_intent *it, struct kstat *stat)
2521 {
2522         struct inode *inode = de->d_inode;
2523         struct ll_sb_info *sbi = ll_i2sbi(inode);
2524         struct ll_inode_info *lli = ll_i2info(inode);
2525         int res = 0;
2526
2527         res = ll_inode_revalidate_it(de, it, MDS_INODELOCK_UPDATE |
2528                                              MDS_INODELOCK_LOOKUP);
2529         ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
2530
2531         if (res)
2532                 return res;
2533
2534         stat->dev = inode->i_sb->s_dev;
2535         if (ll_need_32bit_api(sbi))
2536                 stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
2537         else
2538                 stat->ino = inode->i_ino;
2539         stat->mode = inode->i_mode;
2540         stat->nlink = inode->i_nlink;
2541         stat->uid = inode->i_uid;
2542         stat->gid = inode->i_gid;
2543         stat->rdev = inode->i_rdev;
2544         stat->atime = inode->i_atime;
2545         stat->mtime = inode->i_mtime;
2546         stat->ctime = inode->i_ctime;
2547         stat->blksize = 1 << inode->i_blkbits;
2548
2549         stat->size = i_size_read(inode);
2550         stat->blocks = inode->i_blocks;
2551
2552         return 0;
2553 }
2554 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2555 {
2556         struct lookup_intent it = { .it_op = IT_GETATTR };
2557
2558         return ll_getattr_it(mnt, de, &it, stat);
2559 }
2560
2561 #ifdef HAVE_LINUX_FIEMAP_H
2562 int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
2563                 __u64 start, __u64 len)
2564 {
2565         int rc;
2566         size_t num_bytes;
2567         struct ll_user_fiemap *fiemap;
2568         unsigned int extent_count = fieinfo->fi_extents_max;
2569
2570         num_bytes = sizeof(*fiemap) + (extent_count *
2571                                        sizeof(struct ll_fiemap_extent));
2572         OBD_ALLOC_LARGE(fiemap, num_bytes);
2573
2574         if (fiemap == NULL)
2575                 RETURN(-ENOMEM);
2576
2577         fiemap->fm_flags = fieinfo->fi_flags;
2578         fiemap->fm_extent_count = fieinfo->fi_extents_max;
2579         fiemap->fm_start = start;
2580         fiemap->fm_length = len;
2581         memcpy(&fiemap->fm_extents[0], fieinfo->fi_extents_start,
2582                sizeof(struct ll_fiemap_extent));
2583
2584         rc = ll_do_fiemap(inode, fiemap, num_bytes);
2585
2586         fieinfo->fi_flags = fiemap->fm_flags;
2587         fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
2588         memcpy(fieinfo->fi_extents_start, &fiemap->fm_extents[0],
2589                fiemap->fm_mapped_extents * sizeof(struct ll_fiemap_extent));
2590
2591         OBD_FREE_LARGE(fiemap, num_bytes);
2592         return rc;
2593 }
2594 #endif
2595
2596 struct posix_acl * ll_get_acl(struct inode *inode, int type)
2597 {
2598         struct ll_inode_info *lli = ll_i2info(inode);
2599         struct posix_acl *acl = NULL;
2600         ENTRY;
2601
2602         spin_lock(&lli->lli_lock);
2603         /* VFS' acl_permission_check->check_acl will release the refcount */
2604         acl = posix_acl_dup(lli->lli_posix_acl);
2605         spin_unlock(&lli->lli_lock);
2606
2607         RETURN(acl);
2608 }
2609
2610 #ifndef HAVE_GENERIC_PERMISSION_2ARGS
2611 static int
2612 # ifdef HAVE_GENERIC_PERMISSION_4ARGS
2613 ll_check_acl(struct inode *inode, int mask, unsigned int flags)
2614 # else
2615 ll_check_acl(struct inode *inode, int mask)
2616 # endif
2617 {
2618 # ifdef CONFIG_FS_POSIX_ACL
2619         struct posix_acl *acl;
2620         int rc;
2621         ENTRY;
2622
2623 #  ifdef HAVE_GENERIC_PERMISSION_4ARGS
2624         if (flags & IPERM_FLAG_RCU)
2625                 return -ECHILD;
2626 #  endif
2627         acl = ll_get_acl(inode, ACL_TYPE_ACCESS);
2628
2629         if (!acl)
2630                 RETURN(-EAGAIN);
2631
2632         rc = posix_acl_permission(inode, acl, mask);
2633         posix_acl_release(acl);
2634
2635         RETURN(rc);
2636 # else /* !CONFIG_FS_POSIX_ACL */
2637         return -EAGAIN;
2638 # endif /* CONFIG_FS_POSIX_ACL */
2639 }
2640 #endif /* HAVE_GENERIC_PERMISSION_2ARGS */
2641
2642 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
2643 int ll_inode_permission(struct inode *inode, int mask, unsigned int flags)
2644 #else
2645 # ifdef HAVE_INODE_PERMISION_2ARGS
2646 int ll_inode_permission(struct inode *inode, int mask)
2647 # else
2648 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2649 # endif
2650 #endif
2651 {
2652         int rc = 0;
2653         ENTRY;
2654
2655 #ifdef MAY_NOT_BLOCK
2656         if (mask & MAY_NOT_BLOCK)
2657                 return -ECHILD;
2658 #elif defined(HAVE_GENERIC_PERMISSION_4ARGS)
2659         if (flags & IPERM_FLAG_RCU)
2660                 return -ECHILD;
2661 #endif
2662
2663        /* as root inode are NOT getting validated in lookup operation,
2664         * need to do it before permission check. */
2665
2666         if (inode == inode->i_sb->s_root->d_inode) {
2667                 struct lookup_intent it = { .it_op = IT_LOOKUP };
2668
2669                 rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
2670                                               MDS_INODELOCK_LOOKUP);
2671                 if (rc)
2672                         RETURN(rc);
2673         }
2674
2675         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
2676                inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
2677
2678         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2679                 return lustre_check_remote_perm(inode, mask);
2680
2681         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2682         rc = ll_generic_permission(inode, mask, flags, ll_check_acl);
2683
2684         RETURN(rc);
2685 }
2686
2687 #ifdef HAVE_FILE_READV
2688 #define READ_METHOD readv
2689 #define READ_FUNCTION ll_file_readv
2690 #define WRITE_METHOD writev
2691 #define WRITE_FUNCTION ll_file_writev
2692 #else
2693 #define READ_METHOD aio_read
2694 #define READ_FUNCTION ll_file_aio_read
2695 #define WRITE_METHOD aio_write
2696 #define WRITE_FUNCTION ll_file_aio_write
2697 #endif
2698
2699 /* -o localflock - only provides locally consistent flock locks */
2700 struct file_operations ll_file_operations = {
2701         .read           = ll_file_read,
2702         .READ_METHOD    = READ_FUNCTION,
2703         .write          = ll_file_write,
2704         .WRITE_METHOD   = WRITE_FUNCTION,
2705         .unlocked_ioctl = ll_file_ioctl,
2706         .open           = ll_file_open,
2707         .release        = ll_file_release,
2708         .mmap           = ll_file_mmap,
2709         .llseek         = ll_file_seek,
2710 #ifdef HAVE_KERNEL_SENDFILE
2711         .sendfile       = ll_file_sendfile,
2712 #endif
2713 #ifdef HAVE_KERNEL_SPLICE_READ
2714         .splice_read    = ll_file_splice_read,
2715 #endif
2716         .fsync          = ll_fsync,
2717         .flush          = ll_flush
2718 };
2719
2720 struct file_operations ll_file_operations_flock = {
2721         .read           = ll_file_read,
2722         .READ_METHOD    = READ_FUNCTION,
2723         .write          = ll_file_write,
2724         .WRITE_METHOD   = WRITE_FUNCTION,
2725         .unlocked_ioctl = ll_file_ioctl,
2726         .open           = ll_file_open,
2727         .release        = ll_file_release,
2728         .mmap           = ll_file_mmap,
2729         .llseek         = ll_file_seek,
2730 #ifdef HAVE_KERNEL_SENDFILE
2731         .sendfile       = ll_file_sendfile,
2732 #endif
2733 #ifdef HAVE_KERNEL_SPLICE_READ
2734         .splice_read    = ll_file_splice_read,
2735 #endif
2736         .fsync          = ll_fsync,
2737         .flush          = ll_flush,
2738         .flock          = ll_file_flock,
2739         .lock           = ll_file_flock
2740 };
2741
2742 /* These are for -o noflock - to return ENOSYS on flock calls */
2743 struct file_operations ll_file_operations_noflock = {
2744         .read           = ll_file_read,
2745         .READ_METHOD    = READ_FUNCTION,
2746         .write          = ll_file_write,
2747         .WRITE_METHOD   = WRITE_FUNCTION,
2748         .unlocked_ioctl = ll_file_ioctl,
2749         .open           = ll_file_open,
2750         .release        = ll_file_release,
2751         .mmap           = ll_file_mmap,
2752         .llseek         = ll_file_seek,
2753 #ifdef HAVE_KERNEL_SENDFILE
2754         .sendfile       = ll_file_sendfile,
2755 #endif
2756 #ifdef HAVE_KERNEL_SPLICE_READ
2757         .splice_read    = ll_file_splice_read,
2758 #endif
2759         .fsync          = ll_fsync,
2760         .flush          = ll_flush,
2761         .flock          = ll_file_noflock,
2762         .lock           = ll_file_noflock
2763 };
2764
2765 struct inode_operations ll_file_inode_operations = {
2766         .setattr        = ll_setattr,
2767         .getattr        = ll_getattr,
2768         .permission     = ll_inode_permission,
2769         .setxattr       = ll_setxattr,
2770         .getxattr       = ll_getxattr,
2771         .listxattr      = ll_listxattr,
2772         .removexattr    = ll_removexattr,
2773 #ifdef  HAVE_LINUX_FIEMAP_H
2774         .fiemap         = ll_fiemap,
2775 #endif
2776 #ifdef HAVE_IOP_GET_ACL
2777         .get_acl        = ll_get_acl,
2778 #endif
2779 };
2780
2781 /* dynamic ioctl number support routins */
2782 static struct llioc_ctl_data {
2783         struct rw_semaphore     ioc_sem;
2784         cfs_list_t              ioc_head;
2785 } llioc = {
2786         __RWSEM_INITIALIZER(llioc.ioc_sem),
2787         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2788 };
2789
2790
2791 struct llioc_data {
2792         cfs_list_t              iocd_list;
2793         unsigned int            iocd_size;
2794         llioc_callback_t        iocd_cb;
2795         unsigned int            iocd_count;
2796         unsigned int            iocd_cmd[0];
2797 };
2798
2799 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2800 {
2801         unsigned int size;
2802         struct llioc_data *in_data = NULL;
2803         ENTRY;
2804
2805         if (cb == NULL || cmd == NULL ||
2806             count > LLIOC_MAX_CMD || count < 0)
2807                 RETURN(NULL);
2808
2809         size = sizeof(*in_data) + count * sizeof(unsigned int);
2810         OBD_ALLOC(in_data, size);
2811         if (in_data == NULL)
2812                 RETURN(NULL);
2813
2814         memset(in_data, 0, sizeof(*in_data));
2815         in_data->iocd_size = size;
2816         in_data->iocd_cb = cb;
2817         in_data->iocd_count = count;
2818         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2819
2820         down_write(&llioc.ioc_sem);
2821         cfs_list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2822         up_write(&llioc.ioc_sem);
2823
2824         RETURN(in_data);
2825 }
2826
2827 void ll_iocontrol_unregister(void *magic)
2828 {
2829         struct llioc_data *tmp;
2830
2831         if (magic == NULL)
2832                 return;
2833
2834         down_write(&llioc.ioc_sem);
2835         cfs_list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2836                 if (tmp == magic) {
2837                         unsigned int size = tmp->iocd_size;
2838
2839                         cfs_list_del(&tmp->iocd_list);
2840                         up_write(&llioc.ioc_sem);
2841
2842                         OBD_FREE(tmp, size);
2843                         return;
2844                 }
2845         }
2846         up_write(&llioc.ioc_sem);
2847
2848         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2849 }
2850
2851 EXPORT_SYMBOL(ll_iocontrol_register);
2852 EXPORT_SYMBOL(ll_iocontrol_unregister);
2853
2854 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
2855                         unsigned int cmd, unsigned long arg, int *rcp)
2856 {
2857         enum llioc_iter ret = LLIOC_CONT;
2858         struct llioc_data *data;
2859         int rc = -EINVAL, i;
2860
2861         down_read(&llioc.ioc_sem);
2862         cfs_list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2863                 for (i = 0; i < data->iocd_count; i++) {
2864                         if (cmd != data->iocd_cmd[i])
2865                                 continue;
2866
2867                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2868                         break;
2869                 }
2870
2871                 if (ret == LLIOC_STOP)
2872                         break;
2873         }
2874         up_read(&llioc.ioc_sem);
2875
2876         if (rcp)
2877                 *rcp = rc;
2878         return ret;
2879 }
2880
2881 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
2882 {
2883         struct ll_inode_info *lli = ll_i2info(inode);
2884         struct cl_env_nest nest;
2885         struct lu_env *env;
2886         int result;
2887         ENTRY;
2888
2889         if (lli->lli_clob == NULL)
2890                 RETURN(0);
2891
2892         env = cl_env_nested_get(&nest);
2893         if (IS_ERR(env))
2894                 RETURN(PTR_ERR(env));
2895
2896         result = cl_conf_set(env, lli->lli_clob, conf);
2897         cl_env_nested_put(&nest, env);
2898         RETURN(result);
2899 }
2900
2901 /**
2902  * This function checks if there exists a LAYOUT lock on the client side,
2903  * or enqueues it if it doesn't have one in cache.
2904  *
2905  * This function will not hold layout lock so it may be revoked any time after
2906  * this function returns. Any operations depend on layout should be redone
2907  * in that case.
2908  *
2909  * This function should be called before lov_io_init() to get an uptodate
2910  * layout version, the caller should save the version number and after IO
2911  * is finished, this function should be called again to verify that layout
2912  * is not changed during IO time.
2913  */
2914 int ll_layout_refresh(struct inode *inode, __u32 *gen)
2915 {
2916         struct ll_inode_info  *lli = ll_i2info(inode);
2917         struct ll_sb_info     *sbi = ll_i2sbi(inode);
2918         struct md_op_data     *op_data = NULL;
2919         struct lookup_intent   it = { .it_op = IT_LAYOUT };
2920         struct lustre_handle   lockh = { 0 };
2921         ldlm_mode_t            mode;
2922         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
2923                                            .ei_mode = LCK_CR,
2924                                            .ei_cb_bl = ll_md_blocking_ast,
2925                                            .ei_cb_cp = ldlm_completion_ast,
2926                                            .ei_cbdata = inode };
2927         int rc;
2928         ENTRY;
2929
2930         *gen = 0;
2931         if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
2932                 RETURN(0);
2933
2934         /* sanity checks */
2935         LASSERT(fid_is_sane(ll_inode2fid(inode)));
2936         LASSERT(S_ISREG(inode->i_mode));
2937
2938         /* mostly layout lock is caching on the local side, so try to match
2939          * it before grabbing layout lock mutex. */
2940         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
2941                                 LDLM_FL_LVB_READY);
2942         if (mode != 0) { /* hit cached lock */
2943                 /* lsm_layout_gen is started from 0, plus 1 here to distinguish
2944                  * the cases of no layout and first layout. */
2945                 *gen = lli->lli_layout_gen + 1;
2946
2947                 ldlm_lock_decref(&lockh, mode);
2948                 RETURN(0);
2949         }
2950
2951         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
2952                                      0, 0, LUSTRE_OPC_ANY, NULL);
2953         if (IS_ERR(op_data))
2954                 RETURN(PTR_ERR(op_data));
2955
2956         /* take layout lock mutex to enqueue layout lock exclusively. */
2957         mutex_lock(&lli->lli_layout_mutex);
2958
2959         /* try again inside layout mutex */
2960         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
2961                                 LDLM_FL_LVB_READY);
2962         if (mode != 0) { /* hit cached lock */
2963                 *gen = lli->lli_layout_gen + 1;
2964
2965                 ldlm_lock_decref(&lockh, mode);
2966                 mutex_unlock(&lli->lli_layout_mutex);
2967                 ll_finish_md_op_data(op_data);
2968                 RETURN(0);
2969         }
2970
2971         /* have to enqueue one */
2972         rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
2973                         NULL, 0, NULL, 0);
2974         if (it.d.lustre.it_data != NULL)
2975                 ptlrpc_req_finished(it.d.lustre.it_data);
2976         it.d.lustre.it_data = NULL;
2977
2978         if (rc == 0) {
2979                 struct ldlm_lock *lock;
2980                 struct cl_object_conf conf;
2981                 struct lustre_md md = { NULL };
2982                 void *lmm;
2983                 int lmmsize;
2984
2985                 LASSERT(lustre_handle_is_used(&lockh));
2986
2987                 /* set lock data in case this is a new lock */
2988                 ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
2989
2990                 lock = ldlm_handle2lock(&lockh);
2991                 LASSERT(lock != NULL);
2992
2993                 /* for IT_LAYOUT lock, lmm is returned in lock's lvb
2994                  * data via completion callback */
2995                 lmm = lock->l_lvb_data;
2996                 lmmsize = lock->l_lvb_len;
2997                 if (lmm != NULL) {
2998                         rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
2999                                         lmm, lmmsize);
3000                         if (rc >= 0) {
3001                                 if (md.lsm != NULL)
3002                                         *gen = md.lsm->lsm_layout_gen + 1;
3003                                 rc = 0;
3004                         } else {
3005                                 CERROR("file: "DFID" unpackmd error: %d\n",
3006                                         PFID(&lli->lli_fid), rc);
3007                         }
3008                 }
3009                 LDLM_LOCK_PUT(lock);
3010
3011                 /* set layout to file. This may cause lock expiration as we
3012                  * set layout inside layout ibits lock. */
3013                 memset(&conf, 0, sizeof conf);
3014                 conf.coc_inode = inode;
3015                 conf.u.coc_md = &md;
3016                 ll_layout_conf(inode, &conf);
3017                 /* is this racy? */
3018                 lli->lli_has_smd = md.lsm != NULL;
3019                 if (md.lsm != NULL)
3020                         obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
3021         }
3022         ll_intent_drop_lock(&it);
3023
3024         mutex_unlock(&lli->lli_layout_mutex);
3025         ll_finish_md_op_data(op_data);
3026
3027         RETURN(rc);
3028 }