Whamcloud - gitweb
b=16774
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <linux/pagemap.h>
47 #include <linux/file.h>
48 #include "llite_internal.h"
49 #include <lustre/ll_fiemap.h>
50
51 /* also used by llite/special.c:ll_special_open() */
52 struct ll_file_data *ll_file_data_get(void)
53 {
54         struct ll_file_data *fd;
55
56         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
57         return fd;
58 }
59
60 static void ll_file_data_put(struct ll_file_data *fd)
61 {
62         if (fd != NULL)
63                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
64 }
65
66 static int ll_close_inode_openhandle(struct inode *inode,
67                                      struct obd_client_handle *och)
68 {
69         struct ptlrpc_request *req = NULL;
70         struct obd_device *obd;
71         struct obdo *oa;
72         struct mdc_op_data data = { { 0 } };
73         obd_flag valid;
74         int rc;
75         ENTRY;
76
77         obd = class_exp2obd(ll_i2mdcexp(inode));
78         if (obd == NULL) {
79                 CERROR("Invalid MDC connection handle "LPX64"\n",
80                        ll_i2mdcexp(inode)->exp_handle.h_cookie);
81                 GOTO(out, rc = 0);
82         }
83
84         /*
85          * here we check if this is forced umount. If so this is called on
86          * canceling "open lock" and we do not call mdc_close() in this case, as
87          * it will not be successful, as import is already deactivated.
88          */
89         if (obd->obd_force)
90                 GOTO(out, rc = 0);
91
92         OBDO_ALLOC(oa);
93         if (!oa)
94                 RETURN(-ENOMEM); // XXX We leak openhandle and request here.
95
96         oa->o_id = inode->i_ino;
97         oa->o_valid = OBD_MD_FLID;
98         valid = OBD_MD_FLTYPE | OBD_MD_FLMODE | OBD_MD_FLATIME |
99                 OBD_MD_FLMTIME | OBD_MD_FLCTIME;
100         if (S_ISREG(inode->i_mode))
101                 valid |=  OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
102         obdo_from_inode(oa, inode, valid);
103         if (ll_is_inode_dirty(inode)) {
104                 oa->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
105                 oa->o_valid |= OBD_MD_FLFLAGS;
106         }
107         ll_inode2fid(&data.fid1, inode);
108         rc = mdc_close(ll_i2mdcexp(inode), &data, oa, och, &req);
109         if (rc == EAGAIN) {
110                 /* We are the last writer, so the MDS has instructed us to get
111                  * the file size and any write cookies, then close again. */
112                 ll_queue_done_writing(inode);
113                 rc = 0;
114         } else if (rc) {
115                 CERROR("inode %lu mdc close failed: rc = %d\n",
116                        inode->i_ino, rc);
117         }
118
119         OBDO_FREE(oa);
120
121         if (rc == 0) {
122                 rc = ll_objects_destroy(req, inode);
123                 if (rc)
124                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
125                                inode->i_ino, rc);
126         }
127
128         ptlrpc_req_finished(req); /* This is close request */
129         EXIT;
130 out:
131         mdc_clear_open_replay_data(och);
132
133         return rc;
134 }
135
136 int ll_mdc_real_close(struct inode *inode, int flags)
137 {
138         struct ll_inode_info *lli = ll_i2info(inode);
139         int rc = 0;
140         struct obd_client_handle **och_p;
141         struct obd_client_handle *och;
142         __u64 *och_usecount;
143
144         ENTRY;
145
146         if (flags & FMODE_WRITE) {
147                 och_p = &lli->lli_mds_write_och;
148                 och_usecount = &lli->lli_open_fd_write_count;
149         } else if (flags & FMODE_EXEC) {
150                 och_p = &lli->lli_mds_exec_och;
151                 och_usecount = &lli->lli_open_fd_exec_count;
152          } else {
153                 LASSERT(flags & FMODE_READ);
154                 och_p = &lli->lli_mds_read_och;
155                 och_usecount = &lli->lli_open_fd_read_count;
156         }
157
158         down(&lli->lli_och_sem);
159         if (*och_usecount) { /* There are still users of this handle, so
160                                 skip freeing it. */
161                 up(&lli->lli_och_sem);
162                 RETURN(0);
163         }
164         och=*och_p;
165         *och_p = NULL;
166         up(&lli->lli_och_sem);
167
168         if (och) { /* There might be a race and somebody have freed this och
169                       already */
170                 rc = ll_close_inode_openhandle(inode, och);
171                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
172                 OBD_FREE(och, sizeof *och);
173         }
174
175         RETURN(rc);
176 }
177
178 int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
179                         struct file *file)
180 {
181         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
182         struct ll_inode_info *lli = ll_i2info(inode);
183         int rc = 0;
184         ENTRY;
185
186         /* clear group lock, if present */
187         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
188                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
189                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
190                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
191                                       &fd->fd_cwlockh);
192         }
193
194         /* Let's see if we have good enough OPEN lock on the file and if
195            we can skip talking to MDS */
196         if (file->f_dentry->d_inode) { /* Can this ever be false? */
197                 int lockmode;
198                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
199                 struct lustre_handle lockh;
200                 struct inode *inode = file->f_dentry->d_inode;
201                 struct ldlm_res_id file_res_id;
202
203                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
204                 fid_build_reg_res_name(ll_inode_lu_fid(inode), &file_res_id);
205
206                 down(&lli->lli_och_sem);
207                 if (fd->fd_omode & FMODE_WRITE) {
208                         lockmode = LCK_CW;
209                         LASSERT(lli->lli_open_fd_write_count);
210                         lli->lli_open_fd_write_count--;
211                 } else if (fd->fd_omode & FMODE_EXEC) {
212                         lockmode = LCK_PR;
213                         LASSERT(lli->lli_open_fd_exec_count);
214                         lli->lli_open_fd_exec_count--;
215                 } else {
216                         lockmode = LCK_CR;
217                         LASSERT(lli->lli_open_fd_read_count);
218                         lli->lli_open_fd_read_count--;
219                 }
220                 up(&lli->lli_och_sem);
221
222                 if (!ldlm_lock_match(mdc_exp->exp_obd->obd_namespace, flags,
223                                      &file_res_id, LDLM_IBITS, &policy,lockmode,
224                                      &lockh)) {
225                         rc = ll_mdc_real_close(file->f_dentry->d_inode,
226                                                 fd->fd_omode);
227                 }
228         } else {
229                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
230                        file, file->f_dentry, file->f_dentry->d_name.name);
231         }
232
233         LUSTRE_FPRIVATE(file) = NULL;
234         ll_file_data_put(fd);
235
236         RETURN(rc);
237 }
238
239 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
240
241 /* While this returns an error code, fput() the caller does not, so we need
242  * to make every effort to clean up all of our state here.  Also, applications
243  * rarely check close errors and even if an error is returned they will not
244  * re-try the close call.
245  */
246 int ll_file_release(struct inode *inode, struct file *file)
247 {
248         struct ll_file_data *fd;
249         struct ll_sb_info *sbi = ll_i2sbi(inode);
250         struct ll_inode_info *lli = ll_i2info(inode);
251         struct lov_stripe_md *lsm = lli->lli_smd;
252         int rc;
253         ENTRY;
254
255         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
256                inode->i_generation, inode);
257
258
259         if (inode->i_sb->s_root != file->f_dentry)
260                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
261         fd = LUSTRE_FPRIVATE(file);
262         LASSERT(fd != NULL);
263
264         /* The last ref on @file, maybe not the the owner pid of statahead.
265          * Different processes can open the same dir, "ll_opendir_key" means:
266          * it is me that should stop the statahead thread. */
267         if (lli->lli_opendir_key == fd && lli->lli_opendir_pid != 0)
268                 ll_stop_statahead(inode, lli->lli_opendir_key);
269
270         if (inode->i_sb->s_root == file->f_dentry) {
271                 LUSTRE_FPRIVATE(file) = NULL;
272                 ll_file_data_put(fd);
273                 RETURN(0);
274         }
275
276         if (lsm)
277                 lov_test_and_clear_async_rc(lsm);
278         lli->lli_async_rc = 0;
279
280         rc = ll_mdc_close(sbi->ll_mdc_exp, inode, file);
281         RETURN(rc);
282 }
283
284 static int ll_intent_file_open(struct file *file, void *lmm,
285                                int lmmsize, struct lookup_intent *itp)
286 {
287         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
288         struct mdc_op_data data = { { 0 } };
289         struct dentry *parent = file->f_dentry->d_parent;
290         const char *name = file->f_dentry->d_name.name;
291         const int len = file->f_dentry->d_name.len;
292         struct inode *inode = file->f_dentry->d_inode;
293         struct ptlrpc_request *req;
294         int rc;
295         ENTRY;
296
297         if (!parent)
298                 RETURN(-ENOENT);
299
300         ll_prepare_mdc_op_data(&data, parent->d_inode, inode,
301                                name, len, O_RDWR, NULL);
302
303         /* Usually we come here only for NFSD, and we want open lock.
304            But we can also get here with pre 2.6.15 patchless kernels, and in
305            that case that lock is also ok */
306         /* We can also get here if there was cached open handle in revalidate_it
307          * but it disappeared while we were getting from there to ll_file_open.
308          * But this means this file was closed and immediatelly opened which
309          * makes a good candidate for using OPEN lock */
310         /* If lmmsize & lmm are not 0, we are just setting stripe info
311          * parameters. No need for the open lock */
312         if (!lmm && !lmmsize)
313                 itp->it_flags |= MDS_OPEN_LOCK;
314
315         rc = mdc_intent_lock(sbi->ll_mdc_exp, &data, lmm, lmmsize, itp,
316                               0 /*unused */, &req, ll_mdc_blocking_ast, 0);
317         if (rc == -ESTALE) {
318                 /* reason for keep own exit path - don`t flood log
319                 * with messages with -ESTALE errors.
320                 */
321                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
322                      it_open_error(DISP_OPEN_OPEN, itp))
323                         GOTO(out, rc);
324                 ll_release_openhandle(file->f_dentry, itp);
325                 GOTO(out, rc);
326         }
327
328         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
329                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
330                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
331                 GOTO(out, rc);
332         }
333
334         if (itp->d.lustre.it_lock_mode)
335                 mdc_set_lock_data(&itp->d.lustre.it_lock_handle,
336                                   inode);
337
338         rc = ll_prep_inode(sbi->ll_osc_exp, &file->f_dentry->d_inode,
339                            req, DLM_REPLY_REC_OFF, NULL);
340 out:
341         ptlrpc_req_finished(itp->d.lustre.it_data);
342         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
343         ll_intent_drop_lock(itp);
344
345         RETURN(rc);
346 }
347
348
349 static void ll_och_fill(struct ll_inode_info *lli, struct lookup_intent *it,
350                         struct obd_client_handle *och)
351 {
352         struct ptlrpc_request *req = it->d.lustre.it_data;
353         struct mds_body *body;
354
355         LASSERT(och);
356
357         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
358         LASSERT(body != NULL);                  /* reply already checked out */
359         /* and swabbed in mdc_enqueue */
360         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
361
362         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
363         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
364         lli->lli_io_epoch = body->io_epoch;
365
366         mdc_set_open_replay_data(och, it->d.lustre.it_data);
367 }
368
369 int ll_local_open(struct file *file, struct lookup_intent *it,
370                   struct ll_file_data *fd, struct obd_client_handle *och)
371 {
372         ENTRY;
373
374         LASSERT(!LUSTRE_FPRIVATE(file));
375
376         LASSERT(fd != NULL);
377
378         if (och)
379                 ll_och_fill(ll_i2info(file->f_dentry->d_inode), it, och);
380         LUSTRE_FPRIVATE(file) = fd;
381         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
382         fd->fd_omode = it->it_flags;
383
384         RETURN(0);
385 }
386
387 /* Open a file, and (for the very first open) create objects on the OSTs at
388  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
389  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
390  * lli_open_sem to ensure no other process will create objects, send the
391  * stripe MD to the MDS, or try to destroy the objects if that fails.
392  *
393  * If we already have the stripe MD locally then we don't request it in
394  * mdc_open(), by passing a lmm_size = 0.
395  *
396  * It is up to the application to ensure no other processes open this file
397  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
398  * used.  We might be able to avoid races of that sort by getting lli_open_sem
399  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
400  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
401  */
402 int ll_file_open(struct inode *inode, struct file *file)
403 {
404         struct ll_inode_info *lli = ll_i2info(inode);
405         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
406                                           .it_flags = file->f_flags };
407         struct lov_stripe_md *lsm;
408         struct ptlrpc_request *req = NULL;
409         struct obd_client_handle **och_p;
410         __u64 *och_usecount;
411         struct ll_file_data *fd;
412         int rc = 0, opendir_set = 0;
413         ENTRY;
414
415         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
416                inode->i_generation, inode, file->f_flags);
417
418 #ifdef HAVE_VFS_INTENT_PATCHES
419         it = file->f_it;
420 #else
421         it = file->private_data; /* XXX: compat macro */
422         file->private_data = NULL; /* prevent ll_local_open assertion */
423 #endif
424
425         fd = ll_file_data_get();
426         if (fd == NULL)
427                 RETURN(-ENOMEM);
428
429         if (S_ISDIR(inode->i_mode)) {
430 again:
431                 spin_lock(&lli->lli_lock);
432                 if (lli->lli_opendir_key == NULL && lli->lli_opendir_pid == 0) {
433                         LASSERT(lli->lli_sai == NULL);
434                         lli->lli_opendir_key = fd;
435                         lli->lli_opendir_pid = cfs_curproc_pid();
436                         opendir_set = 1;
437                 } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid() &&
438                                     lli->lli_opendir_key != NULL)) {
439                         /* Two cases for this:
440                          * (1) The same process open such directory many times.
441                          * (2) The old process opened the directory, and exited
442                          *     before its children processes. Then new process
443                          *     with the same pid opens such directory before the
444                          *     old process's children processes exit.
445                          * reset stat ahead for such cases. */
446                         spin_unlock(&lli->lli_lock);
447                         CDEBUG(D_INFO, "Conflict statahead for %.*s %lu/%u"
448                                " reset it.\n", file->f_dentry->d_name.len,
449                                file->f_dentry->d_name.name,
450                                inode->i_ino, inode->i_generation);
451                         ll_stop_statahead(inode, lli->lli_opendir_key);
452                         goto again;
453                 }
454                 spin_unlock(&lli->lli_lock);
455         }
456
457         if (inode->i_sb->s_root == file->f_dentry) {
458                 LUSTRE_FPRIVATE(file) = fd;
459                 RETURN(0);
460         }
461
462         if (!it || !it->d.lustre.it_disposition) {
463                 /* Convert f_flags into access mode. We cannot use file->f_mode,
464                  * because everything but O_ACCMODE mask was stripped from it */
465                 if ((oit.it_flags + 1) & O_ACCMODE)
466                         oit.it_flags++;
467                 if (file->f_flags & O_TRUNC)
468                         oit.it_flags |= FMODE_WRITE;
469
470                 /* kernel only call f_op->open in dentry_open.  filp_open calls
471                  * dentry_open after call to open_namei that checks permissions.
472                  * Only nfsd_open call dentry_open directly without checking
473                  * permissions and because of that this code below is safe. */
474                 if (oit.it_flags & FMODE_WRITE)
475                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
476
477                 /* We do not want O_EXCL here, presumably we opened the file
478                  * already? XXX - NFS implications? */
479                 oit.it_flags &= ~O_EXCL;
480
481                 it = &oit;
482         }
483
484 restart:
485         /* Let's see if we have file open on MDS already. */
486         if (it->it_flags & FMODE_WRITE) {
487                 och_p = &lli->lli_mds_write_och;
488                 och_usecount = &lli->lli_open_fd_write_count;
489         } else if (it->it_flags & FMODE_EXEC) {
490                 och_p = &lli->lli_mds_exec_och;
491                 och_usecount = &lli->lli_open_fd_exec_count;
492          } else {
493                 och_p = &lli->lli_mds_read_och;
494                 och_usecount = &lli->lli_open_fd_read_count;
495         }
496
497         LASSERTF(it->it_flags != 0, "it %p dist %d \n", it,
498                  it->d.lustre.it_disposition);
499
500         down(&lli->lli_och_sem);
501         if (*och_p) { /* Open handle is present */
502                 if (it_disposition(it, DISP_OPEN_OPEN)) {
503                         /* Well, there's extra open request that we do not need,
504                            let's close it somehow. This will decref request. */
505                         rc = it_open_error(DISP_OPEN_OPEN, it);
506                         if (rc) {
507                                 up(&lli->lli_och_sem);
508                                 ll_file_data_put(fd);
509                                 GOTO(out_openerr, rc);
510                         }
511                         ll_release_openhandle(file->f_dentry, it);
512                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
513                                              LPROC_LL_OPEN);
514                 }
515                 (*och_usecount)++;
516
517                 rc = ll_local_open(file, it, fd, NULL);
518
519                 LASSERTF(rc == 0, "rc = %d\n", rc);
520         } else {
521                 LASSERT(*och_usecount == 0);
522                 if (!it->d.lustre.it_disposition) {
523                         /* We cannot just request lock handle now, new ELC code
524                            means that one of other OPEN locks for this file
525                            could be cancelled, and since blocking ast handler
526                            would attempt to grab och_sem as well, that would
527                            result in a deadlock */
528                         up(&lli->lli_och_sem);
529                         it->it_create_mode |= M_CHECK_STALE;
530                         rc = ll_intent_file_open(file, NULL, 0, it);
531                         it->it_create_mode &= ~M_CHECK_STALE;
532                         if (rc) {
533                                 ll_file_data_put(fd);
534                                 GOTO(out_openerr, rc);
535                         }
536
537                         mdc_set_lock_data(&it->d.lustre.it_lock_handle,
538                                           file->f_dentry->d_inode);
539                         goto restart;
540                 }
541
542                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
543                 if (!*och_p) {
544                         ll_file_data_put(fd);
545                         GOTO(out_och_free, rc = -ENOMEM);
546                 }
547                 (*och_usecount)++;
548                req = it->d.lustre.it_data;
549
550                 /* mdc_intent_lock() didn't get a request ref if there was an
551                  * open error, so don't do cleanup on the request here
552                  * (bug 3430) */
553                 /* XXX (green): Should not we bail out on any error here, not
554                  * just open error? */
555                 rc = it_open_error(DISP_OPEN_OPEN, it);
556                 if (rc) {
557                         ll_file_data_put(fd);
558                         GOTO(out_och_free, rc);
559                 }
560
561                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
562                 rc = ll_local_open(file, it, fd, *och_p);
563                 LASSERTF(rc == 0, "rc = %d\n", rc);
564         }
565         up(&lli->lli_och_sem);
566
567         /* Must do this outside lli_och_sem lock to prevent deadlock where
568            different kind of OPEN lock for this same inode gets cancelled
569            by ldlm_cancel_lru */
570         if (!S_ISREG(inode->i_mode))
571                 GOTO(out, rc);
572
573         lsm = lli->lli_smd;
574         if (lsm == NULL) {
575                 if (file->f_flags & O_LOV_DELAY_CREATE ||
576                     !(file->f_mode & FMODE_WRITE)) {
577                         CDEBUG(D_INODE, "object creation was delayed\n");
578                         GOTO(out, rc);
579                 }
580         }
581         file->f_flags &= ~O_LOV_DELAY_CREATE;
582         GOTO(out, rc);
583  out:
584         ptlrpc_req_finished(req);
585         if (req)
586                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
587         if (rc == 0) {
588                 ll_open_complete(inode);
589         } else {
590 out_och_free:
591                 if (*och_p) {
592                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
593                         *och_p = NULL; /* OBD_FREE writes some magic there */
594                         (*och_usecount)--;
595                 }
596                 up(&lli->lli_och_sem);
597 out_openerr:
598                 if (opendir_set != 0)
599                         ll_stop_statahead(inode, lli->lli_opendir_key);
600         }
601
602         return rc;
603 }
604
605 /* Fills the obdo with the attributes for the inode defined by lsm */
606 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
607                    struct obdo *oa)
608 {
609         struct ptlrpc_request_set *set;
610         struct obd_info oinfo = { { { 0 } } };
611         int rc;
612         ENTRY;
613
614         LASSERT(lsm != NULL);
615
616         memset(oa, 0, sizeof *oa);
617         oinfo.oi_md = lsm;
618         oinfo.oi_oa = oa;
619         oa->o_id = lsm->lsm_object_id;
620         oa->o_gr = lsm->lsm_object_gr;
621         oa->o_mode = S_IFREG;
622         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
623                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
624                 OBD_MD_FLCTIME | OBD_MD_FLGROUP;
625
626         set = ptlrpc_prep_set();
627         if (set == NULL) {
628                 rc = -ENOMEM;
629         } else {
630                 rc = obd_getattr_async(exp, &oinfo, set);
631                 if (rc == 0)
632                         rc = ptlrpc_set_wait(set);
633                 ptlrpc_set_destroy(set);
634         }
635         if (rc)
636                 RETURN(rc);
637
638         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
639                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
640         RETURN(0);
641 }
642
643 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
644 {
645         struct ll_inode_info *lli = ll_i2info(inode);
646         struct lov_stripe_md *lsm = lli->lli_smd;
647         struct obd_export *exp = ll_i2obdexp(inode);
648         struct {
649                 char name[16];
650                 struct ldlm_lock *lock;
651         } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock };
652         __u32 stripe, vallen = sizeof(stripe);
653         struct lov_oinfo *loinfo;
654         int rc;
655         ENTRY;
656
657         if (lsm->lsm_stripe_count == 1)
658                 GOTO(check, stripe = 0);
659
660         /* get our offset in the lov */
661         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe, lsm);
662         if (rc != 0) {
663                 CERROR("obd_get_info: rc = %d\n", rc);
664                 RETURN(rc);
665         }
666         LASSERT(stripe < lsm->lsm_stripe_count);
667
668 check:
669         loinfo = lsm->lsm_oinfo[stripe];
670         if (!osc_res_name_eq(loinfo->loi_id, loinfo->loi_gr,
671                             &lock->l_resource->lr_name)) {
672                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
673                            loinfo->loi_id, loinfo->loi_gr);
674                 RETURN(-ELDLM_NO_LOCK_DATA);
675         }
676
677         RETURN(stripe);
678 }
679
680 /* Get extra page reference to ensure it is not going away */
681 void ll_pin_extent_cb(void *data)
682 {
683         struct page *page = data;
684
685         page_cache_get(page);
686
687         return;
688 }
689 /* Flush the page from page cache for an extent as its canceled.
690  * Page to remove is delivered as @data.
691  *
692  * No one can dirty the extent until we've finished our work and they cannot
693  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
694  * but other kernel actors could have pages locked.
695  *
696  * If @discard is set, there is no need to write the page if it is dirty.
697  *
698  * Called with the DLM lock held. */
699 int ll_page_removal_cb(void *data, int discard)
700 {
701         int rc;
702         struct page *page = data;
703         struct address_space *mapping;
704
705         ENTRY;
706
707         /* We have page reference already from ll_pin_page */
708         lock_page(page);
709
710         /* Already truncated by somebody */
711         if (!page->mapping)
712                 GOTO(out, rc = 0);
713
714         mapping = page->mapping;
715
716         ll_teardown_mmaps(mapping,
717                           (__u64)page->index << PAGE_CACHE_SHIFT,
718                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
719                                                               ~PAGE_CACHE_MASK);
720         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
721         if (!discard && PageWriteback(page))
722                 wait_on_page_writeback(page);
723
724         if (!discard && clear_page_dirty_for_io(page)) {
725                 rc = ll_call_writepage(page->mapping->host, page);
726                 /* either waiting for io to complete or reacquiring
727                  * the lock that the failed writepage released */
728                 lock_page(page);
729                 wait_on_page_writeback(page);
730                 if (rc < 0) {
731                         CERROR("writepage inode %lu(%p) of page %p "
732                                "failed: %d\n", mapping->host->i_ino,
733                                mapping->host, page, rc);
734                         if (rc == -ENOSPC)
735                                 set_bit(AS_ENOSPC, &mapping->flags);
736                         else
737                                 set_bit(AS_EIO, &mapping->flags);
738                 }
739         }
740         if (page->mapping != NULL) {
741                 struct ll_async_page *llap = llap_cast_private(page);
742                 // checking again to account for writeback's lock_page()
743                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
744                 if (llap)
745                         ll_ra_accounting(llap, page->mapping);
746                 ll_truncate_complete_page(page);
747         }
748         EXIT;
749 out:
750         LASSERT(!PageWriteback(page));
751         unlock_page(page);
752         page_cache_release(page);
753
754         return 0;
755 }
756
757 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
758                              void *data, int flag)
759 {
760         struct inode *inode;
761         struct ll_inode_info *lli;
762         struct lov_stripe_md *lsm;
763         int stripe;
764         __u64 kms;
765
766         ENTRY;
767
768         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
769                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
770                 LBUG();
771         }
772
773         inode = ll_inode_from_lock(lock);
774         if (inode == NULL)
775                 RETURN(0);
776         lli = ll_i2info(inode);
777         if (lli == NULL)
778                 GOTO(iput, 0);
779         if (lli->lli_smd == NULL)
780                 GOTO(iput, 0);
781         lsm = lli->lli_smd;
782
783         stripe = ll_lock_to_stripe_offset(inode, lock);
784         if (stripe < 0)
785                 GOTO(iput, 0);
786
787         lov_stripe_lock(lsm);
788         lock_res_and_lock(lock);
789         kms = ldlm_extent_shift_kms(lock,
790                                     lsm->lsm_oinfo[stripe]->loi_kms);
791
792         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
793                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
794                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
795         lsm->lsm_oinfo[stripe]->loi_kms = kms;
796         unlock_res_and_lock(lock);
797         lov_stripe_unlock(lsm);
798         ll_try_done_writing(inode);
799         EXIT;
800 iput:
801         iput(inode);
802
803         return 0;
804 }
805
806 #if 0
807 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
808 {
809         /* XXX ALLOCATE - 160 bytes */
810         struct inode *inode = ll_inode_from_lock(lock);
811         struct ll_inode_info *lli = ll_i2info(inode);
812         struct lustre_handle lockh = { 0 };
813         struct ost_lvb *lvb;
814         int stripe;
815         ENTRY;
816
817         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
818                      LDLM_FL_BLOCK_CONV)) {
819                 LBUG(); /* not expecting any blocked async locks yet */
820                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
821                            "lock, returning");
822                 ldlm_lock_dump(D_OTHER, lock, 0);
823                 ldlm_reprocess_all(lock->l_resource);
824                 RETURN(0);
825         }
826
827         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
828
829         stripe = ll_lock_to_stripe_offset(inode, lock);
830         if (stripe < 0)
831                 goto iput;
832
833         if (lock->l_lvb_len) {
834                 struct lov_stripe_md *lsm = lli->lli_smd;
835                 __u64 kms;
836                 lvb = lock->l_lvb_data;
837                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
838
839                 lock_res_and_lock(lock);
840                 ll_inode_size_lock(inode, 1);
841                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
842                 kms = ldlm_extent_shift_kms(NULL, kms);
843                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
844                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
845                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
846                 lsm->lsm_oinfo[stripe].loi_kms = kms;
847                 ll_inode_size_unlock(inode, 1);
848                 unlock_res_and_lock(lock);
849         }
850
851 iput:
852         iput(inode);
853         wake_up(&lock->l_waitq);
854
855         ldlm_lock2handle(lock, &lockh);
856         ldlm_lock_decref(&lockh, LCK_PR);
857         RETURN(0);
858 }
859 #endif
860
861 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
862 {
863         struct ptlrpc_request *req = reqp;
864         struct inode *inode = ll_inode_from_lock(lock);
865         struct ll_inode_info *lli;
866         struct lov_stripe_md *lsm;
867         struct ost_lvb *lvb;
868         int rc, stripe;
869         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
870         ENTRY;
871
872         if (inode == NULL)
873                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
874         lli = ll_i2info(inode);
875         if (lli == NULL)
876                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
877         lsm = lli->lli_smd;
878         if (lsm == NULL)
879                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
880
881         /* First, find out which stripe index this lock corresponds to. */
882         stripe = ll_lock_to_stripe_offset(inode, lock);
883         if (stripe < 0)
884                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
885
886         rc = lustre_pack_reply(req, 2, size, NULL);
887         if (rc)
888                 GOTO(iput, rc);
889
890         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
891         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
892         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
893         lvb->lvb_atime = LTIME_S(inode->i_atime);
894         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
895
896         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
897                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
898                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_atime,
899                    lvb->lvb_mtime, lvb->lvb_ctime);
900  iput:
901         iput(inode);
902
903  out:
904         /* These errors are normal races, so we don't want to fill the console
905          * with messages by calling ptlrpc_error() */
906         if (rc == -ELDLM_NO_LOCK_DATA)
907                 lustre_pack_reply(req, 1, NULL, NULL);
908
909         req->rq_status = rc;
910         return rc;
911 }
912
913 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
914                      lstat_t *st)
915 {
916         struct lustre_handle lockh = { 0 };
917         struct ldlm_enqueue_info einfo = { 0 };
918         struct obd_info oinfo = { { { 0 } } };
919         struct ost_lvb lvb;
920         int rc;
921
922         ENTRY;
923
924         einfo.ei_type = LDLM_EXTENT;
925         einfo.ei_mode = LCK_PR;
926         einfo.ei_cb_bl = osc_extent_blocking_cb;
927         einfo.ei_cb_cp = ldlm_completion_ast;
928         einfo.ei_cb_gl = ll_glimpse_callback;
929         einfo.ei_cbdata = NULL;
930
931         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
932         oinfo.oi_lockh = &lockh;
933         oinfo.oi_md = lsm;
934         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
935
936         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
937         if (rc == -ENOENT)
938                 RETURN(rc);
939         if (rc != 0) {
940                 CERROR("obd_enqueue returned rc %d, "
941                        "returning -EIO\n", rc);
942                 RETURN(rc > 0 ? -EIO : rc);
943         }
944
945         lov_stripe_lock(lsm);
946         memset(&lvb, 0, sizeof(lvb));
947         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 0);
948         st->st_size = lvb.lvb_size;
949         st->st_blocks = lvb.lvb_blocks;
950         st->st_mtime = lvb.lvb_mtime;
951         st->st_atime = lvb.lvb_atime;
952         st->st_ctime = lvb.lvb_ctime;
953         lov_stripe_unlock(lsm);
954
955         RETURN(rc);
956 }
957
958 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
959  * file (because it prefers KMS over RSS when larger) */
960 int ll_glimpse_size(struct inode *inode, int ast_flags)
961 {
962         struct ll_inode_info *lli = ll_i2info(inode);
963         struct ll_sb_info *sbi = ll_i2sbi(inode);
964         struct lustre_handle lockh = { 0 };
965         struct ldlm_enqueue_info einfo = { 0 };
966         struct obd_info oinfo = { { { 0 } } };
967         struct ost_lvb lvb;
968         int rc;
969         ENTRY;
970
971         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
972
973         if (!lli->lli_smd) {
974                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
975                 RETURN(0);
976         }
977
978         /* NOTE: this looks like DLM lock request, but it may not be one. Due
979          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
980          *       won't revoke any conflicting DLM locks held. Instead,
981          *       ll_glimpse_callback() will be called on each client
982          *       holding a DLM lock against this file, and resulting size
983          *       will be returned for each stripe. DLM lock on [0, EOF] is
984          *       acquired only if there were no conflicting locks. */
985         einfo.ei_type = LDLM_EXTENT;
986         einfo.ei_mode = LCK_PR;
987         einfo.ei_cb_bl = osc_extent_blocking_cb;
988         einfo.ei_cb_cp = ldlm_completion_ast;
989         einfo.ei_cb_gl = ll_glimpse_callback;
990         einfo.ei_cbdata = inode;
991
992         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
993         oinfo.oi_lockh = &lockh;
994         oinfo.oi_md = lli->lli_smd;
995         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
996
997         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
998         if (rc == -ENOENT)
999                 RETURN(rc);
1000         if (rc != 0) {
1001                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1002                 RETURN(rc > 0 ? -EIO : rc);
1003         }
1004
1005         ll_inode_size_lock(inode, 1);
1006         inode_init_lvb(inode, &lvb);
1007         /* merge timestamps the most resently obtained from mds with
1008            timestamps obtained from osts */
1009         lvb.lvb_atime = lli->lli_lvb.lvb_atime;
1010         lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
1011         lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
1012         rc = obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0);
1013         i_size_write(inode, lvb.lvb_size);
1014         inode->i_blocks = lvb.lvb_blocks;
1015         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1016         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1017         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1018         ll_inode_size_unlock(inode, 1);
1019
1020         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1021                i_size_read(inode), (long long)inode->i_blocks);
1022
1023         RETURN(rc);
1024 }
1025
1026 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1027                    struct lov_stripe_md *lsm, int mode,
1028                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1029                    int ast_flags)
1030 {
1031         struct ll_sb_info *sbi = ll_i2sbi(inode);
1032         struct ost_lvb lvb;
1033         struct ldlm_enqueue_info einfo = { 0 };
1034         struct obd_info oinfo = { { { 0 } } };
1035         int rc;
1036         ENTRY;
1037
1038         LASSERT(!lustre_handle_is_used(lockh));
1039         LASSERT(lsm != NULL);
1040
1041         /* don't drop the mmapped file to LRU */
1042         if (mapping_mapped(inode->i_mapping))
1043                 ast_flags |= LDLM_FL_NO_LRU;
1044
1045         /* XXX phil: can we do this?  won't it screw the file size up? */
1046         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1047             (sbi->ll_flags & LL_SBI_NOLCK))
1048                 RETURN(0);
1049
1050         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1051                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1052
1053         einfo.ei_type = LDLM_EXTENT;
1054         einfo.ei_mode = mode;
1055         einfo.ei_cb_bl = osc_extent_blocking_cb;
1056         einfo.ei_cb_cp = ldlm_completion_ast;
1057         einfo.ei_cb_gl = ll_glimpse_callback;
1058         einfo.ei_cbdata = inode;
1059
1060         oinfo.oi_policy = *policy;
1061         oinfo.oi_lockh = lockh;
1062         oinfo.oi_md = lsm;
1063         oinfo.oi_flags = ast_flags;
1064
1065         rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo, NULL);
1066         *policy = oinfo.oi_policy;
1067         if (rc > 0)
1068                 rc = -EIO;
1069
1070         ll_inode_size_lock(inode, 1);
1071         inode_init_lvb(inode, &lvb);
1072         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 1);
1073
1074         if (policy->l_extent.start == 0 &&
1075             policy->l_extent.end == OBD_OBJECT_EOF) {
1076                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1077                  * the kms under both a DLM lock and the
1078                  * ll_inode_size_lock().  If we don't get the
1079                  * ll_inode_size_lock() here we can match the DLM lock and
1080                  * reset i_size from the kms before the truncating path has
1081                  * updated the kms.  generic_file_write can then trust the
1082                  * stale i_size when doing appending writes and effectively
1083                  * cancel the result of the truncate.  Getting the
1084                  * ll_inode_size_lock() after the enqueue maintains the DLM
1085                  * -> ll_inode_size_lock() acquiring order. */
1086                 i_size_write(inode, lvb.lvb_size);
1087                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1088                        inode->i_ino, i_size_read(inode));
1089         }
1090
1091         if (rc == 0) {
1092                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1093                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1094                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1095         }
1096         ll_inode_size_unlock(inode, 1);
1097
1098         RETURN(rc);
1099 }
1100
1101 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1102                      struct lov_stripe_md *lsm, int mode,
1103                      struct lustre_handle *lockh)
1104 {
1105         struct ll_sb_info *sbi = ll_i2sbi(inode);
1106         int rc;
1107         ENTRY;
1108
1109         /* XXX phil: can we do this?  won't it screw the file size up? */
1110         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1111             (sbi->ll_flags & LL_SBI_NOLCK))
1112                 RETURN(0);
1113
1114         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh, 0, 0);
1115
1116         RETURN(rc);
1117 }
1118
1119 static void ll_set_file_contended(struct inode *inode)
1120 {
1121         struct ll_inode_info *lli = ll_i2info(inode);
1122
1123         lli->lli_contention_time = cfs_time_current();
1124         set_bit(LLI_F_CONTENDED, &lli->lli_flags);
1125 }
1126
1127 void ll_clear_file_contended(struct inode *inode)
1128 {
1129         struct ll_inode_info *lli = ll_i2info(inode);
1130
1131         clear_bit(LLI_F_CONTENDED, &lli->lli_flags);
1132 }
1133
1134 static int ll_is_file_contended(struct file *file)
1135 {
1136         struct inode *inode = file->f_dentry->d_inode;
1137         struct ll_inode_info *lli = ll_i2info(inode);
1138         struct ll_sb_info *sbi = ll_i2sbi(inode);
1139         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1140         ENTRY;
1141
1142         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1143                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1144                        " osc connect flags = 0x"LPX64"\n",
1145                        sbi->ll_lco.lco_flags);
1146                 RETURN(0);
1147         }
1148
1149         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1150                 RETURN(0);
1151
1152         /* server-side locking for dio unless LL_FILE_LOCKED_DIRECTIO */
1153         if ((file->f_flags & O_DIRECT) &&
1154             !(fd && (fd->fd_flags & LL_FILE_LOCKED_DIRECTIO)))
1155                 RETURN(1);
1156
1157         /* server-side locking for cached I/O with LL_FILE_LOCKLESS_IO */
1158         if (!(file->f_flags & O_DIRECT) &&
1159             fd && fd->fd_flags & LL_FILE_LOCKLESS_IO)
1160                 RETURN(1);
1161
1162         if (test_bit(LLI_F_CONTENDED, &lli->lli_flags)) {
1163                 cfs_time_t cur_time = cfs_time_current();
1164                 cfs_time_t retry_time;
1165
1166                 retry_time = cfs_time_add(
1167                         lli->lli_contention_time,
1168                         cfs_time_seconds(sbi->ll_contention_time));
1169                 if (cfs_time_after(cur_time, retry_time)) {
1170                         ll_clear_file_contended(inode);
1171                         RETURN(0);
1172                 }
1173                 RETURN(1);
1174         }
1175         RETURN(0);
1176 }
1177
1178 static int ll_file_get_tree_lock_iov(struct ll_lock_tree *tree,
1179                                      struct file *file, const struct iovec *iov,
1180                                      unsigned long nr_segs,
1181                                      obd_off start, obd_off end, int rw)
1182 {
1183         int append;
1184         int tree_locked = 0;
1185         int rc;
1186         struct inode * inode = file->f_dentry->d_inode;
1187         ENTRY;
1188
1189         append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
1190
1191         if (append || !ll_is_file_contended(file)) {
1192                 struct ll_lock_tree_node *node;
1193                 int ast_flags;
1194
1195                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1196                 if (file->f_flags & O_NONBLOCK)
1197                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1198                 node = ll_node_from_inode(inode, start, end,
1199                                           (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
1200                 if (IS_ERR(node)) {
1201                         rc = PTR_ERR(node);
1202                         GOTO(out, rc);
1203                 }
1204                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1205                 rc = ll_tree_lock_iov(tree, node, iov, nr_segs, ast_flags);
1206                 if (rc == 0)
1207                         tree_locked = 1;
1208                 else if (rc == -EUSERS)
1209                         ll_set_file_contended(inode);
1210                 else
1211                         GOTO(out, rc);
1212         }
1213         RETURN(tree_locked);
1214 out:
1215         return rc;
1216 }
1217
1218 /* XXX: exact copy from kernel code (__generic_file_aio_write_nolock from rhel4)
1219  */
1220 static size_t ll_file_get_iov_count(const struct iovec *iov,
1221                                      unsigned long *nr_segs)
1222 {
1223         size_t count = 0;
1224         unsigned long seg;
1225
1226         for (seg = 0; seg < *nr_segs; seg++) {
1227                 const struct iovec *iv = &iov[seg];
1228
1229                 /*
1230                  * If any segment has a negative length, or the cumulative
1231                  * length ever wraps negative then return -EINVAL.
1232                  */
1233                 count += iv->iov_len;
1234                 if (unlikely((ssize_t)(count|iv->iov_len) < 0))
1235                         return -EINVAL;
1236                 if (access_ok(VERIFY_WRITE, iv->iov_base, iv->iov_len))
1237                         continue;
1238                 if (seg == 0)
1239                         return -EFAULT;
1240                 *nr_segs = seg;
1241                 count -= iv->iov_len;   /* This segment is no good */
1242                 break;
1243         }
1244         return count;
1245 }
1246
1247 static int iov_copy_update(unsigned long *nr_segs, const struct iovec **iov_out,
1248                            unsigned long *nrsegs_copy,
1249                            struct iovec *iov_copy, size_t *offset,
1250                            size_t size)
1251 {
1252         int i;
1253         const struct iovec *iov = *iov_out;
1254         for (i = 0; i < *nr_segs;
1255              i++) {
1256                 const struct iovec *iv = &iov[i];
1257                 struct iovec *ivc = &iov_copy[i];
1258                 *ivc = *iv;
1259                 if (i == 0) {
1260                         ivc->iov_len -= *offset;
1261                         ivc->iov_base += *offset;
1262                 }
1263                 if (ivc->iov_len >= size) {
1264                         ivc->iov_len = size;
1265                         if (i == 0)
1266                                 *offset += size;
1267                         else
1268                                 *offset = size;
1269                         break;
1270                 }
1271                 size -= ivc->iov_len;
1272         }
1273         *iov_out += i;
1274         *nr_segs -= i;
1275         *nrsegs_copy = i + 1;
1276
1277         return 0;
1278 }
1279
1280 static int ll_get_short_lock(struct page *page, int rw, obd_off start,
1281                              obd_off end, struct lustre_handle *lockh)
1282 {
1283         struct ll_async_page *llap;
1284         struct obd_export *exp;
1285         struct inode *inode = page->mapping->host;
1286
1287         ENTRY;
1288
1289         exp = ll_i2obdexp(inode);
1290         if (exp == NULL)
1291                 RETURN(0);
1292
1293         llap = llap_cast_private(page);
1294         if (llap == NULL)
1295                 RETURN(0);
1296
1297         RETURN(obd_get_lock(exp, ll_i2info(inode)->lli_smd,
1298                             &llap->llap_cookie, rw, start, end, lockh,
1299                             OBD_FAST_LOCK));
1300 }
1301
1302 static void ll_release_short_lock(struct inode *inode, obd_off end,
1303                                   struct lustre_handle *lockh, int rw)
1304 {
1305         struct obd_export *exp;
1306         int rc;
1307
1308         exp = ll_i2obdexp(inode);
1309         if (exp == NULL)
1310                 return;
1311
1312         rc = obd_cancel(exp, ll_i2info(inode)->lli_smd,
1313                         rw = OBD_BRW_READ ? LCK_PR : LCK_PW, lockh,
1314                         OBD_FAST_LOCK, end);
1315         if (rc < 0)
1316                 CERROR("unlock failed (%d)\n", rc);
1317 }
1318
1319 static inline int ll_file_get_fast_lock(struct file *file,
1320                                         obd_off ppos, obd_off end,
1321                                         const struct iovec *iov,
1322                                         unsigned long nr_segs,
1323                                         struct lustre_handle *lockh,
1324                                         int rw)
1325 {
1326         int rc = 0, seg;
1327         struct page *page;
1328
1329         ENTRY;
1330
1331         /* we would like this read request to be lockfree */
1332         for (seg = 0; seg < nr_segs; seg++) {
1333                 const struct iovec *iv = &iov[seg];
1334                 if (ll_region_mapped((unsigned long)iv->iov_base, iv->iov_len))
1335                         GOTO(out, rc);
1336         }
1337
1338         page = find_lock_page(file->f_dentry->d_inode->i_mapping,
1339                               ppos >> CFS_PAGE_SHIFT);
1340         if (page) {
1341                 if (ll_get_short_lock(page, rw, ppos, end, lockh))
1342                         rc = 1;
1343
1344                 unlock_page(page);
1345                 page_cache_release(page);
1346         }
1347
1348 out:
1349         RETURN(rc);
1350 }
1351
1352 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
1353                                          struct lustre_handle *lockh, int rw)
1354 {
1355         ll_release_short_lock(inode, end, lockh, rw);
1356 }
1357
1358 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
1359                                    obd_off end, const struct iovec *iov,
1360                                    unsigned long nr_segs,
1361                                    struct lustre_handle *lockh,
1362                                    struct ll_lock_tree *tree, int rw)
1363 {
1364         int rc;
1365
1366         ENTRY;
1367
1368         if (ll_file_get_fast_lock(file, ppos, end, iov, nr_segs, lockh, rw))
1369                 RETURN(LL_LOCK_STYLE_FASTLOCK);
1370
1371         rc = ll_file_get_tree_lock_iov(tree, file, iov, nr_segs,
1372                                        ppos, end, rw);
1373         /* rc: 1 for tree lock, 0 for no lock, <0 for error */
1374         switch (rc) {
1375         case 1:
1376                 RETURN(LL_LOCK_STYLE_TREELOCK);
1377         case 0:
1378                 RETURN(LL_LOCK_STYLE_NOLOCK);
1379         }
1380
1381         /* an error happened if we reached this point, rc = -errno here */
1382         RETURN(rc);
1383 }
1384
1385 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
1386                                     enum ll_lock_style lock_style,
1387                                     struct lustre_handle *lockh,
1388                                     struct ll_lock_tree *tree, int rw)
1389
1390 {
1391         switch (lock_style) {
1392         case LL_LOCK_STYLE_TREELOCK:
1393                 ll_tree_unlock(tree);
1394                 break;
1395         case LL_LOCK_STYLE_FASTLOCK:
1396                 ll_file_put_fast_lock(inode, end, lockh, rw);
1397                 break;
1398         default:
1399                 CERROR("invalid locking style (%d)\n", lock_style);
1400         }
1401 }
1402
1403 #ifdef HAVE_FILE_READV
1404 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
1405                               unsigned long nr_segs, loff_t *ppos)
1406 {
1407 #else
1408 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
1409                                 unsigned long nr_segs, loff_t pos)
1410 {
1411         struct file *file = iocb->ki_filp;
1412         loff_t *ppos = &iocb->ki_pos;
1413 #endif
1414         struct inode *inode = file->f_dentry->d_inode;
1415         struct ll_inode_info *lli = ll_i2info(inode);
1416         struct lov_stripe_md *lsm = lli->lli_smd;
1417         struct ll_sb_info *sbi = ll_i2sbi(inode);
1418         struct ll_thread_data ltd = { 0 };
1419         struct ost_lvb lvb;
1420         struct ll_ra_read bead;
1421         int ra = 0;
1422         obd_off end;
1423         ssize_t retval, chunk, sum = 0;
1424         struct iovec *iov_copy = NULL;
1425         unsigned long nrsegs_copy, nrsegs_orig = 0;
1426         size_t count, iov_offset = 0;
1427         __u64 kms;
1428         ENTRY;
1429
1430         count = ll_file_get_iov_count(iov, &nr_segs);
1431         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1432                inode->i_ino, inode->i_generation, inode, count, *ppos);
1433         /* "If nbyte is 0, read() will return 0 and have no other results."
1434          *                      -- Single Unix Spec */
1435         if (count == 0)
1436                 RETURN(0);
1437
1438         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1439
1440         if (!lsm) {
1441                 /* Read on file with no objects should return zero-filled
1442                  * buffers up to file size (we can get non-zero sizes with
1443                  * mknod + truncate, then opening file for read. This is a
1444                  * common pattern in NFS case, it seems). Bug 6243 */
1445                 int notzeroed;
1446                 /* Since there are no objects on OSTs, we have nothing to get
1447                  * lock on and so we are forced to access inode->i_size
1448                  * unguarded */
1449
1450                 /* Read beyond end of file */
1451                 if (*ppos >= i_size_read(inode))
1452                         RETURN(0);
1453
1454                 if (count > i_size_read(inode) - *ppos)
1455                         count = i_size_read(inode) - *ppos;
1456                 /* Make sure to correctly adjust the file pos pointer for
1457                  * EFAULT case */
1458                 for (nrsegs_copy = 0; nrsegs_copy < nr_segs; nrsegs_copy++) {
1459                         const struct iovec *iv = &iov[nrsegs_copy];
1460
1461                         if (count < iv->iov_len)
1462                                 chunk = count;
1463                         else
1464                                 chunk = iv->iov_len;
1465                         notzeroed = clear_user(iv->iov_base, chunk);
1466                         sum += (chunk - notzeroed);
1467                         count -= (chunk - notzeroed);
1468                         if (notzeroed || !count)
1469                                 break;
1470                 }
1471                 *ppos += sum;
1472                 if (!sum)
1473                         RETURN(-EFAULT);
1474                 RETURN(sum);
1475         }
1476
1477         ltd.ltd_magic = LTD_MAGIC;
1478         ll_td_set(&ltd);
1479 repeat:
1480         memset(&ltd, 0, sizeof(ltd));
1481         ltd.ltd_magic = LTD_MAGIC;
1482         if (sbi->ll_max_rw_chunk != 0 && !(file->f_flags & O_DIRECT)) {
1483                 /* first, let's know the end of the current stripe */
1484                 end = *ppos;
1485                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END,&end);
1486
1487                 /* correct, the end is beyond the request */
1488                 if (end > *ppos + count - 1)
1489                         end = *ppos + count - 1;
1490
1491                 /* and chunk shouldn't be too large even if striping is wide */
1492                 if (end - *ppos > sbi->ll_max_rw_chunk)
1493                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1494
1495                 chunk = end - *ppos + 1;
1496                 if ((count == chunk) && (iov_offset == 0)) {
1497                         if (iov_copy)
1498                                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1499
1500                         iov_copy = (struct iovec *)iov;
1501                         nrsegs_copy = nr_segs;
1502                 } else {
1503                         if (!iov_copy) {
1504                                 nrsegs_orig = nr_segs;
1505                                 OBD_ALLOC(iov_copy, sizeof(*iov) * nr_segs);
1506                                 if (!iov_copy)
1507                                         GOTO(out, retval = -ENOMEM);
1508                         }
1509
1510                         iov_copy_update(&nr_segs, &iov, &nrsegs_copy, iov_copy,
1511                                         &iov_offset, chunk);
1512                 }
1513         } else {
1514                 end = *ppos + count - 1;
1515                 iov_copy = (struct iovec *)iov;
1516                 nrsegs_copy = nr_segs;
1517         }
1518
1519         down_read(&lli->lli_truncate_rwsem); /* Bug 18233 */
1520
1521         ltd.lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end, 
1522                                           iov_copy, nrsegs_copy, 
1523                                           &ltd.u.lockh, &ltd.u.tree,
1524                                           OBD_BRW_READ);
1525         if (ltd.lock_style < 0 || ltd.lock_style == LL_LOCK_STYLE_NOLOCK)
1526                 up_read(&lli->lli_truncate_rwsem);
1527         if (ltd.lock_style < 0)
1528                 GOTO(out, retval = ltd.lock_style);
1529
1530         ll_inode_size_lock(inode, 1);
1531         /*
1532          * Consistency guarantees: following possibilities exist for the
1533          * relation between region being read and real file size at this
1534          * moment:
1535          *
1536          *  (A): the region is completely inside of the file;
1537          *
1538          *  (B-x): x bytes of region are inside of the file, the rest is
1539          *  outside;
1540          *
1541          *  (C): the region is completely outside of the file.
1542          *
1543          * This classification is stable under DLM lock acquired by
1544          * ll_tree_lock() above, because to change class, other client has to
1545          * take DLM lock conflicting with our lock. Also, any updates to
1546          * ->i_size by other threads on this client are serialized by
1547          * ll_inode_size_lock(). This guarantees that short reads are handled
1548          * correctly in the face of concurrent writes and truncates.
1549          */
1550         inode_init_lvb(inode, &lvb);
1551         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1552         kms = lvb.lvb_size;
1553         if (*ppos + count - 1 > kms) {
1554                 /* A glimpse is necessary to determine whether we return a
1555                  * short read (B) or some zeroes at the end of the buffer (C) */
1556                 ll_inode_size_unlock(inode, 1);
1557                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1558                 if (retval) {
1559                         if (ltd.lock_style != LL_LOCK_STYLE_NOLOCK) {
1560                                 ll_file_put_lock(inode, end, ltd.lock_style,
1561                                                  &ltd.u.lockh, &ltd.u.tree,
1562                                                  OBD_BRW_READ);
1563                                 up_read(&lli->lli_truncate_rwsem);
1564                         }
1565                         goto out;
1566                 } else {
1567                         /* If objective page index exceed the end-of-file page
1568                          * index, return directly. Do not expect kernel will
1569                          * check such case correctly. linux-2.6.18-128.1.1 miss
1570                          * to do that. --bug 17336 */
1571                         loff_t size = i_size_read(inode);
1572                         unsigned long cur_index = *ppos >> CFS_PAGE_SHIFT;
1573
1574                         if ((size == 0 && cur_index != 0) ||
1575                             (((size - 1) >> CFS_PAGE_SHIFT) < cur_index)) {
1576                                 if (ltd.lock_style != LL_LOCK_STYLE_NOLOCK) {
1577
1578                                         ll_file_put_lock(inode, end, 
1579                                                          ltd.lock_style,
1580                                                          &ltd.u.lockh, 
1581                                                          &ltd.u.tree,
1582                                                          OBD_BRW_READ);
1583                                         up_read(&lli->lli_truncate_rwsem);
1584                                 }
1585                                 goto out;
1586                         }
1587                 }
1588         } else {
1589                 /* region is within kms and, hence, within real file size (A).
1590                  * We need to increase i_size to cover the read region so that
1591                  * generic_file_read() will do its job, but that doesn't mean
1592                  * the kms size is _correct_, it is only the _minimum_ size.
1593                  * If someone does a stat they will get the correct size which
1594                  * will always be >= the kms value here.  b=11081 */
1595                 if (i_size_read(inode) < kms)
1596                         i_size_write(inode, kms);
1597                 ll_inode_size_unlock(inode, 1);
1598         }
1599
1600         chunk = end - *ppos + 1;
1601         CDEBUG(D_INODE,"Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1602                inode->i_ino, chunk, *ppos, i_size_read(inode));
1603
1604         /* turn off the kernel's read-ahead */
1605         if (ltd.lock_style != LL_LOCK_STYLE_NOLOCK) {
1606                 struct ost_lvb *xtimes;
1607                 /* read under locks
1608                  *
1609                  * 1. update inode's atime as long as concurrent stat
1610                  * (via ll_glimpse_size) might bring out-of-date ones
1611                  *
1612                  * 2. update lsm so that next stat (via
1613                  * ll_glimpse_size) could get correct values in lsm */
1614                 OBD_ALLOC_PTR(xtimes);
1615                 if (NULL == xtimes) {
1616                         ll_file_put_lock(inode, end, ltd.lock_style, 
1617                                          &ltd.u.lockh, &ltd.u.tree,
1618                                          OBD_BRW_READ);
1619                         up_read(&lli->lli_truncate_rwsem);
1620                         GOTO(out, retval = -ENOMEM);
1621                 }
1622
1623                 lov_stripe_lock(lsm);
1624                 LTIME_S(inode->i_atime) = LTIME_S(CURRENT_TIME);
1625                 xtimes->lvb_atime = LTIME_S(inode->i_atime);
1626                 obd_update_lvb(sbi->ll_osc_exp, lsm, xtimes,
1627                                OBD_MD_FLATIME);
1628                 lov_stripe_unlock(lsm);
1629                 OBD_FREE_PTR(xtimes);
1630
1631                 file->f_ra.ra_pages = 0;
1632                 /* initialize read-ahead window once per syscall */
1633                 if (ra == 0) {
1634                         ra = 1;
1635                         ll_ra_read_init(file, &bead, *ppos, count);
1636                 }
1637
1638                 /* BUG: 5972 */
1639                 file_accessed(file);
1640 #ifdef HAVE_FILE_READV
1641                 retval = generic_file_readv(file, iov_copy, nrsegs_copy, ppos);
1642 #else
1643                 retval = generic_file_aio_read(iocb, iov_copy, nrsegs_copy,
1644                                                *ppos);
1645 #endif
1646                 ll_file_put_lock(inode, end, ltd.lock_style, &ltd.u.lockh,
1647                                  &ltd.u.tree, OBD_BRW_READ);
1648                 up_read(&lli->lli_truncate_rwsem);
1649         } else {
1650                 retval = ll_direct_IO(READ, file, iov_copy, *ppos, nr_segs, 0);
1651                 if (retval > 0) {
1652                        lprocfs_counter_add(sbi->ll_stats,
1653                                            LPROC_LL_LOCKLESS_READ,
1654                                            (long)retval);
1655                         *ppos += retval;
1656                 }
1657         }
1658         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1659         if (retval > 0) {
1660                 count -= retval;
1661                 sum += retval;
1662                 if (retval == chunk && count > 0)
1663                         goto repeat;
1664         }
1665
1666  out:
1667         ll_td_set(NULL);
1668         if (ra != 0)
1669                 ll_ra_read_ex(file, &bead);
1670         retval = (sum > 0) ? sum : retval;
1671
1672         if (iov_copy && iov_copy != iov)
1673                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1674
1675         RETURN(retval);
1676 }
1677
1678 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1679                             loff_t *ppos)
1680 {
1681         struct iovec local_iov = { .iov_base = (void __user *)buf,
1682                                    .iov_len = count };
1683 #ifdef HAVE_FILE_READV
1684         return ll_file_readv(file, &local_iov, 1, ppos);
1685 #else
1686         struct kiocb kiocb;
1687         ssize_t ret;
1688
1689         init_sync_kiocb(&kiocb, file);
1690         kiocb.ki_pos = *ppos;
1691         kiocb.ki_left = count;
1692
1693         ret = ll_file_aio_read(&kiocb, &local_iov, 1, kiocb.ki_pos);
1694         *ppos = kiocb.ki_pos;
1695         return ret;
1696 #endif
1697 }
1698
1699 /*
1700  * Write to a file (through the page cache).
1701  */
1702 #ifdef HAVE_FILE_WRITEV
1703 static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
1704                               unsigned long nr_segs, loff_t *ppos)
1705 {
1706 #else /* AIO stuff */
1707 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1708                                  unsigned long nr_segs, loff_t pos)
1709 {
1710         struct file *file = iocb->ki_filp;
1711         loff_t *ppos = &iocb->ki_pos;
1712 #endif
1713         struct inode *inode = file->f_dentry->d_inode;
1714         struct ll_sb_info *sbi = ll_i2sbi(inode);
1715         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1716         struct ll_thread_data ltd = { 0 };
1717         loff_t maxbytes = ll_file_maxbytes(inode);
1718         loff_t lock_start, lock_end, end;
1719         ssize_t retval, chunk, sum = 0;
1720         int tree_locked;
1721         struct iovec *iov_copy = NULL;
1722         unsigned long nrsegs_copy, nrsegs_orig = 0;
1723         size_t count, iov_offset = 0;
1724         ENTRY;
1725
1726         count = ll_file_get_iov_count(iov, &nr_segs);
1727
1728         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1729                inode->i_ino, inode->i_generation, inode, count, *ppos);
1730
1731         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1732
1733         /* POSIX, but surprised the VFS doesn't check this already */
1734         if (count == 0)
1735                 RETURN(0);
1736
1737         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1738          * called on the file, don't fail the below assertion (bug 2388). */
1739         if (file->f_flags & O_LOV_DELAY_CREATE &&
1740             ll_i2info(inode)->lli_smd == NULL)
1741                 RETURN(-EBADF);
1742
1743         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1744
1745         /* signal(7) specifies that write(2) and writev(2) should be restarted */
1746         if (down_interruptible(&ll_i2info(inode)->lli_write_sem))
1747                 RETURN(-ERESTARTSYS);
1748
1749         ltd.ltd_magic = LTD_MAGIC;
1750         ll_td_set(&ltd);
1751 repeat:
1752         memset(&ltd, 0, sizeof(ltd));
1753         ltd.ltd_magic = LTD_MAGIC;
1754
1755         chunk = 0; /* just to fix gcc's warning */
1756         end = *ppos + count - 1;
1757
1758         if (file->f_flags & O_APPEND) {
1759                 lock_start = 0;
1760                 lock_end = OBD_OBJECT_EOF;
1761                 iov_copy = (struct iovec *)iov;
1762                 nrsegs_copy = nr_segs;
1763         } else if (sbi->ll_max_rw_chunk != 0 && !(file->f_flags & O_DIRECT)) {
1764                 /* first, let's know the end of the current stripe */
1765                 end = *ppos;
1766                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END,
1767                                 (obd_off *)&end);
1768
1769                 /* correct, the end is beyond the request */
1770                 if (end > *ppos + count - 1)
1771                         end = *ppos + count - 1;
1772
1773                 /* and chunk shouldn't be too large even if striping is wide */
1774                 if (end - *ppos > sbi->ll_max_rw_chunk)
1775                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1776                 lock_start = *ppos;
1777                 lock_end = end;
1778                 chunk = end - *ppos + 1;
1779                 if ((count == chunk) && (iov_offset == 0)) {
1780                         if (iov_copy)
1781                                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1782
1783                         iov_copy = (struct iovec *)iov;
1784                         nrsegs_copy = nr_segs;
1785                 } else {
1786                         if (!iov_copy) {
1787                                 nrsegs_orig = nr_segs;
1788                                 OBD_ALLOC(iov_copy, sizeof(*iov) * nr_segs);
1789                                 if (!iov_copy)
1790                                         GOTO(out, retval = -ENOMEM);
1791                         }
1792                         iov_copy_update(&nr_segs, &iov, &nrsegs_copy, iov_copy,
1793                                         &iov_offset, chunk);
1794                 }
1795         } else {
1796                 lock_start = *ppos;
1797                 lock_end = end;
1798                 iov_copy = (struct iovec *)iov;
1799                 nrsegs_copy = nr_segs;
1800         }
1801
1802         tree_locked = ll_file_get_tree_lock_iov(&ltd.u.tree, file, iov_copy,
1803                                                 nrsegs_copy,
1804                                                 (obd_off)lock_start,
1805                                                 (obd_off)lock_end,
1806                                                 OBD_BRW_WRITE);
1807         if (tree_locked < 0)
1808                 GOTO(out, retval = tree_locked);
1809
1810         /* This is ok, g_f_w will overwrite this under i_sem if it races
1811          * with a local truncate, it just makes our maxbyte checking easier.
1812          * The i_size value gets updated in ll_extent_lock() as a consequence
1813          * of the [0,EOF] extent lock we requested above. */
1814         if (file->f_flags & O_APPEND) {
1815                 *ppos = i_size_read(inode);
1816                 end = *ppos + count - 1;
1817         }
1818
1819         if (*ppos >= maxbytes) {
1820                 send_sig(SIGXFSZ, current, 0);
1821                 GOTO(out_unlock, retval = -EFBIG);
1822         }
1823         if (end > maxbytes - 1)
1824                 end = maxbytes - 1;
1825
1826         /* generic_file_write handles O_APPEND after getting i_mutex */
1827         chunk = end - *ppos + 1;
1828         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1829                inode->i_ino, chunk, *ppos);
1830         if (tree_locked) {
1831                 struct ost_lvb *xtimes;
1832                 /* write under locks
1833                  *
1834                  * 1. update inode's mtime and ctime as long as
1835                  * concurrent stat (via ll_glimpse_size) might bring
1836                  * out-of-date ones
1837                  *
1838                  * 2. update lsm so that next stat (via
1839                  * ll_glimpse_size) could get correct values in lsm */
1840                 OBD_ALLOC_PTR(xtimes);
1841                 if (NULL == xtimes)
1842                         GOTO(out_unlock, retval = -ENOMEM);
1843
1844                 lov_stripe_lock(lsm);
1845                 LTIME_S(inode->i_mtime) = LTIME_S(CURRENT_TIME);
1846                 LTIME_S(inode->i_ctime) = LTIME_S(CURRENT_TIME);
1847                 xtimes->lvb_mtime = LTIME_S(inode->i_mtime);
1848                 xtimes->lvb_ctime = LTIME_S(inode->i_ctime);
1849                 obd_update_lvb(sbi->ll_osc_exp, lsm, xtimes,
1850                                OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1851                 lov_stripe_unlock(lsm);
1852                 OBD_FREE_PTR(xtimes);
1853
1854                 ltd.lock_style = LL_LOCK_STYLE_TREELOCK;
1855
1856 #ifdef HAVE_FILE_WRITEV
1857                 retval = generic_file_writev(file, iov_copy, nrsegs_copy, ppos);
1858 #else
1859                 retval = generic_file_aio_write(iocb, iov_copy, nrsegs_copy,
1860                                                 *ppos);
1861 #endif
1862         } else {
1863                 retval = ll_direct_IO(WRITE, file, iov_copy, *ppos, nr_segs, 0);
1864                 if (retval > 0) {
1865                        lprocfs_counter_add(sbi->ll_stats,
1866                                            LPROC_LL_LOCKLESS_WRITE,
1867                                            (long)retval);
1868                         *ppos += retval;
1869                 }
1870         }
1871         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1872
1873 out_unlock:
1874         if (tree_locked)
1875                 ll_tree_unlock(&ltd.u.tree);
1876
1877 out:
1878         if (retval > 0) {
1879                 count -= retval;
1880                 sum += retval;
1881                 if (retval == chunk && count > 0)
1882                         goto repeat;
1883         }
1884
1885         up(&ll_i2info(inode)->lli_write_sem);
1886
1887         ll_td_set(NULL);
1888         if (iov_copy && iov_copy != iov)
1889                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1890
1891         retval = (sum > 0) ? sum : retval;
1892         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1893                            retval > 0 ? retval : 0);
1894         RETURN(retval);
1895 }
1896
1897 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1898                              loff_t *ppos)
1899 {
1900         struct iovec local_iov = { .iov_base = (void __user *)buf,
1901                                    .iov_len = count };
1902
1903 #ifdef HAVE_FILE_WRITEV
1904         return ll_file_writev(file, &local_iov, 1, ppos);
1905 #else
1906         struct kiocb kiocb;
1907         ssize_t ret;
1908
1909         init_sync_kiocb(&kiocb, file);
1910         kiocb.ki_pos = *ppos;
1911         kiocb.ki_left = count;
1912
1913         ret = ll_file_aio_write(&kiocb, &local_iov, 1, kiocb.ki_pos);
1914         *ppos = kiocb.ki_pos;
1915
1916         return ret;
1917 #endif
1918 }
1919
1920 #ifdef HAVE_KERNEL_SENDFILE
1921 /*
1922  * Send file content (through pagecache) somewhere with helper
1923  */
1924 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,
1925                                 size_t count, read_actor_t actor, void *target)
1926 {
1927         struct inode *inode = in_file->f_dentry->d_inode;
1928         struct ll_inode_info *lli = ll_i2info(inode);
1929         struct lov_stripe_md *lsm = lli->lli_smd;
1930         struct ll_lock_tree tree;
1931         struct ll_lock_tree_node *node;
1932         struct ost_lvb lvb;
1933         struct ll_ra_read bead;
1934         ssize_t rc;
1935         __u64 kms;
1936         ENTRY;
1937
1938         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1939                inode->i_ino, inode->i_generation, inode, count, *ppos);
1940
1941         /* "If nbyte is 0, read() will return 0 and have no other results."
1942          *                      -- Single Unix Spec */
1943         if (count == 0)
1944                 RETURN(0);
1945
1946         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1947         /* turn off the kernel's read-ahead */
1948         in_file->f_ra.ra_pages = 0;
1949
1950         /* File with no objects, nothing to lock */
1951         if (!lsm) {
1952                 rc = generic_file_sendfile(in_file, ppos, count, actor, target);
1953                 RETURN(rc);
1954         }
1955
1956         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1957         if (IS_ERR(node))
1958                 RETURN(PTR_ERR(node));
1959
1960         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1961         rc = ll_tree_lock(&tree, node, NULL, count,
1962                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1963         if (rc != 0)
1964                 RETURN(rc);
1965
1966         ll_clear_file_contended(inode);
1967         ll_inode_size_lock(inode, 1);
1968         /*
1969          * Consistency guarantees: following possibilities exist for the
1970          * relation between region being read and real file size at this
1971          * moment:
1972          *
1973          *  (A): the region is completely inside of the file;
1974          *
1975          *  (B-x): x bytes of region are inside of the file, the rest is
1976          *  outside;
1977          *
1978          *  (C): the region is completely outside of the file.
1979          *
1980          * This classification is stable under DLM lock acquired by
1981          * ll_tree_lock() above, because to change class, other client has to
1982          * take DLM lock conflicting with our lock. Also, any updates to
1983          * ->i_size by other threads on this client are serialized by
1984          * ll_inode_size_lock(). This guarantees that short reads are handled
1985          * correctly in the face of concurrent writes and truncates.
1986          */
1987         inode_init_lvb(inode, &lvb);
1988         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1989         kms = lvb.lvb_size;
1990         if (*ppos + count - 1 > kms) {
1991                 /* A glimpse is necessary to determine whether we return a
1992                  * short read (B) or some zeroes at the end of the buffer (C) */
1993                 ll_inode_size_unlock(inode, 1);
1994                 rc = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1995                 if (rc)
1996                         goto out;
1997         } else {
1998                 /* region is within kms and, hence, within real file size (A) */
1999                 i_size_write(inode, kms);
2000                 ll_inode_size_unlock(inode, 1);
2001         }
2002
2003         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
2004                inode->i_ino, count, *ppos, i_size_read(inode));
2005
2006         ll_ra_read_init(in_file, &bead, *ppos, count);
2007         /* BUG: 5972 */
2008         file_accessed(in_file);
2009         rc = generic_file_sendfile(in_file, ppos, count, actor, target);
2010         ll_ra_read_ex(in_file, &bead);
2011
2012  out:
2013         ll_tree_unlock(&tree);
2014         RETURN(rc);
2015 }
2016 #endif
2017
2018 /* change based on
2019  * http://git.kernel.org/?p=linux/kernel/git/torvalds/linux-2.6.git;a=commit;h=f0930fffa99e7fe0a0c4b6c7d9a244dc88288c27
2020  */
2021 #ifdef HAVE_KERNEL_SPLICE_READ
2022 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
2023                                    struct pipe_inode_info *pipe, size_t count,
2024                                    unsigned int flags)
2025 {
2026         struct inode *inode = in_file->f_dentry->d_inode;
2027         struct ll_inode_info *lli = ll_i2info(inode);
2028         struct lov_stripe_md *lsm = lli->lli_smd;
2029         struct ll_lock_tree tree;
2030         struct ll_lock_tree_node *node;
2031         struct ost_lvb lvb;
2032         struct ll_ra_read bead;
2033         ssize_t rc;
2034         __u64 kms;
2035         ENTRY;
2036
2037         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
2038                inode->i_ino, inode->i_generation, inode, count, *ppos);
2039
2040         /* "If nbyte is 0, read() will return 0 and have no other results."
2041          *                      -- Single Unix Spec */
2042         if (count == 0)
2043                 RETURN(0);
2044
2045         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
2046         /* turn off the kernel's read-ahead */
2047         in_file->f_ra.ra_pages = 0;
2048
2049         /* File with no objects, nothing to lock */
2050         if (!lsm) {
2051                 rc = generic_file_splice_read(in_file, ppos, pipe, count, flags);
2052                 RETURN(rc);
2053         }
2054
2055         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
2056         if (IS_ERR(node))
2057                 RETURN(PTR_ERR(node));
2058
2059         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
2060         rc = ll_tree_lock(&tree, node, NULL, count,
2061                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
2062         if (rc != 0)
2063                 RETURN(rc);
2064
2065         ll_clear_file_contended(inode);
2066         ll_inode_size_lock(inode, 1);
2067         /*
2068          * Consistency guarantees: following possibilities exist for the
2069          * relation between region being read and real file size at this
2070          * moment:
2071          *
2072          *  (A): the region is completely inside of the file;
2073          *
2074          *  (B-x): x bytes of region are inside of the file, the rest is
2075          *  outside;
2076          *
2077          *  (C): the region is completely outside of the file.
2078          *
2079          * This classification is stable under DLM lock acquired by
2080          * ll_tree_lock() above, because to change class, other client has to
2081          * take DLM lock conflicting with our lock. Also, any updates to
2082          * ->i_size by other threads on this client are serialized by
2083          * ll_inode_size_lock(). This guarantees that short reads are handled
2084          * correctly in the face of concurrent writes and truncates.
2085          */
2086         inode_init_lvb(inode, &lvb);
2087         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
2088         kms = lvb.lvb_size;
2089         if (*ppos + count - 1 > kms) {
2090                 /* A glimpse is necessary to determine whether we return a
2091                  * short read (B) or some zeroes at the end of the buffer (C) */
2092                 ll_inode_size_unlock(inode, 1);
2093                 rc = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
2094                 if (rc)
2095                         goto out;
2096         } else {
2097                 /* region is within kms and, hence, within real file size (A) */
2098                 i_size_write(inode, kms);
2099                 ll_inode_size_unlock(inode, 1);
2100         }
2101
2102         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
2103                inode->i_ino, count, *ppos, i_size_read(inode));
2104
2105         ll_ra_read_init(in_file, &bead, *ppos, count);
2106         /* BUG: 5972 */
2107         file_accessed(in_file);
2108         rc = generic_file_splice_read(in_file, ppos, pipe, count, flags);
2109         ll_ra_read_ex(in_file, &bead);
2110
2111  out:
2112         ll_tree_unlock(&tree);
2113         RETURN(rc);
2114 }
2115 #endif
2116
2117 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
2118                                unsigned long arg)
2119 {
2120         struct ll_inode_info *lli = ll_i2info(inode);
2121         struct obd_export *exp = ll_i2obdexp(inode);
2122         struct ll_recreate_obj ucreatp;
2123         struct obd_trans_info oti = { 0 };
2124         struct obdo *oa = NULL;
2125         int lsm_size;
2126         int rc = 0;
2127         struct lov_stripe_md *lsm, *lsm2;
2128         ENTRY;
2129
2130         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2131                 RETURN(-EPERM);
2132
2133         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
2134                             sizeof(struct ll_recreate_obj));
2135         if (rc) {
2136                 RETURN(-EFAULT);
2137         }
2138         OBDO_ALLOC(oa);
2139         if (oa == NULL)
2140                 RETURN(-ENOMEM);
2141
2142         down(&lli->lli_size_sem);
2143         lsm = lli->lli_smd;
2144         if (lsm == NULL)
2145                 GOTO(out, rc = -ENOENT);
2146         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
2147                    (lsm->lsm_stripe_count));
2148
2149         OBD_ALLOC(lsm2, lsm_size);
2150         if (lsm2 == NULL)
2151                 GOTO(out, rc = -ENOMEM);
2152
2153         oa->o_id = ucreatp.lrc_id;
2154         oa->o_nlink = ucreatp.lrc_ost_idx;
2155         oa->o_flags |= OBD_FL_RECREATE_OBJS;
2156         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
2157         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2158                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2159
2160         memcpy(lsm2, lsm, lsm_size);
2161         rc = obd_create(exp, oa, &lsm2, &oti);
2162
2163         OBD_FREE(lsm2, lsm_size);
2164         GOTO(out, rc);
2165 out:
2166         up(&lli->lli_size_sem);
2167         OBDO_FREE(oa);
2168         return rc;
2169 }
2170
2171 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
2172                                     int flags, struct lov_user_md *lum,
2173                                     int lum_size)
2174 {
2175         struct ll_inode_info *lli = ll_i2info(inode);
2176         struct lov_stripe_md *lsm;
2177         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
2178         int rc = 0;
2179         ENTRY;
2180
2181         down(&lli->lli_size_sem);
2182         lsm = lli->lli_smd;
2183         if (lsm) {
2184                 up(&lli->lli_size_sem);
2185                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
2186                        inode->i_ino);
2187                 RETURN(-EEXIST);
2188         }
2189
2190         rc = ll_intent_file_open(file, lum, lum_size, &oit);
2191         if (rc)
2192                 GOTO(out, rc);
2193         if (it_disposition(&oit, DISP_LOOKUP_NEG))
2194                 GOTO(out_req_free, rc = -ENOENT);
2195         rc = oit.d.lustre.it_status;
2196         if (rc < 0)
2197                 GOTO(out_req_free, rc);
2198
2199         ll_release_openhandle(file->f_dentry, &oit);
2200
2201  out:
2202         up(&lli->lli_size_sem);
2203         ll_intent_release(&oit);
2204         RETURN(rc);
2205 out_req_free:
2206         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2207         goto out;
2208 }
2209
2210 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
2211                              struct lov_mds_md **lmmp, int *lmm_size,
2212                              struct ptlrpc_request **request)
2213 {
2214         struct ll_sb_info *sbi = ll_i2sbi(inode);
2215         struct ll_fid  fid;
2216         struct mds_body  *body;
2217         struct lov_mds_md *lmm = NULL;
2218         struct ptlrpc_request *req = NULL;
2219         int rc, lmmsize;
2220
2221         ll_inode2fid(&fid, inode);
2222
2223         rc = ll_get_max_mdsize(sbi, &lmmsize);
2224         if (rc)
2225                 RETURN(rc);
2226
2227         rc = mdc_getattr_name(sbi->ll_mdc_exp, &fid,
2228                         filename, strlen(filename) + 1,
2229                         OBD_MD_FLEASIZE | OBD_MD_FLDIREA,
2230                         lmmsize, &req);
2231         if (rc < 0) {
2232                 CDEBUG(D_INFO, "mdc_getattr_name failed "
2233                                 "on %s: rc %d\n", filename, rc);
2234                 GOTO(out, rc);
2235         }
2236
2237         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
2238                         sizeof(*body));
2239         LASSERT(body != NULL); /* checked by mdc_getattr_name */
2240         /* swabbed by mdc_getattr_name */
2241         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF));
2242
2243         lmmsize = body->eadatasize;
2244
2245         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
2246                         lmmsize == 0) {
2247                 GOTO(out, rc = -ENODATA);
2248         }
2249
2250         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
2251                         lmmsize);
2252         LASSERT(lmm != NULL);
2253         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF + 1));
2254
2255         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
2256             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3)) &&
2257             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_JOIN))) {
2258                 GOTO(out, rc = -EPROTO);
2259         }
2260         /*
2261          * This is coming from the MDS, so is probably in
2262          * little endian.  We convert it to host endian before
2263          * passing it to userspace.
2264          */
2265         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
2266                 /* if function called for directory - we should
2267                  * avoid swab not existent lsm objects */
2268                 if ((lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) ||
2269                     (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3))) {
2270                         lustre_swab_lov_user_md((struct lov_user_md*)lmm);
2271                         if (S_ISREG(body->mode))
2272                                 lustre_swab_lov_user_md_objects(
2273                                                 (struct lov_user_md*)lmm);
2274                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_JOIN)) {
2275                         lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
2276                 }
2277         }
2278
2279         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
2280                 struct lov_stripe_md *lsm;
2281                 struct lov_user_md_join *lmj;
2282                 int lmj_size, i, aindex = 0;
2283
2284                 rc = obd_unpackmd(sbi->ll_osc_exp, &lsm, lmm, lmmsize);
2285                 if (rc < 0)
2286                         GOTO(out, rc = -ENOMEM);
2287                 rc = obd_checkmd(sbi->ll_osc_exp, sbi->ll_mdc_exp, lsm);
2288                 if (rc)
2289                         GOTO(out_free_memmd, rc);
2290
2291                 lmj_size = sizeof(struct lov_user_md_join) +
2292                         lsm->lsm_stripe_count *
2293                         sizeof(struct lov_user_ost_data_join);
2294                 OBD_ALLOC(lmj, lmj_size);
2295                 if (!lmj)
2296                         GOTO(out_free_memmd, rc = -ENOMEM);
2297
2298                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
2299                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2300                         struct lov_extent *lex =
2301                                 &lsm->lsm_array->lai_ext_array[aindex];
2302
2303                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
2304                                 aindex ++;
2305                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
2306                                         LPU64" len %d\n", aindex, i,
2307                                         lex->le_start, (int)lex->le_len);
2308                         lmj->lmm_objects[i].l_extent_start =
2309                                 lex->le_start;
2310
2311                         if ((int)lex->le_len == -1)
2312                                 lmj->lmm_objects[i].l_extent_end = -1;
2313                         else
2314                                 lmj->lmm_objects[i].l_extent_end =
2315                                         lex->le_start + lex->le_len;
2316                         lmj->lmm_objects[i].l_object_id =
2317                                 lsm->lsm_oinfo[i]->loi_id;
2318                         lmj->lmm_objects[i].l_object_gr =
2319                                 lsm->lsm_oinfo[i]->loi_gr;
2320                         lmj->lmm_objects[i].l_ost_gen =
2321                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2322                         lmj->lmm_objects[i].l_ost_idx =
2323                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2324                 }
2325                 lmm = (struct lov_mds_md *)lmj;
2326                 lmmsize = lmj_size;
2327 out_free_memmd:
2328                 obd_free_memmd(sbi->ll_osc_exp, &lsm);
2329         }
2330 out:
2331         *lmmp = lmm;
2332         *lmm_size = lmmsize;
2333         *request = req;
2334         return rc;
2335 }
2336 static int ll_lov_setea(struct inode *inode, struct file *file,
2337                             unsigned long arg)
2338 {
2339         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2340         struct lov_user_md  *lump;
2341         int lum_size = sizeof(struct lov_user_md) +
2342                        sizeof(struct lov_user_ost_data);
2343         int rc;
2344         ENTRY;
2345
2346         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2347                 RETURN(-EPERM);
2348
2349         OBD_ALLOC(lump, lum_size);
2350         if (lump == NULL) {
2351                 RETURN(-ENOMEM);
2352         }
2353         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2354         if (rc) {
2355                 OBD_FREE(lump, lum_size);
2356                 RETURN(-EFAULT);
2357         }
2358
2359         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2360
2361         OBD_FREE(lump, lum_size);
2362         RETURN(rc);
2363 }
2364
2365 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2366                             unsigned long arg)
2367 {
2368         struct lov_user_md_v3 lumv3;
2369         struct lov_user_md_v1 *lumv1 = (struct lov_user_md_v1 *)&lumv3;
2370         struct lov_user_md_v1 *lumv1p = (struct lov_user_md_v1 *)arg;
2371         struct lov_user_md_v3 *lumv3p = (struct lov_user_md_v3 *)arg;
2372         int lum_size;
2373         int rc;
2374         int flags = FMODE_WRITE;
2375         ENTRY;
2376
2377         /* first try with v1 which is smaller than v3 */
2378         lum_size = sizeof(struct lov_user_md_v1);
2379         rc = copy_from_user(lumv1, lumv1p, lum_size);
2380         if (rc)
2381                 RETURN(-EFAULT);
2382
2383         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
2384                 lum_size = sizeof(struct lov_user_md_v3);
2385                 rc = copy_from_user(&lumv3, lumv3p, lum_size);
2386                 if (rc)
2387                         RETURN(-EFAULT);
2388         }
2389
2390         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
2391         if (rc == 0) {
2392                  put_user(0, &lumv1p->lmm_stripe_count);
2393                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode),
2394                                     0, ll_i2info(inode)->lli_smd,
2395                                     (void *)arg);
2396         }
2397         RETURN(rc);
2398 }
2399
2400 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2401 {
2402         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2403
2404         if (!lsm)
2405                 RETURN(-ENODATA);
2406
2407         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode), 0, lsm,
2408                             (void *)arg);
2409 }
2410
2411 static int ll_get_grouplock(struct inode *inode, struct file *file,
2412                             unsigned long arg)
2413 {
2414         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2415         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2416                                                     .end = OBD_OBJECT_EOF}};
2417         struct lustre_handle lockh = { 0 };
2418         struct ll_inode_info *lli = ll_i2info(inode);
2419         struct lov_stripe_md *lsm = lli->lli_smd;
2420         int flags = 0, rc;
2421         ENTRY;
2422
2423         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2424                 RETURN(-EINVAL);
2425         }
2426
2427         policy.l_extent.gid = arg;
2428         if (file->f_flags & O_NONBLOCK)
2429                 flags = LDLM_FL_BLOCK_NOWAIT;
2430
2431         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2432         if (rc)
2433                 RETURN(rc);
2434
2435         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2436         fd->fd_gid = arg;
2437         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2438
2439         RETURN(0);
2440 }
2441
2442 static int ll_put_grouplock(struct inode *inode, struct file *file,
2443                             unsigned long arg)
2444 {
2445         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2446         struct ll_inode_info *lli = ll_i2info(inode);
2447         struct lov_stripe_md *lsm = lli->lli_smd;
2448         int rc;
2449         ENTRY;
2450
2451         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2452                 /* Ugh, it's already unlocked. */
2453                 RETURN(-EINVAL);
2454         }
2455
2456         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2457                 RETURN(-EINVAL);
2458
2459         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2460
2461         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2462         if (rc)
2463                 RETURN(rc);
2464
2465         fd->fd_gid = 0;
2466         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2467
2468         RETURN(0);
2469 }
2470
2471 #if LUSTRE_FIX >= 50
2472 static int join_sanity_check(struct inode *head, struct inode *tail)
2473 {
2474         ENTRY;
2475         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2476                 CERROR("server do not support join \n");
2477                 RETURN(-EINVAL);
2478         }
2479         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2480                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2481                        head->i_ino, tail->i_ino);
2482                 RETURN(-EINVAL);
2483         }
2484         if (head->i_ino == tail->i_ino) {
2485                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2486                 RETURN(-EINVAL);
2487         }
2488         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2489                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2490                 RETURN(-EINVAL);
2491         }
2492         RETURN(0);
2493 }
2494
2495 static int join_file(struct inode *head_inode, struct file *head_filp,
2496                      struct file *tail_filp)
2497 {
2498         struct dentry *tail_dentry = tail_filp->f_dentry;
2499         struct lookup_intent oit = {.it_op = IT_OPEN,
2500                                     .it_flags = head_filp->f_flags,
2501                                     .it_create_mode = M_JOIN_FILE};
2502         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_PW,
2503                 ll_mdc_blocking_ast, ldlm_completion_ast, NULL, NULL };
2504
2505         struct lustre_handle lockh;
2506         struct mdc_op_data *op_data;
2507         int    rc;
2508         loff_t data;
2509         ENTRY;
2510
2511         tail_dentry = tail_filp->f_dentry;
2512
2513         OBD_ALLOC_PTR(op_data);
2514         if (op_data == NULL) {
2515                 RETURN(-ENOMEM);
2516         }
2517
2518         data = i_size_read(head_inode);
2519         ll_prepare_mdc_op_data(op_data, head_inode,
2520                                tail_dentry->d_parent->d_inode,
2521                                tail_dentry->d_name.name,
2522                                tail_dentry->d_name.len, 0, &data);
2523         rc = mdc_enqueue(ll_i2mdcexp(head_inode), &einfo, &oit,
2524                          op_data, &lockh, NULL, 0, 0);
2525
2526         if (rc < 0)
2527                 GOTO(out, rc);
2528
2529         rc = oit.d.lustre.it_status;
2530
2531         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2532                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2533                 ptlrpc_req_finished((struct ptlrpc_request *)
2534                                     oit.d.lustre.it_data);
2535                 GOTO(out, rc);
2536         }
2537
2538         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2539                                            * away */
2540                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2541                 oit.d.lustre.it_lock_mode = 0;
2542         }
2543         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2544         it_clear_disposition(&oit, DISP_ENQ_COMPLETE);
2545         ll_release_openhandle(head_filp->f_dentry, &oit);
2546 out:
2547         if (op_data)
2548                 OBD_FREE_PTR(op_data);
2549         ll_intent_release(&oit);
2550         RETURN(rc);
2551 }
2552
2553 static int ll_file_join(struct inode *head, struct file *filp,
2554                         char *filename_tail)
2555 {
2556         struct inode *tail = NULL, *first = NULL, *second = NULL;
2557         struct dentry *tail_dentry;
2558         struct file *tail_filp, *first_filp, *second_filp;
2559         struct ll_lock_tree first_tree, second_tree;
2560         struct ll_lock_tree_node *first_node, *second_node;
2561         struct ll_inode_info *hlli = ll_i2info(head);
2562         int rc = 0, cleanup_phase = 0;
2563         ENTRY;
2564
2565         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2566                head->i_ino, head->i_generation, head, filename_tail);
2567
2568         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2569         if (IS_ERR(tail_filp)) {
2570                 CERROR("Can not open tail file %s", filename_tail);
2571                 rc = PTR_ERR(tail_filp);
2572                 GOTO(cleanup, rc);
2573         }
2574         tail = igrab(tail_filp->f_dentry->d_inode);
2575
2576         tail_dentry = tail_filp->f_dentry;
2577         LASSERT(tail_dentry);
2578         cleanup_phase = 1;
2579
2580         /*reorder the inode for lock sequence*/
2581         first = head->i_ino > tail->i_ino ? head : tail;
2582         second = head->i_ino > tail->i_ino ? tail : head;
2583         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2584         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2585
2586         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2587                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2588         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2589         if (IS_ERR(first_node)){
2590                 rc = PTR_ERR(first_node);
2591                 GOTO(cleanup, rc);
2592         }
2593         first_tree.lt_fd = first_filp->private_data;
2594         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2595         if (rc != 0)
2596                 GOTO(cleanup, rc);
2597         cleanup_phase = 2;
2598
2599         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2600         if (IS_ERR(second_node)){
2601                 rc = PTR_ERR(second_node);
2602                 GOTO(cleanup, rc);
2603         }
2604         second_tree.lt_fd = second_filp->private_data;
2605         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2606         if (rc != 0)
2607                 GOTO(cleanup, rc);
2608         cleanup_phase = 3;
2609
2610         rc = join_sanity_check(head, tail);
2611         if (rc)
2612                 GOTO(cleanup, rc);
2613
2614         rc = join_file(head, filp, tail_filp);
2615         if (rc)
2616                 GOTO(cleanup, rc);
2617 cleanup:
2618         switch (cleanup_phase) {
2619         case 3:
2620                 ll_tree_unlock(&second_tree);
2621                 obd_cancel_unused(ll_i2obdexp(second),
2622                                   ll_i2info(second)->lli_smd, 0, NULL);
2623         case 2:
2624                 ll_tree_unlock(&first_tree);
2625                 obd_cancel_unused(ll_i2obdexp(first),
2626                                   ll_i2info(first)->lli_smd, 0, NULL);
2627         case 1:
2628                 filp_close(tail_filp, 0);
2629                 if (tail)
2630                         iput(tail);
2631                 if (head && rc == 0) {
2632                         obd_free_memmd(ll_i2sbi(head)->ll_osc_exp,
2633                                        &hlli->lli_smd);
2634                         hlli->lli_smd = NULL;
2635                 }
2636         case 0:
2637                 break;
2638         default:
2639                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2640                 LBUG();
2641         }
2642         RETURN(rc);
2643 }
2644 #endif  /* LUSTRE_FIX >= 50 */
2645
2646 /**
2647  * Close inode open handle
2648  *
2649  * \param dentry [in]     dentry which contains the inode
2650  * \param it     [in,out] intent which contains open info and result
2651  *
2652  * \retval 0     success
2653  * \retval <0    failure
2654  */
2655 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2656 {
2657         struct inode *inode = dentry->d_inode;
2658         struct obd_client_handle *och;
2659         int rc;
2660         ENTRY;
2661
2662         LASSERT(inode);
2663
2664         /* Root ? Do nothing. */
2665         if (dentry->d_inode->i_sb->s_root == dentry)
2666                 RETURN(0);
2667
2668         /* No open handle to close? Move away */
2669         if (!it_disposition(it, DISP_OPEN_OPEN))
2670                 RETURN(0);
2671
2672         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2673
2674         OBD_ALLOC(och, sizeof(*och));
2675         if (!och)
2676                 GOTO(out, rc = -ENOMEM);
2677
2678         ll_och_fill(ll_i2info(inode), it, och);
2679
2680         rc = ll_close_inode_openhandle(inode, och);
2681
2682         OBD_FREE(och, sizeof(*och));
2683  out:
2684         /* this one is in place of ll_file_open */
2685         if (it_disposition(it, DISP_ENQ_OPEN_REF))
2686                 ptlrpc_req_finished(it->d.lustre.it_data);
2687         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2688         RETURN(rc);
2689 }
2690
2691 int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
2692               int num_bytes)
2693 {
2694         struct obd_export *exp = ll_i2obdexp(inode);
2695         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2696         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
2697         int vallen = num_bytes;
2698         int rc;
2699         ENTRY;
2700
2701         /* If the stripe_count > 1 and the application does not understand
2702          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
2703          */
2704         if (lsm->lsm_stripe_count > 1 &&
2705             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
2706                 return -EOPNOTSUPP;
2707
2708         fm_key.oa.o_id = lsm->lsm_object_id;
2709         fm_key.oa.o_valid = OBD_MD_FLID;
2710
2711         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLFID | OBD_MD_FLSIZE);
2712
2713         /* If filesize is 0, then there would be no objects for mapping */
2714         if (fm_key.oa.o_size == 0) {
2715                 fiemap->fm_mapped_extents = 0;
2716                 RETURN(0);
2717         }
2718
2719         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
2720
2721         rc = obd_get_info(exp, sizeof(fm_key), &fm_key, &vallen, fiemap, lsm);
2722         if (rc)
2723                 CERROR("obd_get_info failed: rc = %d\n", rc);
2724
2725         RETURN(rc);
2726 }
2727
2728 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2729                   unsigned long arg)
2730 {
2731         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2732         int flags;
2733         ENTRY;
2734
2735         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2736                inode->i_generation, inode, cmd);
2737         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2738
2739         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2740         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2741                 RETURN(-ENOTTY);
2742
2743         switch(cmd) {
2744         case LL_IOC_GETFLAGS:
2745                 /* Get the current value of the file flags */
2746                 return put_user(fd->fd_flags, (int *)arg);
2747         case LL_IOC_SETFLAGS:
2748         case LL_IOC_CLRFLAGS:
2749                 /* Set or clear specific file flags */
2750                 /* XXX This probably needs checks to ensure the flags are
2751                  *     not abused, and to handle any flag side effects.
2752                  */
2753                 if (get_user(flags, (int *) arg))
2754                         RETURN(-EFAULT);
2755
2756                 if (cmd == LL_IOC_SETFLAGS) {
2757                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2758                             !(file->f_flags & O_DIRECT)) {
2759                                 CERROR("%s: unable to disable locking on "
2760                                        "non-O_DIRECT file\n", current->comm);
2761                                 RETURN(-EINVAL);
2762                         }
2763
2764                         fd->fd_flags |= flags;
2765                 } else {
2766                         fd->fd_flags &= ~flags;
2767                 }
2768                 RETURN(0);
2769         case LL_IOC_LOV_SETSTRIPE:
2770                 RETURN(ll_lov_setstripe(inode, file, arg));
2771         case LL_IOC_LOV_SETEA:
2772                 RETURN(ll_lov_setea(inode, file, arg));
2773         case LL_IOC_LOV_GETSTRIPE:
2774                 RETURN(ll_lov_getstripe(inode, arg));
2775         case LL_IOC_RECREATE_OBJ:
2776                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2777         case FSFILT_IOC_FIEMAP: {
2778                 struct ll_user_fiemap *fiemap_s;
2779                 size_t num_bytes, ret_bytes;
2780                 unsigned int extent_count;
2781                 int rc = 0;
2782
2783                 /* Get the extent count so we can calculate the size of
2784                  * required fiemap buffer */
2785                 if (get_user(extent_count,
2786                     &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
2787                         RETURN(-EFAULT);
2788                 num_bytes = sizeof(*fiemap_s) + (extent_count *
2789                                                  sizeof(struct ll_fiemap_extent));
2790                 OBD_VMALLOC(fiemap_s, num_bytes);
2791                 if (fiemap_s == NULL)
2792                         RETURN(-ENOMEM);
2793
2794                 if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
2795                                    sizeof(*fiemap_s)))
2796                         GOTO(error, rc = -EFAULT);
2797
2798                 if (fiemap_s->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
2799                         fiemap_s->fm_flags = fiemap_s->fm_flags &
2800                                                     ~LUSTRE_FIEMAP_FLAGS_COMPAT;
2801                         if (copy_to_user((char *)arg, fiemap_s,
2802                                          sizeof(*fiemap_s)))
2803                                 GOTO(error, rc = -EFAULT);
2804
2805                         GOTO(error, rc = -EBADR);
2806                 }
2807
2808                 /* If fm_extent_count is non-zero, read the first extent since
2809                  * it is used to calculate end_offset and device from previous
2810                  * fiemap call. */
2811                 if (extent_count) {
2812                         if (copy_from_user(&fiemap_s->fm_extents[0],
2813                             (char __user *)arg + sizeof(*fiemap_s),
2814                             sizeof(struct ll_fiemap_extent)))
2815                                 GOTO(error, rc = -EFAULT);
2816                 }
2817
2818                 if (fiemap_s->fm_flags & FIEMAP_FLAG_SYNC) {
2819                         int rc;
2820
2821                         rc = filemap_fdatawrite(inode->i_mapping);
2822                         if (rc)
2823                                 GOTO(error, rc);
2824                 }
2825
2826                 rc = ll_fiemap(inode, fiemap_s, num_bytes);
2827                 if (rc)
2828                         GOTO(error, rc);
2829
2830                 ret_bytes = sizeof(struct ll_user_fiemap);
2831
2832                 if (extent_count != 0)
2833                         ret_bytes += (fiemap_s->fm_mapped_extents *
2834                                          sizeof(struct ll_fiemap_extent));
2835
2836                 if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
2837                         rc = -EFAULT;
2838
2839 error:
2840                 OBD_VFREE(fiemap_s, num_bytes);
2841                 RETURN(rc);
2842         }
2843         case FSFILT_IOC_GETFLAGS:
2844         case FSFILT_IOC_SETFLAGS:
2845                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2846         case FSFILT_IOC_GETVERSION_OLD:
2847         case FSFILT_IOC_GETVERSION:
2848                 RETURN(put_user(inode->i_generation, (int *)arg));
2849         case LL_IOC_JOIN: {
2850 #if LUSTRE_FIX >= 50
2851                 /* Allow file join in beta builds to allow debuggging */
2852                 char *ftail;
2853                 int rc;
2854
2855                 ftail = getname((const char *)arg);
2856                 if (IS_ERR(ftail))
2857                         RETURN(PTR_ERR(ftail));
2858                 rc = ll_file_join(inode, file, ftail);
2859                 putname(ftail);
2860                 RETURN(rc);
2861 #else
2862                 CWARN("file join is not supported in this version of Lustre\n");
2863                 RETURN(-ENOTTY);
2864 #endif
2865         }
2866         case LL_IOC_GROUP_LOCK:
2867                 RETURN(ll_get_grouplock(inode, file, arg));
2868         case LL_IOC_GROUP_UNLOCK:
2869                 RETURN(ll_put_grouplock(inode, file, arg));
2870         case IOC_OBD_STATFS:
2871                 RETURN(ll_obd_statfs(inode, (void *)arg));
2872         case OBD_IOC_GETNAME_OLD:
2873         case OBD_IOC_GETNAME: {
2874                 struct obd_device *obd =
2875                         class_exp2obd(ll_i2sbi(inode)->ll_osc_exp);
2876                 if (!obd)
2877                         RETURN(-EFAULT);
2878                 if (copy_to_user((void *)arg, obd->obd_name,
2879                                 strlen(obd->obd_name) + 1))
2880                         RETURN (-EFAULT);
2881                 RETURN(0);
2882         }
2883
2884         /* We need to special case any other ioctls we want to handle,
2885          * to send them to the MDS/OST as appropriate and to properly
2886          * network encode the arg field.
2887         case EXT3_IOC_SETVERSION_OLD:
2888         case EXT3_IOC_SETVERSION:
2889         */
2890         default: {
2891                 int err;
2892
2893                 if (LLIOC_STOP ==
2894                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2895                         RETURN(err);
2896
2897                 RETURN(obd_iocontrol(cmd, ll_i2obdexp(inode), 0, NULL,
2898                                      (void *)arg));
2899         }
2900         }
2901 }
2902
2903 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2904 {
2905         struct inode *inode = file->f_dentry->d_inode;
2906         struct ll_inode_info *lli = ll_i2info(inode);
2907         struct lov_stripe_md *lsm = lli->lli_smd;
2908         loff_t retval;
2909         ENTRY;
2910         retval = offset + ((origin == 2) ? i_size_read(inode) :
2911                            (origin == 1) ? file->f_pos : 0);
2912         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2913                inode->i_ino, inode->i_generation, inode, retval, retval,
2914                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2915         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2916
2917         if (origin == 2) { /* SEEK_END */
2918                 int nonblock = 0, rc;
2919
2920                 if (file->f_flags & O_NONBLOCK)
2921                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2922
2923                 if (lsm != NULL) {
2924                         rc = ll_glimpse_size(inode, nonblock);
2925                         if (rc != 0)
2926                                 RETURN(rc);
2927                 }
2928
2929                 ll_inode_size_lock(inode, 0);
2930                 offset += i_size_read(inode);
2931                 ll_inode_size_unlock(inode, 0);
2932         } else if (origin == 1) { /* SEEK_CUR */
2933                 offset += file->f_pos;
2934         }
2935
2936         retval = -EINVAL;
2937         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2938                 if (offset != file->f_pos) {
2939                         file->f_pos = offset;
2940                         file->f_version = 0;
2941                 }
2942                 retval = offset;
2943         }
2944
2945         RETURN(retval);
2946 }
2947
2948 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2949 {
2950         struct inode *inode = dentry->d_inode;
2951         struct ll_inode_info *lli = ll_i2info(inode);
2952         struct lov_stripe_md *lsm = lli->lli_smd;
2953         struct ll_fid fid;
2954         struct ptlrpc_request *req;
2955         int rc, err;
2956         ENTRY;
2957         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2958                inode->i_generation, inode);
2959         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2960
2961         /* fsync's caller has already called _fdata{sync,write}, we want
2962          * that IO to finish before calling the osc and mdc sync methods */
2963         rc = filemap_fdatawait(inode->i_mapping);
2964
2965         /* catch async errors that were recorded back when async writeback
2966          * failed for pages in this mapping. */
2967         err = lli->lli_async_rc;
2968         lli->lli_async_rc = 0;
2969         if (rc == 0)
2970                 rc = err;
2971         if (lsm) {
2972                 err = lov_test_and_clear_async_rc(lsm);
2973                 if (rc == 0)
2974                         rc = err;
2975         }
2976
2977         ll_inode2fid(&fid, inode);
2978         err = mdc_sync(ll_i2sbi(inode)->ll_mdc_exp, &fid, &req);
2979         if (!rc)
2980                 rc = err;
2981         if (!err)
2982                 ptlrpc_req_finished(req);
2983
2984         if (data && lsm) {
2985                 struct obd_info *oinfo;
2986
2987                 OBD_ALLOC_PTR(oinfo);
2988                 if (!oinfo)
2989                         RETURN(rc ? rc : -ENOMEM);
2990                 OBDO_ALLOC(oinfo->oi_oa);
2991                 if (!oinfo->oi_oa) {
2992                         OBD_FREE_PTR(oinfo);
2993                         RETURN(rc ? rc : -ENOMEM);
2994                 }
2995                 oinfo->oi_oa->o_id = lsm->lsm_object_id;
2996                 oinfo->oi_oa->o_gr = lsm->lsm_object_gr;
2997                 oinfo->oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2998                 obdo_from_inode(oinfo->oi_oa, inode,
2999                                 OBD_MD_FLTYPE | OBD_MD_FLATIME |
3000                                 OBD_MD_FLMTIME | OBD_MD_FLCTIME);
3001                 oinfo->oi_md = lsm;
3002                 err = obd_sync_rqset(ll_i2sbi(inode)->ll_osc_exp, oinfo,
3003                                      0, OBD_OBJECT_EOF);
3004                 if (!rc)
3005                         rc = err;
3006                 OBDO_FREE(oinfo->oi_oa);
3007                 OBD_FREE_PTR(oinfo);
3008         }
3009
3010         RETURN(rc);
3011 }
3012
3013 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
3014 {
3015         struct inode *inode = file->f_dentry->d_inode;
3016         struct ll_sb_info *sbi = ll_i2sbi(inode);
3017         struct lu_fid *fid = ll_inode_lu_fid(inode);
3018         struct ldlm_res_id res_id =
3019                     { .name = { fid_seq(fid),
3020                                 fid_oid(fid),
3021                                 fid_ver(fid),
3022                                 LDLM_FLOCK} };
3023         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
3024                 ldlm_flock_completion_ast, NULL, file_lock };
3025         struct lustre_handle lockh = {0};
3026         ldlm_policy_data_t flock;
3027         int flags = 0;
3028         int rc;
3029         ENTRY;
3030
3031         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
3032                inode->i_ino, file_lock);
3033         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
3034
3035         if (fid_is_igif(fid)) {
3036                 /* If this is an IGIF inode, we need to keep the 1.6-style
3037                  * flock mapping for compatibility.  If it is a proper FID
3038                  * then we know any other client accessing it must also be
3039                  * accessing it as a FID and can use the CMD-style flock. */
3040                 res_id.name[2] = LDLM_FLOCK;
3041                 res_id.name[3] = 0;
3042         }
3043
3044         if (file_lock->fl_flags & FL_FLOCK) {
3045                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
3046                 /* set missing params for flock() calls */
3047                 file_lock->fl_end = OFFSET_MAX;
3048                 file_lock->fl_pid = current->tgid;
3049         }
3050         flock.l_flock.pid = file_lock->fl_pid;
3051         flock.l_flock.start = file_lock->fl_start;
3052         flock.l_flock.end = file_lock->fl_end;
3053
3054         switch (file_lock->fl_type) {
3055         case F_RDLCK:
3056                 einfo.ei_mode = LCK_PR;
3057                 break;
3058         case F_UNLCK:
3059                 /* An unlock request may or may not have any relation to
3060                  * existing locks so we may not be able to pass a lock handle
3061                  * via a normal ldlm_lock_cancel() request. The request may even
3062                  * unlock a byte range in the middle of an existing lock. In
3063                  * order to process an unlock request we need all of the same
3064                  * information that is given with a normal read or write record
3065                  * lock request. To avoid creating another ldlm unlock (cancel)
3066                  * message we'll treat a LCK_NL flock request as an unlock. */
3067                 einfo.ei_mode = LCK_NL;
3068                 break;
3069         case F_WRLCK:
3070                 einfo.ei_mode = LCK_PW;
3071                 break;
3072         default:
3073                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
3074                 RETURN (-EINVAL);
3075         }
3076
3077         switch (cmd) {
3078         case F_SETLKW:
3079 #ifdef F_SETLKW64
3080         case F_SETLKW64:
3081 #endif
3082                 flags = 0;
3083                 break;
3084         case F_SETLK:
3085 #ifdef F_SETLK64
3086         case F_SETLK64:
3087 #endif
3088                 flags = LDLM_FL_BLOCK_NOWAIT;
3089                 break;
3090         case F_GETLK:
3091 #ifdef F_GETLK64
3092         case F_GETLK64:
3093 #endif
3094                 flags = LDLM_FL_TEST_LOCK;
3095                 /* Save the old mode so that if the mode in the lock changes we
3096                  * can decrement the appropriate reader or writer refcount. */
3097                 file_lock->fl_type = einfo.ei_mode;
3098                 break;
3099         default:
3100                 CERROR("unknown fcntl lock command: %d\n", cmd);
3101                 RETURN (-EINVAL);
3102         }
3103
3104         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
3105                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
3106                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
3107
3108         rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, &einfo, res_id,
3109                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
3110         if ((file_lock->fl_flags & FL_FLOCK) &&
3111             (rc == 0 || file_lock->fl_type == F_UNLCK))
3112                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
3113 #ifdef HAVE_F_OP_FLOCK
3114         if ((file_lock->fl_flags & FL_POSIX) &&
3115             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
3116             !(flags & LDLM_FL_TEST_LOCK))
3117                 posix_lock_file_wait(file, file_lock);
3118 #endif
3119
3120         RETURN(rc);
3121 }
3122
3123 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
3124 {
3125         ENTRY;
3126
3127         RETURN(-ENOSYS);
3128 }
3129
3130 int ll_have_md_lock(struct inode *inode, __u64 bits)
3131 {
3132         struct lustre_handle lockh;
3133         struct ldlm_res_id res_id;
3134         struct obd_device *obddev;
3135         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
3136         int flags;
3137         ENTRY;
3138
3139         if (!inode)
3140                RETURN(0);
3141
3142         obddev = ll_i2mdcexp(inode)->exp_obd;
3143         fid_build_reg_res_name(ll_inode_lu_fid(inode), &res_id);
3144
3145         CDEBUG(D_INFO, "trying to match res "LPU64":"LPU64":"LPU64"\n",
3146                 res_id.name[0],
3147                 res_id.name[1],
3148                 res_id.name[2]);
3149
3150         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
3151         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
3152                             &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
3153                 RETURN(1);
3154         }
3155
3156         RETURN(0);
3157 }
3158
3159 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
3160         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
3161                               * and return success */
3162                 inode->i_nlink = 0;
3163                 /* This path cannot be hit for regular files unless in
3164                  * case of obscure races, so no need to to validate
3165                  * size. */
3166                 if (!S_ISREG(inode->i_mode) &&
3167                     !S_ISDIR(inode->i_mode))
3168                         return 0;
3169         }
3170
3171         if (rc) {
3172                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
3173                 return -abs(rc);
3174
3175         }
3176
3177         return 0;
3178 }
3179
3180 int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
3181                              __u64 ibits)
3182 {
3183         struct inode *inode = dentry->d_inode;
3184         struct ptlrpc_request *req = NULL;
3185         struct obd_export *exp;
3186         int rc = 0;
3187         ENTRY;
3188
3189         if (!inode) {
3190                 CERROR("REPORT THIS LINE TO PETER\n");
3191                 RETURN(0);
3192         }
3193         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
3194                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
3195
3196         exp = ll_i2mdcexp(inode);
3197
3198         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
3199                 struct lookup_intent oit = { .it_op = IT_GETATTR };
3200                 struct mdc_op_data op_data = { { 0 } };
3201
3202                 /* Call getattr by fid, so do not provide name at all. */
3203                 ll_prepare_mdc_op_data(&op_data, dentry->d_parent->d_inode,
3204                                        dentry->d_inode, NULL, 0, 0, NULL);
3205                 oit.it_create_mode |= M_CHECK_STALE;
3206                 rc = mdc_intent_lock(exp, &op_data, NULL, 0,
3207                                      /* we are not interested in name
3208                                         based lookup */
3209                                      &oit, 0, &req,
3210                                      ll_mdc_blocking_ast, 0);
3211                 oit.it_create_mode &= ~M_CHECK_STALE;
3212                 if (rc < 0) {
3213                         rc = ll_inode_revalidate_fini(inode, rc);
3214                         GOTO (out, rc);
3215                 }
3216
3217                 rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
3218                 if (rc != 0) {
3219                         ll_intent_release(&oit);
3220                         GOTO(out, rc);
3221                 }
3222
3223                 /* Unlinked? Unhash dentry, so it is not picked up later by
3224                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
3225                    here to preserve get_cwd functionality on 2.6.
3226                    Bug 10503 */
3227                 if (!dentry->d_inode->i_nlink) {
3228                         spin_lock(&ll_lookup_lock);
3229                         spin_lock(&dcache_lock);
3230                         ll_drop_dentry(dentry);
3231                         spin_unlock(&dcache_lock);
3232                         spin_unlock(&ll_lookup_lock);
3233                 }
3234
3235                 ll_lookup_finish_locks(&oit, dentry);
3236         } else if (!ll_have_md_lock(dentry->d_inode, ibits)) {
3237                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
3238                 struct ll_fid fid;
3239                 obd_valid valid = OBD_MD_FLGETATTR;
3240                 int ealen = 0;
3241
3242                 if (S_ISREG(inode->i_mode)) {
3243                         rc = ll_get_max_mdsize(sbi, &ealen);
3244                         if (rc)
3245                                 RETURN(rc);
3246                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
3247                 }
3248                 ll_inode2fid(&fid, inode);
3249                 rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req);
3250                 if (rc) {
3251                         rc = ll_inode_revalidate_fini(inode, rc);
3252                         RETURN(rc);
3253                 }
3254
3255                 rc = ll_prep_inode(sbi->ll_osc_exp, &inode, req, REPLY_REC_OFF,
3256                                    NULL);
3257         }
3258
3259 out:
3260         ptlrpc_req_finished(req);
3261         RETURN(rc);
3262 }
3263
3264 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
3265 {
3266         struct inode *inode = dentry->d_inode;
3267         int rc;
3268         ENTRY;
3269
3270         rc = __ll_inode_revalidate_it(dentry, it, MDS_INODELOCK_UPDATE |
3271                                                   MDS_INODELOCK_LOOKUP);
3272
3273         /* if object not yet allocated, don't validate size */
3274         if (rc == 0 && ll_i2info(inode)->lli_smd == NULL) {
3275                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
3276                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
3277                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
3278                 RETURN(0);
3279         }
3280
3281         /* ll_glimpse_size will prefer locally cached writes if they extend
3282          * the file */
3283
3284         if (rc == 0)
3285                 rc = ll_glimpse_size(inode, 0);
3286
3287         RETURN(rc);
3288 }
3289
3290 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
3291                   struct lookup_intent *it, struct kstat *stat)
3292 {
3293         struct inode *inode = de->d_inode;
3294         int res = 0;
3295
3296         res = ll_inode_revalidate_it(de, it);
3297         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
3298
3299         if (res)
3300                 return res;
3301
3302         stat->dev = inode->i_sb->s_dev;
3303         stat->ino = inode->i_ino;
3304         stat->mode = inode->i_mode;
3305         stat->nlink = inode->i_nlink;
3306         stat->uid = inode->i_uid;
3307         stat->gid = inode->i_gid;
3308         stat->rdev = kdev_t_to_nr(inode->i_rdev);
3309         stat->atime = inode->i_atime;
3310         stat->mtime = inode->i_mtime;
3311         stat->ctime = inode->i_ctime;
3312 #ifdef HAVE_INODE_BLKSIZE
3313         stat->blksize = inode->i_blksize;
3314 #else
3315         stat->blksize = 1<<inode->i_blkbits;
3316 #endif
3317
3318         ll_inode_size_lock(inode, 0);
3319         stat->size = i_size_read(inode);
3320         stat->blocks = inode->i_blocks;
3321         ll_inode_size_unlock(inode, 0);
3322
3323         return 0;
3324 }
3325 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3326 {
3327         struct lookup_intent it = { .it_op = IT_GETATTR };
3328
3329         return ll_getattr_it(mnt, de, &it, stat);
3330 }
3331
3332 static
3333 int lustre_check_acl(struct inode *inode, int mask)
3334 {
3335 #ifdef CONFIG_FS_POSIX_ACL
3336         struct ll_inode_info *lli = ll_i2info(inode);
3337         struct posix_acl *acl;
3338         int rc;
3339         ENTRY;
3340
3341         spin_lock(&lli->lli_lock);
3342         acl = posix_acl_dup(lli->lli_posix_acl);
3343         spin_unlock(&lli->lli_lock);
3344
3345         if (!acl)
3346                 RETURN(-EAGAIN);
3347
3348         rc = posix_acl_permission(inode, acl, mask);
3349         posix_acl_release(acl);
3350
3351         RETURN(rc);
3352 #else
3353         return -EAGAIN;
3354 #endif
3355 }
3356
3357 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
3358 #ifndef HAVE_INODE_PERMISION_2ARGS
3359 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3360 #else
3361 int ll_inode_permission(struct inode *inode, int mask)
3362 #endif
3363 {
3364         int rc = 0;
3365         ENTRY;
3366
3367        /* as root inode are NOT getting validated in lookup operation,
3368         * need to do it before permission check. */
3369
3370         if (inode == inode->i_sb->s_root->d_inode) {
3371                 struct lookup_intent it = { .it_op = IT_LOOKUP };
3372
3373                 rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
3374                                               MDS_INODELOCK_LOOKUP);
3375                 if (rc)
3376                         RETURN(rc);
3377         }
3378
3379         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
3380                inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
3381
3382         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3383         rc = generic_permission(inode, mask, lustre_check_acl);
3384
3385         RETURN(rc);
3386 }
3387 #else
3388 #ifndef HAVE_INODE_PERMISION_2ARGS
3389 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3390 #else
3391 int ll_inode_permission(struct inode *inode, int mask)
3392 #endif
3393 {
3394         int mode = inode->i_mode;
3395         int rc;
3396
3397         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3398                inode->i_ino, inode->i_generation, inode, mask);
3399         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3400
3401         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
3402             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
3403                 return -EROFS;
3404         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
3405                 return -EACCES;
3406         if (current->fsuid == inode->i_uid) {
3407                 mode >>= 6;
3408         } else if (1) {
3409                 if (((mode >> 3) & mask & S_IRWXO) != mask)
3410                         goto check_groups;
3411                 rc = lustre_check_acl(inode, mask);
3412                 if (rc == -EAGAIN)
3413                         goto check_groups;
3414                 if (rc == -EACCES)
3415                         goto check_capabilities;
3416                 return rc;
3417         } else {
3418 check_groups:
3419                 if (in_group_p(inode->i_gid))
3420                         mode >>= 3;
3421         }
3422         if ((mode & mask & S_IRWXO) == mask)
3423                 return 0;
3424
3425 check_capabilities:
3426         if (!(mask & MAY_EXEC) ||
3427             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
3428                 if (cfs_capable(CFS_CAP_DAC_OVERRIDE))
3429                         return 0;
3430
3431         if (cfs_capable(CFS_CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
3432             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
3433                 return 0;
3434
3435         return -EACCES;
3436 }
3437 #endif
3438
3439 /* -o localflock - only provides locally consistent flock locks */
3440 struct file_operations ll_file_operations = {
3441         .read           = ll_file_read,
3442 #ifdef HAVE_FILE_READV
3443         .readv          = ll_file_readv,
3444 #else
3445         .aio_read       = ll_file_aio_read,
3446 #endif
3447         .write          = ll_file_write,
3448 #ifdef HAVE_FILE_WRITEV
3449         .writev         = ll_file_writev,
3450 #else
3451         .aio_write      = ll_file_aio_write,
3452 #endif
3453         .ioctl          = ll_file_ioctl,
3454         .open           = ll_file_open,
3455         .release        = ll_file_release,
3456         .mmap           = ll_file_mmap,
3457         .llseek         = ll_file_seek,
3458 #ifdef HAVE_KERNEL_SPLICE_READ
3459         .splice_read    = ll_file_splice_read,
3460 #endif
3461 #ifdef HAVE_KERNEL_SENDFILE
3462         .sendfile       = ll_file_sendfile,
3463 #endif
3464         .fsync          = ll_fsync,
3465 };
3466
3467 struct file_operations ll_file_operations_flock = {
3468         .read           = ll_file_read,
3469 #ifdef HAVE_FILE_READV
3470         .readv          = ll_file_readv,
3471 #else
3472         .aio_read       = ll_file_aio_read,
3473 #endif
3474         .write          = ll_file_write,
3475 #ifdef HAVE_FILE_WRITEV
3476         .writev         = ll_file_writev,
3477 #else
3478         .aio_write      = ll_file_aio_write,
3479 #endif
3480         .ioctl          = ll_file_ioctl,
3481         .open           = ll_file_open,
3482         .release        = ll_file_release,
3483         .mmap           = ll_file_mmap,
3484         .llseek         = ll_file_seek,
3485 #ifdef HAVE_KERNEL_SPLICE_READ
3486         .splice_read    = ll_file_splice_read,
3487 #endif
3488 #ifdef HAVE_KERNEL_SENDFILE
3489         .sendfile       = ll_file_sendfile,
3490 #endif
3491         .fsync          = ll_fsync,
3492 #ifdef HAVE_F_OP_FLOCK
3493         .flock          = ll_file_flock,
3494 #endif
3495         .lock           = ll_file_flock
3496 };
3497
3498 /* These are for -o noflock - to return ENOSYS on flock calls */
3499 struct file_operations ll_file_operations_noflock = {
3500         .read           = ll_file_read,
3501 #ifdef HAVE_FILE_READV
3502         .readv          = ll_file_readv,
3503 #else
3504         .aio_read       = ll_file_aio_read,
3505 #endif
3506         .write          = ll_file_write,
3507 #ifdef HAVE_FILE_WRITEV
3508         .writev         = ll_file_writev,
3509 #else
3510         .aio_write      = ll_file_aio_write,
3511 #endif
3512         .ioctl          = ll_file_ioctl,
3513         .open           = ll_file_open,
3514         .release        = ll_file_release,
3515         .mmap           = ll_file_mmap,
3516         .llseek         = ll_file_seek,
3517 #ifdef HAVE_KERNEL_SPLICE_READ
3518         .splice_read    = ll_file_splice_read,
3519 #endif
3520 #ifdef HAVE_KERNEL_SENDFILE
3521         .sendfile       = ll_file_sendfile,
3522 #endif
3523         .fsync          = ll_fsync,
3524 #ifdef HAVE_F_OP_FLOCK
3525         .flock          = ll_file_noflock,
3526 #endif
3527         .lock           = ll_file_noflock
3528 };
3529
3530 struct inode_operations ll_file_inode_operations = {
3531 #ifdef HAVE_VFS_INTENT_PATCHES
3532         .setattr_raw    = ll_setattr_raw,
3533 #endif
3534         .setattr        = ll_setattr,
3535         .truncate       = ll_truncate,
3536         .getattr        = ll_getattr,
3537         .permission     = ll_inode_permission,
3538         .setxattr       = ll_setxattr,
3539         .getxattr       = ll_getxattr,
3540         .listxattr      = ll_listxattr,
3541         .removexattr    = ll_removexattr,
3542 };
3543
3544 /* dynamic ioctl number support routins */
3545 static struct llioc_ctl_data {
3546         struct rw_semaphore ioc_sem;
3547         struct list_head    ioc_head;
3548 } llioc = {
3549         __RWSEM_INITIALIZER(llioc.ioc_sem),
3550         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3551 };
3552
3553
3554 struct llioc_data {
3555         struct list_head        iocd_list;
3556         unsigned int            iocd_size;
3557         llioc_callback_t        iocd_cb;
3558         unsigned int            iocd_count;
3559         unsigned int            iocd_cmd[0];
3560 };
3561
3562 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3563 {
3564         unsigned int size;
3565         struct llioc_data *in_data = NULL;
3566         ENTRY;
3567
3568         if (cb == NULL || cmd == NULL ||
3569             count > LLIOC_MAX_CMD || count < 0)
3570                 RETURN(NULL);
3571
3572         size = sizeof(*in_data) + count * sizeof(unsigned int);
3573         OBD_ALLOC(in_data, size);
3574         if (in_data == NULL)
3575                 RETURN(NULL);
3576
3577         memset(in_data, 0, sizeof(*in_data));
3578         in_data->iocd_size = size;
3579         in_data->iocd_cb = cb;
3580         in_data->iocd_count = count;
3581         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3582
3583         down_write(&llioc.ioc_sem);
3584         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3585         up_write(&llioc.ioc_sem);
3586
3587         RETURN(in_data);
3588 }
3589
3590 void ll_iocontrol_unregister(void *magic)
3591 {
3592         struct llioc_data *tmp;
3593
3594         if (magic == NULL)
3595                 return;
3596
3597         down_write(&llioc.ioc_sem);
3598         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3599                 if (tmp == magic) {
3600                         unsigned int size = tmp->iocd_size;
3601
3602                         list_del(&tmp->iocd_list);
3603                         up_write(&llioc.ioc_sem);
3604
3605                         OBD_FREE(tmp, size);
3606                         return;
3607                 }
3608         }
3609         up_write(&llioc.ioc_sem);
3610
3611         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3612 }
3613
3614 EXPORT_SYMBOL(ll_iocontrol_register);
3615 EXPORT_SYMBOL(ll_iocontrol_unregister);
3616
3617 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
3618                         unsigned int cmd, unsigned long arg, int *rcp)
3619 {
3620         enum llioc_iter ret = LLIOC_CONT;
3621         struct llioc_data *data;
3622         int rc = -EINVAL, i;
3623
3624         down_read(&llioc.ioc_sem);
3625         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3626                 for (i = 0; i < data->iocd_count; i++) {
3627                         if (cmd != data->iocd_cmd[i])
3628                                 continue;
3629
3630                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3631                         break;
3632                 }
3633
3634                 if (ret == LLIOC_STOP)
3635                         break;
3636         }
3637         up_read(&llioc.ioc_sem);
3638
3639         if (rcp)
3640                 *rcp = rc;
3641         return ret;
3642 }