Whamcloud - gitweb
LU-3817 llite: Truncate to restore file
[fs/lustre-release.git] / lustre / llite / file.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <linux/pagemap.h>
47 #include <linux/file.h>
48 #include "llite_internal.h"
49 #include <lustre/ll_fiemap.h>
50
51 #include "cl_object.h"
52
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, __GFP_IO);
58         if (fd == NULL)
59                 return NULL;
60
61         fd->fd_write_failed = false;
62
63         return fd;
64 }
65
66 static void ll_file_data_put(struct ll_file_data *fd)
67 {
68         if (fd != NULL)
69                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
70 }
71
72 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
73                           struct lustre_handle *fh)
74 {
75         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
76         op_data->op_attr.ia_mode = inode->i_mode;
77         op_data->op_attr.ia_atime = inode->i_atime;
78         op_data->op_attr.ia_mtime = inode->i_mtime;
79         op_data->op_attr.ia_ctime = inode->i_ctime;
80         op_data->op_attr.ia_size = i_size_read(inode);
81         op_data->op_attr_blocks = inode->i_blocks;
82         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
83                                         ll_inode_to_ext_flags(inode->i_flags);
84         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
85         if (fh)
86                 op_data->op_handle = *fh;
87         op_data->op_capa1 = ll_mdscapa_get(inode);
88
89         if (LLIF_DATA_MODIFIED & ll_i2info(inode)->lli_flags)
90                 op_data->op_bias |= MDS_DATA_MODIFIED;
91 }
92
93 /**
94  * Closes the IO epoch and packs all the attributes into @op_data for
95  * the CLOSE rpc.
96  */
97 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
98                              struct obd_client_handle *och)
99 {
100         ENTRY;
101
102         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME | ATTR_ATIME_SET |
103                                         ATTR_MTIME | ATTR_MTIME_SET |
104                                         ATTR_CTIME | ATTR_CTIME_SET;
105
106         if (!(och->och_flags & FMODE_WRITE))
107                 goto out;
108
109         if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
110                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
111         else
112                 ll_ioepoch_close(inode, op_data, &och, 0);
113
114 out:
115         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
116         ll_prep_md_op_data(op_data, inode, NULL, NULL,
117                            0, 0, LUSTRE_OPC_ANY, NULL);
118         EXIT;
119 }
120
121 static int ll_close_inode_openhandle(struct obd_export *md_exp,
122                                      struct inode *inode,
123                                      struct obd_client_handle *och,
124                                      const __u64 *data_version)
125 {
126         struct obd_export *exp = ll_i2mdexp(inode);
127         struct md_op_data *op_data;
128         struct ptlrpc_request *req = NULL;
129         struct obd_device *obd = class_exp2obd(exp);
130         int epoch_close = 1;
131         int rc;
132         ENTRY;
133
134         if (obd == NULL) {
135                 /*
136                  * XXX: in case of LMV, is this correct to access
137                  * ->exp_handle?
138                  */
139                 CERROR("Invalid MDC connection handle "LPX64"\n",
140                        ll_i2mdexp(inode)->exp_handle.h_cookie);
141                 GOTO(out, rc = 0);
142         }
143
144         OBD_ALLOC_PTR(op_data);
145         if (op_data == NULL)
146                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
147
148         ll_prepare_close(inode, op_data, och);
149         if (data_version != NULL) {
150                 /* Pass in data_version implies release. */
151                 op_data->op_bias |= MDS_HSM_RELEASE;
152                 op_data->op_data_version = *data_version;
153                 op_data->op_lease_handle = och->och_lease_handle;
154                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
155         }
156         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
157         rc = md_close(md_exp, op_data, och->och_mod, &req);
158         if (rc == -EAGAIN) {
159                 /* This close must have the epoch closed. */
160                 LASSERT(epoch_close);
161                 /* MDS has instructed us to obtain Size-on-MDS attribute from
162                  * OSTs and send setattr to back to MDS. */
163                 rc = ll_som_update(inode, op_data);
164                 if (rc) {
165                         CERROR("inode %lu mdc Size-on-MDS update failed: "
166                                "rc = %d\n", inode->i_ino, rc);
167                         rc = 0;
168                 }
169         } else if (rc) {
170                 CERROR("inode %lu mdc close failed: rc = %d\n",
171                        inode->i_ino, rc);
172         }
173
174         /* DATA_MODIFIED flag was successfully sent on close, cancel data
175          * modification flag. */
176         if (rc == 0 && (op_data->op_bias & MDS_DATA_MODIFIED)) {
177                 struct ll_inode_info *lli = ll_i2info(inode);
178
179                 spin_lock(&lli->lli_lock);
180                 lli->lli_flags &= ~LLIF_DATA_MODIFIED;
181                 spin_unlock(&lli->lli_lock);
182         }
183
184         if (rc == 0) {
185                 rc = ll_objects_destroy(req, inode);
186                 if (rc)
187                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
188                                inode->i_ino, rc);
189         }
190
191         if (rc == 0 && op_data->op_bias & MDS_HSM_RELEASE) {
192                 struct mdt_body *body;
193                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
194                 if (!(body->valid & OBD_MD_FLRELEASED))
195                         rc = -EBUSY;
196         }
197
198         ll_finish_md_op_data(op_data);
199         EXIT;
200 out:
201
202         if (exp_connect_som(exp) && !epoch_close &&
203             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
204                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
205         } else {
206                 md_clear_open_replay_data(md_exp, och);
207                 /* Free @och if it is not waiting for DONE_WRITING. */
208                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
209                 OBD_FREE_PTR(och);
210         }
211         if (req) /* This is close request */
212                 ptlrpc_req_finished(req);
213         return rc;
214 }
215
216 int ll_md_real_close(struct inode *inode, int flags)
217 {
218         struct ll_inode_info *lli = ll_i2info(inode);
219         struct obd_client_handle **och_p;
220         struct obd_client_handle *och;
221         __u64 *och_usecount;
222         int rc = 0;
223         ENTRY;
224
225         if (flags & FMODE_WRITE) {
226                 och_p = &lli->lli_mds_write_och;
227                 och_usecount = &lli->lli_open_fd_write_count;
228         } else if (flags & FMODE_EXEC) {
229                 och_p = &lli->lli_mds_exec_och;
230                 och_usecount = &lli->lli_open_fd_exec_count;
231         } else {
232                 LASSERT(flags & FMODE_READ);
233                 och_p = &lli->lli_mds_read_och;
234                 och_usecount = &lli->lli_open_fd_read_count;
235         }
236
237         mutex_lock(&lli->lli_och_mutex);
238         if (*och_usecount) { /* There are still users of this handle, so
239                                 skip freeing it. */
240                 mutex_unlock(&lli->lli_och_mutex);
241                 RETURN(0);
242         }
243         och=*och_p;
244         *och_p = NULL;
245         mutex_unlock(&lli->lli_och_mutex);
246
247         if (och) { /* There might be a race and somebody have freed this och
248                       already */
249                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
250                                                inode, och, NULL);
251         }
252
253         RETURN(rc);
254 }
255
256 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
257                 struct file *file)
258 {
259         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
260         struct ll_inode_info *lli = ll_i2info(inode);
261         int rc = 0;
262         ENTRY;
263
264         /* clear group lock, if present */
265         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
266                 ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
267
268         if (fd->fd_lease_och != NULL) {
269                 bool lease_broken;
270
271                 /* Usually the lease is not released when the
272                  * application crashed, we need to release here. */
273                 rc = ll_lease_close(fd->fd_lease_och, inode, &lease_broken);
274                 CDEBUG(rc ? D_ERROR : D_INODE, "Clean up lease "DFID" %d/%d\n",
275                         PFID(&lli->lli_fid), rc, lease_broken);
276
277                 fd->fd_lease_och = NULL;
278         }
279
280         if (fd->fd_och != NULL) {
281                 rc = ll_close_inode_openhandle(md_exp, inode, fd->fd_och, NULL);
282                 fd->fd_och = NULL;
283                 GOTO(out, rc);
284         }
285
286         /* Let's see if we have good enough OPEN lock on the file and if
287            we can skip talking to MDS */
288         if (file->f_dentry->d_inode) { /* Can this ever be false? */
289                 int lockmode;
290                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
291                 struct lustre_handle lockh;
292                 struct inode *inode = file->f_dentry->d_inode;
293                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
294
295                 mutex_lock(&lli->lli_och_mutex);
296                 if (fd->fd_omode & FMODE_WRITE) {
297                         lockmode = LCK_CW;
298                         LASSERT(lli->lli_open_fd_write_count);
299                         lli->lli_open_fd_write_count--;
300                 } else if (fd->fd_omode & FMODE_EXEC) {
301                         lockmode = LCK_PR;
302                         LASSERT(lli->lli_open_fd_exec_count);
303                         lli->lli_open_fd_exec_count--;
304                 } else {
305                         lockmode = LCK_CR;
306                         LASSERT(lli->lli_open_fd_read_count);
307                         lli->lli_open_fd_read_count--;
308                 }
309                 mutex_unlock(&lli->lli_och_mutex);
310
311                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
312                                    LDLM_IBITS, &policy, lockmode,
313                                    &lockh)) {
314                         rc = ll_md_real_close(file->f_dentry->d_inode,
315                                               fd->fd_omode);
316                 }
317         } else {
318                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
319                        file, file->f_dentry, file->f_dentry->d_name.name);
320         }
321
322 out:
323         LUSTRE_FPRIVATE(file) = NULL;
324         ll_file_data_put(fd);
325         ll_capa_close(inode);
326
327         RETURN(rc);
328 }
329
330 /* While this returns an error code, fput() the caller does not, so we need
331  * to make every effort to clean up all of our state here.  Also, applications
332  * rarely check close errors and even if an error is returned they will not
333  * re-try the close call.
334  */
335 int ll_file_release(struct inode *inode, struct file *file)
336 {
337         struct ll_file_data *fd;
338         struct ll_sb_info *sbi = ll_i2sbi(inode);
339         struct ll_inode_info *lli = ll_i2info(inode);
340         int rc;
341         ENTRY;
342
343         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
344                inode->i_generation, inode);
345
346 #ifdef CONFIG_FS_POSIX_ACL
347         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
348             inode == inode->i_sb->s_root->d_inode) {
349                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
350
351                 LASSERT(fd != NULL);
352                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
353                         fd->fd_flags &= ~LL_FILE_RMTACL;
354                         rct_del(&sbi->ll_rct, current_pid());
355                         et_search_free(&sbi->ll_et, current_pid());
356                 }
357         }
358 #endif
359
360         if (inode->i_sb->s_root != file->f_dentry)
361                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
362         fd = LUSTRE_FPRIVATE(file);
363         LASSERT(fd != NULL);
364
365         /* The last ref on @file, maybe not the the owner pid of statahead.
366          * Different processes can open the same dir, "ll_opendir_key" means:
367          * it is me that should stop the statahead thread. */
368         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
369             lli->lli_opendir_pid != 0)
370                 ll_stop_statahead(inode, lli->lli_opendir_key);
371
372         if (inode->i_sb->s_root == file->f_dentry) {
373                 LUSTRE_FPRIVATE(file) = NULL;
374                 ll_file_data_put(fd);
375                 RETURN(0);
376         }
377
378         if (!S_ISDIR(inode->i_mode)) {
379                 lov_read_and_clear_async_rc(lli->lli_clob);
380                 lli->lli_async_rc = 0;
381         }
382
383         rc = ll_md_close(sbi->ll_md_exp, inode, file);
384
385         if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
386                 libcfs_debug_dumplog();
387
388         RETURN(rc);
389 }
390
391 static int ll_intent_file_open(struct file *file, void *lmm,
392                                int lmmsize, struct lookup_intent *itp)
393 {
394         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
395         struct dentry *parent = file->f_dentry->d_parent;
396         struct md_op_data *op_data;
397         struct ptlrpc_request *req;
398         __u32 opc = LUSTRE_OPC_ANY;
399         int rc;
400         ENTRY;
401
402         if (!parent)
403                 RETURN(-ENOENT);
404
405         /* Usually we come here only for NFSD, and we want open lock.
406            But we can also get here with pre 2.6.15 patchless kernels, and in
407            that case that lock is also ok */
408         /* We can also get here if there was cached open handle in revalidate_it
409          * but it disappeared while we were getting from there to ll_file_open.
410          * But this means this file was closed and immediatelly opened which
411          * makes a good candidate for using OPEN lock */
412         /* If lmmsize & lmm are not 0, we are just setting stripe info
413          * parameters. No need for the open lock */
414         if (lmm == NULL && lmmsize == 0) {
415                 itp->it_flags |= MDS_OPEN_LOCK;
416                 if (itp->it_flags & FMODE_WRITE)
417                         opc = LUSTRE_OPC_CREATE;
418         }
419
420         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
421                                       file->f_dentry->d_inode, NULL, 0,
422                                       O_RDWR, opc, NULL);
423
424         if (IS_ERR(op_data))
425                 RETURN(PTR_ERR(op_data));
426
427         itp->it_flags |= MDS_OPEN_BY_FID;
428         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
429                             0 /*unused */, &req, ll_md_blocking_ast, 0);
430         ll_finish_md_op_data(op_data);
431         if (rc == -ESTALE) {
432                 /* reason for keep own exit path - don`t flood log
433                 * with messages with -ESTALE errors.
434                 */
435                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
436                      it_open_error(DISP_OPEN_OPEN, itp))
437                         GOTO(out, rc);
438                 ll_release_openhandle(file->f_dentry, itp);
439                 GOTO(out, rc);
440         }
441
442         if (it_disposition(itp, DISP_LOOKUP_NEG))
443                 GOTO(out, rc = -ENOENT);
444
445         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
446                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
447                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
448                 GOTO(out, rc);
449         }
450
451         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL, itp);
452         if (!rc && itp->d.lustre.it_lock_mode)
453                 ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
454                                  itp, NULL);
455
456 out:
457         ptlrpc_req_finished(itp->d.lustre.it_data);
458         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
459         ll_intent_drop_lock(itp);
460
461         RETURN(rc);
462 }
463
464 /**
465  * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
466  * not believe attributes if a few ioepoch holders exist. Attributes for
467  * previous ioepoch if new one is opened are also skipped by MDS.
468  */
469 void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
470 {
471         if (ioepoch && lli->lli_ioepoch != ioepoch) {
472                 lli->lli_ioepoch = ioepoch;
473                 CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
474                        ioepoch, PFID(&lli->lli_fid));
475         }
476 }
477
478 static int ll_och_fill(struct obd_export *md_exp, struct lookup_intent *it,
479                        struct obd_client_handle *och)
480 {
481         struct ptlrpc_request *req = it->d.lustre.it_data;
482         struct mdt_body *body;
483
484         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
485         och->och_fh = body->handle;
486         och->och_fid = body->fid1;
487         och->och_lease_handle.cookie = it->d.lustre.it_lock_handle;
488         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
489         och->och_flags = it->it_flags;
490
491         return md_set_open_replay_data(md_exp, och, req);
492 }
493
494 int ll_local_open(struct file *file, struct lookup_intent *it,
495                   struct ll_file_data *fd, struct obd_client_handle *och)
496 {
497         struct inode *inode = file->f_dentry->d_inode;
498         struct ll_inode_info *lli = ll_i2info(inode);
499         ENTRY;
500
501         LASSERT(!LUSTRE_FPRIVATE(file));
502
503         LASSERT(fd != NULL);
504
505         if (och) {
506                 struct ptlrpc_request *req = it->d.lustre.it_data;
507                 struct mdt_body *body;
508                 int rc;
509
510                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och);
511                 if (rc != 0)
512                         RETURN(rc);
513
514                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
515                 ll_ioepoch_open(lli, body->ioepoch);
516         }
517
518         LUSTRE_FPRIVATE(file) = fd;
519         ll_readahead_init(inode, &fd->fd_ras);
520         fd->fd_omode = it->it_flags & (FMODE_READ | FMODE_WRITE | FMODE_EXEC);
521
522         RETURN(0);
523 }
524
525 /* Open a file, and (for the very first open) create objects on the OSTs at
526  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
527  * creation or open until ll_lov_setstripe() ioctl is called.
528  *
529  * If we already have the stripe MD locally then we don't request it in
530  * md_open(), by passing a lmm_size = 0.
531  *
532  * It is up to the application to ensure no other processes open this file
533  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
534  * used.  We might be able to avoid races of that sort by getting lli_open_sem
535  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
536  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
537  */
538 int ll_file_open(struct inode *inode, struct file *file)
539 {
540         struct ll_inode_info *lli = ll_i2info(inode);
541         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
542                                           .it_flags = file->f_flags };
543         struct obd_client_handle **och_p = NULL;
544         __u64 *och_usecount = NULL;
545         struct ll_file_data *fd;
546         int rc = 0, opendir_set = 0;
547         ENTRY;
548
549         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
550                inode->i_generation, inode, file->f_flags);
551
552         it = file->private_data; /* XXX: compat macro */
553         file->private_data = NULL; /* prevent ll_local_open assertion */
554
555         fd = ll_file_data_get();
556         if (fd == NULL)
557                 GOTO(out_openerr, rc = -ENOMEM);
558
559         fd->fd_file = file;
560         if (S_ISDIR(inode->i_mode)) {
561                 spin_lock(&lli->lli_sa_lock);
562                 if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
563                     lli->lli_opendir_pid == 0) {
564                         lli->lli_opendir_key = fd;
565                         lli->lli_opendir_pid = current_pid();
566                         opendir_set = 1;
567                 }
568                 spin_unlock(&lli->lli_sa_lock);
569         }
570
571         if (inode->i_sb->s_root == file->f_dentry) {
572                 LUSTRE_FPRIVATE(file) = fd;
573                 RETURN(0);
574         }
575
576         if (!it || !it->d.lustre.it_disposition) {
577                 /* Convert f_flags into access mode. We cannot use file->f_mode,
578                  * because everything but O_ACCMODE mask was stripped from
579                  * there */
580                 if ((oit.it_flags + 1) & O_ACCMODE)
581                         oit.it_flags++;
582                 if (file->f_flags & O_TRUNC)
583                         oit.it_flags |= FMODE_WRITE;
584
585                 /* kernel only call f_op->open in dentry_open.  filp_open calls
586                  * dentry_open after call to open_namei that checks permissions.
587                  * Only nfsd_open call dentry_open directly without checking
588                  * permissions and because of that this code below is safe. */
589                 if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
590                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
591
592                 /* We do not want O_EXCL here, presumably we opened the file
593                  * already? XXX - NFS implications? */
594                 oit.it_flags &= ~O_EXCL;
595
596                 /* bug20584, if "it_flags" contains O_CREAT, the file will be
597                  * created if necessary, then "IT_CREAT" should be set to keep
598                  * consistent with it */
599                 if (oit.it_flags & O_CREAT)
600                         oit.it_op |= IT_CREAT;
601
602                 it = &oit;
603         }
604
605 restart:
606         /* Let's see if we have file open on MDS already. */
607         if (it->it_flags & FMODE_WRITE) {
608                 och_p = &lli->lli_mds_write_och;
609                 och_usecount = &lli->lli_open_fd_write_count;
610         } else if (it->it_flags & FMODE_EXEC) {
611                 och_p = &lli->lli_mds_exec_och;
612                 och_usecount = &lli->lli_open_fd_exec_count;
613          } else {
614                 och_p = &lli->lli_mds_read_och;
615                 och_usecount = &lli->lli_open_fd_read_count;
616         }
617
618         mutex_lock(&lli->lli_och_mutex);
619         if (*och_p) { /* Open handle is present */
620                 if (it_disposition(it, DISP_OPEN_OPEN)) {
621                         /* Well, there's extra open request that we do not need,
622                            let's close it somehow. This will decref request. */
623                         rc = it_open_error(DISP_OPEN_OPEN, it);
624                         if (rc) {
625                                 mutex_unlock(&lli->lli_och_mutex);
626                                 GOTO(out_openerr, rc);
627                         }
628
629                         ll_release_openhandle(file->f_dentry, it);
630                 }
631                 (*och_usecount)++;
632
633                 rc = ll_local_open(file, it, fd, NULL);
634                 if (rc) {
635                         (*och_usecount)--;
636                         mutex_unlock(&lli->lli_och_mutex);
637                         GOTO(out_openerr, rc);
638                 }
639         } else {
640                 LASSERT(*och_usecount == 0);
641                 if (!it->d.lustre.it_disposition) {
642                         /* We cannot just request lock handle now, new ELC code
643                            means that one of other OPEN locks for this file
644                            could be cancelled, and since blocking ast handler
645                            would attempt to grab och_mutex as well, that would
646                            result in a deadlock */
647                         mutex_unlock(&lli->lli_och_mutex);
648                         it->it_create_mode |= M_CHECK_STALE;
649                         rc = ll_intent_file_open(file, NULL, 0, it);
650                         it->it_create_mode &= ~M_CHECK_STALE;
651                         if (rc)
652                                 GOTO(out_openerr, rc);
653
654                         goto restart;
655                 }
656                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
657                 if (!*och_p)
658                         GOTO(out_och_free, rc = -ENOMEM);
659
660                 (*och_usecount)++;
661
662                 /* md_intent_lock() didn't get a request ref if there was an
663                  * open error, so don't do cleanup on the request here
664                  * (bug 3430) */
665                 /* XXX (green): Should not we bail out on any error here, not
666                  * just open error? */
667                 rc = it_open_error(DISP_OPEN_OPEN, it);
668                 if (rc)
669                         GOTO(out_och_free, rc);
670
671                 LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
672
673                 rc = ll_local_open(file, it, fd, *och_p);
674                 if (rc)
675                         GOTO(out_och_free, rc);
676         }
677         mutex_unlock(&lli->lli_och_mutex);
678         fd = NULL;
679
680         /* Must do this outside lli_och_mutex lock to prevent deadlock where
681            different kind of OPEN lock for this same inode gets cancelled
682            by ldlm_cancel_lru */
683         if (!S_ISREG(inode->i_mode))
684                 GOTO(out_och_free, rc);
685
686         ll_capa_open(inode);
687
688         if (!lli->lli_has_smd) {
689                 if (file->f_flags & O_LOV_DELAY_CREATE ||
690                     !(file->f_mode & FMODE_WRITE)) {
691                         CDEBUG(D_INODE, "object creation was delayed\n");
692                         GOTO(out_och_free, rc);
693                 }
694         }
695         file->f_flags &= ~O_LOV_DELAY_CREATE;
696         GOTO(out_och_free, rc);
697
698 out_och_free:
699         if (rc) {
700                 if (och_p && *och_p) {
701                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
702                         *och_p = NULL; /* OBD_FREE writes some magic there */
703                         (*och_usecount)--;
704                 }
705                 mutex_unlock(&lli->lli_och_mutex);
706
707 out_openerr:
708                 if (opendir_set != 0)
709                         ll_stop_statahead(inode, lli->lli_opendir_key);
710                 if (fd != NULL)
711                         ll_file_data_put(fd);
712         } else {
713                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
714         }
715
716         if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
717                 ptlrpc_req_finished(it->d.lustre.it_data);
718                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
719         }
720
721         return rc;
722 }
723
724 static int ll_md_blocking_lease_ast(struct ldlm_lock *lock,
725                         struct ldlm_lock_desc *desc, void *data, int flag)
726 {
727         int rc;
728         struct lustre_handle lockh;
729         ENTRY;
730
731         switch (flag) {
732         case LDLM_CB_BLOCKING:
733                 ldlm_lock2handle(lock, &lockh);
734                 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
735                 if (rc < 0) {
736                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
737                         RETURN(rc);
738                 }
739                 break;
740         case LDLM_CB_CANCELING:
741                 /* do nothing */
742                 break;
743         }
744         RETURN(0);
745 }
746
747 /**
748  * Acquire a lease and open the file.
749  */
750 struct obd_client_handle *ll_lease_open(struct inode *inode, struct file *file,
751                                         fmode_t fmode, __u64 open_flags)
752 {
753         struct lookup_intent it = { .it_op = IT_OPEN };
754         struct ll_sb_info *sbi = ll_i2sbi(inode);
755         struct md_op_data *op_data;
756         struct ptlrpc_request *req;
757         struct lustre_handle old_handle = { 0 };
758         struct obd_client_handle *och = NULL;
759         int rc;
760         int rc2;
761         ENTRY;
762
763         if (fmode != FMODE_WRITE && fmode != FMODE_READ)
764                 RETURN(ERR_PTR(-EINVAL));
765
766         if (file != NULL) {
767                 struct ll_inode_info *lli = ll_i2info(inode);
768                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
769                 struct obd_client_handle **och_p;
770                 __u64 *och_usecount;
771
772                 if (!(fmode & file->f_mode) || (file->f_mode & FMODE_EXEC))
773                         RETURN(ERR_PTR(-EPERM));
774
775                 /* Get the openhandle of the file */
776                 rc = -EBUSY;
777                 mutex_lock(&lli->lli_och_mutex);
778                 if (fd->fd_lease_och != NULL) {
779                         mutex_unlock(&lli->lli_och_mutex);
780                         RETURN(ERR_PTR(rc));
781                 }
782
783                 if (fd->fd_och == NULL) {
784                         if (file->f_mode & FMODE_WRITE) {
785                                 LASSERT(lli->lli_mds_write_och != NULL);
786                                 och_p = &lli->lli_mds_write_och;
787                                 och_usecount = &lli->lli_open_fd_write_count;
788                         } else {
789                                 LASSERT(lli->lli_mds_read_och != NULL);
790                                 och_p = &lli->lli_mds_read_och;
791                                 och_usecount = &lli->lli_open_fd_read_count;
792                         }
793                         if (*och_usecount == 1) {
794                                 fd->fd_och = *och_p;
795                                 *och_p = NULL;
796                                 *och_usecount = 0;
797                                 rc = 0;
798                         }
799                 }
800                 mutex_unlock(&lli->lli_och_mutex);
801                 if (rc < 0) /* more than 1 opener */
802                         RETURN(ERR_PTR(rc));
803
804                 LASSERT(fd->fd_och != NULL);
805                 old_handle = fd->fd_och->och_fh;
806         }
807
808         OBD_ALLOC_PTR(och);
809         if (och == NULL)
810                 RETURN(ERR_PTR(-ENOMEM));
811
812         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, 0, 0,
813                                         LUSTRE_OPC_ANY, NULL);
814         if (IS_ERR(op_data))
815                 GOTO(out, rc = PTR_ERR(op_data));
816
817         /* To tell the MDT this openhandle is from the same owner */
818         op_data->op_handle = old_handle;
819
820         it.it_flags = fmode | open_flags;
821         it.it_flags |= MDS_OPEN_LOCK | MDS_OPEN_BY_FID | MDS_OPEN_LEASE;
822         rc = md_intent_lock(sbi->ll_md_exp, op_data, NULL, 0, &it, 0, &req,
823                                 ll_md_blocking_lease_ast,
824         /* LDLM_FL_NO_LRU: To not put the lease lock into LRU list, otherwise
825          * it can be cancelled which may mislead applications that the lease is
826          * broken;
827          * LDLM_FL_EXCL: Set this flag so that it won't be matched by normal
828          * open in ll_md_blocking_ast(). Otherwise as ll_md_blocking_lease_ast
829          * doesn't deal with openhandle, so normal openhandle will be leaked. */
830                                 LDLM_FL_NO_LRU | LDLM_FL_EXCL);
831         ll_finish_md_op_data(op_data);
832         if (req != NULL) {
833                 ptlrpc_req_finished(req);
834                 it_clear_disposition(&it, DISP_ENQ_COMPLETE);
835         }
836         if (rc < 0)
837                 GOTO(out_release_it, rc);
838
839         if (it_disposition(&it, DISP_LOOKUP_NEG))
840                 GOTO(out_release_it, rc = -ENOENT);
841
842         rc = it_open_error(DISP_OPEN_OPEN, &it);
843         if (rc)
844                 GOTO(out_release_it, rc);
845
846         LASSERT(it_disposition(&it, DISP_ENQ_OPEN_REF));
847         ll_och_fill(sbi->ll_md_exp, &it, och);
848
849         if (!it_disposition(&it, DISP_OPEN_LEASE)) /* old server? */
850                 GOTO(out_close, rc = -EOPNOTSUPP);
851
852         /* already get lease, handle lease lock */
853         ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
854         if (it.d.lustre.it_lock_mode == 0 ||
855             it.d.lustre.it_lock_bits != MDS_INODELOCK_OPEN) {
856                 /* open lock must return for lease */
857                 CERROR(DFID "lease granted but no open lock, %d/%Lu.\n",
858                         PFID(ll_inode2fid(inode)), it.d.lustre.it_lock_mode,
859                         it.d.lustre.it_lock_bits);
860                 GOTO(out_close, rc = -EPROTO);
861         }
862
863         ll_intent_release(&it);
864         RETURN(och);
865
866 out_close:
867         /* Cancel open lock */
868         if (it.d.lustre.it_lock_mode != 0) {
869                 ldlm_lock_decref_and_cancel(&och->och_lease_handle,
870                                             it.d.lustre.it_lock_mode);
871                 it.d.lustre.it_lock_mode = 0;
872                 och->och_lease_handle.cookie = 0ULL;
873         }
874         rc2 = ll_close_inode_openhandle(sbi->ll_md_exp, inode, och, NULL);
875         if (rc2 < 0)
876                 CERROR("%s: error closing file "DFID": %d\n",
877                        ll_get_fsname(inode->i_sb, NULL, 0),
878                        PFID(&ll_i2info(inode)->lli_fid), rc2);
879         och = NULL; /* och has been freed in ll_close_inode_openhandle() */
880 out_release_it:
881         ll_intent_release(&it);
882 out:
883         if (och != NULL)
884                 OBD_FREE_PTR(och);
885         RETURN(ERR_PTR(rc));
886 }
887 EXPORT_SYMBOL(ll_lease_open);
888
889 /**
890  * Release lease and close the file.
891  * It will check if the lease has ever broken.
892  */
893 int ll_lease_close(struct obd_client_handle *och, struct inode *inode,
894                         bool *lease_broken)
895 {
896         struct ldlm_lock *lock;
897         bool cancelled = true;
898         int rc;
899         ENTRY;
900
901         lock = ldlm_handle2lock(&och->och_lease_handle);
902         if (lock != NULL) {
903                 lock_res_and_lock(lock);
904                 cancelled = ldlm_is_cancel(lock);
905                 unlock_res_and_lock(lock);
906                 LDLM_LOCK_PUT(lock);
907         }
908
909         CDEBUG(D_INODE, "lease for "DFID" broken? %d\n",
910                 PFID(&ll_i2info(inode)->lli_fid), cancelled);
911
912         if (!cancelled)
913                 ldlm_cli_cancel(&och->och_lease_handle, 0);
914         if (lease_broken != NULL)
915                 *lease_broken = cancelled;
916
917         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, inode, och,
918                                        NULL);
919         RETURN(rc);
920 }
921 EXPORT_SYMBOL(ll_lease_close);
922
923 /* Fills the obdo with the attributes for the lsm */
924 static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
925                           struct obd_capa *capa, struct obdo *obdo,
926                           __u64 ioepoch, int dv_flags)
927 {
928         struct ptlrpc_request_set *set;
929         struct obd_info            oinfo = { { { 0 } } };
930         int                        rc;
931
932         ENTRY;
933
934         LASSERT(lsm != NULL);
935
936         oinfo.oi_md = lsm;
937         oinfo.oi_oa = obdo;
938         oinfo.oi_oa->o_oi = lsm->lsm_oi;
939         oinfo.oi_oa->o_mode = S_IFREG;
940         oinfo.oi_oa->o_ioepoch = ioepoch;
941         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
942                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
943                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
944                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
945                                OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
946                                OBD_MD_FLDATAVERSION;
947         oinfo.oi_capa = capa;
948         if (dv_flags & (LL_DV_WR_FLUSH | LL_DV_RD_FLUSH)) {
949                 oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
950                 oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
951                 if (dv_flags & LL_DV_WR_FLUSH)
952                         oinfo.oi_oa->o_flags |= OBD_FL_FLUSH;
953         }
954
955         set = ptlrpc_prep_set();
956         if (set == NULL) {
957                 CERROR("can't allocate ptlrpc set\n");
958                 rc = -ENOMEM;
959         } else {
960                 rc = obd_getattr_async(exp, &oinfo, set);
961                 if (rc == 0)
962                         rc = ptlrpc_set_wait(set);
963                 ptlrpc_set_destroy(set);
964         }
965         if (rc == 0) {
966                 oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
967                                          OBD_MD_FLATIME | OBD_MD_FLMTIME |
968                                          OBD_MD_FLCTIME | OBD_MD_FLSIZE |
969                                          OBD_MD_FLDATAVERSION | OBD_MD_FLFLAGS);
970                 if (dv_flags & LL_DV_WR_FLUSH &&
971                     !(oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS &&
972                       oinfo.oi_oa->o_flags & OBD_FL_FLUSH))
973                         RETURN(-ENOTSUPP);
974         }
975         RETURN(rc);
976 }
977
978 /**
979   * Performs the getattr on the inode and updates its fields.
980   * If @sync != 0, perform the getattr under the server-side lock.
981   */
982 int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
983                      __u64 ioepoch, int sync)
984 {
985         struct obd_capa      *capa = ll_mdscapa_get(inode);
986         struct lov_stripe_md *lsm;
987         int rc;
988         ENTRY;
989
990         lsm = ccc_inode_lsm_get(inode);
991         rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
992                                 capa, obdo, ioepoch, sync ? LL_DV_RD_FLUSH : 0);
993         capa_put(capa);
994         if (rc == 0) {
995                 struct ost_id *oi = lsm ? &lsm->lsm_oi : &obdo->o_oi;
996
997                 obdo_refresh_inode(inode, obdo, obdo->o_valid);
998                 CDEBUG(D_INODE, "objid "DOSTID" size %llu, blocks %llu,"
999                        " blksize %lu\n", POSTID(oi), i_size_read(inode),
1000                        (unsigned long long)inode->i_blocks,
1001                        (unsigned long)ll_inode_blksize(inode));
1002         }
1003         ccc_inode_lsm_put(inode, lsm);
1004         RETURN(rc);
1005 }
1006
1007 int ll_merge_lvb(const struct lu_env *env, struct inode *inode)
1008 {
1009         struct ll_inode_info *lli = ll_i2info(inode);
1010         struct cl_object *obj = lli->lli_clob;
1011         struct cl_attr *attr = ccc_env_thread_attr(env);
1012         struct ost_lvb lvb;
1013         int rc = 0;
1014
1015         ENTRY;
1016
1017         ll_inode_size_lock(inode);
1018         /* merge timestamps the most recently obtained from mds with
1019            timestamps obtained from osts */
1020         LTIME_S(inode->i_atime) = lli->lli_lvb.lvb_atime;
1021         LTIME_S(inode->i_mtime) = lli->lli_lvb.lvb_mtime;
1022         LTIME_S(inode->i_ctime) = lli->lli_lvb.lvb_ctime;
1023         inode_init_lvb(inode, &lvb);
1024
1025         cl_object_attr_lock(obj);
1026         rc = cl_object_attr_get(env, obj, attr);
1027         cl_object_attr_unlock(obj);
1028
1029         if (rc == 0) {
1030                 if (lvb.lvb_atime < attr->cat_atime)
1031                         lvb.lvb_atime = attr->cat_atime;
1032                 if (lvb.lvb_ctime < attr->cat_ctime)
1033                         lvb.lvb_ctime = attr->cat_ctime;
1034                 if (lvb.lvb_mtime < attr->cat_mtime)
1035                         lvb.lvb_mtime = attr->cat_mtime;
1036
1037                 CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
1038                                 PFID(&lli->lli_fid), attr->cat_size);
1039                 cl_isize_write_nolock(inode, attr->cat_size);
1040
1041                 inode->i_blocks = attr->cat_blocks;
1042
1043                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1044                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1045                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1046         }
1047         ll_inode_size_unlock(inode);
1048
1049         RETURN(rc);
1050 }
1051
1052 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1053                      lstat_t *st)
1054 {
1055         struct obdo obdo = { 0 };
1056         int rc;
1057
1058         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
1059         if (rc == 0) {
1060                 st->st_size   = obdo.o_size;
1061                 st->st_blocks = obdo.o_blocks;
1062                 st->st_mtime  = obdo.o_mtime;
1063                 st->st_atime  = obdo.o_atime;
1064                 st->st_ctime  = obdo.o_ctime;
1065         }
1066         return rc;
1067 }
1068
1069 static bool file_is_noatime(const struct file *file)
1070 {
1071         const struct vfsmount *mnt = file->f_path.mnt;
1072         const struct inode *inode = file->f_path.dentry->d_inode;
1073
1074         /* Adapted from file_accessed() and touch_atime().*/
1075         if (file->f_flags & O_NOATIME)
1076                 return true;
1077
1078         if (inode->i_flags & S_NOATIME)
1079                 return true;
1080
1081         if (IS_NOATIME(inode))
1082                 return true;
1083
1084         if (mnt->mnt_flags & (MNT_NOATIME | MNT_READONLY))
1085                 return true;
1086
1087         if ((mnt->mnt_flags & MNT_NODIRATIME) && S_ISDIR(inode->i_mode))
1088                 return true;
1089
1090         if ((inode->i_sb->s_flags & MS_NODIRATIME) && S_ISDIR(inode->i_mode))
1091                 return true;
1092
1093         return false;
1094 }
1095
1096 void ll_io_init(struct cl_io *io, const struct file *file, int write)
1097 {
1098         struct inode *inode = file->f_dentry->d_inode;
1099
1100         io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
1101         if (write) {
1102                 io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
1103                 io->u.ci_wr.wr_sync = file->f_flags & O_SYNC ||
1104                                       file->f_flags & O_DIRECT ||
1105                                       IS_SYNC(inode);
1106         }
1107         io->ci_obj     = ll_i2info(inode)->lli_clob;
1108         io->ci_lockreq = CILR_MAYBE;
1109         if (ll_file_nolock(file)) {
1110                 io->ci_lockreq = CILR_NEVER;
1111                 io->ci_no_srvlock = 1;
1112         } else if (file->f_flags & O_APPEND) {
1113                 io->ci_lockreq = CILR_MANDATORY;
1114         }
1115
1116         io->ci_noatime = file_is_noatime(file);
1117 }
1118
1119 static ssize_t
1120 ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
1121                    struct file *file, enum cl_io_type iot,
1122                    loff_t *ppos, size_t count)
1123 {
1124         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
1125         struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
1126         struct cl_io         *io;
1127         ssize_t               result;
1128         ENTRY;
1129
1130 restart:
1131         io = ccc_env_thread_io(env);
1132         ll_io_init(io, file, iot == CIT_WRITE);
1133
1134         if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
1135                 struct vvp_io *vio = vvp_env_io(env);
1136                 struct ccc_io *cio = ccc_env_io(env);
1137                 int write_mutex_locked = 0;
1138
1139                 cio->cui_fd  = LUSTRE_FPRIVATE(file);
1140                 vio->cui_io_subtype = args->via_io_subtype;
1141
1142                 switch (vio->cui_io_subtype) {
1143                 case IO_NORMAL:
1144                         cio->cui_iov = args->u.normal.via_iov;
1145                         cio->cui_nrsegs = args->u.normal.via_nrsegs;
1146                         cio->cui_tot_nrsegs = cio->cui_nrsegs;
1147                         cio->cui_iocb = args->u.normal.via_iocb;
1148                         if ((iot == CIT_WRITE) &&
1149                             !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1150                                 if (mutex_lock_interruptible(&lli->
1151                                                                lli_write_mutex))
1152                                         GOTO(out, result = -ERESTARTSYS);
1153                                 write_mutex_locked = 1;
1154                         } else if (iot == CIT_READ) {
1155                                 down_read(&lli->lli_trunc_sem);
1156                         }
1157                         break;
1158                 case IO_SENDFILE:
1159                         vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
1160                         vio->u.sendfile.cui_target = args->u.sendfile.via_target;
1161                         break;
1162                 case IO_SPLICE:
1163                         vio->u.splice.cui_pipe = args->u.splice.via_pipe;
1164                         vio->u.splice.cui_flags = args->u.splice.via_flags;
1165                         break;
1166                 default:
1167                         CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
1168                         LBUG();
1169                 }
1170                 result = cl_io_loop(env, io);
1171                 if (write_mutex_locked)
1172                         mutex_unlock(&lli->lli_write_mutex);
1173                 else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
1174                         up_read(&lli->lli_trunc_sem);
1175         } else {
1176                 /* cl_io_rw_init() handled IO */
1177                 result = io->ci_result;
1178         }
1179
1180         if (io->ci_nob > 0) {
1181                 result = io->ci_nob;
1182                 *ppos = io->u.ci_wr.wr.crw_pos;
1183         }
1184         GOTO(out, result);
1185 out:
1186         cl_io_fini(env, io);
1187         /* If any bit been read/written (result != 0), we just return
1188          * short read/write instead of restart io. */
1189         if ((result == 0 || result == -ENODATA) && io->ci_need_restart) {
1190                 CDEBUG(D_VFSTRACE, "Restart %s on %s from %lld, count:%zd\n",
1191                        iot == CIT_READ ? "read" : "write",
1192                        file->f_dentry->d_name.name, *ppos, count);
1193                 LASSERTF(io->ci_nob == 0, "%zd", io->ci_nob);
1194                 goto restart;
1195         }
1196
1197         if (iot == CIT_READ) {
1198                 if (result >= 0)
1199                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
1200                                            LPROC_LL_READ_BYTES, result);
1201         } else if (iot == CIT_WRITE) {
1202                 if (result >= 0) {
1203                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
1204                                            LPROC_LL_WRITE_BYTES, result);
1205                         fd->fd_write_failed = false;
1206                 } else if (result != -ERESTARTSYS) {
1207                         fd->fd_write_failed = true;
1208                 }
1209         }
1210
1211         return result;
1212 }
1213
1214
1215 /*
1216  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
1217  */
1218 static int ll_file_get_iov_count(const struct iovec *iov,
1219                                  unsigned long *nr_segs, size_t *count)
1220 {
1221         size_t cnt = 0;
1222         unsigned long seg;
1223
1224         for (seg = 0; seg < *nr_segs; seg++) {
1225                 const struct iovec *iv = &iov[seg];
1226
1227                 /*
1228                  * If any segment has a negative length, or the cumulative
1229                  * length ever wraps negative then return -EINVAL.
1230                  */
1231                 cnt += iv->iov_len;
1232                 if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
1233                         return -EINVAL;
1234                 if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
1235                         continue;
1236                 if (seg == 0)
1237                         return -EFAULT;
1238                 *nr_segs = seg;
1239                 cnt -= iv->iov_len;   /* This segment is no good */
1240                 break;
1241         }
1242         *count = cnt;
1243         return 0;
1244 }
1245
1246 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
1247                                 unsigned long nr_segs, loff_t pos)
1248 {
1249         struct lu_env      *env;
1250         struct vvp_io_args *args;
1251         size_t              count;
1252         ssize_t             result;
1253         int                 refcheck;
1254         ENTRY;
1255
1256         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1257         if (result)
1258                 RETURN(result);
1259
1260         env = cl_env_get(&refcheck);
1261         if (IS_ERR(env))
1262                 RETURN(PTR_ERR(env));
1263
1264         args = vvp_env_args(env, IO_NORMAL);
1265         args->u.normal.via_iov = (struct iovec *)iov;
1266         args->u.normal.via_nrsegs = nr_segs;
1267         args->u.normal.via_iocb = iocb;
1268
1269         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1270                                     &iocb->ki_pos, count);
1271         cl_env_put(env, &refcheck);
1272         RETURN(result);
1273 }
1274
1275 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1276                             loff_t *ppos)
1277 {
1278         struct lu_env *env;
1279         struct iovec  *local_iov;
1280         struct kiocb  *kiocb;
1281         ssize_t        result;
1282         int            refcheck;
1283         ENTRY;
1284
1285         env = cl_env_get(&refcheck);
1286         if (IS_ERR(env))
1287                 RETURN(PTR_ERR(env));
1288
1289         local_iov = &vvp_env_info(env)->vti_local_iov;
1290         kiocb = &vvp_env_info(env)->vti_kiocb;
1291         local_iov->iov_base = (void __user *)buf;
1292         local_iov->iov_len = count;
1293         init_sync_kiocb(kiocb, file);
1294         kiocb->ki_pos = *ppos;
1295         kiocb->ki_left = count;
1296
1297         result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1298         *ppos = kiocb->ki_pos;
1299
1300         cl_env_put(env, &refcheck);
1301         RETURN(result);
1302 }
1303
1304 /*
1305  * Write to a file (through the page cache).
1306  * AIO stuff
1307  */
1308 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1309                                  unsigned long nr_segs, loff_t pos)
1310 {
1311         struct lu_env      *env;
1312         struct vvp_io_args *args;
1313         size_t              count;
1314         ssize_t             result;
1315         int                 refcheck;
1316         ENTRY;
1317
1318         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1319         if (result)
1320                 RETURN(result);
1321
1322         env = cl_env_get(&refcheck);
1323         if (IS_ERR(env))
1324                 RETURN(PTR_ERR(env));
1325
1326         args = vvp_env_args(env, IO_NORMAL);
1327         args->u.normal.via_iov = (struct iovec *)iov;
1328         args->u.normal.via_nrsegs = nr_segs;
1329         args->u.normal.via_iocb = iocb;
1330
1331         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1332                                   &iocb->ki_pos, count);
1333         cl_env_put(env, &refcheck);
1334         RETURN(result);
1335 }
1336
1337 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1338                              loff_t *ppos)
1339 {
1340         struct lu_env *env;
1341         struct iovec  *local_iov;
1342         struct kiocb  *kiocb;
1343         ssize_t        result;
1344         int            refcheck;
1345         ENTRY;
1346
1347         env = cl_env_get(&refcheck);
1348         if (IS_ERR(env))
1349                 RETURN(PTR_ERR(env));
1350
1351         local_iov = &vvp_env_info(env)->vti_local_iov;
1352         kiocb = &vvp_env_info(env)->vti_kiocb;
1353         local_iov->iov_base = (void __user *)buf;
1354         local_iov->iov_len = count;
1355         init_sync_kiocb(kiocb, file);
1356         kiocb->ki_pos = *ppos;
1357         kiocb->ki_left = count;
1358
1359         result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1360         *ppos = kiocb->ki_pos;
1361
1362         cl_env_put(env, &refcheck);
1363         RETURN(result);
1364 }
1365
1366 /*
1367  * Send file content (through pagecache) somewhere with helper
1368  */
1369 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1370                                    struct pipe_inode_info *pipe, size_t count,
1371                                    unsigned int flags)
1372 {
1373         struct lu_env      *env;
1374         struct vvp_io_args *args;
1375         ssize_t             result;
1376         int                 refcheck;
1377         ENTRY;
1378
1379         env = cl_env_get(&refcheck);
1380         if (IS_ERR(env))
1381                 RETURN(PTR_ERR(env));
1382
1383         args = vvp_env_args(env, IO_SPLICE);
1384         args->u.splice.via_pipe = pipe;
1385         args->u.splice.via_flags = flags;
1386
1387         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1388         cl_env_put(env, &refcheck);
1389         RETURN(result);
1390 }
1391
1392 static int ll_lov_recreate(struct inode *inode, struct ost_id *oi,
1393                            obd_count ost_idx)
1394 {
1395         struct obd_export *exp = ll_i2dtexp(inode);
1396         struct obd_trans_info oti = { 0 };
1397         struct obdo *oa = NULL;
1398         int lsm_size;
1399         int rc = 0;
1400         struct lov_stripe_md *lsm = NULL, *lsm2;
1401         ENTRY;
1402
1403         OBDO_ALLOC(oa);
1404         if (oa == NULL)
1405                 RETURN(-ENOMEM);
1406
1407         lsm = ccc_inode_lsm_get(inode);
1408         if (!lsm_has_objects(lsm))
1409                 GOTO(out, rc = -ENOENT);
1410
1411         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1412                    (lsm->lsm_stripe_count));
1413
1414         OBD_ALLOC_LARGE(lsm2, lsm_size);
1415         if (lsm2 == NULL)
1416                 GOTO(out, rc = -ENOMEM);
1417
1418         oa->o_oi = *oi;
1419         oa->o_nlink = ost_idx;
1420         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1421         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1422         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1423                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1424         obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1425         memcpy(lsm2, lsm, lsm_size);
1426         ll_inode_size_lock(inode);
1427         rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1428         ll_inode_size_unlock(inode);
1429
1430         OBD_FREE_LARGE(lsm2, lsm_size);
1431         GOTO(out, rc);
1432 out:
1433         ccc_inode_lsm_put(inode, lsm);
1434         OBDO_FREE(oa);
1435         return rc;
1436 }
1437
1438 static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1439 {
1440         struct ll_recreate_obj ucreat;
1441         struct ost_id           oi;
1442         ENTRY;
1443
1444         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1445                 RETURN(-EPERM);
1446
1447         if (copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1448                            sizeof(ucreat)))
1449                 RETURN(-EFAULT);
1450
1451         ostid_set_seq_mdt0(&oi);
1452         ostid_set_id(&oi, ucreat.lrc_id);
1453         RETURN(ll_lov_recreate(inode, &oi, ucreat.lrc_ost_idx));
1454 }
1455
1456 static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1457 {
1458         struct lu_fid   fid;
1459         struct ost_id   oi;
1460         obd_count       ost_idx;
1461         ENTRY;
1462
1463         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1464                 RETURN(-EPERM);
1465
1466         if (copy_from_user(&fid, (struct lu_fid *)arg, sizeof(fid)))
1467                 RETURN(-EFAULT);
1468
1469         fid_to_ostid(&fid, &oi);
1470         ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1471         RETURN(ll_lov_recreate(inode, &oi, ost_idx));
1472 }
1473
1474 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1475                              __u64  flags, struct lov_user_md *lum,
1476                              int lum_size)
1477 {
1478         struct lov_stripe_md *lsm = NULL;
1479         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1480         int rc = 0;
1481         ENTRY;
1482
1483         lsm = ccc_inode_lsm_get(inode);
1484         if (lsm != NULL) {
1485                 ccc_inode_lsm_put(inode, lsm);
1486                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1487                        inode->i_ino);
1488                 RETURN(-EEXIST);
1489         }
1490
1491         ll_inode_size_lock(inode);
1492         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1493         if (rc)
1494                 GOTO(out, rc);
1495         rc = oit.d.lustre.it_status;
1496         if (rc < 0)
1497                 GOTO(out_req_free, rc);
1498
1499         ll_release_openhandle(file->f_dentry, &oit);
1500
1501  out:
1502         ll_inode_size_unlock(inode);
1503         ll_intent_release(&oit);
1504         ccc_inode_lsm_put(inode, lsm);
1505         RETURN(rc);
1506 out_req_free:
1507         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1508         goto out;
1509 }
1510
1511 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1512                              struct lov_mds_md **lmmp, int *lmm_size,
1513                              struct ptlrpc_request **request)
1514 {
1515         struct ll_sb_info *sbi = ll_i2sbi(inode);
1516         struct mdt_body  *body;
1517         struct lov_mds_md *lmm = NULL;
1518         struct ptlrpc_request *req = NULL;
1519         struct md_op_data *op_data;
1520         int rc, lmmsize;
1521
1522         rc = ll_get_max_mdsize(sbi, &lmmsize);
1523         if (rc)
1524                 RETURN(rc);
1525
1526         op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1527                                      strlen(filename), lmmsize,
1528                                      LUSTRE_OPC_ANY, NULL);
1529         if (IS_ERR(op_data))
1530                 RETURN(PTR_ERR(op_data));
1531
1532         op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1533         rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1534         ll_finish_md_op_data(op_data);
1535         if (rc < 0) {
1536                 CDEBUG(D_INFO, "md_getattr_name failed "
1537                        "on %s: rc %d\n", filename, rc);
1538                 GOTO(out, rc);
1539         }
1540
1541         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1542         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1543
1544         lmmsize = body->eadatasize;
1545
1546         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1547                         lmmsize == 0) {
1548                 GOTO(out, rc = -ENODATA);
1549         }
1550
1551         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1552         LASSERT(lmm != NULL);
1553
1554         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1555             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1556                 GOTO(out, rc = -EPROTO);
1557         }
1558
1559         /*
1560          * This is coming from the MDS, so is probably in
1561          * little endian.  We convert it to host endian before
1562          * passing it to userspace.
1563          */
1564         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1565                 int stripe_count;
1566
1567                 stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
1568                 if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_RELEASED)
1569                         stripe_count = 0;
1570
1571                 /* if function called for directory - we should
1572                  * avoid swab not existent lsm objects */
1573                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1574                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1575                         if (S_ISREG(body->mode))
1576                                 lustre_swab_lov_user_md_objects(
1577                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1578                                  stripe_count);
1579                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1580                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1581                         if (S_ISREG(body->mode))
1582                                 lustre_swab_lov_user_md_objects(
1583                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1584                                  stripe_count);
1585                 }
1586         }
1587
1588 out:
1589         *lmmp = lmm;
1590         *lmm_size = lmmsize;
1591         *request = req;
1592         return rc;
1593 }
1594
1595 static int ll_lov_setea(struct inode *inode, struct file *file,
1596                             unsigned long arg)
1597 {
1598         __u64                    flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1599         struct lov_user_md      *lump;
1600         int                      lum_size = sizeof(struct lov_user_md) +
1601                                             sizeof(struct lov_user_ost_data);
1602         int                      rc;
1603         ENTRY;
1604
1605         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1606                 RETURN(-EPERM);
1607
1608         OBD_ALLOC_LARGE(lump, lum_size);
1609         if (lump == NULL)
1610                 RETURN(-ENOMEM);
1611
1612         if (copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1613                 OBD_FREE_LARGE(lump, lum_size);
1614                 RETURN(-EFAULT);
1615         }
1616
1617         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1618
1619         OBD_FREE_LARGE(lump, lum_size);
1620         RETURN(rc);
1621 }
1622
1623 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1624                             unsigned long arg)
1625 {
1626         struct lov_user_md_v3    lumv3;
1627         struct lov_user_md_v1   *lumv1 = (struct lov_user_md_v1 *)&lumv3;
1628         struct lov_user_md_v1   *lumv1p = (struct lov_user_md_v1 *)arg;
1629         struct lov_user_md_v3   *lumv3p = (struct lov_user_md_v3 *)arg;
1630         int                      lum_size, rc;
1631         __u64                    flags = FMODE_WRITE;
1632         ENTRY;
1633
1634         /* first try with v1 which is smaller than v3 */
1635         lum_size = sizeof(struct lov_user_md_v1);
1636         if (copy_from_user(lumv1, lumv1p, lum_size))
1637                 RETURN(-EFAULT);
1638
1639         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1640                 lum_size = sizeof(struct lov_user_md_v3);
1641                 if (copy_from_user(&lumv3, lumv3p, lum_size))
1642                         RETURN(-EFAULT);
1643         }
1644
1645         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1646         if (rc == 0) {
1647                 struct lov_stripe_md *lsm;
1648                 __u32 gen;
1649
1650                 put_user(0, &lumv1p->lmm_stripe_count);
1651
1652                 ll_layout_refresh(inode, &gen);
1653                 lsm = ccc_inode_lsm_get(inode);
1654                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1655                                    0, lsm, (void *)arg);
1656                 ccc_inode_lsm_put(inode, lsm);
1657         }
1658         RETURN(rc);
1659 }
1660
1661 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1662 {
1663         struct lov_stripe_md *lsm;
1664         int rc = -ENODATA;
1665         ENTRY;
1666
1667         lsm = ccc_inode_lsm_get(inode);
1668         if (lsm != NULL)
1669                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1670                                    lsm, (void *)arg);
1671         ccc_inode_lsm_put(inode, lsm);
1672         RETURN(rc);
1673 }
1674
1675 int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1676 {
1677         struct ll_inode_info   *lli = ll_i2info(inode);
1678         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1679         struct ccc_grouplock    grouplock;
1680         int                     rc;
1681         ENTRY;
1682
1683         if (ll_file_nolock(file))
1684                 RETURN(-EOPNOTSUPP);
1685
1686         spin_lock(&lli->lli_lock);
1687         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1688                 CWARN("group lock already existed with gid %lu\n",
1689                       fd->fd_grouplock.cg_gid);
1690                 spin_unlock(&lli->lli_lock);
1691                 RETURN(-EINVAL);
1692         }
1693         LASSERT(fd->fd_grouplock.cg_lock == NULL);
1694         spin_unlock(&lli->lli_lock);
1695
1696         rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1697                               arg, (file->f_flags & O_NONBLOCK), &grouplock);
1698         if (rc)
1699                 RETURN(rc);
1700
1701         spin_lock(&lli->lli_lock);
1702         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1703                 spin_unlock(&lli->lli_lock);
1704                 CERROR("another thread just won the race\n");
1705                 cl_put_grouplock(&grouplock);
1706                 RETURN(-EINVAL);
1707         }
1708
1709         fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1710         fd->fd_grouplock = grouplock;
1711         spin_unlock(&lli->lli_lock);
1712
1713         CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1714         RETURN(0);
1715 }
1716
1717 int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1718 {
1719         struct ll_inode_info   *lli = ll_i2info(inode);
1720         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1721         struct ccc_grouplock    grouplock;
1722         ENTRY;
1723
1724         spin_lock(&lli->lli_lock);
1725         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1726                 spin_unlock(&lli->lli_lock);
1727                 CWARN("no group lock held\n");
1728                 RETURN(-EINVAL);
1729         }
1730         LASSERT(fd->fd_grouplock.cg_lock != NULL);
1731
1732         if (fd->fd_grouplock.cg_gid != arg) {
1733                 CWARN("group lock %lu doesn't match current id %lu\n",
1734                        arg, fd->fd_grouplock.cg_gid);
1735                 spin_unlock(&lli->lli_lock);
1736                 RETURN(-EINVAL);
1737         }
1738
1739         grouplock = fd->fd_grouplock;
1740         memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1741         fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1742         spin_unlock(&lli->lli_lock);
1743
1744         cl_put_grouplock(&grouplock);
1745         CDEBUG(D_INFO, "group lock %lu released\n", arg);
1746         RETURN(0);
1747 }
1748
1749 /**
1750  * Close inode open handle
1751  *
1752  * \param dentry [in]     dentry which contains the inode
1753  * \param it     [in,out] intent which contains open info and result
1754  *
1755  * \retval 0     success
1756  * \retval <0    failure
1757  */
1758 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1759 {
1760         struct inode *inode = dentry->d_inode;
1761         struct obd_client_handle *och;
1762         int rc;
1763         ENTRY;
1764
1765         LASSERT(inode);
1766
1767         /* Root ? Do nothing. */
1768         if (dentry->d_inode->i_sb->s_root == dentry)
1769                 RETURN(0);
1770
1771         /* No open handle to close? Move away */
1772         if (!it_disposition(it, DISP_OPEN_OPEN))
1773                 RETURN(0);
1774
1775         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1776
1777         OBD_ALLOC(och, sizeof(*och));
1778         if (!och)
1779                 GOTO(out, rc = -ENOMEM);
1780
1781         ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och);
1782
1783         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1784                                        inode, och, NULL);
1785 out:
1786         /* this one is in place of ll_file_open */
1787         if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1788                 ptlrpc_req_finished(it->d.lustre.it_data);
1789                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1790         }
1791         RETURN(rc);
1792 }
1793
1794 /**
1795  * Get size for inode for which FIEMAP mapping is requested.
1796  * Make the FIEMAP get_info call and returns the result.
1797  */
1798 int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1799               int num_bytes)
1800 {
1801         struct obd_export *exp = ll_i2dtexp(inode);
1802         struct lov_stripe_md *lsm = NULL;
1803         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1804         int vallen = num_bytes;
1805         int rc;
1806         ENTRY;
1807
1808         /* Checks for fiemap flags */
1809         if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1810                 fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1811                 return -EBADR;
1812         }
1813
1814         /* Check for FIEMAP_FLAG_SYNC */
1815         if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1816                 rc = filemap_fdatawrite(inode->i_mapping);
1817                 if (rc)
1818                         return rc;
1819         }
1820
1821         lsm = ccc_inode_lsm_get(inode);
1822         if (lsm == NULL)
1823                 return -ENOENT;
1824
1825         /* If the stripe_count > 1 and the application does not understand
1826          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1827          */
1828         if (lsm->lsm_stripe_count > 1 &&
1829             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1830                 GOTO(out, rc = -EOPNOTSUPP);
1831
1832         fm_key.oa.o_oi = lsm->lsm_oi;
1833         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1834
1835         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1836         obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1837         /* If filesize is 0, then there would be no objects for mapping */
1838         if (fm_key.oa.o_size == 0) {
1839                 fiemap->fm_mapped_extents = 0;
1840                 GOTO(out, rc = 0);
1841         }
1842
1843         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1844
1845         rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1846                           fiemap, lsm);
1847         if (rc)
1848                 CERROR("obd_get_info failed: rc = %d\n", rc);
1849
1850 out:
1851         ccc_inode_lsm_put(inode, lsm);
1852         RETURN(rc);
1853 }
1854
1855 int ll_fid2path(struct inode *inode, void *arg)
1856 {
1857         struct obd_export       *exp = ll_i2mdexp(inode);
1858         struct getinfo_fid2path *gfout, *gfin;
1859         int                      outsize, rc;
1860         ENTRY;
1861
1862         if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
1863             !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
1864                 RETURN(-EPERM);
1865
1866         /* Need to get the buflen */
1867         OBD_ALLOC_PTR(gfin);
1868         if (gfin == NULL)
1869                 RETURN(-ENOMEM);
1870         if (copy_from_user(gfin, arg, sizeof(*gfin))) {
1871                 OBD_FREE_PTR(gfin);
1872                 RETURN(-EFAULT);
1873         }
1874
1875         outsize = sizeof(*gfout) + gfin->gf_pathlen;
1876         OBD_ALLOC(gfout, outsize);
1877         if (gfout == NULL) {
1878                 OBD_FREE_PTR(gfin);
1879                 RETURN(-ENOMEM);
1880         }
1881         memcpy(gfout, gfin, sizeof(*gfout));
1882         OBD_FREE_PTR(gfin);
1883
1884         /* Call mdc_iocontrol */
1885         rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1886         if (rc)
1887                 GOTO(gf_free, rc);
1888
1889         if (copy_to_user(arg, gfout, outsize))
1890                 rc = -EFAULT;
1891
1892 gf_free:
1893         OBD_FREE(gfout, outsize);
1894         RETURN(rc);
1895 }
1896
1897 static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1898 {
1899         struct ll_user_fiemap *fiemap_s;
1900         size_t num_bytes, ret_bytes;
1901         unsigned int extent_count;
1902         int rc = 0;
1903
1904         /* Get the extent count so we can calculate the size of
1905          * required fiemap buffer */
1906         if (get_user(extent_count,
1907             &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1908                 RETURN(-EFAULT);
1909         num_bytes = sizeof(*fiemap_s) + (extent_count *
1910                                          sizeof(struct ll_fiemap_extent));
1911
1912         OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1913         if (fiemap_s == NULL)
1914                 RETURN(-ENOMEM);
1915
1916         /* get the fiemap value */
1917         if (copy_from_user(fiemap_s, (struct ll_user_fiemap __user *)arg,
1918                            sizeof(*fiemap_s)))
1919                 GOTO(error, rc = -EFAULT);
1920
1921         /* If fm_extent_count is non-zero, read the first extent since
1922          * it is used to calculate end_offset and device from previous
1923          * fiemap call. */
1924         if (extent_count) {
1925                 if (copy_from_user(&fiemap_s->fm_extents[0],
1926                     (char __user *)arg + sizeof(*fiemap_s),
1927                     sizeof(struct ll_fiemap_extent)))
1928                         GOTO(error, rc = -EFAULT);
1929         }
1930
1931         rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1932         if (rc)
1933                 GOTO(error, rc);
1934
1935         ret_bytes = sizeof(struct ll_user_fiemap);
1936
1937         if (extent_count != 0)
1938                 ret_bytes += (fiemap_s->fm_mapped_extents *
1939                                  sizeof(struct ll_fiemap_extent));
1940
1941         if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1942                 rc = -EFAULT;
1943
1944 error:
1945         OBD_FREE_LARGE(fiemap_s, num_bytes);
1946         RETURN(rc);
1947 }
1948
1949 /*
1950  * Read the data_version for inode.
1951  *
1952  * This value is computed using stripe object version on OST.
1953  * Version is computed using server side locking.
1954  *
1955  * @param sync if do sync on the OST side;
1956  *              0: no sync
1957  *              LL_DV_RD_FLUSH: flush dirty pages, LCK_PR on OSTs
1958  *              LL_DV_WR_FLUSH: drop all caching pages, LCK_PW on OSTs
1959  */
1960 int ll_data_version(struct inode *inode, __u64 *data_version, int flags)
1961 {
1962         struct lov_stripe_md    *lsm = NULL;
1963         struct ll_sb_info       *sbi = ll_i2sbi(inode);
1964         struct obdo             *obdo = NULL;
1965         int                      rc;
1966         ENTRY;
1967
1968         /* If no stripe, we consider version is 0. */
1969         lsm = ccc_inode_lsm_get(inode);
1970         if (!lsm_has_objects(lsm)) {
1971                 *data_version = 0;
1972                 CDEBUG(D_INODE, "No object for inode\n");
1973                 GOTO(out, rc = 0);
1974         }
1975
1976         OBD_ALLOC_PTR(obdo);
1977         if (obdo == NULL)
1978                 GOTO(out, rc = -ENOMEM);
1979
1980         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, flags);
1981         if (rc == 0) {
1982                 if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1983                         rc = -EOPNOTSUPP;
1984                 else
1985                         *data_version = obdo->o_data_version;
1986         }
1987
1988         OBD_FREE_PTR(obdo);
1989         EXIT;
1990 out:
1991         ccc_inode_lsm_put(inode, lsm);
1992         RETURN(rc);
1993 }
1994
1995 /*
1996  * Trigger a HSM release request for the provided inode.
1997  */
1998 int ll_hsm_release(struct inode *inode)
1999 {
2000         struct cl_env_nest nest;
2001         struct lu_env *env;
2002         struct obd_client_handle *och = NULL;
2003         __u64 data_version = 0;
2004         int rc;
2005         ENTRY;
2006
2007         CDEBUG(D_INODE, "%s: Releasing file "DFID".\n",
2008                ll_get_fsname(inode->i_sb, NULL, 0),
2009                PFID(&ll_i2info(inode)->lli_fid));
2010
2011         och = ll_lease_open(inode, NULL, FMODE_WRITE, MDS_OPEN_RELEASE);
2012         if (IS_ERR(och))
2013                 GOTO(out, rc = PTR_ERR(och));
2014
2015         /* Grab latest data_version and [am]time values */
2016         rc = ll_data_version(inode, &data_version, LL_DV_WR_FLUSH);
2017         if (rc != 0)
2018                 GOTO(out, rc);
2019
2020         env = cl_env_nested_get(&nest);
2021         if (IS_ERR(env))
2022                 GOTO(out, rc = PTR_ERR(env));
2023
2024         ll_merge_lvb(env, inode);
2025         cl_env_nested_put(&nest, env);
2026
2027         /* Release the file.
2028          * NB: lease lock handle is released in mdc_hsm_release_pack() because
2029          * we still need it to pack l_remote_handle to MDT. */
2030         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, inode, och,
2031                                        &data_version);
2032         och = NULL;
2033
2034         EXIT;
2035 out:
2036         if (och != NULL && !IS_ERR(och)) /* close the file */
2037                 ll_lease_close(och, inode, NULL);
2038
2039         return rc;
2040 }
2041
2042 struct ll_swap_stack {
2043         struct iattr             ia1, ia2;
2044         __u64                    dv1, dv2;
2045         struct inode            *inode1, *inode2;
2046         bool                     check_dv1, check_dv2;
2047 };
2048
2049 static int ll_swap_layouts(struct file *file1, struct file *file2,
2050                            struct lustre_swap_layouts *lsl)
2051 {
2052         struct mdc_swap_layouts  msl;
2053         struct md_op_data       *op_data;
2054         __u32                    gid;
2055         __u64                    dv;
2056         struct ll_swap_stack    *llss = NULL;
2057         int                      rc;
2058
2059         OBD_ALLOC_PTR(llss);
2060         if (llss == NULL)
2061                 RETURN(-ENOMEM);
2062
2063         llss->inode1 = file1->f_dentry->d_inode;
2064         llss->inode2 = file2->f_dentry->d_inode;
2065
2066         if (!S_ISREG(llss->inode2->i_mode))
2067                 GOTO(free, rc = -EINVAL);
2068
2069         if (inode_permission(llss->inode1, MAY_WRITE) ||
2070             inode_permission(llss->inode2, MAY_WRITE))
2071                 GOTO(free, rc = -EPERM);
2072
2073         if (llss->inode2->i_sb != llss->inode1->i_sb)
2074                 GOTO(free, rc = -EXDEV);
2075
2076         /* we use 2 bool because it is easier to swap than 2 bits */
2077         if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV1)
2078                 llss->check_dv1 = true;
2079
2080         if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV2)
2081                 llss->check_dv2 = true;
2082
2083         /* we cannot use lsl->sl_dvX directly because we may swap them */
2084         llss->dv1 = lsl->sl_dv1;
2085         llss->dv2 = lsl->sl_dv2;
2086
2087         rc = lu_fid_cmp(ll_inode2fid(llss->inode1), ll_inode2fid(llss->inode2));
2088         if (rc == 0) /* same file, done! */
2089                 GOTO(free, rc = 0);
2090
2091         if (rc < 0) { /* sequentialize it */
2092                 swap(llss->inode1, llss->inode2);
2093                 swap(file1, file2);
2094                 swap(llss->dv1, llss->dv2);
2095                 swap(llss->check_dv1, llss->check_dv2);
2096         }
2097
2098         gid = lsl->sl_gid;
2099         if (gid != 0) { /* application asks to flush dirty cache */
2100                 rc = ll_get_grouplock(llss->inode1, file1, gid);
2101                 if (rc < 0)
2102                         GOTO(free, rc);
2103
2104                 rc = ll_get_grouplock(llss->inode2, file2, gid);
2105                 if (rc < 0) {
2106                         ll_put_grouplock(llss->inode1, file1, gid);
2107                         GOTO(free, rc);
2108                 }
2109         }
2110
2111         /* to be able to restore mtime and atime after swap
2112          * we need to first save them */
2113         if (lsl->sl_flags &
2114             (SWAP_LAYOUTS_KEEP_MTIME | SWAP_LAYOUTS_KEEP_ATIME)) {
2115                 llss->ia1.ia_mtime = llss->inode1->i_mtime;
2116                 llss->ia1.ia_atime = llss->inode1->i_atime;
2117                 llss->ia1.ia_valid = ATTR_MTIME | ATTR_ATIME;
2118                 llss->ia2.ia_mtime = llss->inode2->i_mtime;
2119                 llss->ia2.ia_atime = llss->inode2->i_atime;
2120                 llss->ia2.ia_valid = ATTR_MTIME | ATTR_ATIME;
2121         }
2122
2123         /* ultimate check, before swaping the layouts we check if
2124          * dataversion has changed (if requested) */
2125         if (llss->check_dv1) {
2126                 rc = ll_data_version(llss->inode1, &dv, 0);
2127                 if (rc)
2128                         GOTO(putgl, rc);
2129                 if (dv != llss->dv1)
2130                         GOTO(putgl, rc = -EAGAIN);
2131         }
2132
2133         if (llss->check_dv2) {
2134                 rc = ll_data_version(llss->inode2, &dv, 0);
2135                 if (rc)
2136                         GOTO(putgl, rc);
2137                 if (dv != llss->dv2)
2138                         GOTO(putgl, rc = -EAGAIN);
2139         }
2140
2141         /* struct md_op_data is used to send the swap args to the mdt
2142          * only flags is missing, so we use struct mdc_swap_layouts
2143          * through the md_op_data->op_data */
2144         /* flags from user space have to be converted before they are send to
2145          * server, no flag is sent today, they are only used on the client */
2146         msl.msl_flags = 0;
2147         rc = -ENOMEM;
2148         op_data = ll_prep_md_op_data(NULL, llss->inode1, llss->inode2, NULL, 0,
2149                                      0, LUSTRE_OPC_ANY, &msl);
2150         if (IS_ERR(op_data))
2151                 GOTO(free, rc = PTR_ERR(op_data));
2152
2153         rc = obd_iocontrol(LL_IOC_LOV_SWAP_LAYOUTS, ll_i2mdexp(llss->inode1),
2154                            sizeof(*op_data), op_data, NULL);
2155         ll_finish_md_op_data(op_data);
2156
2157 putgl:
2158         if (gid != 0) {
2159                 ll_put_grouplock(llss->inode2, file2, gid);
2160                 ll_put_grouplock(llss->inode1, file1, gid);
2161         }
2162
2163         /* rc can be set from obd_iocontrol() or from a GOTO(putgl, ...) */
2164         if (rc != 0)
2165                 GOTO(free, rc);
2166
2167         /* clear useless flags */
2168         if (!(lsl->sl_flags & SWAP_LAYOUTS_KEEP_MTIME)) {
2169                 llss->ia1.ia_valid &= ~ATTR_MTIME;
2170                 llss->ia2.ia_valid &= ~ATTR_MTIME;
2171         }
2172
2173         if (!(lsl->sl_flags & SWAP_LAYOUTS_KEEP_ATIME)) {
2174                 llss->ia1.ia_valid &= ~ATTR_ATIME;
2175                 llss->ia2.ia_valid &= ~ATTR_ATIME;
2176         }
2177
2178         /* update time if requested */
2179         rc = 0;
2180         if (llss->ia2.ia_valid != 0) {
2181                 mutex_lock(&llss->inode1->i_mutex);
2182                 rc = ll_setattr(file1->f_dentry, &llss->ia2);
2183                 mutex_unlock(&llss->inode1->i_mutex);
2184         }
2185
2186         if (llss->ia1.ia_valid != 0) {
2187                 int rc1;
2188
2189                 mutex_lock(&llss->inode2->i_mutex);
2190                 rc1 = ll_setattr(file2->f_dentry, &llss->ia1);
2191                 mutex_unlock(&llss->inode2->i_mutex);
2192                 if (rc == 0)
2193                         rc = rc1;
2194         }
2195
2196 free:
2197         if (llss != NULL)
2198                 OBD_FREE_PTR(llss);
2199
2200         RETURN(rc);
2201 }
2202
2203 static int ll_hsm_state_set(struct inode *inode, struct hsm_state_set *hss)
2204 {
2205         struct md_op_data       *op_data;
2206         int                      rc;
2207
2208         /* Non-root users are forbidden to set or clear flags which are
2209          * NOT defined in HSM_USER_MASK. */
2210         if (((hss->hss_setmask | hss->hss_clearmask) & ~HSM_USER_MASK) &&
2211             !cfs_capable(CFS_CAP_SYS_ADMIN))
2212                 RETURN(-EPERM);
2213
2214         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2215                                      LUSTRE_OPC_ANY, hss);
2216         if (IS_ERR(op_data))
2217                 RETURN(PTR_ERR(op_data));
2218
2219         rc = obd_iocontrol(LL_IOC_HSM_STATE_SET, ll_i2mdexp(inode),
2220                            sizeof(*op_data), op_data, NULL);
2221
2222         ll_finish_md_op_data(op_data);
2223
2224         RETURN(rc);
2225 }
2226
2227 static int ll_hsm_import(struct inode *inode, struct file *file,
2228                          struct hsm_user_import *hui)
2229 {
2230         struct hsm_state_set    *hss = NULL;
2231         struct iattr            *attr = NULL;
2232         int                      rc;
2233         ENTRY;
2234
2235         if (!S_ISREG(inode->i_mode))
2236                 RETURN(-EINVAL);
2237
2238         /* set HSM flags */
2239         OBD_ALLOC_PTR(hss);
2240         if (hss == NULL)
2241                 GOTO(out, rc = -ENOMEM);
2242
2243         hss->hss_valid = HSS_SETMASK | HSS_ARCHIVE_ID;
2244         hss->hss_archive_id = hui->hui_archive_id;
2245         hss->hss_setmask = HS_ARCHIVED | HS_EXISTS | HS_RELEASED;
2246         rc = ll_hsm_state_set(inode, hss);
2247         if (rc != 0)
2248                 GOTO(out, rc);
2249
2250         OBD_ALLOC_PTR(attr);
2251         if (attr == NULL)
2252                 GOTO(out, rc = -ENOMEM);
2253
2254         attr->ia_mode = hui->hui_mode & (S_IRWXU | S_IRWXG | S_IRWXO);
2255         attr->ia_mode |= S_IFREG;
2256         attr->ia_uid = hui->hui_uid;
2257         attr->ia_gid = hui->hui_gid;
2258         attr->ia_size = hui->hui_size;
2259         attr->ia_mtime.tv_sec = hui->hui_mtime;
2260         attr->ia_mtime.tv_nsec = hui->hui_mtime_ns;
2261         attr->ia_atime.tv_sec = hui->hui_atime;
2262         attr->ia_atime.tv_nsec = hui->hui_atime_ns;
2263
2264         attr->ia_valid = ATTR_SIZE | ATTR_MODE | ATTR_FORCE |
2265                          ATTR_UID | ATTR_GID |
2266                          ATTR_MTIME | ATTR_MTIME_SET |
2267                          ATTR_ATIME | ATTR_ATIME_SET;
2268
2269         rc = ll_setattr_raw(file->f_dentry, attr, true);
2270         if (rc == -ENODATA)
2271                 rc = 0;
2272
2273 out:
2274         if (hss != NULL)
2275                 OBD_FREE_PTR(hss);
2276
2277         if (attr != NULL)
2278                 OBD_FREE_PTR(attr);
2279
2280         RETURN(rc);
2281 }
2282
2283 long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
2284 {
2285         struct inode            *inode = file->f_dentry->d_inode;
2286         struct ll_file_data     *fd = LUSTRE_FPRIVATE(file);
2287         int                      flags, rc;
2288         ENTRY;
2289
2290         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2291                inode->i_generation, inode, cmd);
2292         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2293
2294         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2295         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2296                 RETURN(-ENOTTY);
2297
2298         switch(cmd) {
2299         case LL_IOC_GETFLAGS:
2300                 /* Get the current value of the file flags */
2301                 return put_user(fd->fd_flags, (int *)arg);
2302         case LL_IOC_SETFLAGS:
2303         case LL_IOC_CLRFLAGS:
2304                 /* Set or clear specific file flags */
2305                 /* XXX This probably needs checks to ensure the flags are
2306                  *     not abused, and to handle any flag side effects.
2307                  */
2308                 if (get_user(flags, (int *) arg))
2309                         RETURN(-EFAULT);
2310
2311                 if (cmd == LL_IOC_SETFLAGS) {
2312                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2313                             !(file->f_flags & O_DIRECT)) {
2314                                 CERROR("%s: unable to disable locking on "
2315                                        "non-O_DIRECT file\n", current->comm);
2316                                 RETURN(-EINVAL);
2317                         }
2318
2319                         fd->fd_flags |= flags;
2320                 } else {
2321                         fd->fd_flags &= ~flags;
2322                 }
2323                 RETURN(0);
2324         case LL_IOC_LOV_SETSTRIPE:
2325                 RETURN(ll_lov_setstripe(inode, file, arg));
2326         case LL_IOC_LOV_SETEA:
2327                 RETURN(ll_lov_setea(inode, file, arg));
2328         case LL_IOC_LOV_SWAP_LAYOUTS: {
2329                 struct file *file2;
2330                 struct lustre_swap_layouts lsl;
2331
2332                 if (copy_from_user(&lsl, (char *)arg,
2333                                        sizeof(struct lustre_swap_layouts)))
2334                         RETURN(-EFAULT);
2335
2336                 if ((file->f_flags & O_ACCMODE) == 0) /* O_RDONLY */
2337                         RETURN(-EPERM);
2338
2339                 file2 = fget(lsl.sl_fd);
2340                 if (file2 == NULL)
2341                         RETURN(-EBADF);
2342
2343                 rc = -EPERM;
2344                 if ((file2->f_flags & O_ACCMODE) != 0) /* O_WRONLY or O_RDWR */
2345                         rc = ll_swap_layouts(file, file2, &lsl);
2346                 fput(file2);
2347                 RETURN(rc);
2348         }
2349         case LL_IOC_LOV_GETSTRIPE:
2350                 RETURN(ll_lov_getstripe(inode, arg));
2351         case LL_IOC_RECREATE_OBJ:
2352                 RETURN(ll_lov_recreate_obj(inode, arg));
2353         case LL_IOC_RECREATE_FID:
2354                 RETURN(ll_lov_recreate_fid(inode, arg));
2355         case FSFILT_IOC_FIEMAP:
2356                 RETURN(ll_ioctl_fiemap(inode, arg));
2357         case FSFILT_IOC_GETFLAGS:
2358         case FSFILT_IOC_SETFLAGS:
2359                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2360         case FSFILT_IOC_GETVERSION_OLD:
2361         case FSFILT_IOC_GETVERSION:
2362                 RETURN(put_user(inode->i_generation, (int *)arg));
2363         case LL_IOC_GROUP_LOCK:
2364                 RETURN(ll_get_grouplock(inode, file, arg));
2365         case LL_IOC_GROUP_UNLOCK:
2366                 RETURN(ll_put_grouplock(inode, file, arg));
2367         case IOC_OBD_STATFS:
2368                 RETURN(ll_obd_statfs(inode, (void *)arg));
2369
2370         /* We need to special case any other ioctls we want to handle,
2371          * to send them to the MDS/OST as appropriate and to properly
2372          * network encode the arg field.
2373         case FSFILT_IOC_SETVERSION_OLD:
2374         case FSFILT_IOC_SETVERSION:
2375         */
2376         case LL_IOC_FLUSHCTX:
2377                 RETURN(ll_flush_ctx(inode));
2378         case LL_IOC_PATH2FID: {
2379                 if (copy_to_user((void *)arg, ll_inode2fid(inode),
2380                                  sizeof(struct lu_fid)))
2381                         RETURN(-EFAULT);
2382
2383                 RETURN(0);
2384         }
2385         case OBD_IOC_FID2PATH:
2386                 RETURN(ll_fid2path(inode, (void *)arg));
2387         case LL_IOC_DATA_VERSION: {
2388                 struct ioc_data_version idv;
2389                 int rc;
2390
2391                 if (copy_from_user(&idv, (char *)arg, sizeof(idv)))
2392                         RETURN(-EFAULT);
2393
2394                 idv.idv_flags &= LL_DV_RD_FLUSH | LL_DV_WR_FLUSH;
2395                 rc = ll_data_version(inode, &idv.idv_version, idv.idv_flags);
2396
2397                 if (rc == 0 && copy_to_user((char *) arg, &idv, sizeof(idv)))
2398                         RETURN(-EFAULT);
2399
2400                 RETURN(rc);
2401         }
2402
2403         case LL_IOC_GET_MDTIDX: {
2404                 int mdtidx;
2405
2406                 mdtidx = ll_get_mdt_idx(inode);
2407                 if (mdtidx < 0)
2408                         RETURN(mdtidx);
2409
2410                 if (put_user((int)mdtidx, (int*)arg))
2411                         RETURN(-EFAULT);
2412
2413                 RETURN(0);
2414         }
2415         case OBD_IOC_GETDTNAME:
2416         case OBD_IOC_GETMDNAME:
2417                 RETURN(ll_get_obd_name(inode, cmd, arg));
2418         case LL_IOC_HSM_STATE_GET: {
2419                 struct md_op_data       *op_data;
2420                 struct hsm_user_state   *hus;
2421                 int                      rc;
2422
2423                 OBD_ALLOC_PTR(hus);
2424                 if (hus == NULL)
2425                         RETURN(-ENOMEM);
2426
2427                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2428                                              LUSTRE_OPC_ANY, hus);
2429                 if (IS_ERR(op_data)) {
2430                         OBD_FREE_PTR(hus);
2431                         RETURN(PTR_ERR(op_data));
2432                 }
2433
2434                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2435                                    op_data, NULL);
2436
2437                 if (copy_to_user((void *)arg, hus, sizeof(*hus)))
2438                         rc = -EFAULT;
2439
2440                 ll_finish_md_op_data(op_data);
2441                 OBD_FREE_PTR(hus);
2442                 RETURN(rc);
2443         }
2444         case LL_IOC_HSM_STATE_SET: {
2445                 struct hsm_state_set    *hss;
2446                 int                      rc;
2447
2448                 OBD_ALLOC_PTR(hss);
2449                 if (hss == NULL)
2450                         RETURN(-ENOMEM);
2451
2452                 if (copy_from_user(hss, (char *)arg, sizeof(*hss))) {
2453                         OBD_FREE_PTR(hss);
2454                         RETURN(-EFAULT);
2455                 }
2456
2457                 rc = ll_hsm_state_set(inode, hss);
2458
2459                 OBD_FREE_PTR(hss);
2460                 RETURN(rc);
2461         }
2462         case LL_IOC_HSM_ACTION: {
2463                 struct md_op_data               *op_data;
2464                 struct hsm_current_action       *hca;
2465                 int                              rc;
2466
2467                 OBD_ALLOC_PTR(hca);
2468                 if (hca == NULL)
2469                         RETURN(-ENOMEM);
2470
2471                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2472                                              LUSTRE_OPC_ANY, hca);
2473                 if (IS_ERR(op_data)) {
2474                         OBD_FREE_PTR(hca);
2475                         RETURN(PTR_ERR(op_data));
2476                 }
2477
2478                 rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2479                                    op_data, NULL);
2480
2481                 if (copy_to_user((char *)arg, hca, sizeof(*hca)))
2482                         rc = -EFAULT;
2483
2484                 ll_finish_md_op_data(op_data);
2485                 OBD_FREE_PTR(hca);
2486                 RETURN(rc);
2487         }
2488         case LL_IOC_SET_LEASE: {
2489                 struct ll_inode_info *lli = ll_i2info(inode);
2490                 struct obd_client_handle *och = NULL;
2491                 bool lease_broken;
2492                 fmode_t mode = 0;
2493
2494                 switch (arg) {
2495                 case F_WRLCK:
2496                         if (!(file->f_mode & FMODE_WRITE))
2497                                 RETURN(-EPERM);
2498                         mode = FMODE_WRITE;
2499                         break;
2500                 case F_RDLCK:
2501                         if (!(file->f_mode & FMODE_READ))
2502                                 RETURN(-EPERM);
2503                         mode = FMODE_READ;
2504                         break;
2505                 case F_UNLCK:
2506                         mutex_lock(&lli->lli_och_mutex);
2507                         if (fd->fd_lease_och != NULL) {
2508                                 och = fd->fd_lease_och;
2509                                 fd->fd_lease_och = NULL;
2510                         }
2511                         mutex_unlock(&lli->lli_och_mutex);
2512
2513                         if (och != NULL) {
2514                                 mode = och->och_flags &(FMODE_READ|FMODE_WRITE);
2515                                 rc = ll_lease_close(och, inode, &lease_broken);
2516                                 if (rc == 0 && lease_broken)
2517                                         mode = 0;
2518                         } else {
2519                                 rc = -ENOLCK;
2520                         }
2521
2522                         /* return the type of lease or error */
2523                         RETURN(rc < 0 ? rc : (int)mode);
2524                 default:
2525                         RETURN(-EINVAL);
2526                 }
2527
2528                 CDEBUG(D_INODE, "Set lease with mode %d\n", mode);
2529
2530                 /* apply for lease */
2531                 och = ll_lease_open(inode, file, mode, 0);
2532                 if (IS_ERR(och))
2533                         RETURN(PTR_ERR(och));
2534
2535                 rc = 0;
2536                 mutex_lock(&lli->lli_och_mutex);
2537                 if (fd->fd_lease_och == NULL) {
2538                         fd->fd_lease_och = och;
2539                         och = NULL;
2540                 }
2541                 mutex_unlock(&lli->lli_och_mutex);
2542                 if (och != NULL) {
2543                         /* impossible now that only excl is supported for now */
2544                         ll_lease_close(och, inode, &lease_broken);
2545                         rc = -EBUSY;
2546                 }
2547                 RETURN(rc);
2548         }
2549         case LL_IOC_GET_LEASE: {
2550                 struct ll_inode_info *lli = ll_i2info(inode);
2551                 struct ldlm_lock *lock = NULL;
2552
2553                 rc = 0;
2554                 mutex_lock(&lli->lli_och_mutex);
2555                 if (fd->fd_lease_och != NULL) {
2556                         struct obd_client_handle *och = fd->fd_lease_och;
2557
2558                         lock = ldlm_handle2lock(&och->och_lease_handle);
2559                         if (lock != NULL) {
2560                                 lock_res_and_lock(lock);
2561                                 if (!ldlm_is_cancel(lock))
2562                                         rc = och->och_flags &
2563                                                 (FMODE_READ | FMODE_WRITE);
2564                                 unlock_res_and_lock(lock);
2565                                 LDLM_LOCK_PUT(lock);
2566                         }
2567                 }
2568                 mutex_unlock(&lli->lli_och_mutex);
2569                 RETURN(rc);
2570         }
2571         case LL_IOC_HSM_IMPORT: {
2572                 struct hsm_user_import *hui;
2573
2574                 OBD_ALLOC_PTR(hui);
2575                 if (hui == NULL)
2576                         RETURN(-ENOMEM);
2577
2578                 if (copy_from_user(hui, (void *)arg, sizeof(*hui))) {
2579                         OBD_FREE_PTR(hui);
2580                         RETURN(-EFAULT);
2581                 }
2582
2583                 rc = ll_hsm_import(inode, file, hui);
2584
2585                 OBD_FREE_PTR(hui);
2586                 RETURN(rc);
2587         }
2588         default: {
2589                 int err;
2590
2591                 if (LLIOC_STOP ==
2592                      ll_iocontrol_call(inode, file, cmd, arg, &err))
2593                         RETURN(err);
2594
2595                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2596                                      (void *)arg));
2597         }
2598         }
2599 }
2600
2601 #ifndef HAVE_FILE_LLSEEK_SIZE
2602 static inline loff_t
2603 llseek_execute(struct file *file, loff_t offset, loff_t maxsize)
2604 {
2605         if (offset < 0 && !(file->f_mode & FMODE_UNSIGNED_OFFSET))
2606                 return -EINVAL;
2607         if (offset > maxsize)
2608                 return -EINVAL;
2609
2610         if (offset != file->f_pos) {
2611                 file->f_pos = offset;
2612                 file->f_version = 0;
2613         }
2614         return offset;
2615 }
2616
2617 static loff_t
2618 generic_file_llseek_size(struct file *file, loff_t offset, int origin,
2619                 loff_t maxsize, loff_t eof)
2620 {
2621         struct inode *inode = file->f_dentry->d_inode;
2622
2623         switch (origin) {
2624         case SEEK_END:
2625                 offset += eof;
2626                 break;
2627         case SEEK_CUR:
2628                 /*
2629                  * Here we special-case the lseek(fd, 0, SEEK_CUR)
2630                  * position-querying operation.  Avoid rewriting the "same"
2631                  * f_pos value back to the file because a concurrent read(),
2632                  * write() or lseek() might have altered it
2633                  */
2634                 if (offset == 0)
2635                         return file->f_pos;
2636                 /*
2637                  * f_lock protects against read/modify/write race with other
2638                  * SEEK_CURs. Note that parallel writes and reads behave
2639                  * like SEEK_SET.
2640                  */
2641                 mutex_lock(&inode->i_mutex);
2642                 offset = llseek_execute(file, file->f_pos + offset, maxsize);
2643                 mutex_unlock(&inode->i_mutex);
2644                 return offset;
2645         case SEEK_DATA:
2646                 /*
2647                  * In the generic case the entire file is data, so as long as
2648                  * offset isn't at the end of the file then the offset is data.
2649                  */
2650                 if (offset >= eof)
2651                         return -ENXIO;
2652                 break;
2653         case SEEK_HOLE:
2654                 /*
2655                  * There is a virtual hole at the end of the file, so as long as
2656                  * offset isn't i_size or larger, return i_size.
2657                  */
2658                 if (offset >= eof)
2659                         return -ENXIO;
2660                 offset = eof;
2661                 break;
2662         }
2663
2664         return llseek_execute(file, offset, maxsize);
2665 }
2666 #endif
2667
2668 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2669 {
2670         struct inode *inode = file->f_dentry->d_inode;
2671         loff_t retval, eof = 0;
2672
2673         ENTRY;
2674         retval = offset + ((origin == SEEK_END) ? i_size_read(inode) :
2675                            (origin == SEEK_CUR) ? file->f_pos : 0);
2676         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%d)\n",
2677                inode->i_ino, inode->i_generation, inode, retval, retval,
2678                origin);
2679         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2680
2681         if (origin == SEEK_END || origin == SEEK_HOLE || origin == SEEK_DATA) {
2682                 retval = ll_glimpse_size(inode);
2683                 if (retval != 0)
2684                         RETURN(retval);
2685                 eof = i_size_read(inode);
2686         }
2687
2688         retval = ll_generic_file_llseek_size(file, offset, origin,
2689                                           ll_file_maxbytes(inode), eof);
2690         RETURN(retval);
2691 }
2692
2693 int ll_flush(struct file *file, fl_owner_t id)
2694 {
2695         struct inode *inode = file->f_dentry->d_inode;
2696         struct ll_inode_info *lli = ll_i2info(inode);
2697         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2698         int rc, err;
2699
2700         LASSERT(!S_ISDIR(inode->i_mode));
2701
2702         /* catch async errors that were recorded back when async writeback
2703          * failed for pages in this mapping. */
2704         rc = lli->lli_async_rc;
2705         lli->lli_async_rc = 0;
2706         err = lov_read_and_clear_async_rc(lli->lli_clob);
2707         if (rc == 0)
2708                 rc = err;
2709
2710         /* The application has been told write failure already.
2711          * Do not report failure again. */
2712         if (fd->fd_write_failed)
2713                 return 0;
2714         return rc ? -EIO : 0;
2715 }
2716
2717 /**
2718  * Called to make sure a portion of file has been written out.
2719  * if @local_only is not true, it will send OST_SYNC RPCs to ost.
2720  *
2721  * Return how many pages have been written.
2722  */
2723 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2724                        enum cl_fsync_mode mode, int ignore_layout)
2725 {
2726         struct cl_env_nest nest;
2727         struct lu_env *env;
2728         struct cl_io *io;
2729         struct obd_capa *capa = NULL;
2730         struct cl_fsync_io *fio;
2731         int result;
2732         ENTRY;
2733
2734         if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2735             mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2736                 RETURN(-EINVAL);
2737
2738         env = cl_env_nested_get(&nest);
2739         if (IS_ERR(env))
2740                 RETURN(PTR_ERR(env));
2741
2742         capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2743
2744         io = ccc_env_thread_io(env);
2745         io->ci_obj = cl_i2info(inode)->lli_clob;
2746         io->ci_ignore_layout = ignore_layout;
2747
2748         /* initialize parameters for sync */
2749         fio = &io->u.ci_fsync;
2750         fio->fi_capa = capa;
2751         fio->fi_start = start;
2752         fio->fi_end = end;
2753         fio->fi_fid = ll_inode2fid(inode);
2754         fio->fi_mode = mode;
2755         fio->fi_nr_written = 0;
2756
2757         if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2758                 result = cl_io_loop(env, io);
2759         else
2760                 result = io->ci_result;
2761         if (result == 0)
2762                 result = fio->fi_nr_written;
2763         cl_io_fini(env, io);
2764         cl_env_nested_put(&nest, env);
2765
2766         capa_put(capa);
2767
2768         RETURN(result);
2769 }
2770
2771 /*
2772  * When dentry is provided (the 'else' case), *file->f_dentry may be
2773  * null and dentry must be used directly rather than pulled from
2774  * *file->f_dentry as is done otherwise.
2775  */
2776
2777 #ifdef HAVE_FILE_FSYNC_4ARGS
2778 int ll_fsync(struct file *file, loff_t start, loff_t end, int datasync)
2779 {
2780         struct dentry *dentry = file->f_dentry;
2781 #elif defined(HAVE_FILE_FSYNC_2ARGS)
2782 int ll_fsync(struct file *file, int datasync)
2783 {
2784         struct dentry *dentry = file->f_dentry;
2785 #else
2786 int ll_fsync(struct file *file, struct dentry *dentry, int datasync)
2787 {
2788 #endif
2789         struct inode *inode = dentry->d_inode;
2790         struct ll_inode_info *lli = ll_i2info(inode);
2791         struct ptlrpc_request *req;
2792         struct obd_capa *oc;
2793         int rc, err;
2794         ENTRY;
2795
2796         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2797                inode->i_generation, inode);
2798         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2799
2800 #ifdef HAVE_FILE_FSYNC_4ARGS
2801         rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
2802         mutex_lock(&inode->i_mutex);
2803 #else
2804         /* fsync's caller has already called _fdata{sync,write}, we want
2805          * that IO to finish before calling the osc and mdc sync methods */
2806         rc = filemap_fdatawait(inode->i_mapping);
2807 #endif
2808
2809         /* catch async errors that were recorded back when async writeback
2810          * failed for pages in this mapping. */
2811         if (!S_ISDIR(inode->i_mode)) {
2812                 err = lli->lli_async_rc;
2813                 lli->lli_async_rc = 0;
2814                 if (rc == 0)
2815                         rc = err;
2816                 err = lov_read_and_clear_async_rc(lli->lli_clob);
2817                 if (rc == 0)
2818                         rc = err;
2819         }
2820
2821         oc = ll_mdscapa_get(inode);
2822         err = md_fsync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2823                        &req);
2824         capa_put(oc);
2825         if (!rc)
2826                 rc = err;
2827         if (!err)
2828                 ptlrpc_req_finished(req);
2829
2830         if (datasync && S_ISREG(inode->i_mode)) {
2831                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2832
2833                 err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
2834                                 CL_FSYNC_ALL, 0);
2835                 if (rc == 0 && err < 0)
2836                         rc = err;
2837                 if (rc < 0)
2838                         fd->fd_write_failed = true;
2839                 else
2840                         fd->fd_write_failed = false;
2841         }
2842
2843 #ifdef HAVE_FILE_FSYNC_4ARGS
2844         mutex_unlock(&inode->i_mutex);
2845 #endif
2846         RETURN(rc);
2847 }
2848
2849 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2850 {
2851         struct inode *inode = file->f_dentry->d_inode;
2852         struct ll_sb_info *sbi = ll_i2sbi(inode);
2853         struct ldlm_enqueue_info einfo = {
2854                 .ei_type        = LDLM_FLOCK,
2855                 .ei_cb_cp       = ldlm_flock_completion_ast,
2856                 .ei_cbdata      = file_lock,
2857         };
2858         struct md_op_data *op_data;
2859         struct lustre_handle lockh = {0};
2860         ldlm_policy_data_t flock = {{0}};
2861         int flags = 0;
2862         int rc;
2863         int rc2 = 0;
2864         ENTRY;
2865
2866         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2867                inode->i_ino, file_lock);
2868
2869         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2870
2871         if (file_lock->fl_flags & FL_FLOCK) {
2872                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2873                 /* flocks are whole-file locks */
2874                 flock.l_flock.end = OFFSET_MAX;
2875                 /* For flocks owner is determined by the local file desctiptor*/
2876                 flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2877         } else if (file_lock->fl_flags & FL_POSIX) {
2878                 flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2879                 flock.l_flock.start = file_lock->fl_start;
2880                 flock.l_flock.end = file_lock->fl_end;
2881         } else {
2882                 RETURN(-EINVAL);
2883         }
2884         flock.l_flock.pid = file_lock->fl_pid;
2885
2886         /* Somewhat ugly workaround for svc lockd.
2887          * lockd installs custom fl_lmops->lm_compare_owner that checks
2888          * for the fl_owner to be the same (which it always is on local node
2889          * I guess between lockd processes) and then compares pid.
2890          * As such we assign pid to the owner field to make it all work,
2891          * conflict with normal locks is unlikely since pid space and
2892          * pointer space for current->files are not intersecting */
2893         if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
2894                 flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2895
2896         switch (file_lock->fl_type) {
2897         case F_RDLCK:
2898                 einfo.ei_mode = LCK_PR;
2899                 break;
2900         case F_UNLCK:
2901                 /* An unlock request may or may not have any relation to
2902                  * existing locks so we may not be able to pass a lock handle
2903                  * via a normal ldlm_lock_cancel() request. The request may even
2904                  * unlock a byte range in the middle of an existing lock. In
2905                  * order to process an unlock request we need all of the same
2906                  * information that is given with a normal read or write record
2907                  * lock request. To avoid creating another ldlm unlock (cancel)
2908                  * message we'll treat a LCK_NL flock request as an unlock. */
2909                 einfo.ei_mode = LCK_NL;
2910                 break;
2911         case F_WRLCK:
2912                 einfo.ei_mode = LCK_PW;
2913                 break;
2914         default:
2915                 CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2916                         file_lock->fl_type);
2917                 RETURN (-ENOTSUPP);
2918         }
2919
2920         switch (cmd) {
2921         case F_SETLKW:
2922 #ifdef F_SETLKW64
2923         case F_SETLKW64:
2924 #endif
2925                 flags = 0;
2926                 break;
2927         case F_SETLK:
2928 #ifdef F_SETLK64
2929         case F_SETLK64:
2930 #endif
2931                 flags = LDLM_FL_BLOCK_NOWAIT;
2932                 break;
2933         case F_GETLK:
2934 #ifdef F_GETLK64
2935         case F_GETLK64:
2936 #endif
2937                 flags = LDLM_FL_TEST_LOCK;
2938                 /* Save the old mode so that if the mode in the lock changes we
2939                  * can decrement the appropriate reader or writer refcount. */
2940                 file_lock->fl_type = einfo.ei_mode;
2941                 break;
2942         default:
2943                 CERROR("unknown fcntl lock command: %d\n", cmd);
2944                 RETURN (-EINVAL);
2945         }
2946
2947         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2948                                      LUSTRE_OPC_ANY, NULL);
2949         if (IS_ERR(op_data))
2950                 RETURN(PTR_ERR(op_data));
2951
2952         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2953                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2954                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2955
2956         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2957                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2958
2959         if ((file_lock->fl_flags & FL_FLOCK) &&
2960             (rc == 0 || file_lock->fl_type == F_UNLCK))
2961                 rc2  = flock_lock_file_wait(file, file_lock);
2962         if ((file_lock->fl_flags & FL_POSIX) &&
2963             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2964             !(flags & LDLM_FL_TEST_LOCK))
2965                 rc2  = posix_lock_file_wait(file, file_lock);
2966
2967         if (rc2 && file_lock->fl_type != F_UNLCK) {
2968                 einfo.ei_mode = LCK_NL;
2969                 md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2970                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2971                 rc = rc2;
2972         }
2973
2974         ll_finish_md_op_data(op_data);
2975
2976         RETURN(rc);
2977 }
2978
2979 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2980 {
2981         ENTRY;
2982
2983         RETURN(-ENOSYS);
2984 }
2985
2986 /**
2987  * test if some locks matching bits and l_req_mode are acquired
2988  * - bits can be in different locks
2989  * - if found clear the common lock bits in *bits
2990  * - the bits not found, are kept in *bits
2991  * \param inode [IN]
2992  * \param bits [IN] searched lock bits [IN]
2993  * \param l_req_mode [IN] searched lock mode
2994  * \retval boolean, true iff all bits are found
2995  */
2996 int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2997 {
2998         struct lustre_handle lockh;
2999         ldlm_policy_data_t policy;
3000         ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
3001                                 (LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
3002         struct lu_fid *fid;
3003         __u64 flags;
3004         int i;
3005         ENTRY;
3006
3007         if (!inode)
3008                RETURN(0);
3009
3010         fid = &ll_i2info(inode)->lli_fid;
3011         CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
3012                ldlm_lockname[mode]);
3013
3014         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
3015         for (i = 0; i <= MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
3016                 policy.l_inodebits.bits = *bits & (1 << i);
3017                 if (policy.l_inodebits.bits == 0)
3018                         continue;
3019
3020                 if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
3021                                   &policy, mode, &lockh)) {
3022                         struct ldlm_lock *lock;
3023
3024                         lock = ldlm_handle2lock(&lockh);
3025                         if (lock) {
3026                                 *bits &=
3027                                       ~(lock->l_policy_data.l_inodebits.bits);
3028                                 LDLM_LOCK_PUT(lock);
3029                         } else {
3030                                 *bits &= ~policy.l_inodebits.bits;
3031                         }
3032                 }
3033         }
3034         RETURN(*bits == 0);
3035 }
3036
3037 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
3038                             struct lustre_handle *lockh, __u64 flags,
3039                             ldlm_mode_t mode)
3040 {
3041         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
3042         struct lu_fid *fid;
3043         ldlm_mode_t rc;
3044         ENTRY;
3045
3046         fid = &ll_i2info(inode)->lli_fid;
3047         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
3048
3049         rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
3050                            fid, LDLM_IBITS, &policy, mode, lockh);
3051
3052         RETURN(rc);
3053 }
3054
3055 static int ll_inode_revalidate_fini(struct inode *inode, int rc)
3056 {
3057         /* Already unlinked. Just update nlink and return success */
3058         if (rc == -ENOENT) {
3059                 clear_nlink(inode);
3060                 /* This path cannot be hit for regular files unless in
3061                  * case of obscure races, so no need to to validate
3062                  * size. */
3063                 if (!S_ISREG(inode->i_mode) && !S_ISDIR(inode->i_mode))
3064                         return 0;
3065         } else if (rc != 0) {
3066                 CERROR("%s: revalidate FID "DFID" error: rc = %d\n",
3067                        ll_get_fsname(inode->i_sb, NULL, 0),
3068                        PFID(ll_inode2fid(inode)), rc);
3069         }
3070
3071         return rc;
3072 }
3073
3074 int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
3075                              __u64 ibits)
3076 {
3077         struct inode *inode = dentry->d_inode;
3078         struct ptlrpc_request *req = NULL;
3079         struct obd_export *exp;
3080         int rc = 0;
3081         ENTRY;
3082
3083         LASSERT(inode != NULL);
3084
3085         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
3086                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
3087
3088         exp = ll_i2mdexp(inode);
3089
3090         /* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
3091          *      But under CMD case, it caused some lock issues, should be fixed
3092          *      with new CMD ibits lock. See bug 12718 */
3093         if (exp_connect_flags(exp) & OBD_CONNECT_ATTRFID) {
3094                 struct lookup_intent oit = { .it_op = IT_GETATTR };
3095                 struct md_op_data *op_data;
3096
3097                 if (ibits == MDS_INODELOCK_LOOKUP)
3098                         oit.it_op = IT_LOOKUP;
3099
3100                 /* Call getattr by fid, so do not provide name at all. */
3101                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
3102                                              dentry->d_inode, NULL, 0, 0,
3103                                              LUSTRE_OPC_ANY, NULL);
3104                 if (IS_ERR(op_data))
3105                         RETURN(PTR_ERR(op_data));
3106
3107                 oit.it_create_mode |= M_CHECK_STALE;
3108                 rc = md_intent_lock(exp, op_data, NULL, 0,
3109                                     /* we are not interested in name
3110                                        based lookup */
3111                                     &oit, 0, &req,
3112                                     ll_md_blocking_ast, 0);
3113                 ll_finish_md_op_data(op_data);
3114                 oit.it_create_mode &= ~M_CHECK_STALE;
3115                 if (rc < 0) {
3116                         rc = ll_inode_revalidate_fini(inode, rc);
3117                         GOTO (out, rc);
3118                 }
3119
3120                 rc = ll_revalidate_it_finish(req, &oit, dentry);
3121                 if (rc != 0) {
3122                         ll_intent_release(&oit);
3123                         GOTO(out, rc);
3124                 }
3125
3126                 /* Unlinked? Unhash dentry, so it is not picked up later by
3127                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
3128                    here to preserve get_cwd functionality on 2.6.
3129                    Bug 10503 */
3130                 if (!dentry->d_inode->i_nlink)
3131                         d_lustre_invalidate(dentry, 0);
3132
3133                 ll_lookup_finish_locks(&oit, dentry);
3134         } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
3135                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
3136                 obd_valid valid = OBD_MD_FLGETATTR;
3137                 struct md_op_data *op_data;
3138                 int ealen = 0;
3139
3140                 if (S_ISREG(inode->i_mode)) {
3141                         rc = ll_get_max_mdsize(sbi, &ealen);
3142                         if (rc)
3143                                 RETURN(rc);
3144                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
3145                 }
3146
3147                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
3148                                              0, ealen, LUSTRE_OPC_ANY,
3149                                              NULL);
3150                 if (IS_ERR(op_data))
3151                         RETURN(PTR_ERR(op_data));
3152
3153                 op_data->op_valid = valid;
3154                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
3155                  * capa for this inode. Because we only keep capas of dirs
3156                  * fresh. */
3157                 rc = md_getattr(sbi->ll_md_exp, op_data, &req);
3158                 ll_finish_md_op_data(op_data);
3159                 if (rc) {
3160                         rc = ll_inode_revalidate_fini(inode, rc);
3161                         RETURN(rc);
3162                 }
3163
3164                 rc = ll_prep_inode(&inode, req, NULL, NULL);
3165         }
3166 out:
3167         ptlrpc_req_finished(req);
3168         return rc;
3169 }
3170
3171 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
3172                            __u64 ibits)
3173 {
3174         struct inode    *inode = dentry->d_inode;
3175         int              rc;
3176         ENTRY;
3177
3178         rc = __ll_inode_revalidate_it(dentry, it, ibits);
3179         if (rc != 0)
3180                 RETURN(rc);
3181
3182         /* if object isn't regular file, don't validate size */
3183         if (!S_ISREG(inode->i_mode)) {
3184                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
3185                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
3186                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
3187         } else {
3188                 /* In case of restore, the MDT has the right size and has
3189                  * already send it back without granting the layout lock,
3190                  * inode is up-to-date so glimpse is useless.
3191                  * Also to glimpse we need the layout, in case of a running
3192                  * restore the MDT holds the layout lock so the glimpse will
3193                  * block up to the end of restore (getattr will block)
3194                  */
3195                 if (!(ll_i2info(inode)->lli_flags & LLIF_FILE_RESTORING))
3196                         rc = ll_glimpse_size(inode);
3197         }
3198         RETURN(rc);
3199 }
3200
3201 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
3202                   struct lookup_intent *it, struct kstat *stat)
3203 {
3204         struct inode *inode = de->d_inode;
3205         struct ll_sb_info *sbi = ll_i2sbi(inode);
3206         struct ll_inode_info *lli = ll_i2info(inode);
3207         int res = 0;
3208
3209         res = ll_inode_revalidate_it(de, it, MDS_INODELOCK_UPDATE |
3210                                              MDS_INODELOCK_LOOKUP);
3211         ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
3212
3213         if (res)
3214                 return res;
3215
3216         stat->dev = inode->i_sb->s_dev;
3217         if (ll_need_32bit_api(sbi))
3218                 stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
3219         else
3220                 stat->ino = inode->i_ino;
3221         stat->mode = inode->i_mode;
3222         stat->nlink = inode->i_nlink;
3223         stat->uid = inode->i_uid;
3224         stat->gid = inode->i_gid;
3225         stat->rdev = inode->i_rdev;
3226         stat->atime = inode->i_atime;
3227         stat->mtime = inode->i_mtime;
3228         stat->ctime = inode->i_ctime;
3229         stat->blksize = 1 << inode->i_blkbits;
3230
3231         stat->size = i_size_read(inode);
3232         stat->blocks = inode->i_blocks;
3233
3234         return 0;
3235 }
3236 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3237 {
3238         struct lookup_intent it = { .it_op = IT_GETATTR };
3239
3240         return ll_getattr_it(mnt, de, &it, stat);
3241 }
3242
3243 int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
3244                 __u64 start, __u64 len)
3245 {
3246         int rc;
3247         size_t num_bytes;
3248         struct ll_user_fiemap *fiemap;
3249         unsigned int extent_count = fieinfo->fi_extents_max;
3250
3251         num_bytes = sizeof(*fiemap) + (extent_count *
3252                                        sizeof(struct ll_fiemap_extent));
3253         OBD_ALLOC_LARGE(fiemap, num_bytes);
3254
3255         if (fiemap == NULL)
3256                 RETURN(-ENOMEM);
3257
3258         fiemap->fm_flags = fieinfo->fi_flags;
3259         fiemap->fm_extent_count = fieinfo->fi_extents_max;
3260         fiemap->fm_start = start;
3261         fiemap->fm_length = len;
3262         memcpy(&fiemap->fm_extents[0], fieinfo->fi_extents_start,
3263                sizeof(struct ll_fiemap_extent));
3264
3265         rc = ll_do_fiemap(inode, fiemap, num_bytes);
3266
3267         fieinfo->fi_flags = fiemap->fm_flags;
3268         fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
3269         memcpy(fieinfo->fi_extents_start, &fiemap->fm_extents[0],
3270                fiemap->fm_mapped_extents * sizeof(struct ll_fiemap_extent));
3271
3272         OBD_FREE_LARGE(fiemap, num_bytes);
3273         return rc;
3274 }
3275
3276 struct posix_acl * ll_get_acl(struct inode *inode, int type)
3277 {
3278         struct ll_inode_info *lli = ll_i2info(inode);
3279         struct posix_acl *acl = NULL;
3280         ENTRY;
3281
3282         spin_lock(&lli->lli_lock);
3283         /* VFS' acl_permission_check->check_acl will release the refcount */
3284         acl = posix_acl_dup(lli->lli_posix_acl);
3285         spin_unlock(&lli->lli_lock);
3286
3287         RETURN(acl);
3288 }
3289
3290 #ifndef HAVE_GENERIC_PERMISSION_2ARGS
3291 static int
3292 # ifdef HAVE_GENERIC_PERMISSION_4ARGS
3293 ll_check_acl(struct inode *inode, int mask, unsigned int flags)
3294 # else
3295 ll_check_acl(struct inode *inode, int mask)
3296 # endif
3297 {
3298 # ifdef CONFIG_FS_POSIX_ACL
3299         struct posix_acl *acl;
3300         int rc;
3301         ENTRY;
3302
3303 #  ifdef HAVE_GENERIC_PERMISSION_4ARGS
3304         if (flags & IPERM_FLAG_RCU)
3305                 return -ECHILD;
3306 #  endif
3307         acl = ll_get_acl(inode, ACL_TYPE_ACCESS);
3308
3309         if (!acl)
3310                 RETURN(-EAGAIN);
3311
3312         rc = posix_acl_permission(inode, acl, mask);
3313         posix_acl_release(acl);
3314
3315         RETURN(rc);
3316 # else /* !CONFIG_FS_POSIX_ACL */
3317         return -EAGAIN;
3318 # endif /* CONFIG_FS_POSIX_ACL */
3319 }
3320 #endif /* HAVE_GENERIC_PERMISSION_2ARGS */
3321
3322 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
3323 int ll_inode_permission(struct inode *inode, int mask, unsigned int flags)
3324 #else
3325 # ifdef HAVE_INODE_PERMISION_2ARGS
3326 int ll_inode_permission(struct inode *inode, int mask)
3327 # else
3328 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3329 # endif
3330 #endif
3331 {
3332         int rc = 0;
3333         ENTRY;
3334
3335 #ifdef MAY_NOT_BLOCK
3336         if (mask & MAY_NOT_BLOCK)
3337                 return -ECHILD;
3338 #elif defined(HAVE_GENERIC_PERMISSION_4ARGS)
3339         if (flags & IPERM_FLAG_RCU)
3340                 return -ECHILD;
3341 #endif
3342
3343        /* as root inode are NOT getting validated in lookup operation,
3344         * need to do it before permission check. */
3345
3346         if (inode == inode->i_sb->s_root->d_inode) {
3347                 struct lookup_intent it = { .it_op = IT_LOOKUP };
3348
3349                 rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
3350                                               MDS_INODELOCK_LOOKUP);
3351                 if (rc)
3352                         RETURN(rc);
3353         }
3354
3355         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
3356                inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
3357
3358         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3359                 return lustre_check_remote_perm(inode, mask);
3360
3361         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3362         rc = ll_generic_permission(inode, mask, flags, ll_check_acl);
3363
3364         RETURN(rc);
3365 }
3366
3367 /* -o localflock - only provides locally consistent flock locks */
3368 struct file_operations ll_file_operations = {
3369         .read           = ll_file_read,
3370         .aio_read    = ll_file_aio_read,
3371         .write          = ll_file_write,
3372         .aio_write   = ll_file_aio_write,
3373         .unlocked_ioctl = ll_file_ioctl,
3374         .open           = ll_file_open,
3375         .release        = ll_file_release,
3376         .mmap           = ll_file_mmap,
3377         .llseek         = ll_file_seek,
3378         .splice_read    = ll_file_splice_read,
3379         .fsync          = ll_fsync,
3380         .flush          = ll_flush
3381 };
3382
3383 struct file_operations ll_file_operations_flock = {
3384         .read           = ll_file_read,
3385         .aio_read    = ll_file_aio_read,
3386         .write          = ll_file_write,
3387         .aio_write   = ll_file_aio_write,
3388         .unlocked_ioctl = ll_file_ioctl,
3389         .open           = ll_file_open,
3390         .release        = ll_file_release,
3391         .mmap           = ll_file_mmap,
3392         .llseek         = ll_file_seek,
3393         .splice_read    = ll_file_splice_read,
3394         .fsync          = ll_fsync,
3395         .flush          = ll_flush,
3396         .flock          = ll_file_flock,
3397         .lock           = ll_file_flock
3398 };
3399
3400 /* These are for -o noflock - to return ENOSYS on flock calls */
3401 struct file_operations ll_file_operations_noflock = {
3402         .read           = ll_file_read,
3403         .aio_read    = ll_file_aio_read,
3404         .write          = ll_file_write,
3405         .aio_write   = ll_file_aio_write,
3406         .unlocked_ioctl = ll_file_ioctl,
3407         .open           = ll_file_open,
3408         .release        = ll_file_release,
3409         .mmap           = ll_file_mmap,
3410         .llseek         = ll_file_seek,
3411         .splice_read    = ll_file_splice_read,
3412         .fsync          = ll_fsync,
3413         .flush          = ll_flush,
3414         .flock          = ll_file_noflock,
3415         .lock           = ll_file_noflock
3416 };
3417
3418 struct inode_operations ll_file_inode_operations = {
3419         .setattr        = ll_setattr,
3420         .getattr        = ll_getattr,
3421         .permission     = ll_inode_permission,
3422         .setxattr       = ll_setxattr,
3423         .getxattr       = ll_getxattr,
3424         .listxattr      = ll_listxattr,
3425         .removexattr    = ll_removexattr,
3426         .fiemap         = ll_fiemap,
3427 #ifdef HAVE_IOP_GET_ACL
3428         .get_acl        = ll_get_acl,
3429 #endif
3430 };
3431
3432 /* dynamic ioctl number support routins */
3433 static struct llioc_ctl_data {
3434         struct rw_semaphore     ioc_sem;
3435         cfs_list_t              ioc_head;
3436 } llioc = {
3437         __RWSEM_INITIALIZER(llioc.ioc_sem),
3438         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3439 };
3440
3441
3442 struct llioc_data {
3443         cfs_list_t              iocd_list;
3444         unsigned int            iocd_size;
3445         llioc_callback_t        iocd_cb;
3446         unsigned int            iocd_count;
3447         unsigned int            iocd_cmd[0];
3448 };
3449
3450 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3451 {
3452         unsigned int size;
3453         struct llioc_data *in_data = NULL;
3454         ENTRY;
3455
3456         if (cb == NULL || cmd == NULL ||
3457             count > LLIOC_MAX_CMD || count < 0)
3458                 RETURN(NULL);
3459
3460         size = sizeof(*in_data) + count * sizeof(unsigned int);
3461         OBD_ALLOC(in_data, size);
3462         if (in_data == NULL)
3463                 RETURN(NULL);
3464
3465         memset(in_data, 0, sizeof(*in_data));
3466         in_data->iocd_size = size;
3467         in_data->iocd_cb = cb;
3468         in_data->iocd_count = count;
3469         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3470
3471         down_write(&llioc.ioc_sem);
3472         cfs_list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3473         up_write(&llioc.ioc_sem);
3474
3475         RETURN(in_data);
3476 }
3477
3478 void ll_iocontrol_unregister(void *magic)
3479 {
3480         struct llioc_data *tmp;
3481
3482         if (magic == NULL)
3483                 return;
3484
3485         down_write(&llioc.ioc_sem);
3486         cfs_list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3487                 if (tmp == magic) {
3488                         unsigned int size = tmp->iocd_size;
3489
3490                         cfs_list_del(&tmp->iocd_list);
3491                         up_write(&llioc.ioc_sem);
3492
3493                         OBD_FREE(tmp, size);
3494                         return;
3495                 }
3496         }
3497         up_write(&llioc.ioc_sem);
3498
3499         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3500 }
3501
3502 EXPORT_SYMBOL(ll_iocontrol_register);
3503 EXPORT_SYMBOL(ll_iocontrol_unregister);
3504
3505 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
3506                         unsigned int cmd, unsigned long arg, int *rcp)
3507 {
3508         enum llioc_iter ret = LLIOC_CONT;
3509         struct llioc_data *data;
3510         int rc = -EINVAL, i;
3511
3512         down_read(&llioc.ioc_sem);
3513         cfs_list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3514                 for (i = 0; i < data->iocd_count; i++) {
3515                         if (cmd != data->iocd_cmd[i])
3516                                 continue;
3517
3518                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3519                         break;
3520                 }
3521
3522                 if (ret == LLIOC_STOP)
3523                         break;
3524         }
3525         up_read(&llioc.ioc_sem);
3526
3527         if (rcp)
3528                 *rcp = rc;
3529         return ret;
3530 }
3531
3532 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
3533 {
3534         struct ll_inode_info *lli = ll_i2info(inode);
3535         struct cl_env_nest nest;
3536         struct lu_env *env;
3537         int result;
3538         ENTRY;
3539
3540         if (lli->lli_clob == NULL)
3541                 RETURN(0);
3542
3543         env = cl_env_nested_get(&nest);
3544         if (IS_ERR(env))
3545                 RETURN(PTR_ERR(env));
3546
3547         result = cl_conf_set(env, lli->lli_clob, conf);
3548         cl_env_nested_put(&nest, env);
3549
3550         if (conf->coc_opc == OBJECT_CONF_SET) {
3551                 struct ldlm_lock *lock = conf->coc_lock;
3552
3553                 LASSERT(lock != NULL);
3554                 LASSERT(ldlm_has_layout(lock));
3555                 if (result == 0) {
3556                         /* it can only be allowed to match after layout is
3557                          * applied to inode otherwise false layout would be
3558                          * seen. Applying layout shoud happen before dropping
3559                          * the intent lock. */
3560                         ldlm_lock_allow_match(lock);
3561                 }
3562         }
3563         RETURN(result);
3564 }
3565
3566 /* Fetch layout from MDT with getxattr request, if it's not ready yet */
3567 static int ll_layout_fetch(struct inode *inode, struct ldlm_lock *lock)
3568
3569 {
3570         struct ll_sb_info *sbi = ll_i2sbi(inode);
3571         struct obd_capa *oc;
3572         struct ptlrpc_request *req;
3573         struct mdt_body *body;
3574         void *lvbdata;
3575         void *lmm;
3576         int lmmsize;
3577         int rc;
3578         ENTRY;
3579
3580         CDEBUG(D_INODE, DFID" LVB_READY=%d l_lvb_data=%p l_lvb_len=%d\n",
3581                PFID(ll_inode2fid(inode)), !!(lock->l_flags & LDLM_FL_LVB_READY),
3582                lock->l_lvb_data, lock->l_lvb_len);
3583
3584         if ((lock->l_lvb_data != NULL) && (lock->l_flags & LDLM_FL_LVB_READY))
3585                 RETURN(0);
3586
3587         /* if layout lock was granted right away, the layout is returned
3588          * within DLM_LVB of dlm reply; otherwise if the lock was ever
3589          * blocked and then granted via completion ast, we have to fetch
3590          * layout here. Please note that we can't use the LVB buffer in
3591          * completion AST because it doesn't have a large enough buffer */
3592         oc = ll_mdscapa_get(inode);
3593         rc = ll_get_max_mdsize(sbi, &lmmsize);
3594         if (rc == 0)
3595                 rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
3596                                 OBD_MD_FLXATTR, XATTR_NAME_LOV, NULL, 0,
3597                                 lmmsize, 0, &req);
3598         capa_put(oc);
3599         if (rc < 0)
3600                 RETURN(rc);
3601
3602         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
3603         if (body == NULL || body->eadatasize > lmmsize)
3604                 GOTO(out, rc = -EPROTO);
3605
3606         lmmsize = body->eadatasize;
3607         if (lmmsize == 0) /* empty layout */
3608                 GOTO(out, rc = 0);
3609
3610         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA, lmmsize);
3611         if (lmm == NULL)
3612                 GOTO(out, rc = -EFAULT);
3613
3614         OBD_ALLOC_LARGE(lvbdata, lmmsize);
3615         if (lvbdata == NULL)
3616                 GOTO(out, rc = -ENOMEM);
3617
3618         memcpy(lvbdata, lmm, lmmsize);
3619         lock_res_and_lock(lock);
3620         if (lock->l_lvb_data != NULL)
3621                 OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
3622
3623         lock->l_lvb_data = lvbdata;
3624         lock->l_lvb_len = lmmsize;
3625         unlock_res_and_lock(lock);
3626
3627         EXIT;
3628
3629 out:
3630         ptlrpc_req_finished(req);
3631         return rc;
3632 }
3633
3634 /**
3635  * Apply the layout to the inode. Layout lock is held and will be released
3636  * in this function.
3637  */
3638 static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
3639                                 struct inode *inode, __u32 *gen, bool reconf)
3640 {
3641         struct ll_inode_info *lli = ll_i2info(inode);
3642         struct ll_sb_info    *sbi = ll_i2sbi(inode);
3643         struct ldlm_lock *lock;
3644         struct lustre_md md = { NULL };
3645         struct cl_object_conf conf;
3646         int rc = 0;
3647         bool lvb_ready;
3648         bool wait_layout = false;
3649         ENTRY;
3650
3651         LASSERT(lustre_handle_is_used(lockh));
3652
3653         lock = ldlm_handle2lock(lockh);
3654         LASSERT(lock != NULL);
3655         LASSERT(ldlm_has_layout(lock));
3656
3657         LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n",
3658                    inode, PFID(&lli->lli_fid), reconf);
3659
3660         /* in case this is a caching lock and reinstate with new inode */
3661         md_set_lock_data(sbi->ll_md_exp, &lockh->cookie, inode, NULL);
3662
3663         lock_res_and_lock(lock);
3664         lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY);
3665         unlock_res_and_lock(lock);
3666         /* checking lvb_ready is racy but this is okay. The worst case is
3667          * that multi processes may configure the file on the same time. */
3668
3669         if (lvb_ready || !reconf) {
3670                 rc = -ENODATA;
3671                 if (lvb_ready) {
3672                         /* layout_gen must be valid if layout lock is not
3673                          * cancelled and stripe has already set */
3674                         *gen = lli->lli_layout_gen;
3675                         rc = 0;
3676                 }
3677                 GOTO(out, rc);
3678         }
3679
3680         rc = ll_layout_fetch(inode, lock);
3681         if (rc < 0)
3682                 GOTO(out, rc);
3683
3684         /* for layout lock, lmm is returned in lock's lvb.
3685          * lvb_data is immutable if the lock is held so it's safe to access it
3686          * without res lock. See the description in ldlm_lock_decref_internal()
3687          * for the condition to free lvb_data of layout lock */
3688         if (lock->l_lvb_data != NULL) {
3689                 rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
3690                                   lock->l_lvb_data, lock->l_lvb_len);
3691                 if (rc >= 0) {
3692                         *gen = LL_LAYOUT_GEN_EMPTY;
3693                         if (md.lsm != NULL)
3694                                 *gen = md.lsm->lsm_layout_gen;
3695                         rc = 0;
3696                 } else {
3697                         CERROR("%s: file "DFID" unpackmd error: %d\n",
3698                                 ll_get_fsname(inode->i_sb, NULL, 0),
3699                                 PFID(&lli->lli_fid), rc);
3700                 }
3701         }
3702         if (rc < 0)
3703                 GOTO(out, rc);
3704
3705         /* set layout to file. Unlikely this will fail as old layout was
3706          * surely eliminated */
3707         memset(&conf, 0, sizeof conf);
3708         conf.coc_opc = OBJECT_CONF_SET;
3709         conf.coc_inode = inode;
3710         conf.coc_lock = lock;
3711         conf.u.coc_md = &md;
3712         rc = ll_layout_conf(inode, &conf);
3713
3714         if (md.lsm != NULL)
3715                 obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
3716
3717         /* refresh layout failed, need to wait */
3718         wait_layout = rc == -EBUSY;
3719         EXIT;
3720
3721 out:
3722         LDLM_LOCK_PUT(lock);
3723         ldlm_lock_decref(lockh, mode);
3724
3725         /* wait for IO to complete if it's still being used. */
3726         if (wait_layout) {
3727                 CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n",
3728                         ll_get_fsname(inode->i_sb, NULL, 0),
3729                         inode, PFID(&lli->lli_fid));
3730
3731                 memset(&conf, 0, sizeof conf);
3732                 conf.coc_opc = OBJECT_CONF_WAIT;
3733                 conf.coc_inode = inode;
3734                 rc = ll_layout_conf(inode, &conf);
3735                 if (rc == 0)
3736                         rc = -EAGAIN;
3737
3738                 CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n",
3739                         PFID(&lli->lli_fid), rc);
3740         }
3741         RETURN(rc);
3742 }
3743
3744 /**
3745  * This function checks if there exists a LAYOUT lock on the client side,
3746  * or enqueues it if it doesn't have one in cache.
3747  *
3748  * This function will not hold layout lock so it may be revoked any time after
3749  * this function returns. Any operations depend on layout should be redone
3750  * in that case.
3751  *
3752  * This function should be called before lov_io_init() to get an uptodate
3753  * layout version, the caller should save the version number and after IO
3754  * is finished, this function should be called again to verify that layout
3755  * is not changed during IO time.
3756  */
3757 int ll_layout_refresh(struct inode *inode, __u32 *gen)
3758 {
3759         struct ll_inode_info  *lli = ll_i2info(inode);
3760         struct ll_sb_info     *sbi = ll_i2sbi(inode);
3761         struct md_op_data     *op_data;
3762         struct lookup_intent   it;
3763         struct lustre_handle   lockh;
3764         ldlm_mode_t            mode;
3765         struct ldlm_enqueue_info einfo = {
3766                 .ei_type = LDLM_IBITS,
3767                 .ei_mode = LCK_CR,
3768                 .ei_cb_bl = ll_md_blocking_ast,
3769                 .ei_cb_cp = ldlm_completion_ast,
3770         };
3771         int rc;
3772         ENTRY;
3773
3774         *gen = lli->lli_layout_gen;
3775         if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
3776                 RETURN(0);
3777
3778         /* sanity checks */
3779         LASSERT(fid_is_sane(ll_inode2fid(inode)));
3780         LASSERT(S_ISREG(inode->i_mode));
3781
3782         /* mostly layout lock is caching on the local side, so try to match
3783          * it before grabbing layout lock mutex. */
3784         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0,
3785                                LCK_CR | LCK_CW | LCK_PR | LCK_PW);
3786         if (mode != 0) { /* hit cached lock */
3787                 rc = ll_layout_lock_set(&lockh, mode, inode, gen, false);
3788                 if (rc == 0)
3789                         RETURN(0);
3790
3791                 /* better hold lli_layout_mutex to try again otherwise
3792                  * it will have starvation problem. */
3793         }
3794
3795         /* take layout lock mutex to enqueue layout lock exclusively. */
3796         mutex_lock(&lli->lli_layout_mutex);
3797
3798 again:
3799         /* try again. Maybe somebody else has done this. */
3800         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0,
3801                                LCK_CR | LCK_CW | LCK_PR | LCK_PW);
3802         if (mode != 0) { /* hit cached lock */
3803                 rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3804                 if (rc == -EAGAIN)
3805                         goto again;
3806
3807                 mutex_unlock(&lli->lli_layout_mutex);
3808                 RETURN(rc);
3809         }
3810
3811         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
3812                         0, 0, LUSTRE_OPC_ANY, NULL);
3813         if (IS_ERR(op_data)) {
3814                 mutex_unlock(&lli->lli_layout_mutex);
3815                 RETURN(PTR_ERR(op_data));
3816         }
3817
3818         /* have to enqueue one */
3819         memset(&it, 0, sizeof(it));
3820         it.it_op = IT_LAYOUT;
3821         lockh.cookie = 0ULL;
3822
3823         LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file %p/"DFID".\n",
3824                         ll_get_fsname(inode->i_sb, NULL, 0), inode,
3825                         PFID(&lli->lli_fid));
3826
3827         rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
3828                         NULL, 0, NULL, 0);
3829         if (it.d.lustre.it_data != NULL)
3830                 ptlrpc_req_finished(it.d.lustre.it_data);
3831         it.d.lustre.it_data = NULL;
3832
3833         ll_finish_md_op_data(op_data);
3834
3835         mode = it.d.lustre.it_lock_mode;
3836         it.d.lustre.it_lock_mode = 0;
3837         ll_intent_drop_lock(&it);
3838
3839         if (rc == 0) {
3840                 /* set lock data in case this is a new lock */
3841                 ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
3842                 rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3843                 if (rc == -EAGAIN)
3844                         goto again;
3845         }
3846         mutex_unlock(&lli->lli_layout_mutex);
3847
3848         RETURN(rc);
3849 }
3850
3851 /**
3852  *  This function send a restore request to the MDT
3853  */
3854 int ll_layout_restore(struct inode *inode, loff_t offset, __u64 length)
3855 {
3856         struct hsm_user_request *hur;
3857         int                      len, rc;
3858         ENTRY;
3859
3860         len = sizeof(struct hsm_user_request) +
3861               sizeof(struct hsm_user_item);
3862         OBD_ALLOC(hur, len);
3863         if (hur == NULL)
3864                 RETURN(-ENOMEM);
3865
3866         hur->hur_request.hr_action = HUA_RESTORE;
3867         hur->hur_request.hr_archive_id = 0;
3868         hur->hur_request.hr_flags = 0;
3869         memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid,
3870                sizeof(hur->hur_user_item[0].hui_fid));
3871         hur->hur_user_item[0].hui_extent.offset = offset;
3872         hur->hur_user_item[0].hui_extent.length = length;
3873         hur->hur_request.hr_itemcount = 1;
3874         rc = obd_iocontrol(LL_IOC_HSM_REQUEST, cl_i2sbi(inode)->ll_md_exp,
3875                            len, hur, NULL);
3876         OBD_FREE(hur, len);
3877         RETURN(rc);
3878 }
3879