Whamcloud - gitweb
bump verion to 1.8.1.58 & changelog cleanup
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <linux/pagemap.h>
47 #include <linux/file.h>
48 #include "llite_internal.h"
49 #include <lustre/ll_fiemap.h>
50
51 /* also used by llite/special.c:ll_special_open() */
52 struct ll_file_data *ll_file_data_get(void)
53 {
54         struct ll_file_data *fd;
55
56         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
57         return fd;
58 }
59
60 static void ll_file_data_put(struct ll_file_data *fd)
61 {
62         if (fd != NULL)
63                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
64 }
65
66 static int ll_close_inode_openhandle(struct inode *inode,
67                                      struct obd_client_handle *och)
68 {
69         struct ptlrpc_request *req = NULL;
70         struct obd_device *obd;
71         struct obdo *oa;
72         struct mdc_op_data data = { { 0 } };
73         obd_flag valid;
74         int rc;
75         ENTRY;
76
77         obd = class_exp2obd(ll_i2mdcexp(inode));
78         if (obd == NULL) {
79                 CERROR("Invalid MDC connection handle "LPX64"\n",
80                        ll_i2mdcexp(inode)->exp_handle.h_cookie);
81                 GOTO(out, rc = 0);
82         }
83
84         /*
85          * here we check if this is forced umount. If so this is called on
86          * canceling "open lock" and we do not call mdc_close() in this case, as
87          * it will not be successful, as import is already deactivated.
88          */
89         if (obd->obd_force)
90                 GOTO(out, rc = 0);
91
92         OBDO_ALLOC(oa);
93         if (!oa)
94                 RETURN(-ENOMEM); // XXX We leak openhandle and request here.
95
96         oa->o_id = inode->i_ino;
97         oa->o_valid = OBD_MD_FLID;
98         valid = OBD_MD_FLTYPE | OBD_MD_FLMODE | OBD_MD_FLATIME |
99                 OBD_MD_FLMTIME | OBD_MD_FLCTIME;
100         if (S_ISREG(inode->i_mode))
101                 valid |=  OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
102         obdo_from_inode(oa, inode, valid);
103         if (ll_is_inode_dirty(inode)) {
104                 oa->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
105                 oa->o_valid |= OBD_MD_FLFLAGS;
106         }
107         ll_inode2fid(&data.fid1, inode);
108         rc = mdc_close(ll_i2mdcexp(inode), &data, oa, och, &req);
109         if (rc == EAGAIN) {
110                 /* We are the last writer, so the MDS has instructed us to get
111                  * the file size and any write cookies, then close again. */
112                 ll_queue_done_writing(inode);
113                 rc = 0;
114         } else if (rc) {
115                 CERROR("inode %lu mdc close failed: rc = %d\n",
116                        inode->i_ino, rc);
117         }
118
119         OBDO_FREE(oa);
120
121         if (rc == 0) {
122                 rc = ll_objects_destroy(req, inode);
123                 if (rc)
124                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
125                                inode->i_ino, rc);
126         }
127
128         ptlrpc_req_finished(req); /* This is close request */
129         EXIT;
130 out:
131         mdc_clear_open_replay_data(och);
132
133         return rc;
134 }
135
136 int ll_mdc_real_close(struct inode *inode, int flags)
137 {
138         struct ll_inode_info *lli = ll_i2info(inode);
139         int rc = 0;
140         struct obd_client_handle **och_p;
141         struct obd_client_handle *och;
142         __u64 *och_usecount;
143
144         ENTRY;
145
146         if (flags & FMODE_WRITE) {
147                 och_p = &lli->lli_mds_write_och;
148                 och_usecount = &lli->lli_open_fd_write_count;
149         } else if (flags & FMODE_EXEC) {
150                 och_p = &lli->lli_mds_exec_och;
151                 och_usecount = &lli->lli_open_fd_exec_count;
152          } else {
153                 LASSERT(flags & FMODE_READ);
154                 och_p = &lli->lli_mds_read_och;
155                 och_usecount = &lli->lli_open_fd_read_count;
156         }
157
158         down(&lli->lli_och_sem);
159         if (*och_usecount) { /* There are still users of this handle, so
160                                 skip freeing it. */
161                 up(&lli->lli_och_sem);
162                 RETURN(0);
163         }
164         och=*och_p;
165         *och_p = NULL;
166         up(&lli->lli_och_sem);
167
168         if (och) { /* There might be a race and somebody have freed this och
169                       already */
170                 rc = ll_close_inode_openhandle(inode, och);
171                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
172                 OBD_FREE(och, sizeof *och);
173         }
174
175         RETURN(rc);
176 }
177
178 int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
179                         struct file *file)
180 {
181         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
182         struct ll_inode_info *lli = ll_i2info(inode);
183         int rc = 0;
184         ENTRY;
185
186         /* clear group lock, if present */
187         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
188                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
189                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
190                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
191                                       &fd->fd_cwlockh);
192         }
193
194         /* Let's see if we have good enough OPEN lock on the file and if
195            we can skip talking to MDS */
196         if (file->f_dentry->d_inode) { /* Can this ever be false? */
197                 int lockmode;
198                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
199                 struct lustre_handle lockh;
200                 struct inode *inode = file->f_dentry->d_inode;
201                 struct ldlm_res_id file_res_id;
202
203                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
204                 fid_build_reg_res_name(ll_inode_lu_fid(inode), &file_res_id);
205
206                 down(&lli->lli_och_sem);
207                 if (fd->fd_omode & FMODE_WRITE) {
208                         lockmode = LCK_CW;
209                         LASSERT(lli->lli_open_fd_write_count);
210                         lli->lli_open_fd_write_count--;
211                 } else if (fd->fd_omode & FMODE_EXEC) {
212                         lockmode = LCK_PR;
213                         LASSERT(lli->lli_open_fd_exec_count);
214                         lli->lli_open_fd_exec_count--;
215                 } else {
216                         lockmode = LCK_CR;
217                         LASSERT(lli->lli_open_fd_read_count);
218                         lli->lli_open_fd_read_count--;
219                 }
220                 up(&lli->lli_och_sem);
221
222                 if (!ldlm_lock_match(mdc_exp->exp_obd->obd_namespace, flags,
223                                      &file_res_id, LDLM_IBITS, &policy,lockmode,
224                                      &lockh)) {
225                         rc = ll_mdc_real_close(file->f_dentry->d_inode,
226                                                 fd->fd_omode);
227                 }
228         } else {
229                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
230                        file, file->f_dentry, file->f_dentry->d_name.name);
231         }
232
233         LUSTRE_FPRIVATE(file) = NULL;
234         ll_file_data_put(fd);
235
236         RETURN(rc);
237 }
238
239 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
240
241 /* While this returns an error code, fput() the caller does not, so we need
242  * to make every effort to clean up all of our state here.  Also, applications
243  * rarely check close errors and even if an error is returned they will not
244  * re-try the close call.
245  */
246 int ll_file_release(struct inode *inode, struct file *file)
247 {
248         struct ll_file_data *fd;
249         struct ll_sb_info *sbi = ll_i2sbi(inode);
250         struct ll_inode_info *lli = ll_i2info(inode);
251         struct lov_stripe_md *lsm = lli->lli_smd;
252         int rc;
253         ENTRY;
254
255         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
256                inode->i_generation, inode);
257
258         if (inode->i_sb->s_root != file->f_dentry)
259                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
260         fd = LUSTRE_FPRIVATE(file);
261         LASSERT(fd != NULL);
262
263         /* The last ref on @file, maybe not the the owner pid of statahead.
264          * Different processes can open the same dir, "ll_opendir_key" means:
265          * it is me that should stop the statahead thread. */
266         if (lli->lli_opendir_key == fd && lli->lli_opendir_pid != 0)
267                 ll_stop_statahead(inode, lli->lli_opendir_key);
268
269         if (inode->i_sb->s_root == file->f_dentry) {
270                 LUSTRE_FPRIVATE(file) = NULL;
271                 ll_file_data_put(fd);
272                 RETURN(0);
273         }
274
275         if (lsm)
276                 lov_test_and_clear_async_rc(lsm);
277         lli->lli_async_rc = 0;
278
279         rc = ll_mdc_close(sbi->ll_mdc_exp, inode, file);
280
281         if (OBD_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, obd_fail_val))
282                 libcfs_debug_dumplog();
283
284         RETURN(rc);
285 }
286
287 static int ll_intent_file_open(struct file *file, void *lmm,
288                                int lmmsize, struct lookup_intent *itp)
289 {
290         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
291         struct mdc_op_data data = { { 0 } };
292         struct dentry *parent = file->f_dentry->d_parent;
293         const char *name = file->f_dentry->d_name.name;
294         const int len = file->f_dentry->d_name.len;
295         struct inode *inode = file->f_dentry->d_inode;
296         struct ptlrpc_request *req;
297         int rc;
298         ENTRY;
299
300         if (!parent)
301                 RETURN(-ENOENT);
302
303         ll_prepare_mdc_op_data(&data, parent->d_inode, inode,
304                                name, len, O_RDWR, NULL);
305
306         /* Usually we come here only for NFSD, and we want open lock.
307            But we can also get here with pre 2.6.15 patchless kernels, and in
308            that case that lock is also ok */
309         /* We can also get here if there was cached open handle in revalidate_it
310          * but it disappeared while we were getting from there to ll_file_open.
311          * But this means this file was closed and immediatelly opened which
312          * makes a good candidate for using OPEN lock */
313         /* If lmmsize & lmm are not 0, we are just setting stripe info
314          * parameters. No need for the open lock */
315         if (!lmm && !lmmsize)
316                 itp->it_flags |= MDS_OPEN_LOCK;
317
318         rc = mdc_intent_lock(sbi->ll_mdc_exp, &data, lmm, lmmsize, itp,
319                               0 /*unused */, &req, ll_mdc_blocking_ast, 0);
320         if (rc == -ESTALE) {
321                 /* reason for keep own exit path - don`t flood log
322                 * with messages with -ESTALE errors.
323                 */
324                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
325                      it_open_error(DISP_OPEN_OPEN, itp))
326                         GOTO(out, rc);
327                 ll_release_openhandle(file->f_dentry, itp);
328                 GOTO(out, rc);
329         }
330
331         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
332                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
333                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
334                 GOTO(out, rc);
335         }
336
337         rc = ll_prep_inode(sbi->ll_osc_exp, &file->f_dentry->d_inode,
338                            req, DLM_REPLY_REC_OFF, NULL);
339         if (itp->d.lustre.it_lock_mode)
340                 mdc_set_lock_data(&itp->d.lustre.it_lock_handle,
341                                   inode, NULL);
342
343 out:
344         ptlrpc_req_finished(itp->d.lustre.it_data);
345         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
346         ll_intent_drop_lock(itp);
347
348         RETURN(rc);
349 }
350
351
352 static void ll_och_fill(struct ll_inode_info *lli, struct lookup_intent *it,
353                         struct obd_client_handle *och)
354 {
355         struct ptlrpc_request *req = it->d.lustre.it_data;
356         struct mds_body *body;
357
358         LASSERT(och);
359
360         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
361         LASSERT(body != NULL);                  /* reply already checked out */
362         /* and swabbed in mdc_enqueue */
363         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
364
365         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
366         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
367         lli->lli_io_epoch = body->io_epoch;
368
369         mdc_set_open_replay_data(och, it->d.lustre.it_data);
370 }
371
372 int ll_local_open(struct file *file, struct lookup_intent *it,
373                   struct ll_file_data *fd, struct obd_client_handle *och)
374 {
375         ENTRY;
376
377         LASSERT(!LUSTRE_FPRIVATE(file));
378
379         LASSERT(fd != NULL);
380
381         if (och)
382                 ll_och_fill(ll_i2info(file->f_dentry->d_inode), it, och);
383         LUSTRE_FPRIVATE(file) = fd;
384         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
385         fd->fd_omode = it->it_flags;
386
387         RETURN(0);
388 }
389
390 /* Open a file, and (for the very first open) create objects on the OSTs at
391  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
392  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
393  * lli_open_sem to ensure no other process will create objects, send the
394  * stripe MD to the MDS, or try to destroy the objects if that fails.
395  *
396  * If we already have the stripe MD locally then we don't request it in
397  * mdc_open(), by passing a lmm_size = 0.
398  *
399  * It is up to the application to ensure no other processes open this file
400  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
401  * used.  We might be able to avoid races of that sort by getting lli_open_sem
402  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
403  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
404  */
405 int ll_file_open(struct inode *inode, struct file *file)
406 {
407         struct ll_inode_info *lli = ll_i2info(inode);
408         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
409                                           .it_flags = file->f_flags };
410         struct lov_stripe_md *lsm;
411         struct ptlrpc_request *req = NULL;
412         struct obd_client_handle **och_p;
413         __u64 *och_usecount;
414         struct ll_file_data *fd;
415         int rc = 0, opendir_set = 0;
416         ENTRY;
417
418         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
419                inode->i_generation, inode, file->f_flags);
420
421 #ifdef HAVE_VFS_INTENT_PATCHES
422         it = file->f_it;
423 #else
424         it = file->private_data; /* XXX: compat macro */
425         file->private_data = NULL; /* prevent ll_local_open assertion */
426 #endif
427
428         fd = ll_file_data_get();
429         if (fd == NULL)
430                 RETURN(-ENOMEM);
431
432         if (S_ISDIR(inode->i_mode)) {
433                 spin_lock(&lli->lli_lock);
434                 if (lli->lli_opendir_key == NULL && lli->lli_opendir_pid == 0) {
435                         LASSERT(lli->lli_sai == NULL);
436                         lli->lli_opendir_key = fd;
437                         lli->lli_opendir_pid = cfs_curproc_pid();
438                         opendir_set = 1;
439                 }
440                 spin_unlock(&lli->lli_lock);
441         }
442
443         if (inode->i_sb->s_root == file->f_dentry) {
444                 LUSTRE_FPRIVATE(file) = fd;
445                 RETURN(0);
446         }
447
448         if (!it || !it->d.lustre.it_disposition) {
449                 /* Convert f_flags into access mode. We cannot use file->f_mode,
450                  * because everything but O_ACCMODE mask was stripped from it */
451                 if ((oit.it_flags + 1) & O_ACCMODE)
452                         oit.it_flags++;
453                 if (file->f_flags & O_TRUNC)
454                         oit.it_flags |= FMODE_WRITE;
455
456                 /* kernel only call f_op->open in dentry_open.  filp_open calls
457                  * dentry_open after call to open_namei that checks permissions.
458                  * Only nfsd_open call dentry_open directly without checking
459                  * permissions and because of that this code below is safe. */
460                 if (oit.it_flags & FMODE_WRITE)
461                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
462
463                 /* We do not want O_EXCL here, presumably we opened the file
464                  * already? XXX - NFS implications? */
465                 oit.it_flags &= ~O_EXCL;
466
467                 /* bug20584, if "it_flags" contains O_CREAT, the file will be
468                  * created if necessary, then "IT_CREAT" should be set to keep
469                  * consistent with it */
470                 if (oit.it_flags & O_CREAT)
471                         oit.it_op |= IT_CREAT;
472
473                 it = &oit;
474         }
475
476         if (ll_i2sbi(inode)->ll_direct_io_default &&
477             !S_ISDIR(inode->i_mode) &&
478             !(it->it_flags & FMODE_EXEC))
479                 file->f_flags |= O_DIRECT;
480
481 restart:
482         /* Let's see if we have file open on MDS already. */
483         if (it->it_flags & FMODE_WRITE) {
484                 och_p = &lli->lli_mds_write_och;
485                 och_usecount = &lli->lli_open_fd_write_count;
486         } else if (it->it_flags & FMODE_EXEC) {
487                 och_p = &lli->lli_mds_exec_och;
488                 och_usecount = &lli->lli_open_fd_exec_count;
489          } else {
490                 och_p = &lli->lli_mds_read_och;
491                 och_usecount = &lli->lli_open_fd_read_count;
492         }
493
494         LASSERTF(it->it_flags != 0, "it %p dist %d \n", it,
495                  it->d.lustre.it_disposition);
496
497         down(&lli->lli_och_sem);
498         if (*och_p) { /* Open handle is present */
499                 if (it_disposition(it, DISP_OPEN_OPEN)) {
500                         /* Well, there's extra open request that we do not need,
501                            let's close it somehow. This will decref request. */
502                         rc = it_open_error(DISP_OPEN_OPEN, it);
503                         if (rc) {
504                                 up(&lli->lli_och_sem);
505                                 ll_file_data_put(fd);
506                                 GOTO(out_openerr, rc);
507                         }
508                         ll_release_openhandle(file->f_dentry, it);
509                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
510                                              LPROC_LL_OPEN);
511                 }
512                 (*och_usecount)++;
513
514                 rc = ll_local_open(file, it, fd, NULL);
515
516                 LASSERTF(rc == 0, "rc = %d\n", rc);
517         } else {
518                 LASSERT(*och_usecount == 0);
519                 if (!it->d.lustre.it_disposition) {
520                         /* We cannot just request lock handle now, new ELC code
521                            means that one of other OPEN locks for this file
522                            could be cancelled, and since blocking ast handler
523                            would attempt to grab och_sem as well, that would
524                            result in a deadlock */
525                         up(&lli->lli_och_sem);
526                         it->it_create_mode |= M_CHECK_STALE;
527                         rc = ll_intent_file_open(file, NULL, 0, it);
528                         it->it_create_mode &= ~M_CHECK_STALE;
529                         if (rc) {
530                                 ll_file_data_put(fd);
531                                 GOTO(out_openerr, rc);
532                         }
533                         goto restart;
534                 }
535
536                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
537                 if (!*och_p) {
538                         ll_file_data_put(fd);
539                         GOTO(out_och_free, rc = -ENOMEM);
540                 }
541                 (*och_usecount)++;
542                req = it->d.lustre.it_data;
543
544                 /* mdc_intent_lock() didn't get a request ref if there was an
545                  * open error, so don't do cleanup on the request here
546                  * (bug 3430) */
547                 /* XXX (green): Should not we bail out on any error here, not
548                  * just open error? */
549                 rc = it_open_error(DISP_OPEN_OPEN, it);
550                 if (rc) {
551                         ll_file_data_put(fd);
552                         GOTO(out_och_free, rc);
553                 }
554
555                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
556                 rc = ll_local_open(file, it, fd, *och_p);
557                 LASSERTF(rc == 0, "rc = %d\n", rc);
558         }
559         up(&lli->lli_och_sem);
560
561         /* Must do this outside lli_och_sem lock to prevent deadlock where
562            different kind of OPEN lock for this same inode gets cancelled
563            by ldlm_cancel_lru */
564         if (!S_ISREG(inode->i_mode))
565                 GOTO(out, rc);
566
567         lsm = lli->lli_smd;
568         if (lsm == NULL) {
569                 if (file->f_flags & O_LOV_DELAY_CREATE ||
570                     !(file->f_mode & FMODE_WRITE)) {
571                         CDEBUG(D_INODE, "object creation was delayed\n");
572                         GOTO(out, rc);
573                 }
574         }
575         file->f_flags &= ~O_LOV_DELAY_CREATE;
576         GOTO(out, rc);
577  out:
578         ptlrpc_req_finished(req);
579         if (req)
580                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
581         if (rc == 0) {
582                 ll_open_complete(inode);
583         } else {
584 out_och_free:
585                 if (*och_p) {
586                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
587                         *och_p = NULL; /* OBD_FREE writes some magic there */
588                         (*och_usecount)--;
589                 }
590                 up(&lli->lli_och_sem);
591 out_openerr:
592                 if (opendir_set != 0)
593                         ll_stop_statahead(inode, lli->lli_opendir_key);
594         }
595
596         return rc;
597 }
598
599 /* Fills the obdo with the attributes for the inode defined by lsm */
600 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
601                    struct obdo *oa)
602 {
603         struct ptlrpc_request_set *set;
604         struct obd_info oinfo = { { { 0 } } };
605         int rc;
606         ENTRY;
607
608         LASSERT(lsm != NULL);
609
610         memset(oa, 0, sizeof *oa);
611         oinfo.oi_md = lsm;
612         oinfo.oi_oa = oa;
613         oa->o_id = lsm->lsm_object_id;
614         oa->o_gr = lsm->lsm_object_gr;
615         oa->o_mode = S_IFREG;
616         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
617                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
618                 OBD_MD_FLCTIME | OBD_MD_FLGROUP;
619
620         set = ptlrpc_prep_set();
621         if (set == NULL) {
622                 rc = -ENOMEM;
623         } else {
624                 rc = obd_getattr_async(exp, &oinfo, set);
625                 if (rc == 0)
626                         rc = ptlrpc_set_wait(set);
627                 ptlrpc_set_destroy(set);
628         }
629         if (rc)
630                 RETURN(rc);
631
632         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
633                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
634         RETURN(0);
635 }
636
637 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
638 {
639         struct ll_inode_info *lli = ll_i2info(inode);
640         struct lov_stripe_md *lsm = lli->lli_smd;
641         struct obd_export *exp = ll_i2obdexp(inode);
642         struct {
643                 char name[16];
644                 struct ldlm_lock *lock;
645         } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock };
646         __u32 stripe, vallen = sizeof(stripe);
647         struct lov_oinfo *loinfo;
648         int rc;
649         ENTRY;
650
651         if (lsm->lsm_stripe_count == 1)
652                 GOTO(check, stripe = 0);
653
654         /* get our offset in the lov */
655         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe, lsm);
656         if (rc != 0) {
657                 CERROR("obd_get_info: rc = %d\n", rc);
658                 RETURN(rc);
659         }
660         LASSERT(stripe < lsm->lsm_stripe_count);
661
662 check:
663         loinfo = lsm->lsm_oinfo[stripe];
664         if (!osc_res_name_eq(loinfo->loi_id, loinfo->loi_gr,
665                             &lock->l_resource->lr_name)) {
666                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
667                            loinfo->loi_id, loinfo->loi_gr);
668                 RETURN(-ELDLM_NO_LOCK_DATA);
669         }
670
671         RETURN(stripe);
672 }
673
674 /* Get extra page reference to ensure it is not going away */
675 void ll_pin_extent_cb(void *data)
676 {
677         struct page *page = data;
678
679         page_cache_get(page);
680
681         return;
682 }
683 /* Flush the page from page cache for an extent as its canceled.
684  * Page to remove is delivered as @data.
685  *
686  * No one can dirty the extent until we've finished our work and they cannot
687  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
688  * but other kernel actors could have pages locked.
689  *
690  * If @discard is set, there is no need to write the page if it is dirty.
691  *
692  * Called with the DLM lock held. */
693 int ll_page_removal_cb(void *data, int discard)
694 {
695         int rc;
696         struct page *page = data;
697         struct address_space *mapping;
698
699         ENTRY;
700
701         /* We have page reference already from ll_pin_page */
702         lock_page(page);
703
704         /* Already truncated by somebody */
705         if (!page->mapping)
706                 GOTO(out, rc = 0);
707
708         mapping = page->mapping;
709
710         ll_teardown_mmaps(mapping,
711                           (__u64)page->index << PAGE_CACHE_SHIFT,
712                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
713                                                               ~PAGE_CACHE_MASK);
714         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
715         if (!discard && PageWriteback(page))
716                 wait_on_page_writeback(page);
717
718         if (!discard && clear_page_dirty_for_io(page)) {
719                 rc = ll_call_writepage(page->mapping->host, page);
720                 /* either waiting for io to complete or reacquiring
721                  * the lock that the failed writepage released */
722                 lock_page(page);
723                 wait_on_page_writeback(page);
724                 if (rc < 0) {
725                         CERROR("writepage inode %lu(%p) of page %p "
726                                "failed: %d\n", mapping->host->i_ino,
727                                mapping->host, page, rc);
728                         if (rc == -ENOSPC)
729                                 set_bit(AS_ENOSPC, &mapping->flags);
730                         else
731                                 set_bit(AS_EIO, &mapping->flags);
732                 }
733         }
734         if (page->mapping != NULL) {
735                 struct ll_async_page *llap = llap_cast_private(page);
736                 // checking again to account for writeback's lock_page()
737                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
738                 if (llap)
739                         ll_ra_accounting(llap, page->mapping);
740                 ll_truncate_complete_page(page);
741         }
742         EXIT;
743 out:
744         LASSERT(!PageWriteback(page));
745         unlock_page(page);
746         page_cache_release(page);
747
748         return 0;
749 }
750
751 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
752                              void *data, int flag)
753 {
754         struct inode *inode;
755         struct ll_inode_info *lli;
756         struct lov_stripe_md *lsm;
757         int stripe;
758         __u64 kms;
759
760         ENTRY;
761
762         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
763                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
764                 LBUG();
765         }
766
767         inode = ll_inode_from_lock(lock);
768         if (inode == NULL)
769                 RETURN(0);
770         lli = ll_i2info(inode);
771         if (lli == NULL)
772                 GOTO(iput, 0);
773         if (lli->lli_smd == NULL)
774                 GOTO(iput, 0);
775         lsm = lli->lli_smd;
776
777         stripe = ll_lock_to_stripe_offset(inode, lock);
778         if (stripe < 0)
779                 GOTO(iput, 0);
780
781         lov_stripe_lock(lsm);
782         lock_res_and_lock(lock);
783         kms = ldlm_extent_shift_kms(lock,
784                                     lsm->lsm_oinfo[stripe]->loi_kms);
785
786         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
787                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
788                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
789         lsm->lsm_oinfo[stripe]->loi_kms = kms;
790         unlock_res_and_lock(lock);
791         lov_stripe_unlock(lsm);
792         ll_try_done_writing(inode);
793         EXIT;
794 iput:
795         iput(inode);
796
797         return 0;
798 }
799
800 #if 0
801 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
802 {
803         /* XXX ALLOCATE - 160 bytes */
804         struct inode *inode = ll_inode_from_lock(lock);
805         struct ll_inode_info *lli = ll_i2info(inode);
806         struct lustre_handle lockh = { 0 };
807         struct ost_lvb *lvb;
808         int stripe;
809         ENTRY;
810
811         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
812                      LDLM_FL_BLOCK_CONV)) {
813                 LBUG(); /* not expecting any blocked async locks yet */
814                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
815                            "lock, returning");
816                 ldlm_lock_dump(D_OTHER, lock, 0);
817                 ldlm_reprocess_all(lock->l_resource);
818                 RETURN(0);
819         }
820
821         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
822
823         stripe = ll_lock_to_stripe_offset(inode, lock);
824         if (stripe < 0)
825                 goto iput;
826
827         if (lock->l_lvb_len) {
828                 struct lov_stripe_md *lsm = lli->lli_smd;
829                 __u64 kms;
830                 lvb = lock->l_lvb_data;
831                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
832
833                 lock_res_and_lock(lock);
834                 ll_inode_size_lock(inode, 1);
835                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
836                 kms = ldlm_extent_shift_kms(NULL, kms);
837                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
838                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
839                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
840                 lsm->lsm_oinfo[stripe].loi_kms = kms;
841                 ll_inode_size_unlock(inode, 1);
842                 unlock_res_and_lock(lock);
843         }
844
845 iput:
846         iput(inode);
847         wake_up(&lock->l_waitq);
848
849         ldlm_lock2handle(lock, &lockh);
850         ldlm_lock_decref(&lockh, LCK_PR);
851         RETURN(0);
852 }
853 #endif
854
855 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
856 {
857         struct ptlrpc_request *req = reqp;
858         struct inode *inode = ll_inode_from_lock(lock);
859         struct ll_inode_info *lli;
860         struct lov_stripe_md *lsm;
861         struct ost_lvb *lvb;
862         int rc, stripe;
863         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
864         ENTRY;
865
866         if (inode == NULL)
867                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
868         lli = ll_i2info(inode);
869         if (lli == NULL)
870                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
871         lsm = lli->lli_smd;
872         if (lsm == NULL)
873                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
874
875         /* First, find out which stripe index this lock corresponds to. */
876         stripe = ll_lock_to_stripe_offset(inode, lock);
877         if (stripe < 0)
878                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
879
880         rc = lustre_pack_reply(req, 2, size, NULL);
881         if (rc)
882                 GOTO(iput, rc);
883
884         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
885         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
886         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
887         lvb->lvb_atime = LTIME_S(inode->i_atime);
888         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
889
890         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
891                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
892                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_atime,
893                    lvb->lvb_mtime, lvb->lvb_ctime);
894  iput:
895         iput(inode);
896
897  out:
898         /* These errors are normal races, so we don't want to fill the console
899          * with messages by calling ptlrpc_error() */
900         if (rc == -ELDLM_NO_LOCK_DATA)
901                 lustre_pack_reply(req, 1, NULL, NULL);
902
903         req->rq_status = rc;
904         return rc;
905 }
906
907 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
908                      lstat_t *st)
909 {
910         struct lustre_handle lockh = { 0 };
911         struct ldlm_enqueue_info einfo = { 0 };
912         struct obd_info oinfo = { { { 0 } } };
913         struct ost_lvb lvb;
914         int rc;
915
916         ENTRY;
917
918         einfo.ei_type = LDLM_EXTENT;
919         einfo.ei_mode = LCK_PR;
920         einfo.ei_cb_bl = osc_extent_blocking_cb;
921         einfo.ei_cb_cp = ldlm_completion_ast;
922         einfo.ei_cb_gl = ll_glimpse_callback;
923         einfo.ei_cbdata = NULL;
924
925         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
926         oinfo.oi_lockh = &lockh;
927         oinfo.oi_md = lsm;
928         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
929
930         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
931         if (rc == -ENOENT)
932                 RETURN(rc);
933         if (rc != 0) {
934                 CERROR("obd_enqueue returned rc %d, "
935                        "returning -EIO\n", rc);
936                 RETURN(rc > 0 ? -EIO : rc);
937         }
938
939         lov_stripe_lock(lsm);
940         memset(&lvb, 0, sizeof(lvb));
941         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 0);
942         st->st_size = lvb.lvb_size;
943         st->st_blocks = lvb.lvb_blocks;
944         st->st_mtime = lvb.lvb_mtime;
945         st->st_atime = lvb.lvb_atime;
946         st->st_ctime = lvb.lvb_ctime;
947         lov_stripe_unlock(lsm);
948
949         RETURN(rc);
950 }
951
952 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
953  * file (because it prefers KMS over RSS when larger) */
954 int ll_glimpse_size(struct inode *inode, int ast_flags)
955 {
956         struct ll_inode_info *lli = ll_i2info(inode);
957         struct ll_sb_info *sbi = ll_i2sbi(inode);
958         struct lustre_handle lockh = { 0 };
959         struct ldlm_enqueue_info einfo = { 0 };
960         struct obd_info oinfo = { { { 0 } } };
961         struct ost_lvb lvb;
962         int rc;
963         ENTRY;
964
965         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
966
967         if (!lli->lli_smd) {
968                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
969                 RETURN(0);
970         }
971
972         /* NOTE: this looks like DLM lock request, but it may not be one. Due
973          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
974          *       won't revoke any conflicting DLM locks held. Instead,
975          *       ll_glimpse_callback() will be called on each client
976          *       holding a DLM lock against this file, and resulting size
977          *       will be returned for each stripe. DLM lock on [0, EOF] is
978          *       acquired only if there were no conflicting locks. */
979         einfo.ei_type = LDLM_EXTENT;
980         einfo.ei_mode = LCK_PR;
981         einfo.ei_cb_bl = osc_extent_blocking_cb;
982         einfo.ei_cb_cp = ldlm_completion_ast;
983         einfo.ei_cb_gl = ll_glimpse_callback;
984         einfo.ei_cbdata = inode;
985
986         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
987         oinfo.oi_lockh = &lockh;
988         oinfo.oi_md = lli->lli_smd;
989         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
990
991         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
992         if (rc == -ENOENT)
993                 RETURN(rc);
994         if (rc != 0) {
995                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
996                 RETURN(rc > 0 ? -EIO : rc);
997         }
998
999         ll_inode_size_lock(inode, 1);
1000         inode_init_lvb(inode, &lvb);
1001         /* merge timestamps the most resently obtained from mds with
1002            timestamps obtained from osts */
1003         lvb.lvb_atime = lli->lli_lvb.lvb_atime;
1004         lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
1005         lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
1006         rc = obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0);
1007         i_size_write(inode, lvb.lvb_size);
1008         inode->i_blocks = lvb.lvb_blocks;
1009         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1010         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1011         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1012         ll_inode_size_unlock(inode, 1);
1013
1014         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1015                i_size_read(inode), (long long)inode->i_blocks);
1016
1017         RETURN(rc);
1018 }
1019
1020 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1021                    struct lov_stripe_md *lsm, int mode,
1022                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1023                    int ast_flags)
1024 {
1025         struct ll_sb_info *sbi = ll_i2sbi(inode);
1026         struct ost_lvb lvb;
1027         struct ldlm_enqueue_info einfo = { 0 };
1028         struct obd_info oinfo = { { { 0 } } };
1029         int rc;
1030         ENTRY;
1031
1032         LASSERT(!lustre_handle_is_used(lockh));
1033         LASSERT(lsm != NULL);
1034
1035         /* don't drop the mmapped file to LRU */
1036         if (mapping_mapped(inode->i_mapping))
1037                 ast_flags |= LDLM_FL_NO_LRU;
1038
1039         /* XXX phil: can we do this?  won't it screw the file size up? */
1040         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1041             (sbi->ll_flags & LL_SBI_NOLCK))
1042                 RETURN(0);
1043
1044         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1045                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1046
1047         einfo.ei_type = LDLM_EXTENT;
1048         einfo.ei_mode = mode;
1049         einfo.ei_cb_bl = osc_extent_blocking_cb;
1050         einfo.ei_cb_cp = ldlm_completion_ast;
1051         einfo.ei_cb_gl = ll_glimpse_callback;
1052         einfo.ei_cbdata = inode;
1053
1054         oinfo.oi_policy = *policy;
1055         oinfo.oi_lockh = lockh;
1056         oinfo.oi_md = lsm;
1057         oinfo.oi_flags = ast_flags;
1058
1059         rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo, NULL);
1060         *policy = oinfo.oi_policy;
1061         if (rc > 0)
1062                 rc = -EIO;
1063
1064         ll_inode_size_lock(inode, 1);
1065         inode_init_lvb(inode, &lvb);
1066         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 1);
1067
1068         if (policy->l_extent.start == 0 &&
1069             policy->l_extent.end == OBD_OBJECT_EOF) {
1070                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1071                  * the kms under both a DLM lock and the
1072                  * ll_inode_size_lock().  If we don't get the
1073                  * ll_inode_size_lock() here we can match the DLM lock and
1074                  * reset i_size from the kms before the truncating path has
1075                  * updated the kms.  generic_file_write can then trust the
1076                  * stale i_size when doing appending writes and effectively
1077                  * cancel the result of the truncate.  Getting the
1078                  * ll_inode_size_lock() after the enqueue maintains the DLM
1079                  * -> ll_inode_size_lock() acquiring order. */
1080                 i_size_write(inode, lvb.lvb_size);
1081                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1082                        inode->i_ino, i_size_read(inode));
1083         }
1084
1085         if (rc == 0) {
1086                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1087                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1088                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1089         }
1090         ll_inode_size_unlock(inode, 1);
1091
1092         RETURN(rc);
1093 }
1094
1095 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1096                      struct lov_stripe_md *lsm, int mode,
1097                      struct lustre_handle *lockh)
1098 {
1099         struct ll_sb_info *sbi = ll_i2sbi(inode);
1100         int rc;
1101         ENTRY;
1102
1103         /* XXX phil: can we do this?  won't it screw the file size up? */
1104         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1105             (sbi->ll_flags & LL_SBI_NOLCK))
1106                 RETURN(0);
1107
1108         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh, 0, 0);
1109
1110         RETURN(rc);
1111 }
1112
1113 static void ll_set_file_contended(struct inode *inode)
1114 {
1115         struct ll_inode_info *lli = ll_i2info(inode);
1116
1117         lli->lli_contention_time = cfs_time_current();
1118         set_bit(LLI_F_CONTENDED, &lli->lli_flags);
1119 }
1120
1121 void ll_clear_file_contended(struct inode *inode)
1122 {
1123         struct ll_inode_info *lli = ll_i2info(inode);
1124
1125         clear_bit(LLI_F_CONTENDED, &lli->lli_flags);
1126 }
1127
1128 static int ll_is_file_contended(struct file *file)
1129 {
1130         struct inode *inode = file->f_dentry->d_inode;
1131         struct ll_inode_info *lli = ll_i2info(inode);
1132         struct ll_sb_info *sbi = ll_i2sbi(inode);
1133         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1134         ENTRY;
1135
1136         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1137                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1138                        " osc connect flags = 0x"LPX64"\n",
1139                        sbi->ll_lco.lco_flags);
1140                 RETURN(0);
1141         }
1142
1143         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1144                 RETURN(0);
1145
1146         /* server-side locking for dio unless LL_FILE_LOCKED_DIRECTIO */
1147         if ((file->f_flags & O_DIRECT) &&
1148             !(fd && (fd->fd_flags & LL_FILE_LOCKED_DIRECTIO)))
1149                 RETURN(1);
1150
1151         /* server-side locking for cached I/O with LL_FILE_LOCKLESS_IO */
1152         if (!(file->f_flags & O_DIRECT) &&
1153             fd && fd->fd_flags & LL_FILE_LOCKLESS_IO)
1154                 RETURN(1);
1155
1156         if (test_bit(LLI_F_CONTENDED, &lli->lli_flags)) {
1157                 cfs_time_t cur_time = cfs_time_current();
1158                 cfs_time_t retry_time;
1159
1160                 retry_time = cfs_time_add(
1161                         lli->lli_contention_time,
1162                         cfs_time_seconds(sbi->ll_contention_time));
1163                 if (cfs_time_after(cur_time, retry_time)) {
1164                         ll_clear_file_contended(inode);
1165                         RETURN(0);
1166                 }
1167                 RETURN(1);
1168         }
1169         RETURN(0);
1170 }
1171
1172 static int ll_file_get_tree_lock_iov(struct ll_lock_tree *tree,
1173                                      struct file *file, const struct iovec *iov,
1174                                      unsigned long nr_segs,
1175                                      obd_off start, obd_off end, int rw)
1176 {
1177         int append;
1178         int tree_locked = 0;
1179         int rc;
1180         struct inode * inode = file->f_dentry->d_inode;
1181         ENTRY;
1182
1183         append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
1184
1185         if (append || !ll_is_file_contended(file)) {
1186                 struct ll_lock_tree_node *node;
1187                 int ast_flags;
1188
1189                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1190                 if (file->f_flags & O_NONBLOCK)
1191                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1192                 node = ll_node_from_inode(inode, start, end,
1193                                           (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
1194                 if (IS_ERR(node)) {
1195                         rc = PTR_ERR(node);
1196                         GOTO(out, rc);
1197                 }
1198                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1199                 rc = ll_tree_lock_iov(tree, node, iov, nr_segs, ast_flags);
1200                 if (rc == 0)
1201                         tree_locked = 1;
1202                 else if (rc == -EUSERS)
1203                         ll_set_file_contended(inode);
1204                 else
1205                         GOTO(out, rc);
1206         }
1207         RETURN(tree_locked);
1208 out:
1209         return rc;
1210 }
1211
1212 /* XXX: exact copy from kernel code (__generic_file_aio_write_nolock from rhel4)
1213  */
1214 static size_t ll_file_get_iov_count(const struct iovec *iov,
1215                                      unsigned long *nr_segs)
1216 {
1217         size_t count = 0;
1218         unsigned long seg;
1219
1220         for (seg = 0; seg < *nr_segs; seg++) {
1221                 const struct iovec *iv = &iov[seg];
1222
1223                 /*
1224                  * If any segment has a negative length, or the cumulative
1225                  * length ever wraps negative then return -EINVAL.
1226                  */
1227                 count += iv->iov_len;
1228                 if (unlikely((ssize_t)(count|iv->iov_len) < 0))
1229                         return -EINVAL;
1230                 if (access_ok(VERIFY_WRITE, iv->iov_base, iv->iov_len))
1231                         continue;
1232                 if (seg == 0)
1233                         return -EFAULT;
1234                 *nr_segs = seg;
1235                 count -= iv->iov_len;   /* This segment is no good */
1236                 break;
1237         }
1238         return count;
1239 }
1240
1241 static int iov_copy_update(unsigned long *nr_segs, const struct iovec **iov_out,
1242                            unsigned long *nrsegs_copy,
1243                            struct iovec *iov_copy, size_t *offset,
1244                            size_t size)
1245 {
1246         int i;
1247         const struct iovec *iov = *iov_out;
1248         for (i = 0; i < *nr_segs;
1249              i++) {
1250                 const struct iovec *iv = &iov[i];
1251                 struct iovec *ivc = &iov_copy[i];
1252                 *ivc = *iv;
1253                 if (i == 0) {
1254                         ivc->iov_len -= *offset;
1255                         ivc->iov_base += *offset;
1256                 }
1257                 if (ivc->iov_len >= size) {
1258                         ivc->iov_len = size;
1259                         if (i == 0)
1260                                 *offset += size;
1261                         else
1262                                 *offset = size;
1263                         break;
1264                 }
1265                 size -= ivc->iov_len;
1266         }
1267         *iov_out += i;
1268         *nr_segs -= i;
1269         *nrsegs_copy = i + 1;
1270
1271         return 0;
1272 }
1273
1274 static int ll_get_short_lock(struct page *page, int rw, obd_off start,
1275                              obd_off end, struct lustre_handle *lockh)
1276 {
1277         struct ll_async_page *llap;
1278         struct obd_export *exp;
1279         struct inode *inode = page->mapping->host;
1280
1281         ENTRY;
1282
1283         exp = ll_i2obdexp(inode);
1284         if (exp == NULL)
1285                 RETURN(0);
1286
1287         llap = llap_cast_private(page);
1288         if (llap == NULL)
1289                 RETURN(0);
1290
1291         RETURN(obd_get_lock(exp, ll_i2info(inode)->lli_smd,
1292                             &llap->llap_cookie, rw, start, end, lockh,
1293                             OBD_FAST_LOCK));
1294 }
1295
1296 static void ll_release_short_lock(struct inode *inode, obd_off end,
1297                                   struct lustre_handle *lockh, int rw)
1298 {
1299         struct obd_export *exp;
1300         int rc;
1301
1302         exp = ll_i2obdexp(inode);
1303         if (exp == NULL)
1304                 return;
1305
1306         rc = obd_cancel(exp, ll_i2info(inode)->lli_smd,
1307                         rw = OBD_BRW_READ ? LCK_PR : LCK_PW, lockh,
1308                         OBD_FAST_LOCK, end);
1309         if (rc < 0)
1310                 CERROR("unlock failed (%d)\n", rc);
1311 }
1312
1313 static inline int ll_file_get_fast_lock(struct file *file,
1314                                         obd_off ppos, obd_off end,
1315                                         const struct iovec *iov,
1316                                         unsigned long nr_segs,
1317                                         struct lustre_handle *lockh,
1318                                         int rw)
1319 {
1320         int rc = 0, seg;
1321         struct page *page;
1322
1323         ENTRY;
1324
1325         /* we would like this read request to be lockfree */
1326         for (seg = 0; seg < nr_segs; seg++) {
1327                 const struct iovec *iv = &iov[seg];
1328                 if (ll_region_mapped((unsigned long)iv->iov_base, iv->iov_len))
1329                         GOTO(out, rc);
1330         }
1331
1332         page = find_lock_page(file->f_dentry->d_inode->i_mapping,
1333                               ppos >> CFS_PAGE_SHIFT);
1334         if (page) {
1335                 if (ll_get_short_lock(page, rw, ppos, end, lockh))
1336                         rc = 1;
1337
1338                 unlock_page(page);
1339                 page_cache_release(page);
1340         }
1341
1342 out:
1343         RETURN(rc);
1344 }
1345
1346 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
1347                                          struct lustre_handle *lockh, int rw)
1348 {
1349         ll_release_short_lock(inode, end, lockh, rw);
1350 }
1351
1352 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
1353                                    obd_off end, const struct iovec *iov,
1354                                    unsigned long nr_segs,
1355                                    struct lustre_handle *lockh,
1356                                    struct ll_lock_tree *tree, int rw)
1357 {
1358         int rc;
1359
1360         ENTRY;
1361
1362         if (ll_file_get_fast_lock(file, ppos, end, iov, nr_segs, lockh, rw))
1363                 RETURN(LL_LOCK_STYLE_FASTLOCK);
1364
1365         rc = ll_file_get_tree_lock_iov(tree, file, iov, nr_segs,
1366                                        ppos, end, rw);
1367         /* rc: 1 for tree lock, 0 for no lock, <0 for error */
1368         switch (rc) {
1369         case 1:
1370                 RETURN(LL_LOCK_STYLE_TREELOCK);
1371         case 0:
1372                 RETURN(LL_LOCK_STYLE_NOLOCK);
1373         }
1374
1375         /* an error happened if we reached this point, rc = -errno here */
1376         RETURN(rc);
1377 }
1378
1379 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
1380                                     enum ll_lock_style lock_style,
1381                                     struct lustre_handle *lockh,
1382                                     struct ll_lock_tree *tree, int rw)
1383
1384 {
1385         switch (lock_style) {
1386         case LL_LOCK_STYLE_TREELOCK:
1387                 ll_tree_unlock(tree);
1388                 break;
1389         case LL_LOCK_STYLE_FASTLOCK:
1390                 ll_file_put_fast_lock(inode, end, lockh, rw);
1391                 break;
1392         default:
1393                 CERROR("invalid locking style (%d)\n", lock_style);
1394         }
1395 }
1396
1397 #ifdef HAVE_FILE_READV
1398 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
1399                               unsigned long nr_segs, loff_t *ppos)
1400 {
1401 #else
1402 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
1403                                 unsigned long nr_segs, loff_t pos)
1404 {
1405         struct file *file = iocb->ki_filp;
1406         loff_t *ppos = &iocb->ki_pos;
1407 #endif
1408         struct inode *inode = file->f_dentry->d_inode;
1409         struct ll_inode_info *lli = ll_i2info(inode);
1410         struct lov_stripe_md *lsm = lli->lli_smd;
1411         struct ll_sb_info *sbi = ll_i2sbi(inode);
1412         struct ll_thread_data ltd = { 0 };
1413         struct ost_lvb lvb;
1414         struct ll_ra_read bead;
1415         int ra = 0;
1416         obd_off end;
1417         ssize_t retval, chunk, sum = 0;
1418         struct iovec *iov_copy = NULL;
1419         unsigned long nrsegs_copy, nrsegs_orig = 0;
1420         size_t count, iov_offset = 0;
1421         __u64 kms;
1422         ENTRY;
1423
1424         count = ll_file_get_iov_count(iov, &nr_segs);
1425         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1426                inode->i_ino, inode->i_generation, inode, count, *ppos);
1427         /* "If nbyte is 0, read() will return 0 and have no other results."
1428          *                      -- Single Unix Spec */
1429         if (count == 0)
1430                 RETURN(0);
1431
1432         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1433
1434         if (!lsm) {
1435                 /* Read on file with no objects should return zero-filled
1436                  * buffers up to file size (we can get non-zero sizes with
1437                  * mknod + truncate, then opening file for read. This is a
1438                  * common pattern in NFS case, it seems). Bug 6243 */
1439                 int notzeroed;
1440                 /* Since there are no objects on OSTs, we have nothing to get
1441                  * lock on and so we are forced to access inode->i_size
1442                  * unguarded */
1443
1444                 /* Read beyond end of file */
1445                 if (*ppos >= i_size_read(inode))
1446                         RETURN(0);
1447
1448                 if (count > i_size_read(inode) - *ppos)
1449                         count = i_size_read(inode) - *ppos;
1450                 /* Make sure to correctly adjust the file pos pointer for
1451                  * EFAULT case */
1452                 for (nrsegs_copy = 0; nrsegs_copy < nr_segs; nrsegs_copy++) {
1453                         const struct iovec *iv = &iov[nrsegs_copy];
1454
1455                         if (count < iv->iov_len)
1456                                 chunk = count;
1457                         else
1458                                 chunk = iv->iov_len;
1459                         notzeroed = clear_user(iv->iov_base, chunk);
1460                         sum += (chunk - notzeroed);
1461                         count -= (chunk - notzeroed);
1462                         if (notzeroed || !count)
1463                                 break;
1464                 }
1465                 *ppos += sum;
1466                 if (!sum)
1467                         RETURN(-EFAULT);
1468                 RETURN(sum);
1469         }
1470
1471         ltd.ltd_magic = LTD_MAGIC;
1472         ll_td_set(&ltd);
1473 repeat:
1474         memset(&ltd, 0, sizeof(ltd));
1475         ltd.ltd_magic = LTD_MAGIC;
1476         if (sbi->ll_max_rw_chunk != 0 && !(file->f_flags & O_DIRECT)) {
1477                 /* first, let's know the end of the current stripe */
1478                 end = *ppos;
1479                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END,&end);
1480
1481                 /* correct, the end is beyond the request */
1482                 if (end > *ppos + count - 1)
1483                         end = *ppos + count - 1;
1484
1485                 /* and chunk shouldn't be too large even if striping is wide */
1486                 if (end - *ppos > sbi->ll_max_rw_chunk)
1487                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1488
1489                 chunk = end - *ppos + 1;
1490                 if ((count == chunk) && (iov_offset == 0)) {
1491                         if (iov_copy)
1492                                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1493
1494                         iov_copy = (struct iovec *)iov;
1495                         nrsegs_copy = nr_segs;
1496                 } else {
1497                         if (!iov_copy) {
1498                                 nrsegs_orig = nr_segs;
1499                                 OBD_ALLOC(iov_copy, sizeof(*iov) * nr_segs);
1500                                 if (!iov_copy)
1501                                         GOTO(out, retval = -ENOMEM);
1502                         }
1503
1504                         iov_copy_update(&nr_segs, &iov, &nrsegs_copy, iov_copy,
1505                                         &iov_offset, chunk);
1506                 }
1507         } else {
1508                 end = *ppos + count - 1;
1509                 iov_copy = (struct iovec *)iov;
1510                 nrsegs_copy = nr_segs;
1511         }
1512
1513         down_read(&lli->lli_truncate_rwsem); /* Bug 18233 */
1514
1515         ltd.lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
1516                                           iov_copy, nrsegs_copy,
1517                                           &ltd.u.lockh, &ltd.u.tree,
1518                                           OBD_BRW_READ);
1519         if (ltd.lock_style < 0 || ltd.lock_style == LL_LOCK_STYLE_NOLOCK)
1520                 up_read(&lli->lli_truncate_rwsem);
1521         if (ltd.lock_style < 0)
1522                 GOTO(out, retval = ltd.lock_style);
1523
1524         ll_inode_size_lock(inode, 1);
1525         /*
1526          * Consistency guarantees: following possibilities exist for the
1527          * relation between region being read and real file size at this
1528          * moment:
1529          *
1530          *  (A): the region is completely inside of the file;
1531          *
1532          *  (B-x): x bytes of region are inside of the file, the rest is
1533          *  outside;
1534          *
1535          *  (C): the region is completely outside of the file.
1536          *
1537          * This classification is stable under DLM lock acquired by
1538          * ll_tree_lock() above, because to change class, other client has to
1539          * take DLM lock conflicting with our lock. Also, any updates to
1540          * ->i_size by other threads on this client are serialized by
1541          * ll_inode_size_lock(). This guarantees that short reads are handled
1542          * correctly in the face of concurrent writes and truncates.
1543          */
1544         inode_init_lvb(inode, &lvb);
1545         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1546         kms = lvb.lvb_size;
1547         if (*ppos + count - 1 > kms) {
1548                 /* A glimpse is necessary to determine whether we return a
1549                  * short read (B) or some zeroes at the end of the buffer (C) */
1550                 ll_inode_size_unlock(inode, 1);
1551                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1552                 if (retval) {
1553                         if (ltd.lock_style != LL_LOCK_STYLE_NOLOCK) {
1554                                 ll_file_put_lock(inode, end, ltd.lock_style,
1555                                                  &ltd.u.lockh, &ltd.u.tree,
1556                                                  OBD_BRW_READ);
1557                                 up_read(&lli->lli_truncate_rwsem);
1558                         }
1559                         goto out;
1560                 } else {
1561                         /* If objective page index exceed the end-of-file page
1562                          * index, return directly. Do not expect kernel will
1563                          * check such case correctly. linux-2.6.18-128.1.1 miss
1564                          * to do that. --bug 17336 */
1565                         loff_t size = i_size_read(inode);
1566                         unsigned long cur_index = *ppos >> CFS_PAGE_SHIFT;
1567
1568                         if ((size == 0 && cur_index != 0) ||
1569                             (((size - 1) >> CFS_PAGE_SHIFT) < cur_index)) {
1570                                 if (ltd.lock_style != LL_LOCK_STYLE_NOLOCK) {
1571
1572                                         ll_file_put_lock(inode, end,
1573                                                          ltd.lock_style,
1574                                                          &ltd.u.lockh,
1575                                                          &ltd.u.tree,
1576                                                          OBD_BRW_READ);
1577                                         up_read(&lli->lli_truncate_rwsem);
1578                                 }
1579                                 goto out;
1580                         }
1581                 }
1582         } else {
1583                 /* region is within kms and, hence, within real file size (A).
1584                  * We need to increase i_size to cover the read region so that
1585                  * generic_file_read() will do its job, but that doesn't mean
1586                  * the kms size is _correct_, it is only the _minimum_ size.
1587                  * If someone does a stat they will get the correct size which
1588                  * will always be >= the kms value here.  b=11081 */
1589                 if (i_size_read(inode) < kms)
1590                         i_size_write(inode, kms);
1591                 ll_inode_size_unlock(inode, 1);
1592         }
1593
1594         chunk = end - *ppos + 1;
1595         CDEBUG(D_INODE,"Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1596                inode->i_ino, chunk, *ppos, i_size_read(inode));
1597
1598         /* turn off the kernel's read-ahead */
1599         if (ltd.lock_style != LL_LOCK_STYLE_NOLOCK) {
1600                 struct ost_lvb *xtimes;
1601                 /* read under locks
1602                  *
1603                  * 1. update inode's atime as long as concurrent stat
1604                  * (via ll_glimpse_size) might bring out-of-date ones
1605                  *
1606                  * 2. update lsm so that next stat (via
1607                  * ll_glimpse_size) could get correct values in lsm */
1608                 OBD_ALLOC_PTR(xtimes);
1609                 if (NULL == xtimes) {
1610                         ll_file_put_lock(inode, end, ltd.lock_style,
1611                                          &ltd.u.lockh, &ltd.u.tree,
1612                                          OBD_BRW_READ);
1613                         up_read(&lli->lli_truncate_rwsem);
1614                         GOTO(out, retval = -ENOMEM);
1615                 }
1616
1617                 lov_stripe_lock(lsm);
1618                 LTIME_S(inode->i_atime) = LTIME_S(CURRENT_TIME);
1619                 xtimes->lvb_atime = LTIME_S(inode->i_atime);
1620                 obd_update_lvb(sbi->ll_osc_exp, lsm, xtimes,
1621                                OBD_MD_FLATIME);
1622                 lov_stripe_unlock(lsm);
1623                 OBD_FREE_PTR(xtimes);
1624
1625                 file->f_ra.ra_pages = 0;
1626                 /* initialize read-ahead window once per syscall */
1627                 if (ra == 0) {
1628                         ra = 1;
1629                         ll_ra_read_init(file, &bead, *ppos, count);
1630                 }
1631
1632                 /* BUG: 5972 */
1633                 file_accessed(file);
1634 #ifdef HAVE_FILE_READV
1635                 retval = generic_file_readv(file, iov_copy, nrsegs_copy, ppos);
1636 #else
1637                 retval = generic_file_aio_read(iocb, iov_copy, nrsegs_copy,
1638                                                *ppos);
1639 #endif
1640                 ll_file_put_lock(inode, end, ltd.lock_style, &ltd.u.lockh,
1641                                  &ltd.u.tree, OBD_BRW_READ);
1642                 up_read(&lli->lli_truncate_rwsem);
1643         } else {
1644                 file_accessed(file);
1645                 retval = ll_direct_IO(READ, file, iov_copy, *ppos, nr_segs, 0);
1646                 if (retval > 0) {
1647                         lprocfs_counter_add(sbi->ll_stats,
1648                                             LPROC_LL_LOCKLESS_READ,
1649                                             (long)retval);
1650                         *ppos += retval;
1651                 }
1652         }
1653         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1654         if (retval > 0) {
1655                 count -= retval;
1656                 sum += retval;
1657                 if (retval == chunk && count > 0)
1658                         goto repeat;
1659         }
1660
1661  out:
1662         ll_td_set(NULL);
1663         if (ra != 0)
1664                 ll_ra_read_ex(file, &bead);
1665         retval = (sum > 0) ? sum : retval;
1666
1667         if (iov_copy && iov_copy != iov)
1668                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1669
1670         RETURN(retval);
1671 }
1672
1673 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1674                             loff_t *ppos)
1675 {
1676         struct iovec local_iov = { .iov_base = (void __user *)buf,
1677                                    .iov_len = count };
1678 #ifdef HAVE_FILE_READV
1679         return ll_file_readv(file, &local_iov, 1, ppos);
1680 #else
1681         struct kiocb kiocb;
1682         ssize_t ret;
1683
1684         init_sync_kiocb(&kiocb, file);
1685         kiocb.ki_pos = *ppos;
1686         kiocb.ki_left = count;
1687
1688         ret = ll_file_aio_read(&kiocb, &local_iov, 1, kiocb.ki_pos);
1689         *ppos = kiocb.ki_pos;
1690         return ret;
1691 #endif
1692 }
1693
1694 /* iov_shorten from linux kernel */
1695 static unsigned long ll_iov_shorten(struct iovec *iov,
1696                                     unsigned long nr_segs,
1697                                     size_t to)
1698 {
1699         unsigned long seg = 0;
1700         size_t len = 0;
1701
1702         while (seg < nr_segs) {
1703                 seg++;
1704                 if (len + iov->iov_len >= to) {
1705                         iov->iov_len = to - len;
1706                         break;
1707                 }
1708                 len += iov->iov_len;
1709                 iov++;
1710         }
1711         return seg;
1712 }
1713
1714 /* 2.6.22 and 2.6.27 export this as generic_segment_checks */
1715 static int ll_generic_segment_checks(const struct iovec *iov,
1716                                      unsigned long *nr_segs,
1717                                      size_t *count,
1718                                      int access_flags)
1719 {
1720         unsigned long   seg;
1721         size_t cnt = 0;
1722         for (seg = 0; seg < *nr_segs; seg++) {
1723                 const struct iovec *iv = &iov[seg];
1724
1725                 /*
1726                  * If any segment has a negative length, or the cumulative
1727                  * length ever wraps negative then return -EINVAL.
1728                  */
1729                 cnt += iv->iov_len;
1730                 if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
1731                         return -EINVAL;
1732                 if (access_ok(access_flags, iv->iov_base, iv->iov_len))
1733                         continue;
1734                 if (seg == 0)
1735                         return -EFAULT;
1736                 *nr_segs = seg;
1737                 cnt -= iv->iov_len;  /* This segment is no good */
1738                 break;
1739         }
1740         *count = cnt;
1741         return 0;
1742 }
1743
1744 /*
1745  * Write to a file (through the page cache).
1746  */
1747 #ifdef HAVE_FILE_WRITEV
1748 static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
1749                               unsigned long nr_segs, loff_t *ppos)
1750 {
1751 #else /* AIO stuff */
1752 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1753                                  unsigned long nr_segs, loff_t pos)
1754 {
1755         struct file *file = iocb->ki_filp;
1756         loff_t *ppos = &iocb->ki_pos;
1757 #endif
1758         struct inode *inode = file->f_dentry->d_inode;
1759         struct ll_sb_info *sbi = ll_i2sbi(inode);
1760         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1761         struct ll_thread_data ltd = { 0 };
1762         loff_t maxbytes = ll_file_maxbytes(inode);
1763         loff_t lock_start, lock_end, end;
1764         ssize_t retval, chunk, sum = 0;
1765         int tree_locked;
1766         struct iovec *iov_copy = NULL;
1767         unsigned long nrsegs_copy, nrsegs_orig = 0;
1768         size_t count, iov_offset = 0;
1769         int got_write_sem = 0;
1770         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1771         ENTRY;
1772
1773         count = ll_file_get_iov_count(iov, &nr_segs);
1774
1775         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1776                inode->i_ino, inode->i_generation, inode, count, *ppos);
1777
1778         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1779
1780         /* POSIX, but surprised the VFS doesn't check this already */
1781         if (count == 0)
1782                 RETURN(0);
1783
1784         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1785          * called on the file, don't fail the below assertion (bug 2388). */
1786         if (file->f_flags & O_LOV_DELAY_CREATE &&
1787             ll_i2info(inode)->lli_smd == NULL)
1788                 RETURN(-EBADF);
1789
1790         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1791
1792         /* signal(7) specifies that write(2) and writev(2) should be restarted */
1793         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK)) {
1794                 got_write_sem = 1;
1795                 if (down_interruptible(&ll_i2info(inode)->lli_write_sem))
1796                         RETURN(-ERESTARTSYS);
1797         }
1798
1799         ltd.ltd_magic = LTD_MAGIC;
1800         ll_td_set(&ltd);
1801 repeat:
1802         memset(&ltd, 0, sizeof(ltd));
1803         ltd.ltd_magic = LTD_MAGIC;
1804
1805         chunk = 0; /* just to fix gcc's warning */
1806         end = *ppos + count - 1;
1807
1808         if (file->f_flags & O_APPEND) {
1809                 lock_start = 0;
1810                 lock_end = OBD_OBJECT_EOF;
1811                 iov_copy = (struct iovec *)iov;
1812                 nrsegs_copy = nr_segs;
1813         } else if (sbi->ll_max_rw_chunk != 0 && !(file->f_flags & O_DIRECT)) {
1814                 /* first, let's know the end of the current stripe */
1815                 end = *ppos;
1816                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END,
1817                                 (obd_off *)&end);
1818
1819                 /* correct, the end is beyond the request */
1820                 if (end > *ppos + count - 1)
1821                         end = *ppos + count - 1;
1822
1823                 /* and chunk shouldn't be too large even if striping is wide */
1824                 if (end - *ppos > sbi->ll_max_rw_chunk)
1825                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1826                 lock_start = *ppos;
1827                 lock_end = end;
1828                 chunk = end - *ppos + 1;
1829                 if ((count == chunk) && (iov_offset == 0)) {
1830                         if (iov_copy)
1831                                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1832
1833                         iov_copy = (struct iovec *)iov;
1834                         nrsegs_copy = nr_segs;
1835                 } else {
1836                         if (!iov_copy) {
1837                                 nrsegs_orig = nr_segs;
1838                                 OBD_ALLOC(iov_copy, sizeof(*iov) * nr_segs);
1839                                 if (!iov_copy)
1840                                         GOTO(out, retval = -ENOMEM);
1841                         }
1842                         iov_copy_update(&nr_segs, &iov, &nrsegs_copy, iov_copy,
1843                                         &iov_offset, chunk);
1844                 }
1845         } else {
1846                 lock_start = *ppos;
1847                 lock_end = end;
1848                 iov_copy = (struct iovec *)iov;
1849                 nrsegs_copy = nr_segs;
1850         }
1851
1852         tree_locked = ll_file_get_tree_lock_iov(&ltd.u.tree, file, iov_copy,
1853                                                 nrsegs_copy,
1854                                                 (obd_off)lock_start,
1855                                                 (obd_off)lock_end,
1856                                                 OBD_BRW_WRITE);
1857         if (tree_locked < 0)
1858                 GOTO(out, retval = tree_locked);
1859
1860         /* This is ok, g_f_w will overwrite this under i_sem if it races
1861          * with a local truncate, it just makes our maxbyte checking easier.
1862          * The i_size value gets updated in ll_extent_lock() as a consequence
1863          * of the [0,EOF] extent lock we requested above. */
1864         if (file->f_flags & O_APPEND) {
1865                 *ppos = i_size_read(inode);
1866                 end = *ppos + count - 1;
1867         }
1868
1869         if (*ppos >= maxbytes) {
1870                 send_sig(SIGXFSZ, current, 0);
1871                 GOTO(out_unlock, retval = -EFBIG);
1872         }
1873         if (end > maxbytes - 1)
1874                 end = maxbytes - 1;
1875
1876         /* generic_file_write handles O_APPEND after getting i_mutex */
1877         chunk = end - *ppos + 1;
1878         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1879                inode->i_ino, chunk, *ppos);
1880         if (tree_locked) {
1881                 struct ost_lvb *xtimes;
1882                 /* write under locks
1883                  *
1884                  * 1. update inode's mtime and ctime as long as
1885                  * concurrent stat (via ll_glimpse_size) might bring
1886                  * out-of-date ones
1887                  *
1888                  * 2. update lsm so that next stat (via
1889                  * ll_glimpse_size) could get correct values in lsm */
1890                 OBD_ALLOC_PTR(xtimes);
1891                 if (NULL == xtimes)
1892                         GOTO(out_unlock, retval = -ENOMEM);
1893
1894                 lov_stripe_lock(lsm);
1895                 LTIME_S(inode->i_mtime) = LTIME_S(CURRENT_TIME);
1896                 LTIME_S(inode->i_ctime) = LTIME_S(CURRENT_TIME);
1897                 xtimes->lvb_mtime = LTIME_S(inode->i_mtime);
1898                 xtimes->lvb_ctime = LTIME_S(inode->i_ctime);
1899                 obd_update_lvb(sbi->ll_osc_exp, lsm, xtimes,
1900                                OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1901                 lov_stripe_unlock(lsm);
1902                 OBD_FREE_PTR(xtimes);
1903
1904                 ltd.lock_style = LL_LOCK_STYLE_TREELOCK;
1905
1906 #ifdef HAVE_FILE_WRITEV
1907                 retval = generic_file_writev(file, iov_copy, nrsegs_copy, ppos);
1908 #else
1909                 retval = generic_file_aio_write(iocb, iov_copy, nrsegs_copy,
1910                                                 *ppos);
1911 #endif
1912         } else {
1913                 size_t ocount, ncount;
1914
1915                 retval = ll_generic_segment_checks(iov_copy, &nrsegs_copy,
1916                                                    &ocount, VERIFY_READ);
1917                 if (retval)
1918                         GOTO(out, retval);
1919
1920                 ncount = ocount;
1921
1922                 retval = generic_write_checks(file, ppos, &ncount, 0);
1923                 if (retval)
1924                         GOTO(out, retval);
1925
1926                 if (unlikely(ocount != ncount)) {
1927                         /* we are allowed to modify the original iov too */
1928                         nrsegs_copy = ll_iov_shorten(iov_copy, nrsegs_copy,
1929                                                      ncount);
1930                         chunk = 0; /* no repetition after the short write */
1931                 }
1932
1933                 retval = ll_remove_suid(file, file->f_vfsmnt);
1934                 if (retval)
1935                         GOTO(out, retval);
1936
1937                 ll_update_time(file);
1938                 retval = ll_direct_IO(WRITE, file, iov_copy, *ppos, nr_segs, 0);
1939                 if (retval > 0) {
1940                         lprocfs_counter_add(sbi->ll_stats,
1941                                             LPROC_LL_LOCKLESS_WRITE,
1942                                             (long)retval);
1943                         *ppos += retval;
1944                 }
1945         }
1946         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1947
1948 out_unlock:
1949         if (tree_locked)
1950                 ll_tree_unlock(&ltd.u.tree);
1951
1952 out:
1953         if (retval > 0) {
1954                 count -= retval;
1955                 sum += retval;
1956                 if (retval == chunk && count > 0)
1957                         goto repeat;
1958         }
1959
1960         if (got_write_sem)
1961                 up(&ll_i2info(inode)->lli_write_sem);
1962
1963         ll_td_set(NULL);
1964         if (iov_copy && iov_copy != iov)
1965                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1966
1967         retval = (sum > 0) ? sum : retval;
1968         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1969                            retval > 0 ? retval : 0);
1970         RETURN(retval);
1971 }
1972
1973 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1974                              loff_t *ppos)
1975 {
1976         struct iovec local_iov = { .iov_base = (void __user *)buf,
1977                                    .iov_len = count };
1978
1979 #ifdef HAVE_FILE_WRITEV
1980         return ll_file_writev(file, &local_iov, 1, ppos);
1981 #else
1982         struct kiocb kiocb;
1983         ssize_t ret;
1984
1985         init_sync_kiocb(&kiocb, file);
1986         kiocb.ki_pos = *ppos;
1987         kiocb.ki_left = count;
1988
1989         ret = ll_file_aio_write(&kiocb, &local_iov, 1, kiocb.ki_pos);
1990         *ppos = kiocb.ki_pos;
1991
1992         return ret;
1993 #endif
1994 }
1995
1996 #ifdef HAVE_KERNEL_SENDFILE
1997 /*
1998  * Send file content (through pagecache) somewhere with helper
1999  */
2000 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,
2001                                 size_t count, read_actor_t actor, void *target)
2002 {
2003         struct inode *inode = in_file->f_dentry->d_inode;
2004         struct ll_inode_info *lli = ll_i2info(inode);
2005         struct lov_stripe_md *lsm = lli->lli_smd;
2006         struct ll_lock_tree tree;
2007         struct ll_lock_tree_node *node;
2008         struct ost_lvb lvb;
2009         struct ll_ra_read bead;
2010         ssize_t rc;
2011         __u64 kms;
2012         ENTRY;
2013
2014         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
2015                inode->i_ino, inode->i_generation, inode, count, *ppos);
2016
2017         /* "If nbyte is 0, read() will return 0 and have no other results."
2018          *                      -- Single Unix Spec */
2019         if (count == 0)
2020                 RETURN(0);
2021
2022         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
2023         /* turn off the kernel's read-ahead */
2024         in_file->f_ra.ra_pages = 0;
2025
2026         /* File with no objects, nothing to lock */
2027         if (!lsm) {
2028                 rc = generic_file_sendfile(in_file, ppos, count, actor, target);
2029                 RETURN(rc);
2030         }
2031
2032         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
2033         if (IS_ERR(node))
2034                 RETURN(PTR_ERR(node));
2035
2036         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
2037         rc = ll_tree_lock(&tree, node, NULL, count,
2038                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
2039         if (rc != 0)
2040                 RETURN(rc);
2041
2042         ll_clear_file_contended(inode);
2043         ll_inode_size_lock(inode, 1);
2044         /*
2045          * Consistency guarantees: following possibilities exist for the
2046          * relation between region being read and real file size at this
2047          * moment:
2048          *
2049          *  (A): the region is completely inside of the file;
2050          *
2051          *  (B-x): x bytes of region are inside of the file, the rest is
2052          *  outside;
2053          *
2054          *  (C): the region is completely outside of the file.
2055          *
2056          * This classification is stable under DLM lock acquired by
2057          * ll_tree_lock() above, because to change class, other client has to
2058          * take DLM lock conflicting with our lock. Also, any updates to
2059          * ->i_size by other threads on this client are serialized by
2060          * ll_inode_size_lock(). This guarantees that short reads are handled
2061          * correctly in the face of concurrent writes and truncates.
2062          */
2063         inode_init_lvb(inode, &lvb);
2064         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
2065         kms = lvb.lvb_size;
2066         if (*ppos + count - 1 > kms) {
2067                 /* A glimpse is necessary to determine whether we return a
2068                  * short read (B) or some zeroes at the end of the buffer (C) */
2069                 ll_inode_size_unlock(inode, 1);
2070                 rc = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
2071                 if (rc)
2072                         goto out;
2073         } else {
2074                 /* region is within kms and, hence, within real file size (A) */
2075                 i_size_write(inode, kms);
2076                 ll_inode_size_unlock(inode, 1);
2077         }
2078
2079         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
2080                inode->i_ino, count, *ppos, i_size_read(inode));
2081
2082         ll_ra_read_init(in_file, &bead, *ppos, count);
2083         /* BUG: 5972 */
2084         file_accessed(in_file);
2085         rc = generic_file_sendfile(in_file, ppos, count, actor, target);
2086         ll_ra_read_ex(in_file, &bead);
2087
2088  out:
2089         ll_tree_unlock(&tree);
2090         RETURN(rc);
2091 }
2092 #endif
2093
2094 /* change based on
2095  * http://git.kernel.org/?p=linux/kernel/git/torvalds/linux-2.6.git;a=commit;h=f0930fffa99e7fe0a0c4b6c7d9a244dc88288c27
2096  */
2097 #ifdef HAVE_KERNEL_SPLICE_READ
2098 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
2099                                    struct pipe_inode_info *pipe, size_t count,
2100                                    unsigned int flags)
2101 {
2102         struct inode *inode = in_file->f_dentry->d_inode;
2103         struct ll_inode_info *lli = ll_i2info(inode);
2104         struct lov_stripe_md *lsm = lli->lli_smd;
2105         struct ll_lock_tree tree;
2106         struct ll_lock_tree_node *node;
2107         struct ost_lvb lvb;
2108         struct ll_ra_read bead;
2109         ssize_t rc;
2110         __u64 kms;
2111         ENTRY;
2112
2113         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
2114                inode->i_ino, inode->i_generation, inode, count, *ppos);
2115
2116         /* "If nbyte is 0, read() will return 0 and have no other results."
2117          *                      -- Single Unix Spec */
2118         if (count == 0)
2119                 RETURN(0);
2120
2121         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
2122         /* turn off the kernel's read-ahead */
2123         in_file->f_ra.ra_pages = 0;
2124
2125         /* File with no objects, nothing to lock */
2126         if (!lsm) {
2127                 rc = generic_file_splice_read(in_file, ppos, pipe, count, flags);
2128                 RETURN(rc);
2129         }
2130
2131         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
2132         if (IS_ERR(node))
2133                 RETURN(PTR_ERR(node));
2134
2135         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
2136         rc = ll_tree_lock(&tree, node, NULL, count,
2137                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
2138         if (rc != 0)
2139                 RETURN(rc);
2140
2141         ll_clear_file_contended(inode);
2142         ll_inode_size_lock(inode, 1);
2143         /*
2144          * Consistency guarantees: following possibilities exist for the
2145          * relation between region being read and real file size at this
2146          * moment:
2147          *
2148          *  (A): the region is completely inside of the file;
2149          *
2150          *  (B-x): x bytes of region are inside of the file, the rest is
2151          *  outside;
2152          *
2153          *  (C): the region is completely outside of the file.
2154          *
2155          * This classification is stable under DLM lock acquired by
2156          * ll_tree_lock() above, because to change class, other client has to
2157          * take DLM lock conflicting with our lock. Also, any updates to
2158          * ->i_size by other threads on this client are serialized by
2159          * ll_inode_size_lock(). This guarantees that short reads are handled
2160          * correctly in the face of concurrent writes and truncates.
2161          */
2162         inode_init_lvb(inode, &lvb);
2163         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
2164         kms = lvb.lvb_size;
2165         if (*ppos + count - 1 > kms) {
2166                 /* A glimpse is necessary to determine whether we return a
2167                  * short read (B) or some zeroes at the end of the buffer (C) */
2168                 ll_inode_size_unlock(inode, 1);
2169                 rc = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
2170                 if (rc)
2171                         goto out;
2172         } else {
2173                 /* region is within kms and, hence, within real file size (A) */
2174                 i_size_write(inode, kms);
2175                 ll_inode_size_unlock(inode, 1);
2176         }
2177
2178         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
2179                inode->i_ino, count, *ppos, i_size_read(inode));
2180
2181         ll_ra_read_init(in_file, &bead, *ppos, count);
2182         /* BUG: 5972 */
2183         file_accessed(in_file);
2184         rc = generic_file_splice_read(in_file, ppos, pipe, count, flags);
2185         ll_ra_read_ex(in_file, &bead);
2186
2187  out:
2188         ll_tree_unlock(&tree);
2189         RETURN(rc);
2190 }
2191 #endif
2192
2193 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
2194                                unsigned long arg)
2195 {
2196         struct ll_inode_info *lli = ll_i2info(inode);
2197         struct obd_export *exp = ll_i2obdexp(inode);
2198         struct ll_recreate_obj ucreatp;
2199         struct obd_trans_info oti = { 0 };
2200         struct obdo *oa = NULL;
2201         int lsm_size;
2202         int rc = 0;
2203         struct lov_stripe_md *lsm, *lsm2;
2204         ENTRY;
2205
2206         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2207                 RETURN(-EPERM);
2208
2209         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
2210                             sizeof(struct ll_recreate_obj));
2211         if (rc) {
2212                 RETURN(-EFAULT);
2213         }
2214         OBDO_ALLOC(oa);
2215         if (oa == NULL)
2216                 RETURN(-ENOMEM);
2217
2218         down(&lli->lli_size_sem);
2219         lsm = lli->lli_smd;
2220         if (lsm == NULL)
2221                 GOTO(out, rc = -ENOENT);
2222         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
2223                    (lsm->lsm_stripe_count));
2224
2225         OBD_ALLOC(lsm2, lsm_size);
2226         if (lsm2 == NULL)
2227                 GOTO(out, rc = -ENOMEM);
2228
2229         oa->o_id = ucreatp.lrc_id;
2230         oa->o_nlink = ucreatp.lrc_ost_idx;
2231         oa->o_flags |= OBD_FL_RECREATE_OBJS;
2232         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
2233         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2234                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2235
2236         memcpy(lsm2, lsm, lsm_size);
2237         rc = obd_create(exp, oa, &lsm2, &oti);
2238
2239         OBD_FREE(lsm2, lsm_size);
2240         GOTO(out, rc);
2241 out:
2242         up(&lli->lli_size_sem);
2243         OBDO_FREE(oa);
2244         return rc;
2245 }
2246
2247 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
2248                                     int flags, struct lov_user_md *lum,
2249                                     int lum_size)
2250 {
2251         struct ll_inode_info *lli = ll_i2info(inode);
2252         struct lov_stripe_md *lsm;
2253         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
2254         int rc = 0;
2255         ENTRY;
2256
2257         down(&lli->lli_size_sem);
2258         lsm = lli->lli_smd;
2259         if (lsm) {
2260                 up(&lli->lli_size_sem);
2261                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
2262                        inode->i_ino);
2263                 RETURN(-EEXIST);
2264         }
2265
2266         rc = ll_intent_file_open(file, lum, lum_size, &oit);
2267         if (rc)
2268                 GOTO(out, rc);
2269         if (it_disposition(&oit, DISP_LOOKUP_NEG))
2270                 GOTO(out_req_free, rc = -ENOENT);
2271         rc = oit.d.lustre.it_status;
2272         if (rc < 0)
2273                 GOTO(out_req_free, rc);
2274
2275         ll_release_openhandle(file->f_dentry, &oit);
2276
2277  out:
2278         up(&lli->lli_size_sem);
2279         ll_intent_release(&oit);
2280         RETURN(rc);
2281 out_req_free:
2282         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2283         goto out;
2284 }
2285
2286 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
2287                              struct lov_mds_md **lmmp, int *lmm_size,
2288                              struct ptlrpc_request **request)
2289 {
2290         struct ll_sb_info *sbi = ll_i2sbi(inode);
2291         struct ll_fid  fid;
2292         struct mds_body  *body;
2293         struct lov_mds_md *lmm = NULL;
2294         struct ptlrpc_request *req = NULL;
2295         int rc, lmmsize;
2296
2297         ll_inode2fid(&fid, inode);
2298
2299         rc = ll_get_max_mdsize(sbi, &lmmsize);
2300         if (rc)
2301                 RETURN(rc);
2302
2303         rc = mdc_getattr_name(sbi->ll_mdc_exp, &fid,
2304                         filename, strlen(filename) + 1,
2305                         OBD_MD_FLEASIZE | OBD_MD_FLDIREA,
2306                         lmmsize, &req);
2307         if (rc < 0) {
2308                 CDEBUG(D_INFO, "mdc_getattr_name failed "
2309                                 "on %s: rc %d\n", filename, rc);
2310                 GOTO(out, rc);
2311         }
2312
2313         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
2314                         sizeof(*body));
2315         LASSERT(body != NULL); /* checked by mdc_getattr_name */
2316         /* swabbed by mdc_getattr_name */
2317         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF));
2318
2319         lmmsize = body->eadatasize;
2320
2321         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
2322                         lmmsize == 0) {
2323                 GOTO(out, rc = -ENODATA);
2324         }
2325
2326         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
2327                         lmmsize);
2328         LASSERT(lmm != NULL);
2329         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF + 1));
2330
2331         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
2332             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3)) &&
2333             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_JOIN))) {
2334                 GOTO(out, rc = -EPROTO);
2335         }
2336         /*
2337          * This is coming from the MDS, so is probably in
2338          * little endian.  We convert it to host endian before
2339          * passing it to userspace.
2340          */
2341         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
2342                 /* if function called for directory - we should
2343                  * avoid swab not existent lsm objects */
2344                 if ((lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) ||
2345                     (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3))) {
2346                         lustre_swab_lov_user_md((struct lov_user_md*)lmm);
2347                         if (S_ISREG(body->mode))
2348                                 lustre_swab_lov_user_md_objects(
2349                                                 (struct lov_user_md*)lmm);
2350                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_JOIN)) {
2351                         lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
2352                 }
2353         }
2354
2355         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
2356                 struct lov_stripe_md *lsm;
2357                 struct lov_user_md_join *lmj;
2358                 int lmj_size, i, aindex = 0;
2359
2360                 rc = obd_unpackmd(sbi->ll_osc_exp, &lsm, lmm, lmmsize);
2361                 if (rc < 0)
2362                         GOTO(out, rc = -ENOMEM);
2363                 rc = obd_checkmd(sbi->ll_osc_exp, sbi->ll_mdc_exp, lsm);
2364                 if (rc)
2365                         GOTO(out_free_memmd, rc);
2366
2367                 lmj_size = sizeof(struct lov_user_md_join) +
2368                         lsm->lsm_stripe_count *
2369                         sizeof(struct lov_user_ost_data_join);
2370                 OBD_ALLOC(lmj, lmj_size);
2371                 if (!lmj)
2372                         GOTO(out_free_memmd, rc = -ENOMEM);
2373
2374                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
2375                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2376                         struct lov_extent *lex =
2377                                 &lsm->lsm_array->lai_ext_array[aindex];
2378
2379                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
2380                                 aindex ++;
2381                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
2382                                         LPU64" len %d\n", aindex, i,
2383                                         lex->le_start, (int)lex->le_len);
2384                         lmj->lmm_objects[i].l_extent_start =
2385                                 lex->le_start;
2386
2387                         if ((int)lex->le_len == -1)
2388                                 lmj->lmm_objects[i].l_extent_end = -1;
2389                         else
2390                                 lmj->lmm_objects[i].l_extent_end =
2391                                         lex->le_start + lex->le_len;
2392                         lmj->lmm_objects[i].l_object_id =
2393                                 lsm->lsm_oinfo[i]->loi_id;
2394                         lmj->lmm_objects[i].l_object_gr =
2395                                 lsm->lsm_oinfo[i]->loi_gr;
2396                         lmj->lmm_objects[i].l_ost_gen =
2397                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2398                         lmj->lmm_objects[i].l_ost_idx =
2399                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2400                 }
2401                 lmm = (struct lov_mds_md *)lmj;
2402                 lmmsize = lmj_size;
2403 out_free_memmd:
2404                 obd_free_memmd(sbi->ll_osc_exp, &lsm);
2405         }
2406 out:
2407         *lmmp = lmm;
2408         *lmm_size = lmmsize;
2409         *request = req;
2410         return rc;
2411 }
2412 static int ll_lov_setea(struct inode *inode, struct file *file,
2413                             unsigned long arg)
2414 {
2415         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2416         struct lov_user_md  *lump;
2417         int lum_size = sizeof(struct lov_user_md) +
2418                        sizeof(struct lov_user_ost_data);
2419         int rc;
2420         ENTRY;
2421
2422         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2423                 RETURN(-EPERM);
2424
2425         OBD_ALLOC(lump, lum_size);
2426         if (lump == NULL) {
2427                 RETURN(-ENOMEM);
2428         }
2429         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2430         if (rc) {
2431                 OBD_FREE(lump, lum_size);
2432                 RETURN(-EFAULT);
2433         }
2434
2435         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2436
2437         OBD_FREE(lump, lum_size);
2438         RETURN(rc);
2439 }
2440
2441 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2442                             unsigned long arg)
2443 {
2444         struct lov_user_md_v3 lumv3;
2445         struct lov_user_md_v1 *lumv1 = (struct lov_user_md_v1 *)&lumv3;
2446         struct lov_user_md_v1 *lumv1p = (struct lov_user_md_v1 *)arg;
2447         struct lov_user_md_v3 *lumv3p = (struct lov_user_md_v3 *)arg;
2448         int lum_size;
2449         int rc;
2450         int flags = FMODE_WRITE;
2451         ENTRY;
2452
2453         /* first try with v1 which is smaller than v3 */
2454         lum_size = sizeof(struct lov_user_md_v1);
2455         rc = copy_from_user(lumv1, lumv1p, lum_size);
2456         if (rc)
2457                 RETURN(-EFAULT);
2458
2459         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
2460                 lum_size = sizeof(struct lov_user_md_v3);
2461                 rc = copy_from_user(&lumv3, lumv3p, lum_size);
2462                 if (rc)
2463                         RETURN(-EFAULT);
2464         }
2465
2466         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
2467         if (rc == 0) {
2468                  put_user(0, &lumv1p->lmm_stripe_count);
2469                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode),
2470                                     0, ll_i2info(inode)->lli_smd,
2471                                     (void *)arg);
2472         }
2473         RETURN(rc);
2474 }
2475
2476 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2477 {
2478         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2479
2480         if (!lsm)
2481                 RETURN(-ENODATA);
2482
2483         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode), 0, lsm,
2484                             (void *)arg);
2485 }
2486
2487 static int ll_get_grouplock(struct inode *inode, struct file *file,
2488                             unsigned long arg)
2489 {
2490         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2491         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2492                                                     .end = OBD_OBJECT_EOF}};
2493         struct lustre_handle lockh = { 0 };
2494         struct ll_inode_info *lli = ll_i2info(inode);
2495         struct lov_stripe_md *lsm = lli->lli_smd;
2496         int flags = 0, rc;
2497         ENTRY;
2498
2499         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2500                 RETURN(-EINVAL);
2501         }
2502
2503         policy.l_extent.gid = arg;
2504         if (file->f_flags & O_NONBLOCK)
2505                 flags = LDLM_FL_BLOCK_NOWAIT;
2506
2507         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2508         if (rc)
2509                 RETURN(rc);
2510
2511         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2512         fd->fd_gid = arg;
2513         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2514
2515         RETURN(0);
2516 }
2517
2518 static int ll_put_grouplock(struct inode *inode, struct file *file,
2519                             unsigned long arg)
2520 {
2521         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2522         struct ll_inode_info *lli = ll_i2info(inode);
2523         struct lov_stripe_md *lsm = lli->lli_smd;
2524         int rc;
2525         ENTRY;
2526
2527         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2528                 /* Ugh, it's already unlocked. */
2529                 RETURN(-EINVAL);
2530         }
2531
2532         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2533                 RETURN(-EINVAL);
2534
2535         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2536
2537         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2538         if (rc)
2539                 RETURN(rc);
2540
2541         fd->fd_gid = 0;
2542         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2543
2544         RETURN(0);
2545 }
2546
2547 #if LUSTRE_FIX >= 50
2548 static int join_sanity_check(struct inode *head, struct inode *tail)
2549 {
2550         ENTRY;
2551         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2552                 CERROR("server do not support join \n");
2553                 RETURN(-EINVAL);
2554         }
2555         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2556                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2557                        head->i_ino, tail->i_ino);
2558                 RETURN(-EINVAL);
2559         }
2560         if (head->i_ino == tail->i_ino) {
2561                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2562                 RETURN(-EINVAL);
2563         }
2564         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2565                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2566                 RETURN(-EINVAL);
2567         }
2568         RETURN(0);
2569 }
2570
2571 static int join_file(struct inode *head_inode, struct file *head_filp,
2572                      struct file *tail_filp)
2573 {
2574         struct dentry *tail_dentry = tail_filp->f_dentry;
2575         struct lookup_intent oit = {.it_op = IT_OPEN,
2576                                     .it_flags = head_filp->f_flags,
2577                                     .it_create_mode = M_JOIN_FILE};
2578         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_PW,
2579                 ll_mdc_blocking_ast, ldlm_completion_ast, NULL, NULL };
2580
2581         struct lustre_handle lockh;
2582         struct mdc_op_data *op_data;
2583         int    rc;
2584         loff_t data;
2585         ENTRY;
2586
2587         tail_dentry = tail_filp->f_dentry;
2588
2589         OBD_ALLOC_PTR(op_data);
2590         if (op_data == NULL) {
2591                 RETURN(-ENOMEM);
2592         }
2593
2594         data = i_size_read(head_inode);
2595         ll_prepare_mdc_op_data(op_data, head_inode,
2596                                tail_dentry->d_parent->d_inode,
2597                                tail_dentry->d_name.name,
2598                                tail_dentry->d_name.len, 0, &data);
2599         rc = mdc_enqueue(ll_i2mdcexp(head_inode), &einfo, &oit,
2600                          op_data, &lockh, NULL, 0, 0);
2601
2602         if (rc < 0)
2603                 GOTO(out, rc);
2604
2605         rc = oit.d.lustre.it_status;
2606
2607         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2608                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2609                 ptlrpc_req_finished((struct ptlrpc_request *)
2610                                     oit.d.lustre.it_data);
2611                 GOTO(out, rc);
2612         }
2613
2614         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2615                                            * away */
2616                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2617                 oit.d.lustre.it_lock_mode = 0;
2618         }
2619         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2620         it_clear_disposition(&oit, DISP_ENQ_COMPLETE);
2621         ll_release_openhandle(head_filp->f_dentry, &oit);
2622 out:
2623         if (op_data)
2624                 OBD_FREE_PTR(op_data);
2625         ll_intent_release(&oit);
2626         RETURN(rc);
2627 }
2628
2629 static int ll_file_join(struct inode *head, struct file *filp,
2630                         char *filename_tail)
2631 {
2632         struct inode *tail = NULL, *first = NULL, *second = NULL;
2633         struct dentry *tail_dentry;
2634         struct file *tail_filp, *first_filp, *second_filp;
2635         struct ll_lock_tree first_tree, second_tree;
2636         struct ll_lock_tree_node *first_node, *second_node;
2637         struct ll_inode_info *hlli = ll_i2info(head);
2638         int rc = 0, cleanup_phase = 0;
2639         ENTRY;
2640
2641         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2642                head->i_ino, head->i_generation, head, filename_tail);
2643
2644         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2645         if (IS_ERR(tail_filp)) {
2646                 CERROR("Can not open tail file %s", filename_tail);
2647                 rc = PTR_ERR(tail_filp);
2648                 GOTO(cleanup, rc);
2649         }
2650         tail = igrab(tail_filp->f_dentry->d_inode);
2651
2652         tail_dentry = tail_filp->f_dentry;
2653         LASSERT(tail_dentry);
2654         cleanup_phase = 1;
2655
2656         /*reorder the inode for lock sequence*/
2657         first = head->i_ino > tail->i_ino ? head : tail;
2658         second = head->i_ino > tail->i_ino ? tail : head;
2659         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2660         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2661
2662         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2663                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2664         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2665         if (IS_ERR(first_node)){
2666                 rc = PTR_ERR(first_node);
2667                 GOTO(cleanup, rc);
2668         }
2669         first_tree.lt_fd = first_filp->private_data;
2670         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2671         if (rc != 0)
2672                 GOTO(cleanup, rc);
2673         cleanup_phase = 2;
2674
2675         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2676         if (IS_ERR(second_node)){
2677                 rc = PTR_ERR(second_node);
2678                 GOTO(cleanup, rc);
2679         }
2680         second_tree.lt_fd = second_filp->private_data;
2681         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2682         if (rc != 0)
2683                 GOTO(cleanup, rc);
2684         cleanup_phase = 3;
2685
2686         rc = join_sanity_check(head, tail);
2687         if (rc)
2688                 GOTO(cleanup, rc);
2689
2690         rc = join_file(head, filp, tail_filp);
2691         if (rc)
2692                 GOTO(cleanup, rc);
2693 cleanup:
2694         switch (cleanup_phase) {
2695         case 3:
2696                 ll_tree_unlock(&second_tree);
2697                 obd_cancel_unused(ll_i2obdexp(second),
2698                                   ll_i2info(second)->lli_smd, 0, NULL);
2699         case 2:
2700                 ll_tree_unlock(&first_tree);
2701                 obd_cancel_unused(ll_i2obdexp(first),
2702                                   ll_i2info(first)->lli_smd, 0, NULL);
2703         case 1:
2704                 filp_close(tail_filp, 0);
2705                 if (tail)
2706                         iput(tail);
2707                 if (head && rc == 0) {
2708                         obd_free_memmd(ll_i2sbi(head)->ll_osc_exp,
2709                                        &hlli->lli_smd);
2710                         hlli->lli_smd = NULL;
2711                 }
2712         case 0:
2713                 break;
2714         default:
2715                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2716                 LBUG();
2717         }
2718         RETURN(rc);
2719 }
2720 #endif  /* LUSTRE_FIX >= 50 */
2721
2722 /**
2723  * Close inode open handle
2724  *
2725  * \param dentry [in]     dentry which contains the inode
2726  * \param it     [in,out] intent which contains open info and result
2727  *
2728  * \retval 0     success
2729  * \retval <0    failure
2730  */
2731 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2732 {
2733         struct inode *inode = dentry->d_inode;
2734         struct obd_client_handle *och;
2735         int rc;
2736         ENTRY;
2737
2738         LASSERT(inode);
2739
2740         /* Root ? Do nothing. */
2741         if (dentry->d_inode->i_sb->s_root == dentry)
2742                 RETURN(0);
2743
2744         /* No open handle to close? Move away */
2745         if (!it_disposition(it, DISP_OPEN_OPEN))
2746                 RETURN(0);
2747
2748         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2749
2750         OBD_ALLOC(och, sizeof(*och));
2751         if (!och)
2752                 GOTO(out, rc = -ENOMEM);
2753
2754         ll_och_fill(ll_i2info(inode), it, och);
2755
2756         rc = ll_close_inode_openhandle(inode, och);
2757
2758         OBD_FREE(och, sizeof(*och));
2759  out:
2760         /* this one is in place of ll_file_open */
2761         if (it_disposition(it, DISP_ENQ_OPEN_REF))
2762                 ptlrpc_req_finished(it->d.lustre.it_data);
2763         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2764         RETURN(rc);
2765 }
2766
2767 int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
2768               int num_bytes)
2769 {
2770         struct obd_export *exp = ll_i2obdexp(inode);
2771         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2772         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
2773         int vallen = num_bytes;
2774         int rc;
2775         ENTRY;
2776
2777         /* If the stripe_count > 1 and the application does not understand
2778          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
2779          */
2780         if (lsm->lsm_stripe_count > 1 &&
2781             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
2782                 return -EOPNOTSUPP;
2783
2784         fm_key.oa.o_id = lsm->lsm_object_id;
2785         fm_key.oa.o_valid = OBD_MD_FLID;
2786
2787         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLFID | OBD_MD_FLSIZE);
2788
2789         /* If filesize is 0, then there would be no objects for mapping */
2790         if (fm_key.oa.o_size == 0) {
2791                 fiemap->fm_mapped_extents = 0;
2792                 RETURN(0);
2793         }
2794
2795         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
2796
2797         rc = obd_get_info(exp, sizeof(fm_key), &fm_key, &vallen, fiemap, lsm);
2798         if (rc)
2799                 CERROR("obd_get_info failed: rc = %d\n", rc);
2800
2801         RETURN(rc);
2802 }
2803
2804 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2805                   unsigned long arg)
2806 {
2807         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2808         int flags;
2809         ENTRY;
2810
2811         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2812                inode->i_generation, inode, cmd);
2813         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2814
2815         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2816         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2817                 RETURN(-ENOTTY);
2818
2819         switch(cmd) {
2820         case LL_IOC_GETFLAGS:
2821                 /* Get the current value of the file flags */
2822                 return put_user(fd->fd_flags, (int *)arg);
2823         case LL_IOC_SETFLAGS:
2824         case LL_IOC_CLRFLAGS:
2825                 /* Set or clear specific file flags */
2826                 /* XXX This probably needs checks to ensure the flags are
2827                  *     not abused, and to handle any flag side effects.
2828                  */
2829                 if (get_user(flags, (int *) arg))
2830                         RETURN(-EFAULT);
2831
2832                 if (cmd == LL_IOC_SETFLAGS) {
2833                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2834                             !(file->f_flags & O_DIRECT)) {
2835                                 CERROR("%s: unable to disable locking on "
2836                                        "non-O_DIRECT file\n", current->comm);
2837                                 RETURN(-EINVAL);
2838                         }
2839
2840                         fd->fd_flags |= flags;
2841                 } else {
2842                         fd->fd_flags &= ~flags;
2843                 }
2844                 RETURN(0);
2845         case LL_IOC_LOV_SETSTRIPE:
2846                 RETURN(ll_lov_setstripe(inode, file, arg));
2847         case LL_IOC_LOV_SETEA:
2848                 RETURN(ll_lov_setea(inode, file, arg));
2849         case LL_IOC_LOV_GETSTRIPE:
2850                 RETURN(ll_lov_getstripe(inode, arg));
2851         case LL_IOC_RECREATE_OBJ:
2852                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2853         case FSFILT_IOC_FIEMAP: {
2854                 struct ll_user_fiemap *fiemap_s;
2855                 size_t num_bytes, ret_bytes;
2856                 unsigned int extent_count;
2857                 int rc = 0;
2858
2859                 /* Get the extent count so we can calculate the size of
2860                  * required fiemap buffer */
2861                 if (get_user(extent_count,
2862                     &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
2863                         RETURN(-EFAULT);
2864                 num_bytes = sizeof(*fiemap_s) + (extent_count *
2865                                                  sizeof(struct ll_fiemap_extent));
2866                 OBD_VMALLOC(fiemap_s, num_bytes);
2867                 if (fiemap_s == NULL)
2868                         RETURN(-ENOMEM);
2869
2870                 if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
2871                                    sizeof(*fiemap_s)))
2872                         GOTO(error, rc = -EFAULT);
2873
2874                 if (fiemap_s->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
2875                         fiemap_s->fm_flags = fiemap_s->fm_flags &
2876                                                     ~LUSTRE_FIEMAP_FLAGS_COMPAT;
2877                         if (copy_to_user((char *)arg, fiemap_s,
2878                                          sizeof(*fiemap_s)))
2879                                 GOTO(error, rc = -EFAULT);
2880
2881                         GOTO(error, rc = -EBADR);
2882                 }
2883
2884                 /* If fm_extent_count is non-zero, read the first extent since
2885                  * it is used to calculate end_offset and device from previous
2886                  * fiemap call. */
2887                 if (extent_count) {
2888                         if (copy_from_user(&fiemap_s->fm_extents[0],
2889                             (char __user *)arg + sizeof(*fiemap_s),
2890                             sizeof(struct ll_fiemap_extent)))
2891                                 GOTO(error, rc = -EFAULT);
2892                 }
2893
2894                 if (fiemap_s->fm_flags & FIEMAP_FLAG_SYNC) {
2895                         int rc;
2896
2897                         rc = filemap_fdatawrite(inode->i_mapping);
2898                         if (rc)
2899                                 GOTO(error, rc);
2900                 }
2901
2902                 rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
2903                 if (rc)
2904                         GOTO(error, rc);
2905
2906                 ret_bytes = sizeof(struct ll_user_fiemap);
2907
2908                 if (extent_count != 0)
2909                         ret_bytes += (fiemap_s->fm_mapped_extents *
2910                                          sizeof(struct ll_fiemap_extent));
2911
2912                 if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
2913                         rc = -EFAULT;
2914
2915 error:
2916                 OBD_VFREE(fiemap_s, num_bytes);
2917                 RETURN(rc);
2918         }
2919         case FSFILT_IOC_GETFLAGS:
2920         case FSFILT_IOC_SETFLAGS:
2921                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2922         case FSFILT_IOC_GETVERSION_OLD:
2923         case FSFILT_IOC_GETVERSION:
2924                 RETURN(put_user(inode->i_generation, (int *)arg));
2925         case LL_IOC_JOIN: {
2926 #if LUSTRE_FIX >= 50
2927                 /* Allow file join in beta builds to allow debuggging */
2928                 char *ftail;
2929                 int rc;
2930
2931                 ftail = getname((const char *)arg);
2932                 if (IS_ERR(ftail))
2933                         RETURN(PTR_ERR(ftail));
2934                 rc = ll_file_join(inode, file, ftail);
2935                 putname(ftail);
2936                 RETURN(rc);
2937 #else
2938                 CWARN("file join is not supported in this version of Lustre\n");
2939                 RETURN(-ENOTTY);
2940 #endif
2941         }
2942         case LL_IOC_GROUP_LOCK:
2943                 RETURN(ll_get_grouplock(inode, file, arg));
2944         case LL_IOC_GROUP_UNLOCK:
2945                 RETURN(ll_put_grouplock(inode, file, arg));
2946         case IOC_OBD_STATFS:
2947                 RETURN(ll_obd_statfs(inode, (void *)arg));
2948         case OBD_IOC_GETNAME_OLD:
2949         case OBD_IOC_GETNAME: {
2950                 struct obd_device *obd =
2951                         class_exp2obd(ll_i2sbi(inode)->ll_osc_exp);
2952                 if (!obd)
2953                         RETURN(-EFAULT);
2954                 if (copy_to_user((void *)arg, obd->obd_name,
2955                                 strlen(obd->obd_name) + 1))
2956                         RETURN (-EFAULT);
2957                 RETURN(0);
2958         }
2959         case LL_IOC_PATH2FID: {
2960                 if (copy_to_user((void *)arg, ll_inode_lu_fid(inode),
2961                                  sizeof(struct lu_fid)))
2962                         RETURN(-EFAULT);
2963
2964                 RETURN(0);
2965         }
2966
2967         /* We need to special case any other ioctls we want to handle,
2968          * to send them to the MDS/OST as appropriate and to properly
2969          * network encode the arg field.
2970         case EXT3_IOC_SETVERSION_OLD:
2971         case EXT3_IOC_SETVERSION:
2972         */
2973         default: {
2974                 int err;
2975
2976                 if (LLIOC_STOP ==
2977                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2978                         RETURN(err);
2979
2980                 RETURN(obd_iocontrol(cmd, ll_i2obdexp(inode), 0, NULL,
2981                                      (void *)arg));
2982         }
2983         }
2984 }
2985
2986 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2987 {
2988         struct inode *inode = file->f_dentry->d_inode;
2989         struct ll_inode_info *lli = ll_i2info(inode);
2990         struct lov_stripe_md *lsm = lli->lli_smd;
2991         loff_t retval;
2992         ENTRY;
2993         retval = offset + ((origin == 2) ? i_size_read(inode) :
2994                            (origin == 1) ? file->f_pos : 0);
2995         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2996                inode->i_ino, inode->i_generation, inode, retval, retval,
2997                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2998         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2999
3000         if (origin == 2) { /* SEEK_END */
3001                 int nonblock = 0, rc;
3002
3003                 if (file->f_flags & O_NONBLOCK)
3004                         nonblock = LDLM_FL_BLOCK_NOWAIT;
3005
3006                 if (lsm != NULL) {
3007                         rc = ll_glimpse_size(inode, nonblock);
3008                         if (rc != 0)
3009                                 RETURN(rc);
3010                 }
3011
3012                 ll_inode_size_lock(inode, 0);
3013                 offset += i_size_read(inode);
3014                 ll_inode_size_unlock(inode, 0);
3015         } else if (origin == 1) { /* SEEK_CUR */
3016                 offset += file->f_pos;
3017         }
3018
3019         retval = -EINVAL;
3020         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
3021                 if (offset != file->f_pos) {
3022                         file->f_pos = offset;
3023                         file->f_version = 0;
3024                 }
3025                 retval = offset;
3026         }
3027
3028         RETURN(retval);
3029 }
3030
3031 int ll_fsync(struct file *file, struct dentry *dentry, int data)
3032 {
3033         struct inode *inode = dentry->d_inode;
3034         struct ll_inode_info *lli = ll_i2info(inode);
3035         struct lov_stripe_md *lsm = lli->lli_smd;
3036         struct ll_fid fid;
3037         struct ptlrpc_request *req;
3038         int rc, err;
3039         ENTRY;
3040         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
3041                inode->i_generation, inode);
3042         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
3043
3044         /* fsync's caller has already called _fdata{sync,write}, we want
3045          * that IO to finish before calling the osc and mdc sync methods */
3046         rc = filemap_fdatawait(inode->i_mapping);
3047
3048         /* catch async errors that were recorded back when async writeback
3049          * failed for pages in this mapping. */
3050         err = lli->lli_async_rc;
3051         lli->lli_async_rc = 0;
3052         if (rc == 0)
3053                 rc = err;
3054         if (lsm) {
3055                 err = lov_test_and_clear_async_rc(lsm);
3056                 if (rc == 0)
3057                         rc = err;
3058         }
3059
3060         ll_inode2fid(&fid, inode);
3061         err = mdc_sync(ll_i2sbi(inode)->ll_mdc_exp, &fid, &req);
3062         if (!rc)
3063                 rc = err;
3064         if (!err)
3065                 ptlrpc_req_finished(req);
3066
3067         if (data && lsm) {
3068                 struct obd_info *oinfo;
3069
3070                 OBD_ALLOC_PTR(oinfo);
3071                 if (!oinfo)
3072                         RETURN(rc ? rc : -ENOMEM);
3073                 OBDO_ALLOC(oinfo->oi_oa);
3074                 if (!oinfo->oi_oa) {
3075                         OBD_FREE_PTR(oinfo);
3076                         RETURN(rc ? rc : -ENOMEM);
3077                 }
3078                 oinfo->oi_oa->o_id = lsm->lsm_object_id;
3079                 oinfo->oi_oa->o_gr = lsm->lsm_object_gr;
3080                 oinfo->oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
3081                 obdo_from_inode(oinfo->oi_oa, inode,
3082                                 OBD_MD_FLTYPE | OBD_MD_FLATIME |
3083                                 OBD_MD_FLMTIME | OBD_MD_FLCTIME);
3084                 oinfo->oi_md = lsm;
3085                 err = obd_sync_rqset(ll_i2sbi(inode)->ll_osc_exp, oinfo,
3086                                      0, OBD_OBJECT_EOF);
3087                 if (!rc)
3088                         rc = err;
3089                 OBDO_FREE(oinfo->oi_oa);
3090                 OBD_FREE_PTR(oinfo);
3091         }
3092
3093         RETURN(rc);
3094 }
3095
3096 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
3097 {
3098         struct inode *inode = file->f_dentry->d_inode;
3099         struct ll_sb_info *sbi = ll_i2sbi(inode);
3100         struct lu_fid *fid = ll_inode_lu_fid(inode);
3101         struct ldlm_res_id res_id =
3102                     { .name = { fid_seq(fid),
3103                                 fid_oid(fid),
3104                                 fid_ver(fid),
3105                                 LDLM_FLOCK} };
3106         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
3107                 ldlm_flock_completion_ast, NULL, file_lock };
3108         struct lustre_handle lockh = {0};
3109         ldlm_policy_data_t flock;
3110         int flags = 0;
3111         int rc;
3112         ENTRY;
3113
3114         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
3115                inode->i_ino, file_lock);
3116         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
3117
3118         if (fid_is_igif(fid)) {
3119                 /* If this is an IGIF inode, we need to keep the 1.6-style
3120                  * flock mapping for compatibility.  If it is a proper FID
3121                  * then we know any other client accessing it must also be
3122                  * accessing it as a FID and can use the CMD-style flock. */
3123                 res_id.name[2] = LDLM_FLOCK;
3124                 res_id.name[3] = 0;
3125         }
3126
3127         if (file_lock->fl_flags & FL_FLOCK) {
3128                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
3129                 /* set missing params for flock() calls */
3130                 file_lock->fl_end = OFFSET_MAX;
3131                 file_lock->fl_pid = current->tgid;
3132         }
3133         flock.l_flock.pid = file_lock->fl_pid;
3134         flock.l_flock.start = file_lock->fl_start;
3135         flock.l_flock.end = file_lock->fl_end;
3136
3137         switch (file_lock->fl_type) {
3138         case F_RDLCK:
3139                 einfo.ei_mode = LCK_PR;
3140                 break;
3141         case F_UNLCK:
3142                 /* An unlock request may or may not have any relation to
3143                  * existing locks so we may not be able to pass a lock handle
3144                  * via a normal ldlm_lock_cancel() request. The request may even
3145                  * unlock a byte range in the middle of an existing lock. In
3146                  * order to process an unlock request we need all of the same
3147                  * information that is given with a normal read or write record
3148                  * lock request. To avoid creating another ldlm unlock (cancel)
3149                  * message we'll treat a LCK_NL flock request as an unlock. */
3150                 einfo.ei_mode = LCK_NL;
3151                 break;
3152         case F_WRLCK:
3153                 einfo.ei_mode = LCK_PW;
3154                 break;
3155         default:
3156                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
3157                 RETURN (-EINVAL);
3158         }
3159
3160         switch (cmd) {
3161         case F_SETLKW:
3162 #ifdef F_SETLKW64
3163         case F_SETLKW64:
3164 #endif
3165                 flags = 0;
3166                 break;
3167         case F_SETLK:
3168 #ifdef F_SETLK64
3169         case F_SETLK64:
3170 #endif
3171                 flags = LDLM_FL_BLOCK_NOWAIT;
3172                 break;
3173         case F_GETLK:
3174 #ifdef F_GETLK64
3175         case F_GETLK64:
3176 #endif
3177                 flags = LDLM_FL_TEST_LOCK;
3178                 /* Save the old mode so that if the mode in the lock changes we
3179                  * can decrement the appropriate reader or writer refcount. */
3180                 file_lock->fl_type = einfo.ei_mode;
3181                 break;
3182         default:
3183                 CERROR("unknown fcntl lock command: %d\n", cmd);
3184                 RETURN (-EINVAL);
3185         }
3186
3187         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
3188                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
3189                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
3190
3191         rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, &einfo, res_id,
3192                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
3193         if ((file_lock->fl_flags & FL_FLOCK) &&
3194             (rc == 0 || file_lock->fl_type == F_UNLCK))
3195                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
3196 #ifdef HAVE_F_OP_FLOCK
3197         if ((file_lock->fl_flags & FL_POSIX) &&
3198             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
3199             !(flags & LDLM_FL_TEST_LOCK))
3200                 posix_lock_file_wait(file, file_lock);
3201 #endif
3202
3203         RETURN(rc);
3204 }
3205
3206 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
3207 {
3208         ENTRY;
3209
3210         RETURN(-ENOSYS);
3211 }
3212
3213 int ll_have_md_lock(struct inode *inode, __u64 bits)
3214 {
3215         struct lustre_handle lockh;
3216         struct ldlm_res_id res_id;
3217         struct obd_device *obddev;
3218         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
3219         int flags;
3220         ENTRY;
3221
3222         if (!inode)
3223                RETURN(0);
3224
3225         obddev = ll_i2mdcexp(inode)->exp_obd;
3226         fid_build_reg_res_name(ll_inode_lu_fid(inode), &res_id);
3227
3228         CDEBUG(D_INFO, "trying to match res "LPU64":"LPU64":"LPU64"\n",
3229                 res_id.name[0],
3230                 res_id.name[1],
3231                 res_id.name[2]);
3232
3233         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
3234         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
3235                             &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
3236                 RETURN(1);
3237         }
3238
3239         RETURN(0);
3240 }
3241
3242 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
3243         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
3244                               * and return success */
3245                 inode->i_nlink = 0;
3246                 /* This path cannot be hit for regular files unless in
3247                  * case of obscure races, so no need to to validate
3248                  * size. */
3249                 if (!S_ISREG(inode->i_mode) &&
3250                     !S_ISDIR(inode->i_mode))
3251                         return 0;
3252         }
3253
3254         if (rc) {
3255                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
3256                 return -abs(rc);
3257
3258         }
3259
3260         return 0;
3261 }
3262
3263 int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
3264                              __u64 ibits)
3265 {
3266         struct inode *inode = dentry->d_inode;
3267         struct ptlrpc_request *req = NULL;
3268         struct obd_export *exp;
3269         int rc = 0;
3270         ENTRY;
3271
3272         if (!inode) {
3273                 CERROR("REPORT THIS LINE TO PETER\n");
3274                 RETURN(0);
3275         }
3276         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
3277                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
3278
3279         exp = ll_i2mdcexp(inode);
3280
3281         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
3282                 struct lookup_intent oit = { .it_op = IT_GETATTR };
3283                 struct mdc_op_data op_data = { { 0 } };
3284
3285                 /* Call getattr by fid, so do not provide name at all. */
3286                 ll_prepare_mdc_op_data(&op_data, dentry->d_parent->d_inode,
3287                                        dentry->d_inode, NULL, 0, 0, NULL);
3288                 oit.it_create_mode |= M_CHECK_STALE;
3289                 rc = mdc_intent_lock(exp, &op_data, NULL, 0,
3290                                      /* we are not interested in name
3291                                         based lookup */
3292                                      &oit, 0, &req,
3293                                      ll_mdc_blocking_ast, 0);
3294                 oit.it_create_mode &= ~M_CHECK_STALE;
3295                 if (rc < 0) {
3296                         rc = ll_inode_revalidate_fini(inode, rc);
3297                         GOTO (out, rc);
3298                 }
3299
3300                 rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
3301                 if (rc != 0) {
3302                         ll_intent_release(&oit);
3303                         GOTO(out, rc);
3304                 }
3305
3306                 /* Unlinked? Unhash dentry, so it is not picked up later by
3307                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
3308                    here to preserve get_cwd functionality on 2.6.
3309                    Bug 10503 */
3310                 if (!dentry->d_inode->i_nlink) {
3311                         spin_lock(&ll_lookup_lock);
3312                         spin_lock(&dcache_lock);
3313                         ll_drop_dentry(dentry);
3314                         spin_unlock(&dcache_lock);
3315                         spin_unlock(&ll_lookup_lock);
3316                 }
3317
3318                 ll_lookup_finish_locks(&oit, dentry);
3319         } else if (!ll_have_md_lock(dentry->d_inode, ibits)) {
3320                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
3321                 struct ll_fid fid;
3322                 obd_valid valid = OBD_MD_FLGETATTR;
3323                 int ealen = 0;
3324
3325                 if (S_ISREG(inode->i_mode)) {
3326                         rc = ll_get_max_mdsize(sbi, &ealen);
3327                         if (rc)
3328                                 RETURN(rc);
3329                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
3330                 }
3331                 ll_inode2fid(&fid, inode);
3332                 rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req);
3333                 if (rc) {
3334                         rc = ll_inode_revalidate_fini(inode, rc);
3335                         RETURN(rc);
3336                 }
3337
3338                 rc = ll_prep_inode(sbi->ll_osc_exp, &inode, req, REPLY_REC_OFF,
3339                                    NULL);
3340         }
3341
3342 out:
3343         ptlrpc_req_finished(req);
3344         RETURN(rc);
3345 }
3346
3347 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
3348 {
3349         struct inode *inode = dentry->d_inode;
3350         int rc;
3351         ENTRY;
3352
3353         rc = __ll_inode_revalidate_it(dentry, it, MDS_INODELOCK_UPDATE |
3354                                                   MDS_INODELOCK_LOOKUP);
3355
3356         /* if object not yet allocated, don't validate size */
3357         if (rc == 0 && ll_i2info(inode)->lli_smd == NULL) {
3358                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
3359                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
3360                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
3361                 RETURN(0);
3362         }
3363
3364         /* ll_glimpse_size will prefer locally cached writes if they extend
3365          * the file */
3366
3367         if (rc == 0)
3368                 rc = ll_glimpse_size(inode, 0);
3369
3370         RETURN(rc);
3371 }
3372
3373 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
3374                   struct lookup_intent *it, struct kstat *stat)
3375 {
3376         struct inode *inode = de->d_inode;
3377         int res = 0;
3378
3379         res = ll_inode_revalidate_it(de, it);
3380         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
3381
3382         if (res)
3383                 return res;
3384
3385         stat->dev = inode->i_sb->s_dev;
3386         stat->ino = inode->i_ino;
3387         stat->mode = inode->i_mode;
3388         stat->nlink = inode->i_nlink;
3389         stat->uid = inode->i_uid;
3390         stat->gid = inode->i_gid;
3391         stat->rdev = kdev_t_to_nr(inode->i_rdev);
3392         stat->atime = inode->i_atime;
3393         stat->mtime = inode->i_mtime;
3394         stat->ctime = inode->i_ctime;
3395 #ifdef HAVE_INODE_BLKSIZE
3396         stat->blksize = inode->i_blksize;
3397 #else
3398         stat->blksize = 1<<inode->i_blkbits;
3399 #endif
3400
3401         ll_inode_size_lock(inode, 0);
3402         stat->size = i_size_read(inode);
3403         stat->blocks = inode->i_blocks;
3404         ll_inode_size_unlock(inode, 0);
3405
3406         return 0;
3407 }
3408 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3409 {
3410         struct lookup_intent it = { .it_op = IT_GETATTR };
3411
3412         return ll_getattr_it(mnt, de, &it, stat);
3413 }
3414
3415 #ifdef HAVE_LINUX_FIEMAP_H
3416 int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
3417                 __u64 start, __u64 len)
3418 {
3419         int rc;
3420         struct ll_user_fiemap *fiemap = (struct ll_user_fiemap*)(
3421                 fieinfo->fi_extents_start - sizeof(ll_user_fiemap));
3422
3423         rc = ll_do_fiemap(inode, fiemap, sizeof(*fiemap) +
3424                           fiemap->fm_extent_count *
3425                           sizeof(struct ll_fiemap_extent));
3426
3427         fieinfo->fi_flags = fiemap->fm_flags;
3428         fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
3429
3430         return rc;
3431 }
3432 #endif
3433
3434
3435 static
3436 int lustre_check_acl(struct inode *inode, int mask)
3437 {
3438 #ifdef CONFIG_FS_POSIX_ACL
3439         struct ll_inode_info *lli = ll_i2info(inode);
3440         struct posix_acl *acl;
3441         int rc;
3442         ENTRY;
3443
3444         spin_lock(&lli->lli_lock);
3445         acl = posix_acl_dup(lli->lli_posix_acl);
3446         spin_unlock(&lli->lli_lock);
3447
3448         if (!acl)
3449                 RETURN(-EAGAIN);
3450
3451         rc = posix_acl_permission(inode, acl, mask);
3452         posix_acl_release(acl);
3453
3454         RETURN(rc);
3455 #else
3456         return -EAGAIN;
3457 #endif
3458 }
3459
3460 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
3461 #ifndef HAVE_INODE_PERMISION_2ARGS
3462 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3463 #else
3464 int ll_inode_permission(struct inode *inode, int mask)
3465 #endif
3466 {
3467         int rc = 0;
3468         ENTRY;
3469
3470        /* as root inode are NOT getting validated in lookup operation,
3471         * need to do it before permission check. */
3472
3473         if (inode == inode->i_sb->s_root->d_inode) {
3474                 struct lookup_intent it = { .it_op = IT_LOOKUP };
3475
3476                 rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
3477                                               MDS_INODELOCK_LOOKUP);
3478                 if (rc)
3479                         RETURN(rc);
3480         }
3481
3482         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
3483                inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
3484
3485         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3486         rc = generic_permission(inode, mask, lustre_check_acl);
3487
3488         RETURN(rc);
3489 }
3490 #else
3491 #ifndef HAVE_INODE_PERMISION_2ARGS
3492 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3493 #else
3494 int ll_inode_permission(struct inode *inode, int mask)
3495 #endif
3496 {
3497         int mode = inode->i_mode;
3498         int rc;
3499
3500         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3501                inode->i_ino, inode->i_generation, inode, mask);
3502         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3503
3504         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
3505             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
3506                 return -EROFS;
3507         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
3508                 return -EACCES;
3509         if (current->fsuid == inode->i_uid) {
3510                 mode >>= 6;
3511         } else if (1) {
3512                 if (((mode >> 3) & mask & S_IRWXO) != mask)
3513                         goto check_groups;
3514                 rc = lustre_check_acl(inode, mask);
3515                 if (rc == -EAGAIN)
3516                         goto check_groups;
3517                 if (rc == -EACCES)
3518                         goto check_capabilities;
3519                 return rc;
3520         } else {
3521 check_groups:
3522                 if (in_group_p(inode->i_gid))
3523                         mode >>= 3;
3524         }
3525         if ((mode & mask & S_IRWXO) == mask)
3526                 return 0;
3527
3528 check_capabilities:
3529         if (!(mask & MAY_EXEC) ||
3530             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
3531                 if (cfs_capable(CFS_CAP_DAC_OVERRIDE))
3532                         return 0;
3533
3534         if (cfs_capable(CFS_CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
3535             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
3536                 return 0;
3537
3538         return -EACCES;
3539 }
3540 #endif
3541
3542 /* -o localflock - only provides locally consistent flock locks */
3543 struct file_operations ll_file_operations = {
3544         .read           = ll_file_read,
3545 #ifdef HAVE_FILE_READV
3546         .readv          = ll_file_readv,
3547 #else
3548         .aio_read       = ll_file_aio_read,
3549 #endif
3550         .write          = ll_file_write,
3551 #ifdef HAVE_FILE_WRITEV
3552         .writev         = ll_file_writev,
3553 #else
3554         .aio_write      = ll_file_aio_write,
3555 #endif
3556         .ioctl          = ll_file_ioctl,
3557         .open           = ll_file_open,
3558         .release        = ll_file_release,
3559         .mmap           = ll_file_mmap,
3560         .llseek         = ll_file_seek,
3561 #ifdef HAVE_KERNEL_SPLICE_READ
3562         .splice_read    = ll_file_splice_read,
3563 #endif
3564 #ifdef HAVE_KERNEL_SENDFILE
3565         .sendfile       = ll_file_sendfile,
3566 #endif
3567         .fsync          = ll_fsync,
3568 };
3569
3570 struct file_operations ll_file_operations_flock = {
3571         .read           = ll_file_read,
3572 #ifdef HAVE_FILE_READV
3573         .readv          = ll_file_readv,
3574 #else
3575         .aio_read       = ll_file_aio_read,
3576 #endif
3577         .write          = ll_file_write,
3578 #ifdef HAVE_FILE_WRITEV
3579         .writev         = ll_file_writev,
3580 #else
3581         .aio_write      = ll_file_aio_write,
3582 #endif
3583         .ioctl          = ll_file_ioctl,
3584         .open           = ll_file_open,
3585         .release        = ll_file_release,
3586         .mmap           = ll_file_mmap,
3587         .llseek         = ll_file_seek,
3588 #ifdef HAVE_KERNEL_SPLICE_READ
3589         .splice_read    = ll_file_splice_read,
3590 #endif
3591 #ifdef HAVE_KERNEL_SENDFILE
3592         .sendfile       = ll_file_sendfile,
3593 #endif
3594         .fsync          = ll_fsync,
3595 #ifdef HAVE_F_OP_FLOCK
3596         .flock          = ll_file_flock,
3597 #endif
3598         .lock           = ll_file_flock
3599 };
3600
3601 /* These are for -o noflock - to return ENOSYS on flock calls */
3602 struct file_operations ll_file_operations_noflock = {
3603         .read           = ll_file_read,
3604 #ifdef HAVE_FILE_READV
3605         .readv          = ll_file_readv,
3606 #else
3607         .aio_read       = ll_file_aio_read,
3608 #endif
3609         .write          = ll_file_write,
3610 #ifdef HAVE_FILE_WRITEV
3611         .writev         = ll_file_writev,
3612 #else
3613         .aio_write      = ll_file_aio_write,
3614 #endif
3615         .ioctl          = ll_file_ioctl,
3616         .open           = ll_file_open,
3617         .release        = ll_file_release,
3618         .mmap           = ll_file_mmap,
3619         .llseek         = ll_file_seek,
3620 #ifdef HAVE_KERNEL_SPLICE_READ
3621         .splice_read    = ll_file_splice_read,
3622 #endif
3623 #ifdef HAVE_KERNEL_SENDFILE
3624         .sendfile       = ll_file_sendfile,
3625 #endif
3626         .fsync          = ll_fsync,
3627 #ifdef HAVE_F_OP_FLOCK
3628         .flock          = ll_file_noflock,
3629 #endif
3630         .lock           = ll_file_noflock
3631 };
3632
3633 struct inode_operations ll_file_inode_operations = {
3634 #ifdef HAVE_VFS_INTENT_PATCHES
3635         .setattr_raw    = ll_setattr_raw,
3636 #endif
3637         .setattr        = ll_setattr,
3638         .truncate       = ll_truncate,
3639         .getattr        = ll_getattr,
3640         .permission     = ll_inode_permission,
3641         .setxattr       = ll_setxattr,
3642         .getxattr       = ll_getxattr,
3643         .listxattr      = ll_listxattr,
3644         .removexattr    = ll_removexattr,
3645 #ifdef  HAVE_LINUX_FIEMAP_H
3646         .fiemap         = ll_fiemap,
3647 #endif
3648 };
3649
3650 /* dynamic ioctl number support routins */
3651 static struct llioc_ctl_data {
3652         struct rw_semaphore ioc_sem;
3653         struct list_head    ioc_head;
3654 } llioc = {
3655         __RWSEM_INITIALIZER(llioc.ioc_sem),
3656         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3657 };
3658
3659
3660 struct llioc_data {
3661         struct list_head        iocd_list;
3662         unsigned int            iocd_size;
3663         llioc_callback_t        iocd_cb;
3664         unsigned int            iocd_count;
3665         unsigned int            iocd_cmd[0];
3666 };
3667
3668 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3669 {
3670         unsigned int size;
3671         struct llioc_data *in_data = NULL;
3672         ENTRY;
3673
3674         if (cb == NULL || cmd == NULL ||
3675             count > LLIOC_MAX_CMD || count < 0)
3676                 RETURN(NULL);
3677
3678         size = sizeof(*in_data) + count * sizeof(unsigned int);
3679         OBD_ALLOC(in_data, size);
3680         if (in_data == NULL)
3681                 RETURN(NULL);
3682
3683         memset(in_data, 0, sizeof(*in_data));
3684         in_data->iocd_size = size;
3685         in_data->iocd_cb = cb;
3686         in_data->iocd_count = count;
3687         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3688
3689         down_write(&llioc.ioc_sem);
3690         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3691         up_write(&llioc.ioc_sem);
3692
3693         RETURN(in_data);
3694 }
3695
3696 void ll_iocontrol_unregister(void *magic)
3697 {
3698         struct llioc_data *tmp;
3699
3700         if (magic == NULL)
3701                 return;
3702
3703         down_write(&llioc.ioc_sem);
3704         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3705                 if (tmp == magic) {
3706                         unsigned int size = tmp->iocd_size;
3707
3708                         list_del(&tmp->iocd_list);
3709                         up_write(&llioc.ioc_sem);
3710
3711                         OBD_FREE(tmp, size);
3712                         return;
3713                 }
3714         }
3715         up_write(&llioc.ioc_sem);
3716
3717         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3718 }
3719
3720 EXPORT_SYMBOL(ll_iocontrol_register);
3721 EXPORT_SYMBOL(ll_iocontrol_unregister);
3722
3723 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
3724                         unsigned int cmd, unsigned long arg, int *rcp)
3725 {
3726         enum llioc_iter ret = LLIOC_CONT;
3727         struct llioc_data *data;
3728         int rc = -EINVAL, i;
3729
3730         down_read(&llioc.ioc_sem);
3731         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3732                 for (i = 0; i < data->iocd_count; i++) {
3733                         if (cmd != data->iocd_cmd[i])
3734                                 continue;
3735
3736                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3737                         break;
3738                 }
3739
3740                 if (ret == LLIOC_STOP)
3741                         break;
3742         }
3743         up_read(&llioc.ioc_sem);
3744
3745         if (rcp)
3746                 *rcp = rc;
3747         return ret;
3748 }