Whamcloud - gitweb
f9a306c6e97ba36e5eabbd4cb47f9f800aa97ca4
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
32 #include <linux/lustre_compat25.h>
33 #endif
34 #include "llite_internal.h"
35
36 /* also used by llite/special.c:ll_special_open() */
37 struct ll_file_data *ll_file_data_get(void)
38 {
39         struct ll_file_data *fd;
40
41         OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
42         return fd;
43 }
44
45 static void ll_file_data_put(struct ll_file_data *fd)
46 {
47         if (fd != NULL)
48                 OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof *fd);
49 }
50
51 static int ll_close_inode_openhandle(struct obd_export *md_exp,
52                                      struct inode *inode,
53                                      struct obd_client_handle *och)
54 {
55         struct md_op_data *op_data;
56         struct ptlrpc_request *req = NULL;
57         int rc;
58         ENTRY;
59
60         OBD_ALLOC_PTR(op_data);
61         if (op_data == NULL)
62                 RETURN(-ENOMEM);
63
64         op_data->fid1 = ll_i2info(inode)->lli_fid;
65         op_data->valid = OBD_MD_FLTYPE | OBD_MD_FLMODE |
66                          OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
67                          OBD_MD_FLATIME | OBD_MD_FLMTIME |
68                          OBD_MD_FLCTIME;
69
70         op_data->atime = LTIME_S(inode->i_atime);
71         op_data->mtime = LTIME_S(inode->i_mtime);
72         op_data->ctime = LTIME_S(inode->i_ctime);
73         op_data->size = inode->i_size;
74         op_data->blocks = inode->i_blocks;
75         op_data->flags = inode->i_flags;
76
77         if (0 /* ll_is_inode_dirty(inode) */) {
78                 op_data->flags = MDS_BFLAG_UNCOMMITTED_WRITES;
79                 op_data->valid |= OBD_MD_FLFLAGS;
80         }
81
82         rc = md_close(md_exp, op_data, och, &req);
83         OBD_FREE_PTR(op_data);
84         if (rc == EAGAIN) {
85                 /* We are the last writer, so the MDS has instructed us to get
86                  * the file size and any write cookies, then close again. */
87                 //ll_queue_done_writing(inode);
88                 rc = 0;
89         } else if (rc) {
90                 CERROR("inode %lu mdc close failed: rc = %d\n",
91                        inode->i_ino, rc);
92         }
93
94         if (rc == 0) {
95                 rc = ll_objects_destroy(req, inode);
96                 if (rc)
97                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
98                                inode->i_ino, rc);
99         }
100
101         md_clear_open_replay_data(md_exp, och);
102         ptlrpc_req_finished(req); /* This is close request */
103
104         RETURN(rc);
105 }
106
107 /* just for debugging by huanghua@clusterfs.com, will be removed later */
108 #include <lustre_lib.h>
109 struct mdc_open_data {
110         struct obd_client_handle *mod_och;
111         struct ptlrpc_request    *mod_open_req;
112         struct ptlrpc_request    *mod_close_req;
113 };
114 /* --end: just for debugging by huanghua@clusterfs.com*/
115
116 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
117                 struct file *file)
118 {
119         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
120         struct obd_client_handle *och = &fd->fd_mds_och;
121         int rc;
122         ENTRY;
123
124         /* clear group lock, if present */
125         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
126                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
127                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
128                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
129                                       &fd->fd_cwlockh);
130         }
131         CDEBUG(D_INFO, "closing ino = %lu file = %p has open req = %p, type = %x, "
132                        "transno = "LPU64", handle = "LPX64"\n",
133                        inode->i_ino, file,
134                        och->och_mod->mod_open_req,
135                        och->och_mod->mod_open_req->rq_type,
136                        och->och_mod->mod_open_req->rq_transno,
137                        och->och_fh.cookie);
138
139         rc = ll_close_inode_openhandle(md_exp, inode, och);
140         och->och_fh.cookie = DEAD_HANDLE_MAGIC;
141         LUSTRE_FPRIVATE(file) = NULL;
142         ll_file_data_put(fd);
143
144         RETURN(rc);
145 }
146
147 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
148
149 /* While this returns an error code, fput() the caller does not, so we need
150  * to make every effort to clean up all of our state here.  Also, applications
151  * rarely check close errors and even if an error is returned they will not
152  * re-try the close call.
153  */
154 int ll_file_release(struct inode *inode, struct file *file)
155 {
156         struct ll_file_data *fd;
157         struct ll_sb_info *sbi = ll_i2sbi(inode);
158         struct ll_inode_info *lli = ll_i2info(inode);
159         struct lov_stripe_md *lsm = lli->lli_smd;
160         int rc;
161
162         ENTRY;
163         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
164                inode->i_generation, inode);
165
166         lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_RELEASE);
167         fd = LUSTRE_FPRIVATE(file);
168         LASSERT(fd != NULL);
169
170         /* don't do anything for / */
171         if (inode->i_sb->s_root == file->f_dentry) {
172                 LUSTRE_FPRIVATE(file) = NULL;
173                 ll_file_data_put(fd);
174                 RETURN(0);
175         }
176
177         if (lsm)
178                 lov_test_and_clear_async_rc(lsm);
179         lli->lli_async_rc = 0;
180
181         rc = ll_md_close(sbi->ll_md_exp, inode, file);
182         RETURN(rc);
183 }
184
185 static int ll_intent_file_open(struct file *file, void *lmm,
186                                int lmmsize, struct lookup_intent *itp)
187 {
188         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
189         struct dentry *parent = file->f_dentry->d_parent;
190         const char *name = file->f_dentry->d_name.name;
191         const int len = file->f_dentry->d_name.len;
192         struct lustre_handle lockh;
193         struct md_op_data *op_data;
194         int rc;
195
196         if (!parent)
197                 RETURN(-ENOENT);
198
199         OBD_ALLOC_PTR(op_data);
200         if (op_data == NULL)
201                 RETURN(-ENOMEM);
202
203         ll_prepare_md_op_data(op_data, parent->d_inode, NULL,
204                               name, len, O_RDWR);
205
206         rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, itp, LCK_PW, op_data,
207                         &lockh, lmm, lmmsize, ldlm_completion_ast,
208                         ll_md_blocking_ast, NULL, 0);
209         OBD_FREE_PTR(op_data);
210         if (rc < 0) {
211                 CERROR("lock enqueue: err: %d\n", rc);
212                 RETURN(rc);
213         }
214
215         rc = ll_prep_inode(&file->f_dentry->d_inode,
216                            (struct ptlrpc_request *)itp->d.lustre.it_data,
217                            1, NULL);
218         RETURN(rc);
219 }
220
221 static void ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
222                         struct lookup_intent *it, struct obd_client_handle *och)
223 {
224         struct ptlrpc_request *req = it->d.lustre.it_data;
225         struct mdt_body *body;
226
227         LASSERT(och);
228
229         body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body));
230         LASSERT(body != NULL);                  /* reply already checked out */
231         LASSERT_REPSWABBED(req, 1);             /* and swabbed in md_enqueue */
232
233         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
234         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
235         lli->lli_io_epoch = body->io_epoch;
236
237         md_set_open_replay_data(md_exp, och, it->d.lustre.it_data);
238 }
239
240 int ll_local_open(struct file *file, struct lookup_intent *it,
241                   struct ll_file_data *fd)
242 {
243         struct inode *inode = file->f_dentry->d_inode;
244         ENTRY;
245
246         LASSERT(!LUSTRE_FPRIVATE(file));
247
248         LASSERT(fd != NULL);
249
250         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
251                     ll_i2info(inode), it, &fd->fd_mds_och);
252
253         LUSTRE_FPRIVATE(file) = fd;
254         ll_readahead_init(inode, &fd->fd_ras);
255
256         RETURN(0);
257 }
258
259 /* Open a file, and (for the very first open) create objects on the OSTs at
260  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
261  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
262  * lli_open_sem to ensure no other process will create objects, send the
263  * stripe MD to the MDS, or try to destroy the objects if that fails.
264  *
265  * If we already have the stripe MD locally then we don't request it in
266  * md_open(), by passing a lmm_size = 0.
267  *
268  * It is up to the application to ensure no other processes open this file
269  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
270  * used.  We might be able to avoid races of that sort by getting lli_open_sem
271  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
272  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
273  */
274 int ll_file_open(struct inode *inode, struct file *file)
275 {
276         struct ll_inode_info *lli = ll_i2info(inode);
277         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
278                                           .it_flags = file->f_flags };
279         struct lov_stripe_md *lsm;
280         struct ptlrpc_request *req;
281         struct ll_file_data *fd;
282         int rc = 0;
283         ENTRY;
284
285         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
286                inode->i_generation, inode, file->f_flags);
287
288         it = file->f_it;
289
290         fd = ll_file_data_get();
291         if (fd == NULL)
292                 RETURN(-ENOMEM);
293
294         /* don't do anything for / */
295         if (inode->i_sb->s_root == file->f_dentry) {
296                 LUSTRE_FPRIVATE(file) = fd;
297                 RETURN(0);
298         }
299
300         if (!it || !it->d.lustre.it_disposition) {
301                 struct ll_sb_info *sbi = ll_i2sbi(inode);
302
303                 /* Convert f_flags into access mode. We cannot use file->f_mode,
304                  * because everything but O_ACCMODE mask was stripped from
305                  * there */
306                 if ((oit.it_flags + 1) & O_ACCMODE)
307                         oit.it_flags++;
308                 if (oit.it_flags & O_TRUNC)
309                         oit.it_flags |= FMODE_WRITE;
310
311                 if (oit.it_flags & O_CREAT)
312                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
313
314                 /* We do not want O_EXCL here, presumably we opened the file
315                  * already? XXX - NFS implications? */
316                 oit.it_flags &= ~O_EXCL;
317
318                 it = &oit;
319                 rc = ll_intent_file_open(file, NULL, 0, it);
320                 if (rc) {
321                         ll_file_data_put(fd);
322                         GOTO(out, rc);
323                 }
324
325                 md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle,
326                                  file->f_dentry->d_inode);
327         }
328
329         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
330         rc = it_open_error(DISP_OPEN_OPEN, it);
331         /* md_intent_lock() didn't get a request ref if there was an open
332          * error, so don't do cleanup on the request here (bug 3430) */
333         if (rc) {
334                 ll_file_data_put(fd);
335                 RETURN(rc);
336         }
337
338         rc = ll_local_open(file, it, fd);
339         req = it->d.lustre.it_data;
340         LASSERTF(rc == 0, "rc = %d\n", rc);
341         CDEBUG(D_INFO, "opening ino = %lu file = %p has open req = %p, type = %x, "
342                        "transno = "LPU64", handle = "LPX64"\n",
343                        inode->i_ino, file, req, req->rq_type,
344                        req->rq_transno, fd->fd_mds_och.och_fh.cookie);
345
346         if (!S_ISREG(inode->i_mode))
347                 GOTO(out, rc);
348
349         lsm = lli->lli_smd;
350         if (lsm == NULL) {
351                 if (file->f_flags & O_LOV_DELAY_CREATE ||
352                     !(file->f_mode & FMODE_WRITE)) {
353                         CDEBUG(D_INODE, "object creation was delayed\n");
354                         GOTO(out, rc);
355                 }
356         }
357         file->f_flags &= ~O_LOV_DELAY_CREATE;
358         GOTO(out, rc);
359  out:
360         req = it->d.lustre.it_data;
361         ptlrpc_req_finished(req);
362         if (rc == 0)
363                 ll_open_complete(inode);
364         return rc;
365 }
366
367 /* Fills the obdo with the attributes for the inode defined by lsm */
368 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
369                    struct obdo *oa)
370 {
371         struct ptlrpc_request_set *set;
372         int rc;
373         ENTRY;
374
375         LASSERT(lsm != NULL);
376
377         memset(oa, 0, sizeof *oa);
378         oa->o_id = lsm->lsm_object_id;
379         oa->o_mode = S_IFREG;
380         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
381                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
382                 OBD_MD_FLCTIME;
383
384         set = ptlrpc_prep_set();
385         if (set == NULL) {
386                 rc = -ENOMEM;
387         } else {
388                 rc = obd_getattr_async(exp, oa, lsm, set);
389                 if (rc == 0)
390                         rc = ptlrpc_set_wait(set);
391                 ptlrpc_set_destroy(set);
392         }
393         if (rc)
394                 RETURN(rc);
395
396         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
397                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
398         RETURN(0);
399 }
400
401 static inline void ll_remove_suid(struct inode *inode)
402 {
403         unsigned int mode;
404
405         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
406         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
407
408         /* was any of the uid bits set? */
409         mode &= inode->i_mode;
410         if (mode && !capable(CAP_FSETID)) {
411                 inode->i_mode &= ~mode;
412                 // XXX careful here - we cannot change the size
413         }
414 }
415
416 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
417 {
418         struct ll_inode_info *lli = ll_i2info(inode);
419         struct lov_stripe_md *lsm = lli->lli_smd;
420         struct obd_export *exp = ll_i2dtexp(inode);
421         struct {
422                 char name[16];
423                 struct ldlm_lock *lock;
424                 struct lov_stripe_md *lsm;
425         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
426         __u32 stripe, vallen = sizeof(stripe);
427         int rc;
428         ENTRY;
429
430         if (lsm->lsm_stripe_count == 1)
431                 GOTO(check, stripe = 0);
432
433         /* get our offset in the lov */
434         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
435         if (rc != 0) {
436                 CERROR("obd_get_info: rc = %d\n", rc);
437                 RETURN(rc);
438         }
439         LASSERT(stripe < lsm->lsm_stripe_count);
440
441 check:
442         if (lsm->lsm_oinfo[stripe].loi_id != lock->l_resource->lr_name.name[0]||
443             lsm->lsm_oinfo[stripe].loi_gr != lock->l_resource->lr_name.name[1]){
444                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
445                            lsm->lsm_oinfo[stripe].loi_id,
446                            lsm->lsm_oinfo[stripe].loi_gr);
447                 RETURN(-ELDLM_NO_LOCK_DATA);
448         }
449
450         RETURN(stripe);
451 }
452
453 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
454  * we get a lock cancellation for each stripe, so we have to map the obd's
455  * region back onto the stripes in the file that it held.
456  *
457  * No one can dirty the extent until we've finished our work and they can
458  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
459  * but other kernel actors could have pages locked.
460  *
461  * Called with the DLM lock held. */
462 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
463                               struct ldlm_lock *lock, __u32 stripe)
464 {
465         ldlm_policy_data_t tmpex;
466         unsigned long start, end, count, skip, i, j;
467         struct page *page;
468         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
469         struct lustre_handle lockh;
470         ENTRY;
471
472         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
473         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
474                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
475                inode->i_size);
476
477         /* our locks are page granular thanks to osc_enqueue, we invalidate the
478          * whole page. */
479         LASSERT((tmpex.l_extent.start & ~PAGE_CACHE_MASK) == 0);
480         LASSERT(((tmpex.l_extent.end + 1) & ~PAGE_CACHE_MASK) == 0);
481
482         count = ~0;
483         skip = 0;
484         start = tmpex.l_extent.start >> PAGE_CACHE_SHIFT;
485         end = tmpex.l_extent.end >> PAGE_CACHE_SHIFT;
486         if (lsm->lsm_stripe_count > 1) {
487                 count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
488                 skip = (lsm->lsm_stripe_count - 1) * count;
489                 start += start/count * skip + stripe * count;
490                 if (end != ~0)
491                         end += end/count * skip + stripe * count;
492         }
493         if (end < tmpex.l_extent.end >> PAGE_CACHE_SHIFT)
494                 end = ~0;
495
496         i = inode->i_size ? (inode->i_size - 1) >> PAGE_CACHE_SHIFT : 0;
497         if (i < end)
498                 end = i;
499
500         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
501                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
502                count, skip, end, discard ? " (DISCARDING)" : "");
503
504         /* walk through the vmas on the inode and tear down mmaped pages that
505          * intersect with the lock.  this stops immediately if there are no
506          * mmap()ed regions of the file.  This is not efficient at all and
507          * should be short lived. We'll associate mmap()ed pages with the lock
508          * and will be able to find them directly */
509         for (i = start; i <= end; i += (j + skip)) {
510                 j = min(count - (i % count), end - i + 1);
511                 LASSERT(j > 0);
512                 LASSERT(inode->i_mapping);
513                 if (ll_teardown_mmaps(inode->i_mapping,
514                                       (__u64)i << PAGE_CACHE_SHIFT,
515                                       ((__u64)(i+j) << PAGE_CACHE_SHIFT) - 1) )
516                         break;
517         }
518
519         /* this is the simplistic implementation of page eviction at
520          * cancelation.  It is careful to get races with other page
521          * lockers handled correctly.  fixes from bug 20 will make it
522          * more efficient by associating locks with pages and with
523          * batching writeback under the lock explicitly. */
524         for (i = start, j = start % count; i <= end;
525              j++, i++, tmpex.l_extent.start += PAGE_CACHE_SIZE) {
526                 if (j == count) {
527                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
528                         i += skip;
529                         j = 0;
530                         if (i > end)
531                                 break;
532                 }
533                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
534                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
535                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
536                          start, i, end);
537
538                 if (!mapping_has_pages(inode->i_mapping)) {
539                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
540                         break;
541                 }
542
543                 cond_resched();
544
545                 page = find_get_page(inode->i_mapping, i);
546                 if (page == NULL)
547                         continue;
548                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
549                                i, tmpex.l_extent.start);
550                 lock_page(page);
551
552                 /* page->mapping to check with racing against teardown */
553                 if (!discard && clear_page_dirty_for_io(page)) {
554                         rc = ll_call_writepage(inode, page);
555                         if (rc != 0)
556                                 CERROR("writepage of page %p failed: %d\n",
557                                        page, rc);
558                         /* either waiting for io to complete or reacquiring
559                          * the lock that the failed writepage released */
560                         lock_page(page);
561                 }
562
563                 tmpex.l_extent.end = tmpex.l_extent.start + PAGE_CACHE_SIZE - 1;
564                 /* check to see if another DLM lock covers this page */
565                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
566                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
567                                       LDLM_FL_TEST_LOCK,
568                                       &lock->l_resource->lr_name, LDLM_EXTENT,
569                                       &tmpex, LCK_PR | LCK_PW, &lockh);
570                 if (rc2 == 0 && page->mapping != NULL) {
571                         struct ll_async_page *llap = llap_cast_private(page);
572                         // checking again to account for writeback's lock_page()
573                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
574                         if (llap)
575                                 ll_ra_accounting(llap, inode->i_mapping);
576                         ll_truncate_complete_page(page);
577                 }
578                 unlock_page(page);
579                 page_cache_release(page);
580         }
581         LASSERTF(tmpex.l_extent.start <=
582                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
583                   lock->l_policy_data.l_extent.end + 1),
584                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
585                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
586                  start, i, end);
587         EXIT;
588 }
589
590 static int ll_extent_lock_callback(struct ldlm_lock *lock,
591                                    struct ldlm_lock_desc *new, void *data,
592                                    int flag)
593 {
594         struct lustre_handle lockh = { 0 };
595         int rc;
596         ENTRY;
597
598         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
599                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
600                 LBUG();
601         }
602
603         switch (flag) {
604         case LDLM_CB_BLOCKING:
605                 ldlm_lock2handle(lock, &lockh);
606                 rc = ldlm_cli_cancel(&lockh);
607                 if (rc != ELDLM_OK)
608                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
609                 break;
610         case LDLM_CB_CANCELING: {
611                 struct inode *inode;
612                 struct ll_inode_info *lli;
613                 struct lov_stripe_md *lsm;
614                 int stripe;
615                 __u64 kms;
616
617                 /* This lock wasn't granted, don't try to evict pages */
618                 if (lock->l_req_mode != lock->l_granted_mode)
619                         RETURN(0);
620
621                 inode = ll_inode_from_lock(lock);
622                 if (inode == NULL)
623                         RETURN(0);
624                 lli = ll_i2info(inode);
625                 if (lli == NULL)
626                         goto iput;
627                 if (lli->lli_smd == NULL)
628                         goto iput;
629                 lsm = lli->lli_smd;
630
631                 stripe = ll_lock_to_stripe_offset(inode, lock);
632                 if (stripe < 0)
633                         goto iput;
634
635                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
636
637                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
638                 lov_stripe_lock(lsm);
639                 kms = ldlm_extent_shift_kms(lock,
640                                             lsm->lsm_oinfo[stripe].loi_kms);
641
642                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
643                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
644                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
645                 lsm->lsm_oinfo[stripe].loi_kms = kms;
646                 lov_stripe_unlock(lsm);
647                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
648                 //ll_try_done_writing(inode);
649         iput:
650                 iput(inode);
651                 break;
652         }
653         default:
654                 LBUG();
655         }
656
657         RETURN(0);
658 }
659
660 #if 0
661 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
662 {
663         /* XXX ALLOCATE - 160 bytes */
664         struct inode *inode = ll_inode_from_lock(lock);
665         struct ll_inode_info *lli = ll_i2info(inode);
666         struct lustre_handle lockh = { 0 };
667         struct ost_lvb *lvb;
668         int stripe;
669         ENTRY;
670
671         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
672                      LDLM_FL_BLOCK_CONV)) {
673                 LBUG(); /* not expecting any blocked async locks yet */
674                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
675                            "lock, returning");
676                 ldlm_lock_dump(D_OTHER, lock, 0);
677                 ldlm_reprocess_all(lock->l_resource);
678                 RETURN(0);
679         }
680
681         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
682
683         stripe = ll_lock_to_stripe_offset(inode, lock);
684         if (stripe < 0)
685                 goto iput;
686
687         if (lock->l_lvb_len) {
688                 struct lov_stripe_md *lsm = lli->lli_smd;
689                 __u64 kms;
690                 lvb = lock->l_lvb_data;
691                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
692
693                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
694                 LOCK_INODE_MUTEX(inode);
695                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
696                 kms = ldlm_extent_shift_kms(NULL, kms);
697                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
698                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
699                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
700                 lsm->lsm_oinfo[stripe].loi_kms = kms;
701                 UNLOCK_INODE_MUTEX(inode);
702                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
703         }
704
705 iput:
706         iput(inode);
707         wake_up(&lock->l_waitq);
708
709         ldlm_lock2handle(lock, &lockh);
710         ldlm_lock_decref(&lockh, LCK_PR);
711         RETURN(0);
712 }
713 #endif
714
715 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
716 {
717         struct ptlrpc_request *req = reqp;
718         struct inode *inode = ll_inode_from_lock(lock);
719         struct ll_inode_info *lli;
720         struct lov_stripe_md *lsm;
721         struct ost_lvb *lvb;
722         int rc, size = sizeof(*lvb), stripe;
723         ENTRY;
724
725         if (inode == NULL)
726                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
727         lli = ll_i2info(inode);
728         if (lli == NULL)
729                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
730         lsm = lli->lli_smd;
731         if (lsm == NULL)
732                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
733
734         /* First, find out which stripe index this lock corresponds to. */
735         stripe = ll_lock_to_stripe_offset(inode, lock);
736         if (stripe < 0)
737                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
738
739         rc = lustre_pack_reply(req, 1, &size, NULL);
740         if (rc) {
741                 CERROR("lustre_pack_reply: %d\n", rc);
742                 GOTO(iput, rc);
743         }
744
745         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
746         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
747         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
748         lvb->lvb_atime = LTIME_S(inode->i_atime);
749         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
750
751         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
752                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
753                    inode->i_size, stripe, lvb->lvb_size, lvb->lvb_mtime,
754                    lvb->lvb_atime, lvb->lvb_ctime);
755         GOTO(iput, 0);
756  iput:
757         iput(inode);
758
759  out:
760         /* These errors are normal races, so we don't want to fill the console
761          * with messages by calling ptlrpc_error() */
762         if (rc == -ELDLM_NO_LOCK_DATA)
763                 lustre_pack_reply(req, 0, NULL, NULL);
764
765         req->rq_status = rc;
766         return rc;
767 }
768
769 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
770  * file (because it prefers KMS over RSS when larger) */
771 int ll_glimpse_size(struct inode *inode, int ast_flags)
772 {
773         struct ll_inode_info *lli = ll_i2info(inode);
774         struct ll_sb_info *sbi = ll_i2sbi(inode);
775         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
776         struct lustre_handle lockh = { 0 };
777         struct ost_lvb lvb;
778         int rc;
779         ENTRY;
780
781         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
782
783         ast_flags |= LDLM_FL_HAS_INTENT;
784
785         /* NOTE: this looks like DLM lock request, but it may not be one. Due
786          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
787          *       won't revoke any conflicting DLM locks held. Instead,
788          *       ll_glimpse_callback() will be called on each client
789          *       holding a DLM lock against this file, and resulting size
790          *       will be returned for each stripe. DLM lock on [0, EOF] is
791          *       acquired only if there were no conflicting locks. */
792         rc = obd_enqueue(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT, &policy,
793                          LCK_PR, &ast_flags, ll_extent_lock_callback,
794                          ldlm_completion_ast, ll_glimpse_callback, inode,
795                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, &lockh);
796         if (rc == -ENOENT)
797                 RETURN(rc);
798         if (rc != 0) {
799                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
800                 RETURN(rc > 0 ? -EIO : rc);
801         }
802
803         ll_inode_size_lock(inode, 1);
804         inode_init_lvb(inode, &lvb);
805         obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
806         inode->i_size = lvb.lvb_size;
807         inode->i_blocks = lvb.lvb_blocks;
808         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
809         LTIME_S(inode->i_atime) = lvb.lvb_atime;
810         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
811         ll_inode_size_unlock(inode, 1);
812
813         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
814                inode->i_size, inode->i_blocks);
815
816         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
817
818         RETURN(rc);
819 }
820
821 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
822                    struct lov_stripe_md *lsm, int mode,
823                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
824                    int ast_flags)
825 {
826         struct ll_sb_info *sbi = ll_i2sbi(inode);
827         struct ost_lvb lvb;
828         int rc;
829         ENTRY;
830
831         LASSERT(!lustre_handle_is_used(lockh));
832         LASSERT(lsm != NULL);
833
834         /* don't drop the mmapped file to LRU */
835         if (mapping_mapped(inode->i_mapping))
836                 ast_flags |= LDLM_FL_NO_LRU;
837
838         /* XXX phil: can we do this?  won't it screw the file size up? */
839         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
840             (sbi->ll_flags & LL_SBI_NOLCK))
841                 RETURN(0);
842
843         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
844                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
845
846         rc = obd_enqueue(sbi->ll_dt_exp, lsm, LDLM_EXTENT, policy, mode,
847                          &ast_flags, ll_extent_lock_callback,
848                          ldlm_completion_ast, ll_glimpse_callback, inode,
849                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
850         if (rc > 0)
851                 rc = -EIO;
852
853         ll_inode_size_lock(inode, 1);
854         inode_init_lvb(inode, &lvb);
855         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
856
857         if (policy->l_extent.start == 0 &&
858             policy->l_extent.end == OBD_OBJECT_EOF) {
859                 /* vmtruncate()->ll_truncate() first sets the i_size and then
860                  * the kms under both a DLM lock and the
861                  * ll_inode_size_lock().  If we don't get the
862                  * ll_inode_size_lock() here we can match the DLM lock and
863                  * reset i_size from the kms before the truncating path has
864                  * updated the kms.  generic_file_write can then trust the
865                  * stale i_size when doing appending writes and effectively
866                  * cancel the result of the truncate.  Getting the
867                  * ll_inode_size_lock() after the enqueue maintains the DLM
868                  * -> ll_inode_size_lock() acquiring order. */
869                 inode->i_size = lvb.lvb_size;
870         }
871
872         if (rc == 0) {
873                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
874                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
875                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
876         }
877         ll_inode_size_unlock(inode, 1);
878
879         RETURN(rc);
880 }
881
882 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
883                      struct lov_stripe_md *lsm, int mode,
884                      struct lustre_handle *lockh)
885 {
886         struct ll_sb_info *sbi = ll_i2sbi(inode);
887         int rc;
888         ENTRY;
889
890         /* XXX phil: can we do this?  won't it screw the file size up? */
891         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
892             (sbi->ll_flags & LL_SBI_NOLCK))
893                 RETURN(0);
894
895         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
896
897         RETURN(rc);
898 }
899
900 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
901                             loff_t *ppos)
902 {
903         struct inode *inode = file->f_dentry->d_inode;
904         struct ll_inode_info *lli = ll_i2info(inode);
905         struct lov_stripe_md *lsm = lli->lli_smd;
906         struct ll_lock_tree tree;
907         struct ll_lock_tree_node *node;
908         struct ost_lvb lvb;
909         struct ll_ra_read bead;
910         int rc;
911         ssize_t retval;
912         __u64 kms;
913         ENTRY;
914         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
915                inode->i_ino, inode->i_generation, inode, count, *ppos);
916
917         /* "If nbyte is 0, read() will return 0 and have no other results."
918          *                      -- Single Unix Spec */
919         if (count == 0)
920                 RETURN(0);
921
922         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
923                             count);
924
925         if (!lsm) {
926                 /* Read on file with no objects should return zero-filled
927                  * buffers up to file size (we can get non-zero sizes with
928                  * mknod + truncate, then opening file for read. This is a
929                  * common pattern in NFS case, it seems). Bug 6243 */
930                 int notzeroed;
931                 /* Since there are no objects on OSTs, we have nothing to get
932                  * lock on and so we are forced to access inode->i_size
933                  * unguarded */
934
935                 /* Read beyond end of file */
936                 if (*ppos >= inode->i_size)
937                         RETURN(0);
938
939                 if (count > inode->i_size - *ppos)
940                         count = inode->i_size - *ppos;
941                 /* Make sure to correctly adjust the file pos pointer for
942                  * EFAULT case */
943                 notzeroed = clear_user(buf, count);
944                 count -= notzeroed;
945                 *ppos += count;
946                 if (!count)
947                         RETURN(-EFAULT);
948                 RETURN(count);
949         }
950
951         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
952         tree.lt_fd = LUSTRE_FPRIVATE(file);
953         rc = ll_tree_lock(&tree, node, buf, count,
954                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
955         if (rc != 0)
956                 RETURN(rc);
957
958         ll_inode_size_lock(inode, 1);
959         /*
960          * Consistency guarantees: following possibilities exist for the
961          * relation between region being read and real file size at this
962          * moment:
963          *
964          *  (A): the region is completely inside of the file;
965          *
966          *  (B-x): x bytes of region are inside of the file, the rest is
967          *  outside;
968          *
969          *  (C): the region is completely outside of the file.
970          *
971          * This classification is stable under DLM lock acquired by
972          * ll_tree_lock() above, because to change class, other client has to
973          * take DLM lock conflicting with our lock. Also, any updates to
974          * ->i_size by other threads on this client are serialized by
975          * ll_inode_size_lock(). This guarantees that short reads are handled
976          * correctly in the face of concurrent writes and truncates.
977          */
978         inode_init_lvb(inode, &lvb);
979         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
980         kms = lvb.lvb_size;
981         if (*ppos + count - 1 > kms) {
982                 /* A glimpse is necessary to determine whether we return a
983                  * short read (B) or some zeroes at the end of the buffer (C) */
984                 ll_inode_size_unlock(inode, 1);
985                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
986                 if (retval)
987                         goto out;
988         } else {
989                 /* region is within kms and, hence, within real file size (A) */
990                 inode->i_size = kms;
991                 ll_inode_size_unlock(inode, 1);
992         }
993
994         CDEBUG(D_INFO, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
995                inode->i_ino, count, *ppos, inode->i_size);
996
997         /* turn off the kernel's read-ahead */
998 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
999         file->f_ramax = 0;
1000 #else
1001         file->f_ra.ra_pages = 0;
1002 #endif
1003         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1004         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1005         ll_ra_read_in(file, &bead);
1006         /* BUG: 5972 */
1007         file_accessed(file);
1008         retval = generic_file_read(file, buf, count, ppos);
1009         ll_ra_read_ex(file, &bead);
1010
1011  out:
1012         ll_tree_unlock(&tree);
1013         RETURN(retval);
1014 }
1015
1016 /*
1017  * Write to a file (through the page cache).
1018  */
1019 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1020                              loff_t *ppos)
1021 {
1022         struct inode *inode = file->f_dentry->d_inode;
1023         struct ll_lock_tree tree;
1024         struct ll_lock_tree_node *node;
1025         loff_t maxbytes = ll_file_maxbytes(inode);
1026         ssize_t retval;
1027         int rc;
1028         ENTRY;
1029         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1030                inode->i_ino, inode->i_generation, inode, count, *ppos);
1031
1032         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1033
1034         /* POSIX, but surprised the VFS doesn't check this already */
1035         if (count == 0)
1036                 RETURN(0);
1037
1038         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1039          * called on the file, don't fail the below assertion (bug 2388). */
1040         if (file->f_flags & O_LOV_DELAY_CREATE &&
1041             ll_i2info(inode)->lli_smd == NULL)
1042                 RETURN(-EBADF);
1043
1044         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1045
1046         if (file->f_flags & O_APPEND)
1047                 node = ll_node_from_inode(inode, 0, OBD_OBJECT_EOF, LCK_PW);
1048         else
1049                 node = ll_node_from_inode(inode, *ppos, *ppos  + count - 1,
1050                                           LCK_PW);
1051
1052         if (IS_ERR(node))
1053                 RETURN(PTR_ERR(node));
1054
1055         tree.lt_fd = LUSTRE_FPRIVATE(file);
1056         rc = ll_tree_lock(&tree, node, buf, count,
1057                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1058         if (rc != 0)
1059                 RETURN(rc);
1060
1061         /* this is ok, g_f_w will overwrite this under i_mutex if it races
1062          * with a local truncate, it just makes our maxbyte checking easier */
1063         if (file->f_flags & O_APPEND)
1064                 *ppos = inode->i_size;
1065
1066         if (*ppos >= maxbytes) {
1067                 if (count || *ppos > maxbytes) {
1068                         send_sig(SIGXFSZ, current, 0);
1069                         GOTO(out, retval = -EFBIG);
1070                 }
1071         }
1072         if (*ppos + count > maxbytes)
1073                 count = maxbytes - *ppos;
1074
1075         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1076                inode->i_ino, count, *ppos);
1077
1078         /* generic_file_write handles O_APPEND after getting i_mutex */
1079         retval = generic_file_write(file, buf, count, ppos);
1080
1081 out:
1082         ll_tree_unlock(&tree);
1083         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WRITE_BYTES,
1084                             retval > 0 ? retval : 0);
1085         RETURN(retval);
1086 }
1087
1088 /*
1089  * Send file content (through pagecache) somewhere with helper
1090  */
1091 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1092 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1093                                 read_actor_t actor, void *target)
1094 {
1095         struct inode *inode = in_file->f_dentry->d_inode;
1096         struct ll_inode_info *lli = ll_i2info(inode);
1097         struct lov_stripe_md *lsm = lli->lli_smd;
1098         struct ll_lock_tree tree;
1099         struct ll_lock_tree_node *node;
1100         struct ost_lvb lvb;
1101         struct ll_ra_read bead;
1102         int rc;
1103         ssize_t retval;
1104         __u64 kms;
1105         ENTRY;
1106         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1107                inode->i_ino, inode->i_generation, inode, count, *ppos);
1108
1109         /* "If nbyte is 0, read() will return 0 and have no other results."
1110          *                      -- Single Unix Spec */
1111         if (count == 0)
1112                 RETURN(0);
1113
1114         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
1115                             count);
1116
1117         /* File with no objects, nothing to lock */
1118         if (!lsm)
1119                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1120
1121         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1122         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1123         rc = ll_tree_lock(&tree, node, NULL, count,
1124                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1125         if (rc != 0)
1126                 RETURN(rc);
1127
1128         ll_inode_size_lock(inode, 1);
1129         /*
1130          * Consistency guarantees: following possibilities exist for the
1131          * relation between region being read and real file size at this
1132          * moment:
1133          *
1134          *  (A): the region is completely inside of the file;
1135          *
1136          *  (B-x): x bytes of region are inside of the file, the rest is
1137          *  outside;
1138          *
1139          *  (C): the region is completely outside of the file.
1140          *
1141          * This classification is stable under DLM lock acquired by
1142          * ll_tree_lock() above, because to change class, other client has to
1143          * take DLM lock conflicting with our lock. Also, any updates to
1144          * ->i_size by other threads on this client are serialized by
1145          * ll_inode_size_lock(). This guarantees that short reads are handled
1146          * correctly in the face of concurrent writes and truncates.
1147          */
1148         inode_init_lvb(inode, &lvb);
1149         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1150         kms = lvb.lvb_size;
1151         if (*ppos + count - 1 > kms) {
1152                 /* A glimpse is necessary to determine whether we return a
1153                  * short read (B) or some zeroes at the end of the buffer (C) */
1154                 ll_inode_size_unlock(inode, 1);
1155                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1156                 if (retval)
1157                         goto out;
1158         } else {
1159                 /* region is within kms and, hence, within real file size (A) */
1160                 inode->i_size = kms;
1161                 ll_inode_size_unlock(inode, 1);
1162         }
1163
1164         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1165                inode->i_ino, count, *ppos, inode->i_size);
1166
1167         /* turn off the kernel's read-ahead */
1168         in_file->f_ra.ra_pages = 0;
1169
1170         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1171         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1172         ll_ra_read_in(in_file, &bead);
1173         /* BUG: 5972 */
1174         file_accessed(in_file);
1175         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1176         ll_ra_read_ex(in_file, &bead);
1177
1178  out:
1179         ll_tree_unlock(&tree);
1180         RETURN(retval);
1181 }
1182 #endif
1183
1184 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1185                                unsigned long arg)
1186 {
1187         struct ll_inode_info *lli = ll_i2info(inode);
1188         struct obd_export *exp = ll_i2dtexp(inode);
1189         struct ll_recreate_obj ucreatp;
1190         struct obd_trans_info oti = { 0 };
1191         struct obdo *oa = NULL;
1192         int lsm_size;
1193         int rc = 0;
1194         struct lov_stripe_md *lsm, *lsm2;
1195         ENTRY;
1196
1197         if (!capable (CAP_SYS_ADMIN))
1198                 RETURN(-EPERM);
1199
1200         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1201                             sizeof(struct ll_recreate_obj));
1202         if (rc) {
1203                 RETURN(-EFAULT);
1204         }
1205         oa = obdo_alloc();
1206         if (oa == NULL)
1207                 RETURN(-ENOMEM);
1208
1209         down(&lli->lli_open_sem);
1210         lsm = lli->lli_smd;
1211         if (lsm == NULL)
1212                 GOTO(out, rc = -ENOENT);
1213         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1214                    (lsm->lsm_stripe_count));
1215
1216         OBD_ALLOC(lsm2, lsm_size);
1217         if (lsm2 == NULL)
1218                 GOTO(out, rc = -ENOMEM);
1219
1220         oa->o_id = ucreatp.lrc_id;
1221         oa->o_nlink = ucreatp.lrc_ost_idx;
1222         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1223         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
1224         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1225                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1226
1227         oti.oti_objid = NULL;
1228         memcpy(lsm2, lsm, lsm_size);
1229         rc = obd_create(exp, oa, &lsm2, &oti);
1230
1231         OBD_FREE(lsm2, lsm_size);
1232         GOTO(out, rc);
1233 out:
1234         up(&lli->lli_open_sem);
1235         obdo_free(oa);
1236         return rc;
1237 }
1238
1239 static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1240                                     int flags, struct lov_user_md *lum,
1241                                     int lum_size)
1242 {
1243         struct ll_inode_info *lli = ll_i2info(inode);
1244         struct file *f = NULL;
1245         struct obd_export *dt_exp = ll_i2dtexp(inode);
1246         struct obd_export *md_exp = ll_i2mdexp(inode);
1247         struct lov_stripe_md *lsm;
1248         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1249         struct ptlrpc_request *req = NULL;
1250         struct ll_file_data *fd;
1251         int rc = 0;
1252         struct lustre_md md;
1253         ENTRY;
1254
1255         down(&lli->lli_open_sem);
1256         lsm = lli->lli_smd;
1257         if (lsm) {
1258                 up(&lli->lli_open_sem);
1259                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1260                        inode->i_ino);
1261                 RETURN(-EEXIST);
1262         }
1263
1264         fd = ll_file_data_get();
1265         if (fd == NULL)
1266                 GOTO(out, -ENOMEM);
1267
1268         f = get_empty_filp();
1269         if (!f)
1270                 GOTO(out, -ENOMEM);
1271
1272         f->f_dentry = dget(file->f_dentry);
1273         f->f_vfsmnt = mntget(file->f_vfsmnt);
1274
1275         rc = ll_intent_file_open(f, lum, lum_size, &oit);
1276         if (rc)
1277                 GOTO(out, rc);
1278         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1279                 GOTO(out, -ENOENT);
1280         req = oit.d.lustre.it_data;
1281         rc = oit.d.lustre.it_status;
1282
1283         if (rc < 0)
1284                 GOTO(out, rc);
1285
1286         rc = md_get_lustre_md(md_exp, req, 1, dt_exp, &md);
1287         if (rc)
1288                 GOTO(out, rc);
1289         ll_update_inode(f->f_dentry->d_inode, &md);
1290
1291         rc = ll_local_open(f, &oit, fd);
1292         if (rc)
1293                 GOTO(out, rc);
1294         fd = NULL;
1295         ll_intent_release(&oit);
1296
1297         rc = ll_file_release(f->f_dentry->d_inode, f);
1298
1299  out:
1300         if (f)
1301                 fput(f);
1302         ll_file_data_put(fd);
1303         up(&lli->lli_open_sem);
1304         if (req != NULL)
1305                 ptlrpc_req_finished(req);
1306         RETURN(rc);
1307 }
1308
1309 static int ll_lov_setea(struct inode *inode, struct file *file,
1310                             unsigned long arg)
1311 {
1312         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1313         struct lov_user_md  *lump;
1314         int lum_size = sizeof(struct lov_user_md) +
1315                        sizeof(struct lov_user_ost_data);
1316         int rc;
1317         ENTRY;
1318
1319         if (!capable (CAP_SYS_ADMIN))
1320                 RETURN(-EPERM);
1321
1322         OBD_ALLOC(lump, lum_size);
1323         if (lump == NULL) {
1324                 RETURN(-ENOMEM);
1325         }
1326         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1327         if (rc) {
1328                 OBD_FREE(lump, lum_size);
1329                 RETURN(-EFAULT);
1330         }
1331
1332         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1333
1334         OBD_FREE(lump, lum_size);
1335         RETURN(rc);
1336 }
1337
1338 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1339                             unsigned long arg)
1340 {
1341         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1342         int rc;
1343         int flags = FMODE_WRITE;
1344         ENTRY;
1345
1346         /* Bug 1152: copy properly when this is no longer true */
1347         LASSERT(sizeof(lum) == sizeof(*lump));
1348         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1349         rc = copy_from_user(&lum, lump, sizeof(lum));
1350         if (rc)
1351                 RETURN(-EFAULT);
1352
1353         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1354         if (rc == 0) {
1355                  put_user(0, &lump->lmm_stripe_count);
1356                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1357                                     0, ll_i2info(inode)->lli_smd, lump);
1358         }
1359         RETURN(rc);
1360 }
1361
1362 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1363 {
1364         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1365
1366         if (!lsm)
1367                 RETURN(-ENODATA);
1368
1369         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1370                             (void *)arg);
1371 }
1372
1373 static int ll_get_grouplock(struct inode *inode, struct file *file,
1374                             unsigned long arg)
1375 {
1376         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1377         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1378                                                     .end = OBD_OBJECT_EOF}};
1379         struct lustre_handle lockh = { 0 };
1380         struct ll_inode_info *lli = ll_i2info(inode);
1381         struct lov_stripe_md *lsm = lli->lli_smd;
1382         int flags = 0, rc;
1383         ENTRY;
1384
1385         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1386                 RETURN(-EINVAL);
1387         }
1388
1389         policy.l_extent.gid = arg;
1390         if (file->f_flags & O_NONBLOCK)
1391                 flags = LDLM_FL_BLOCK_NOWAIT;
1392
1393         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1394         if (rc)
1395                 RETURN(rc);
1396
1397         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1398         fd->fd_gid = arg;
1399         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1400
1401         RETURN(0);
1402 }
1403
1404 static int ll_put_grouplock(struct inode *inode, struct file *file,
1405                             unsigned long arg)
1406 {
1407         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1408         struct ll_inode_info *lli = ll_i2info(inode);
1409         struct lov_stripe_md *lsm = lli->lli_smd;
1410         int rc;
1411         ENTRY;
1412
1413         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1414                 /* Ugh, it's already unlocked. */
1415                 RETURN(-EINVAL);
1416         }
1417
1418         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1419                 RETURN(-EINVAL);
1420
1421         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1422
1423         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1424         if (rc)
1425                 RETURN(rc);
1426
1427         fd->fd_gid = 0;
1428         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1429
1430         RETURN(0);
1431 }
1432
1433 static int join_sanity_check(struct inode *head, struct inode *tail)
1434 {
1435         ENTRY;
1436         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
1437                 CERROR("server do not support join \n");
1438                 RETURN(-EINVAL);
1439         }
1440         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
1441                 CERROR("tail ino %lu and ino head %lu must be regular\n",
1442                        head->i_ino, tail->i_ino);
1443                 RETURN(-EINVAL);
1444         }
1445         if (head->i_ino == tail->i_ino) {
1446                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
1447                 RETURN(-EINVAL);
1448         }
1449         if (head->i_size % JOIN_FILE_ALIGN) {
1450                 CERROR("hsize" LPU64 " must be times of 64K\n",
1451                         head->i_size);
1452                 RETURN(-EINVAL);
1453         }
1454         RETURN(0);
1455 }
1456
1457 static int join_file(struct inode *head_inode, struct file *head_filp,
1458                      struct file *tail_filp)
1459 {
1460         struct inode *tail_inode, *tail_parent;
1461         struct dentry *tail_dentry = tail_filp->f_dentry;
1462         struct lookup_intent oit = {.it_op = IT_OPEN,
1463                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
1464         struct ptlrpc_request *req = NULL;
1465         struct ll_file_data *fd;
1466         struct lustre_handle lockh;
1467         struct md_op_data *op_data;
1468         __u32  hsize = head_inode->i_size >> 32;
1469         __u32  tsize = head_inode->i_size;
1470         struct file *f;
1471         int    rc;
1472         ENTRY;
1473
1474         tail_dentry = tail_filp->f_dentry;
1475         tail_inode = tail_dentry->d_inode;
1476         tail_parent = tail_dentry->d_parent->d_inode;
1477
1478         fd = ll_file_data_get();
1479         if (fd == NULL)
1480                 RETURN(-ENOMEM);
1481
1482         OBD_ALLOC_PTR(op_data);
1483         if (op_data == NULL) {
1484                 ll_file_data_put(fd);
1485                 RETURN(-ENOMEM);
1486         }
1487
1488         f = get_empty_filp();
1489         if (f == NULL)
1490                 GOTO(out, rc = -ENOMEM);
1491
1492         f->f_dentry = dget(head_filp->f_dentry);
1493         f->f_vfsmnt = mntget(head_filp->f_vfsmnt);
1494
1495         ll_prepare_md_op_data(op_data, head_inode, tail_parent,
1496                               tail_dentry->d_name.name,
1497                               tail_dentry->d_name.len, 0);
1498
1499         rc = md_enqueue(ll_i2mdexp(head_inode), LDLM_IBITS, &oit, LCK_PW,
1500                         op_data, &lockh, &tsize, 0, ldlm_completion_ast,
1501                         ll_md_blocking_ast, &hsize, 0);
1502
1503         if (rc < 0)
1504                 GOTO(out, rc);
1505
1506         req = oit.d.lustre.it_data;
1507         rc = oit.d.lustre.it_status;
1508
1509         if (rc < 0)
1510                 GOTO(out, rc);
1511
1512         rc = ll_local_open(f, &oit, fd);
1513         LASSERTF(rc == 0, "rc = %d\n", rc);
1514
1515         fd = NULL;
1516         ll_intent_release(&oit);
1517
1518         rc = ll_file_release(f->f_dentry->d_inode, f);
1519 out:
1520         if (op_data)
1521                 OBD_FREE_PTR(op_data);
1522         if (f)
1523                 fput(f);
1524         ll_file_data_put(fd);
1525         ptlrpc_req_finished(req);
1526         RETURN(rc);
1527 }
1528
1529 static int ll_file_join(struct inode *head, struct file *filp,
1530                         char *filename_tail)
1531 {
1532         struct inode *tail = NULL, *first = NULL, *second = NULL;
1533         struct dentry *tail_dentry;
1534         struct file *tail_filp, *first_filp, *second_filp;
1535         struct ll_lock_tree first_tree, second_tree;
1536         struct ll_lock_tree_node *first_node, *second_node;
1537         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
1538         int rc = 0, cleanup_phase = 0;
1539         ENTRY;
1540
1541         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
1542                head->i_ino, head->i_generation, head, filename_tail);
1543
1544         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
1545         if (IS_ERR(tail_filp)) {
1546                 CERROR("Can not open tail file %s", filename_tail);
1547                 rc = PTR_ERR(tail_filp);
1548                 GOTO(cleanup, rc);
1549         }
1550         tail = igrab(tail_filp->f_dentry->d_inode);
1551
1552         tlli = ll_i2info(tail);
1553         tail_dentry = tail_filp->f_dentry;
1554         LASSERT(tail_dentry);
1555         cleanup_phase = 1;
1556
1557         /*reorder the inode for lock sequence*/
1558         first = head->i_ino > tail->i_ino ? head : tail;
1559         second = head->i_ino > tail->i_ino ? tail : head;
1560         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
1561         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
1562
1563         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
1564                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
1565         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
1566         if (IS_ERR(first_node)){
1567                 rc = PTR_ERR(first_node);
1568                 GOTO(cleanup, rc);
1569         }
1570         first_tree.lt_fd = first_filp->private_data;
1571         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
1572         if (rc != 0)
1573                 GOTO(cleanup, rc);
1574         cleanup_phase = 2;
1575
1576         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
1577         if (IS_ERR(second_node)){
1578                 rc = PTR_ERR(second_node);
1579                 GOTO(cleanup, rc);
1580         }
1581         second_tree.lt_fd = second_filp->private_data;
1582         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
1583         if (rc != 0)
1584                 GOTO(cleanup, rc);
1585         cleanup_phase = 3;
1586
1587         rc = join_sanity_check(head, tail);
1588         if (rc)
1589                 GOTO(cleanup, rc);
1590
1591         rc = join_file(head, filp, tail_filp);
1592         if (rc)
1593                 GOTO(cleanup, rc);
1594 cleanup:
1595         switch (cleanup_phase) {
1596         case 3:
1597                 ll_tree_unlock(&second_tree);
1598                 obd_cancel_unused(ll_i2dtexp(second),
1599                                   ll_i2info(second)->lli_smd, 0, NULL);
1600         case 2:
1601                 ll_tree_unlock(&first_tree);
1602                 obd_cancel_unused(ll_i2dtexp(first),
1603                                   ll_i2info(first)->lli_smd, 0, NULL);
1604         case 1:
1605                 filp_close(tail_filp, 0);
1606                 if (tail)
1607                         iput(tail);
1608                 if (head && rc == 0) {
1609                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
1610                                        &hlli->lli_smd);
1611                         hlli->lli_smd = NULL;
1612                 }
1613         case 0:
1614                 break;
1615         default:
1616                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1617                 LBUG();
1618         }
1619         RETURN(rc);
1620 }
1621
1622 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1623 {
1624         struct inode *inode = dentry->d_inode;
1625         struct obd_client_handle *och;
1626         int rc;
1627         ENTRY;
1628
1629         LASSERT(inode);
1630
1631         /* Root ? Do nothing. */
1632         if (dentry->d_inode->i_sb->s_root == dentry)
1633                 RETURN(0);
1634
1635         /* No open handle to close? Move away */
1636         if (!it_disposition(it, DISP_OPEN_OPEN))
1637                 RETURN(0);
1638
1639         OBD_ALLOC(och, sizeof(*och));
1640         if (!och)
1641                 GOTO(out, rc = -ENOMEM);
1642
1643         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
1644                     ll_i2info(inode), it, och);
1645
1646         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1647                                        inode, och);
1648
1649         OBD_FREE(och, sizeof(*och));
1650  out:
1651         /* this one is in place of ll_file_open */
1652         ptlrpc_req_finished(it->d.lustre.it_data);
1653         RETURN(rc);
1654 }
1655
1656 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
1657                   unsigned long arg)
1658 {
1659         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1660         int flags;
1661         ENTRY;
1662
1663         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1664                inode->i_generation, inode, cmd);
1665
1666         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
1667         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
1668                 RETURN(-ENOTTY);
1669
1670         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_IOCTL);
1671         switch(cmd) {
1672         case LL_IOC_GETFLAGS:
1673                 /* Get the current value of the file flags */
1674                 return put_user(fd->fd_flags, (int *)arg);
1675         case LL_IOC_SETFLAGS:
1676         case LL_IOC_CLRFLAGS:
1677                 /* Set or clear specific file flags */
1678                 /* XXX This probably needs checks to ensure the flags are
1679                  *     not abused, and to handle any flag side effects.
1680                  */
1681                 if (get_user(flags, (int *) arg))
1682                         RETURN(-EFAULT);
1683
1684                 if (cmd == LL_IOC_SETFLAGS) {
1685                         if ((flags & LL_FILE_IGNORE_LOCK) &&
1686                             !(file->f_flags & O_DIRECT)) {
1687                                 CERROR("%s: unable to disable locking on "
1688                                        "non-O_DIRECT file\n", current->comm);
1689                                 RETURN(-EINVAL);
1690                         }
1691
1692                         fd->fd_flags |= flags;
1693                 } else {
1694                         fd->fd_flags &= ~flags;
1695                 }
1696                 RETURN(0);
1697         case LL_IOC_LOV_SETSTRIPE:
1698                 RETURN(ll_lov_setstripe(inode, file, arg));
1699         case LL_IOC_LOV_SETEA:
1700                 RETURN(ll_lov_setea(inode, file, arg));
1701         case LL_IOC_LOV_GETSTRIPE:
1702                 RETURN(ll_lov_getstripe(inode, arg));
1703         case LL_IOC_RECREATE_OBJ:
1704                 RETURN(ll_lov_recreate_obj(inode, file, arg));
1705         case EXT3_IOC_GETFLAGS:
1706         case EXT3_IOC_SETFLAGS:
1707                 RETURN(ll_iocontrol(inode, file, cmd, arg));
1708         case EXT3_IOC_GETVERSION_OLD:
1709         case EXT3_IOC_GETVERSION:
1710                 RETURN(put_user(inode->i_generation, (int *)arg));
1711         case LL_IOC_JOIN: {
1712                 char *ftail;
1713                 int rc;
1714
1715                 ftail = getname((const char *)arg);
1716                 if (IS_ERR(ftail))
1717                         RETURN(PTR_ERR(ftail));
1718                 rc = ll_file_join(inode, file, ftail);
1719                 putname(ftail);
1720                 RETURN(rc);
1721         }
1722         case LL_IOC_GROUP_LOCK:
1723                 RETURN(ll_get_grouplock(inode, file, arg));
1724         case LL_IOC_GROUP_UNLOCK:
1725                 RETURN(ll_put_grouplock(inode, file, arg));
1726         case LL_IOC_OBD_STATFS:
1727                 RETURN(ll_obd_statfs(inode, (void *)arg));
1728
1729         /* We need to special case any other ioctls we want to handle,
1730          * to send them to the MDS/OST as appropriate and to properly
1731          * network encode the arg field.
1732         case EXT3_IOC_SETVERSION_OLD:
1733         case EXT3_IOC_SETVERSION:
1734         */
1735         default:
1736                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
1737                                      (void *)arg));
1738         }
1739 }
1740
1741 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
1742 {
1743         struct inode *inode = file->f_dentry->d_inode;
1744         struct ll_inode_info *lli = ll_i2info(inode);
1745         struct lov_stripe_md *lsm = lli->lli_smd;
1746         loff_t retval;
1747         ENTRY;
1748         retval = offset + ((origin == 2) ? inode->i_size :
1749                            (origin == 1) ? file->f_pos : 0);
1750         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
1751                inode->i_ino, inode->i_generation, inode, retval, retval,
1752                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
1753
1754         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
1755         if (origin == 2) { /* SEEK_END */
1756                 int nonblock = 0, rc;
1757
1758                 if (file->f_flags & O_NONBLOCK)
1759                         nonblock = LDLM_FL_BLOCK_NOWAIT;
1760
1761                 if (lsm != NULL) {
1762                         rc = ll_glimpse_size(inode, nonblock);
1763                         if (rc != 0)
1764                                 RETURN(rc);
1765                 }
1766
1767                 ll_inode_size_lock(inode, 0);
1768                 offset += inode->i_size;
1769                 ll_inode_size_unlock(inode, 0);
1770         } else if (origin == 1) { /* SEEK_CUR */
1771                 offset += file->f_pos;
1772         }
1773
1774         retval = -EINVAL;
1775         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
1776                 if (offset != file->f_pos) {
1777                         file->f_pos = offset;
1778 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1779                         file->f_reada = 0;
1780                         file->f_version = ++event;
1781 #endif
1782                 }
1783                 retval = offset;
1784         }
1785
1786         RETURN(retval);
1787 }
1788
1789 int ll_fsync(struct file *file, struct dentry *dentry, int data)
1790 {
1791         struct inode *inode = dentry->d_inode;
1792         struct ll_inode_info *lli = ll_i2info(inode);
1793         struct lov_stripe_md *lsm = lli->lli_smd;
1794         struct ptlrpc_request *req;
1795         int rc, err;
1796         ENTRY;
1797         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
1798                inode->i_generation, inode);
1799
1800         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_FSYNC);
1801
1802         /* fsync's caller has already called _fdata{sync,write}, we want
1803          * that IO to finish before calling the osc and mdc sync methods */
1804         rc = filemap_fdatawait(inode->i_mapping);
1805
1806         /* catch async errors that were recorded back when async writeback
1807          * failed for pages in this mapping. */
1808         err = lli->lli_async_rc;
1809         lli->lli_async_rc = 0;
1810         if (rc == 0)
1811                 rc = err;
1812         if (lsm) {
1813                 err = lov_test_and_clear_async_rc(lsm);
1814                 if (rc == 0)
1815                         rc = err;
1816         }
1817
1818         err = md_sync(ll_i2sbi(inode)->ll_md_exp,
1819                       ll_inode2fid(inode), &req);
1820         if (!rc)
1821                 rc = err;
1822         if (!err)
1823                 ptlrpc_req_finished(req);
1824
1825         if (data && lsm) {
1826                 struct obdo *oa = obdo_alloc();
1827
1828                 if (!oa)
1829                         RETURN(rc ? rc : -ENOMEM);
1830
1831                 oa->o_id = lsm->lsm_object_id;
1832                 oa->o_valid = OBD_MD_FLID;
1833                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1834                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1835
1836                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
1837                                0, OBD_OBJECT_EOF);
1838                 if (!rc)
1839                         rc = err;
1840                 obdo_free(oa);
1841         }
1842
1843         RETURN(rc);
1844 }
1845
1846 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
1847 {
1848         struct inode *inode = file->f_dentry->d_inode;
1849         struct ll_sb_info *sbi = ll_i2sbi(inode);
1850         struct obd_device *obddev;
1851         struct ldlm_res_id res_id =
1852                 { .name = { fid_seq(ll_inode2fid(inode)),
1853                             fid_oid(ll_inode2fid(inode)),
1854                             fid_ver(ll_inode2fid(inode)),
1855                             LDLM_FLOCK} };
1856         struct lustre_handle lockh = {0};
1857         ldlm_policy_data_t flock;
1858         ldlm_mode_t mode = 0;
1859         int flags = 0;
1860         int rc;
1861         ENTRY;
1862
1863         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
1864                inode->i_ino, file_lock);
1865
1866         flock.l_flock.pid = file_lock->fl_pid;
1867         flock.l_flock.start = file_lock->fl_start;
1868         flock.l_flock.end = file_lock->fl_end;
1869
1870         switch (file_lock->fl_type) {
1871         case F_RDLCK:
1872                 mode = LCK_PR;
1873                 break;
1874         case F_UNLCK:
1875                 /* An unlock request may or may not have any relation to
1876                  * existing locks so we may not be able to pass a lock handle
1877                  * via a normal ldlm_lock_cancel() request. The request may even
1878                  * unlock a byte range in the middle of an existing lock. In
1879                  * order to process an unlock request we need all of the same
1880                  * information that is given with a normal read or write record
1881                  * lock request. To avoid creating another ldlm unlock (cancel)
1882                  * message we'll treat a LCK_NL flock request as an unlock. */
1883                 mode = LCK_NL;
1884                 break;
1885         case F_WRLCK:
1886                 mode = LCK_PW;
1887                 break;
1888         default:
1889                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
1890                 LBUG();
1891         }
1892
1893         switch (cmd) {
1894         case F_SETLKW:
1895 #ifdef F_SETLKW64
1896         case F_SETLKW64:
1897 #endif
1898                 flags = 0;
1899                 break;
1900         case F_SETLK:
1901 #ifdef F_SETLK64
1902         case F_SETLK64:
1903 #endif
1904                 flags = LDLM_FL_BLOCK_NOWAIT;
1905                 break;
1906         case F_GETLK:
1907 #ifdef F_GETLK64
1908         case F_GETLK64:
1909 #endif
1910                 flags = LDLM_FL_TEST_LOCK;
1911                 /* Save the old mode so that if the mode in the lock changes we
1912                  * can decrement the appropriate reader or writer refcount. */
1913                 file_lock->fl_type = mode;
1914                 break;
1915         default:
1916                 CERROR("unknown fcntl lock command: %d\n", cmd);
1917                 LBUG();
1918         }
1919
1920         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
1921                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
1922                flags, mode, flock.l_flock.start, flock.l_flock.end);
1923
1924         obddev = sbi->ll_md_exp->exp_obd;
1925         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, obddev->obd_namespace,
1926                               res_id, LDLM_FLOCK, &flock, mode, &flags,
1927                               NULL, ldlm_flock_completion_ast, NULL, file_lock,
1928                               NULL, 0, NULL, &lockh);
1929         RETURN(rc);
1930 }
1931
1932 int ll_have_md_lock(struct inode *inode, __u64 bits)
1933 {
1934         struct lustre_handle lockh;
1935         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
1936         struct lu_fid *fid;
1937         int flags;
1938         ENTRY;
1939
1940         if (!inode)
1941                RETURN(0);
1942
1943         fid = &ll_i2info(inode)->lli_fid;
1944         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
1945
1946         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
1947         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy, 
1948                                 LCK_CR|LCK_CW|LCK_PR, &lockh)) {
1949                 RETURN(1);
1950         }
1951
1952         RETURN(0);
1953 }
1954
1955 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
1956 {
1957         struct lookup_intent oit = { .it_op = IT_GETATTR };
1958         struct inode *inode = dentry->d_inode;
1959         struct ptlrpc_request *req = NULL;
1960         struct md_op_data *op_data;
1961         struct ll_inode_info *lli;
1962         struct ll_sb_info *sbi;
1963         int rc;
1964         ENTRY;
1965
1966         if (!inode) {
1967                 CERROR("REPORT THIS LINE TO PETER\n");
1968                 RETURN(0);
1969         }
1970         sbi = ll_i2sbi(inode);
1971         lli = ll_i2info(inode);
1972
1973         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
1974                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
1975 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
1976         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE);
1977 #endif
1978
1979         OBD_ALLOC_PTR(op_data);
1980         if (op_data == NULL)
1981                 RETURN(-ENOMEM);
1982
1983         ll_prepare_md_op_data(op_data, inode, inode, NULL, 0, 0);
1984
1985         rc = md_intent_lock(sbi->ll_md_exp, op_data, NULL, 0, &oit, 0,
1986                             &req, ll_md_blocking_ast, 0);
1987         OBD_FREE_PTR(op_data);
1988
1989         if (rc < 0)
1990                 GOTO(out, rc);
1991
1992         rc = ll_revalidate_it_finish(req, 1, &oit, dentry);
1993         if (rc)
1994                 GOTO(out, rc);
1995         
1996         if (!dentry->d_inode->i_nlink) {
1997                 spin_lock(&dcache_lock);
1998                 ll_drop_dentry(dentry);
1999                 spin_unlock(&dcache_lock);
2000         }
2001         
2002         ll_lookup_finish_locks(&oit, dentry);
2003
2004         /* object is allocated, validate size */
2005         if (lli->lli_smd) {
2006                 /* ll_glimpse_size will prefer locally cached writes if they
2007                  * extend the file */
2008                 rc = ll_glimpse_size(inode, 0);
2009         }
2010         EXIT;
2011 out:
2012         if (req)
2013                 ptlrpc_req_finished(req);
2014         return rc;
2015 }
2016
2017 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2018 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2019                   struct lookup_intent *it, struct kstat *stat)
2020 {
2021         struct inode *inode = de->d_inode;
2022         int res = 0;
2023
2024         res = ll_inode_revalidate_it(de, it);
2025         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
2026
2027         if (res)
2028                 return res;
2029
2030         stat->dev = inode->i_sb->s_dev;
2031         stat->ino = inode->i_ino;
2032         stat->mode = inode->i_mode;
2033         stat->nlink = inode->i_nlink;
2034         stat->uid = inode->i_uid;
2035         stat->gid = inode->i_gid;
2036         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2037         stat->atime = inode->i_atime;
2038         stat->mtime = inode->i_mtime;
2039         stat->ctime = inode->i_ctime;
2040         stat->blksize = inode->i_blksize;
2041
2042         ll_inode_size_lock(inode, 0);
2043         stat->size = inode->i_size;
2044         stat->blocks = inode->i_blocks;
2045         ll_inode_size_unlock(inode, 0);
2046
2047         return 0;
2048 }
2049 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2050 {
2051         struct lookup_intent it = { .it_op = IT_GETATTR };
2052
2053         return ll_getattr_it(mnt, de, &it, stat);
2054 }
2055 #endif
2056
2057 static
2058 int lustre_check_acl(struct inode *inode, int mask)
2059 {
2060 #ifdef CONFIG_FS_POSIX_ACL
2061         struct ll_inode_info *lli = ll_i2info(inode);
2062         struct posix_acl *acl;
2063         int rc;
2064         ENTRY;
2065
2066         spin_lock(&lli->lli_lock);
2067         acl = posix_acl_dup(lli->lli_posix_acl);
2068         spin_unlock(&lli->lli_lock);
2069
2070         if (!acl)
2071                 RETURN(-EAGAIN);
2072
2073         rc = posix_acl_permission(inode, acl, mask);
2074         posix_acl_release(acl);
2075
2076         RETURN(rc);
2077 #else
2078         return -EAGAIN;
2079 #endif
2080 }
2081
2082 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2083 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2084 {
2085         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2086                inode->i_ino, inode->i_generation, inode, mask);
2087         return generic_permission(inode, mask, lustre_check_acl);
2088 }
2089 #else
2090 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
2091 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2092 #else
2093 int ll_inode_permission(struct inode *inode, int mask)
2094 #endif
2095 {
2096         int mode = inode->i_mode;
2097         int rc;
2098
2099         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2100                inode->i_ino, inode->i_generation, inode, mask);
2101
2102         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2103             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2104                 return -EROFS;
2105         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2106                 return -EACCES;
2107         if (current->fsuid == inode->i_uid) {
2108                 mode >>= 6;
2109         } else if (1) {
2110                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2111                         goto check_groups;
2112                 rc = lustre_check_acl(inode, mask);
2113                 if (rc == -EAGAIN)
2114                         goto check_groups;
2115                 if (rc == -EACCES)
2116                         goto check_capabilities;
2117                 return rc;
2118         } else {
2119 check_groups:
2120                 if (in_group_p(inode->i_gid))
2121                         mode >>= 3;
2122         }
2123         if ((mode & mask & S_IRWXO) == mask)
2124                 return 0;
2125
2126 check_capabilities:
2127         if (!(mask & MAY_EXEC) ||
2128             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2129                 if (capable(CAP_DAC_OVERRIDE))
2130                         return 0;
2131
2132         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2133             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2134                 return 0;
2135         return -EACCES;
2136 }
2137 #endif
2138
2139 struct file_operations ll_file_operations = {
2140         .read           = ll_file_read,
2141         .write          = ll_file_write,
2142         .ioctl          = ll_file_ioctl,
2143         .open           = ll_file_open,
2144         .release        = ll_file_release,
2145         .mmap           = ll_file_mmap,
2146         .llseek         = ll_file_seek,
2147 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2148         .sendfile       = ll_file_sendfile,
2149 #endif
2150         .fsync          = ll_fsync,
2151         /* .lock           = ll_file_flock */
2152 };
2153
2154 struct file_operations ll_file_operations_flock = {
2155         .read           = ll_file_read,
2156         .write          = ll_file_write,
2157         .ioctl          = ll_file_ioctl,
2158         .open           = ll_file_open,
2159         .release        = ll_file_release,
2160         .mmap           = ll_file_mmap,
2161         .llseek         = ll_file_seek,
2162 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2163         .sendfile       = ll_file_sendfile,
2164 #endif
2165         .fsync          = ll_fsync,
2166         .lock           = ll_file_flock
2167 };
2168
2169
2170 struct inode_operations ll_file_inode_operations = {
2171         .setattr_raw    = ll_setattr_raw,
2172         .setattr        = ll_setattr,
2173         .truncate       = ll_truncate,
2174 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2175         .getattr_it     = ll_getattr_it,
2176 #else
2177         .revalidate_it  = ll_inode_revalidate_it,
2178 #endif
2179         .permission     = ll_inode_permission,
2180         .setxattr       = ll_setxattr,
2181         .getxattr       = ll_getxattr,
2182         .listxattr      = ll_listxattr,
2183         .removexattr    = ll_removexattr,
2184 };
2185