Whamcloud - gitweb
update cmm-mdc code
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
32 #include <linux/lustre_compat25.h>
33 #endif
34 #include "llite_internal.h"
35
36 /* also used by llite/special.c:ll_special_open() */
37 struct ll_file_data *ll_file_data_get(void)
38 {
39         struct ll_file_data *fd;
40
41         OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
42         return fd;
43 }
44
45 static void ll_file_data_put(struct ll_file_data *fd)
46 {
47         if (fd != NULL)
48                 OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof *fd);
49 }
50
51 static int ll_close_inode_openhandle(struct obd_export *md_exp,
52                                      struct inode *inode,
53                                      struct obd_client_handle *och)
54 {
55         struct md_op_data op_data = { { 0 } };
56         struct ptlrpc_request *req = NULL;
57         int rc;
58
59         op_data.fid1 = ll_i2info(inode)->lli_fid;
60         op_data.valid = OBD_MD_FLTYPE | OBD_MD_FLMODE |
61                         OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
62                         OBD_MD_FLATIME | OBD_MD_FLMTIME |
63                         OBD_MD_FLCTIME;
64
65         op_data.atime = LTIME_S(inode->i_atime);
66         op_data.mtime = LTIME_S(inode->i_mtime);
67         op_data.ctime = LTIME_S(inode->i_ctime);
68         op_data.size = inode->i_size;
69         op_data.blocks = inode->i_blocks;
70         op_data.flags = inode->i_flags;
71
72         if (0 /* ll_is_inode_dirty(inode) */) {
73                 op_data.flags = MDS_BFLAG_UNCOMMITTED_WRITES;
74                 op_data.valid |= OBD_MD_FLFLAGS;
75         }
76         
77         rc = md_close(md_exp, &op_data, och, &req);
78         if (rc == EAGAIN) {
79                 /* We are the last writer, so the MDS has instructed us to get
80                  * the file size and any write cookies, then close again. */
81                 //ll_queue_done_writing(inode);
82                 rc = 0;
83         } else if (rc) {
84                 CERROR("inode %lu mdc close failed: rc = %d\n",
85                        inode->i_ino, rc);
86         }
87
88         if (rc == 0) {
89                 rc = ll_objects_destroy(req, inode);
90                 if (rc)
91                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
92                                inode->i_ino, rc);
93         }
94
95         md_clear_open_replay_data(md_exp, och);
96         ptlrpc_req_finished(req); /* This is close request */
97
98         RETURN(rc);
99 }
100
101 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
102                 struct file *file)
103 {
104         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
105         struct obd_client_handle *och = &fd->fd_mds_och;
106         int rc;
107         ENTRY;
108
109         /* clear group lock, if present */
110         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
111                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
112                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
113                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
114                                       &fd->fd_cwlockh);
115         }
116         
117         rc = ll_close_inode_openhandle(md_exp, inode, och);
118         och->och_fh.cookie = DEAD_HANDLE_MAGIC;
119         LUSTRE_FPRIVATE(file) = NULL;
120         ll_file_data_put(fd);
121
122         RETURN(rc);
123 }
124
125 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
126
127 /* While this returns an error code, fput() the caller does not, so we need
128  * to make every effort to clean up all of our state here.  Also, applications
129  * rarely check close errors and even if an error is returned they will not
130  * re-try the close call.
131  */
132 int ll_file_release(struct inode *inode, struct file *file)
133 {
134         struct ll_file_data *fd;
135         struct ll_sb_info *sbi = ll_i2sbi(inode);
136         struct ll_inode_info *lli = ll_i2info(inode);
137         struct lov_stripe_md *lsm = lli->lli_smd;
138         int rc;
139
140         ENTRY;
141         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
142                inode->i_generation, inode);
143
144         /* don't do anything for / */
145         if (inode->i_sb->s_root == file->f_dentry)
146                 RETURN(0);
147
148         lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_RELEASE);
149         fd = LUSTRE_FPRIVATE(file);
150         LASSERT(fd != NULL);
151
152         if (lsm)
153                 lov_test_and_clear_async_rc(lsm);
154         lli->lli_async_rc = 0;
155
156         rc = ll_md_close(sbi->ll_md_exp, inode, file);
157         RETURN(rc);
158 }
159
160 static int ll_intent_file_open(struct file *file, void *lmm,
161                                int lmmsize, struct lookup_intent *itp)
162 {
163         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
164         struct lustre_handle lockh;
165         struct md_op_data op_data = { { 0 } };
166         struct dentry *parent = file->f_dentry->d_parent;
167         const char *name = file->f_dentry->d_name.name;
168         const int len = file->f_dentry->d_name.len;
169         int rc;
170
171         if (!parent)
172                 RETURN(-ENOENT);
173
174         ll_prepare_md_op_data(&op_data, parent->d_inode, NULL, 
175                               name, len, O_RDWR);
176
177         rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, itp, LCK_PW, &op_data,
178                         &lockh, lmm, lmmsize, ldlm_completion_ast,
179                         ll_md_blocking_ast, NULL, 0);
180         if (rc < 0) {
181                 CERROR("lock enqueue: err: %d\n", rc);
182                 GOTO(out, rc);
183         }
184
185         rc = ll_prep_inode(&file->f_dentry->d_inode,
186                            (struct ptlrpc_request *)itp->d.lustre.it_data, 1,
187                             NULL);
188 out:
189         RETURN(rc);
190 }
191
192 static void ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli, 
193                         struct lookup_intent *it, struct obd_client_handle *och)
194 {
195         struct ptlrpc_request *req = it->d.lustre.it_data;
196         struct mds_body *body;
197
198         LASSERT(och);
199
200         body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body));
201         LASSERT(body != NULL);                  /* reply already checked out */
202         LASSERT_REPSWABBED(req, 1);             /* and swabbed in md_enqueue */
203
204         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
205         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
206         lli->lli_io_epoch = body->io_epoch;
207
208         md_set_open_replay_data(md_exp, och, it->d.lustre.it_data);
209 }
210
211 int ll_local_open(struct file *file, struct lookup_intent *it,
212                   struct ll_file_data *fd)
213 {
214         struct inode *inode = file->f_dentry->d_inode;
215         ENTRY;
216
217         LASSERT(!LUSTRE_FPRIVATE(file));
218
219         LASSERT(fd != NULL);
220
221         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
222                     ll_i2info(inode), it, &fd->fd_mds_och);
223                     
224         LUSTRE_FPRIVATE(file) = fd;
225         ll_readahead_init(inode, &fd->fd_ras);
226
227         RETURN(0);
228 }
229
230 /* Open a file, and (for the very first open) create objects on the OSTs at
231  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
232  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
233  * lli_open_sem to ensure no other process will create objects, send the
234  * stripe MD to the MDS, or try to destroy the objects if that fails.
235  *
236  * If we already have the stripe MD locally then we don't request it in
237  * md_open(), by passing a lmm_size = 0.
238  *
239  * It is up to the application to ensure no other processes open this file
240  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
241  * used.  We might be able to avoid races of that sort by getting lli_open_sem
242  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
243  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
244  */
245 int ll_file_open(struct inode *inode, struct file *file)
246 {
247         struct ll_inode_info *lli = ll_i2info(inode);
248         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
249                                           .it_flags = file->f_flags };
250         struct lov_stripe_md *lsm;
251         struct ptlrpc_request *req;
252         struct ll_file_data *fd;
253         int rc = 0;
254         ENTRY;
255
256         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
257                inode->i_generation, inode, file->f_flags);
258
259         /* don't do anything for / */
260         if (inode->i_sb->s_root == file->f_dentry)
261                 RETURN(0);
262
263         it = file->f_it;
264
265         fd = ll_file_data_get();
266         if (fd == NULL)
267                 RETURN(-ENOMEM);
268
269         if (!it || !it->d.lustre.it_disposition) {
270                 /* Convert f_flags into access mode. We cannot use file->f_mode,
271                  * because everything but O_ACCMODE mask was stripped from
272                  * there */
273                 if ((oit.it_flags + 1) & O_ACCMODE)
274                         oit.it_flags++;
275                 if (oit.it_flags & O_TRUNC)
276                         oit.it_flags |= FMODE_WRITE;
277
278                 if (oit.it_flags & O_CREAT)
279                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
280
281                 /* We do not want O_EXCL here, presumably we opened the file
282                  * already? XXX - NFS implications? */
283                 oit.it_flags &= ~O_EXCL;
284
285                 it = &oit;
286                 rc = ll_intent_file_open(file, NULL, 0, it);
287                 if (rc) {
288                         ll_file_data_put(fd);
289                         GOTO(out, rc);
290                 }
291         }
292
293         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
294         rc = it_open_error(DISP_OPEN_OPEN, it);
295         /* md_intent_lock() didn't get a request ref if there was an open
296          * error, so don't do cleanup on the request here (bug 3430) */
297         if (rc) {
298                 ll_file_data_put(fd);
299                 RETURN(rc);
300         }
301
302         rc = ll_local_open(file, it, fd);
303         LASSERTF(rc == 0, "rc = %d\n", rc);
304
305         if (!S_ISREG(inode->i_mode))
306                 GOTO(out, rc);
307
308         lsm = lli->lli_smd;
309         if (lsm == NULL) {
310                 if (file->f_flags & O_LOV_DELAY_CREATE ||
311                     !(file->f_mode & FMODE_WRITE)) {
312                         CDEBUG(D_INODE, "object creation was delayed\n");
313                         GOTO(out, rc);
314                 }
315         }
316         file->f_flags &= ~O_LOV_DELAY_CREATE;
317         GOTO(out, rc);
318  out:
319         req = it->d.lustre.it_data;
320         ptlrpc_req_finished(req);
321         if (rc == 0)
322                 ll_open_complete(inode);
323         return rc;
324 }
325
326 /* Fills the obdo with the attributes for the inode defined by lsm */
327 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
328                    struct obdo *oa)
329 {
330         struct ptlrpc_request_set *set;
331         int rc;
332         ENTRY;
333
334         LASSERT(lsm != NULL);
335
336         memset(oa, 0, sizeof *oa);
337         oa->o_id = lsm->lsm_object_id;
338         oa->o_mode = S_IFREG;
339         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
340                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
341                 OBD_MD_FLCTIME;
342
343         set = ptlrpc_prep_set();
344         if (set == NULL) {
345                 rc = -ENOMEM;
346         } else {
347                 rc = obd_getattr_async(exp, oa, lsm, set);
348                 if (rc == 0)
349                         rc = ptlrpc_set_wait(set);
350                 ptlrpc_set_destroy(set);
351         }
352         if (rc)
353                 RETURN(rc);
354
355         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
356                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
357         RETURN(0);
358 }
359
360 static inline void ll_remove_suid(struct inode *inode)
361 {
362         unsigned int mode;
363
364         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
365         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
366
367         /* was any of the uid bits set? */
368         mode &= inode->i_mode;
369         if (mode && !capable(CAP_FSETID)) {
370                 inode->i_mode &= ~mode;
371                 // XXX careful here - we cannot change the size
372         }
373 }
374
375 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
376 {
377         struct ll_inode_info *lli = ll_i2info(inode);
378         struct lov_stripe_md *lsm = lli->lli_smd;
379         struct obd_export *exp = ll_i2dtexp(inode);
380         struct {
381                 char name[16];
382                 struct ldlm_lock *lock;
383                 struct lov_stripe_md *lsm;
384         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
385         __u32 stripe, vallen = sizeof(stripe);
386         int rc;
387         ENTRY;
388
389         if (lsm->lsm_stripe_count == 1)
390                 GOTO(check, stripe = 0);
391
392         /* get our offset in the lov */
393         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
394         if (rc != 0) {
395                 CERROR("obd_get_info: rc = %d\n", rc);
396                 RETURN(rc);
397         }
398         LASSERT(stripe < lsm->lsm_stripe_count);
399
400 check:
401         if (lsm->lsm_oinfo[stripe].loi_id != lock->l_resource->lr_name.name[0]||
402             lsm->lsm_oinfo[stripe].loi_gr != lock->l_resource->lr_name.name[1]){
403                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
404                            lsm->lsm_oinfo[stripe].loi_id,
405                            lsm->lsm_oinfo[stripe].loi_gr);
406                 RETURN(-ELDLM_NO_LOCK_DATA);
407         }
408
409         RETURN(stripe);
410 }
411
412 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
413  * we get a lock cancellation for each stripe, so we have to map the obd's
414  * region back onto the stripes in the file that it held.
415  *
416  * No one can dirty the extent until we've finished our work and they can
417  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
418  * but other kernel actors could have pages locked.
419  *
420  * Called with the DLM lock held. */
421 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
422                               struct ldlm_lock *lock, __u32 stripe)
423 {
424         ldlm_policy_data_t tmpex;
425         unsigned long start, end, count, skip, i, j;
426         struct page *page;
427         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
428         struct lustre_handle lockh;
429         ENTRY;
430
431         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
432         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
433                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
434                inode->i_size);
435
436         /* our locks are page granular thanks to osc_enqueue, we invalidate the
437          * whole page. */
438         LASSERT((tmpex.l_extent.start & ~PAGE_CACHE_MASK) == 0);
439         LASSERT(((tmpex.l_extent.end + 1) & ~PAGE_CACHE_MASK) == 0);
440
441         count = ~0;
442         skip = 0;
443         start = tmpex.l_extent.start >> PAGE_CACHE_SHIFT;
444         end = tmpex.l_extent.end >> PAGE_CACHE_SHIFT;
445         if (lsm->lsm_stripe_count > 1) {
446                 count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
447                 skip = (lsm->lsm_stripe_count - 1) * count;
448                 start += start/count * skip + stripe * count;
449                 if (end != ~0)
450                         end += end/count * skip + stripe * count;
451         }
452         if (end < tmpex.l_extent.end >> PAGE_CACHE_SHIFT)
453                 end = ~0;
454
455         i = inode->i_size ? (inode->i_size - 1) >> PAGE_CACHE_SHIFT : 0;
456         if (i < end)
457                 end = i;
458
459         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
460                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
461                count, skip, end, discard ? " (DISCARDING)" : "");
462
463         /* walk through the vmas on the inode and tear down mmaped pages that
464          * intersect with the lock.  this stops immediately if there are no
465          * mmap()ed regions of the file.  This is not efficient at all and
466          * should be short lived. We'll associate mmap()ed pages with the lock
467          * and will be able to find them directly */
468         for (i = start; i <= end; i += (j + skip)) {
469                 j = min(count - (i % count), end - i + 1);
470                 LASSERT(j > 0);
471                 LASSERT(inode->i_mapping);
472                 if (ll_teardown_mmaps(inode->i_mapping,
473                                       (__u64)i << PAGE_CACHE_SHIFT,
474                                       ((__u64)(i+j) << PAGE_CACHE_SHIFT) - 1) )
475                         break;
476         }
477
478         /* this is the simplistic implementation of page eviction at
479          * cancelation.  It is careful to get races with other page
480          * lockers handled correctly.  fixes from bug 20 will make it
481          * more efficient by associating locks with pages and with
482          * batching writeback under the lock explicitly. */
483         for (i = start, j = start % count; i <= end;
484              j++, i++, tmpex.l_extent.start += PAGE_CACHE_SIZE) {
485                 if (j == count) {
486                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
487                         i += skip;
488                         j = 0;
489                         if (i > end)
490                                 break;
491                 }
492                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
493                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
494                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
495                          start, i, end);
496
497                 if (!mapping_has_pages(inode->i_mapping)) {
498                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
499                         break;
500                 }
501
502                 cond_resched();
503
504                 page = find_get_page(inode->i_mapping, i);
505                 if (page == NULL)
506                         continue;
507                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
508                                i, tmpex.l_extent.start);
509                 lock_page(page);
510
511                 /* page->mapping to check with racing against teardown */
512                 if (!discard && clear_page_dirty_for_io(page)) {
513                         rc = ll_call_writepage(inode, page);
514                         if (rc != 0)
515                                 CERROR("writepage of page %p failed: %d\n",
516                                        page, rc);
517                         /* either waiting for io to complete or reacquiring
518                          * the lock that the failed writepage released */
519                         lock_page(page);
520                 }
521
522                 tmpex.l_extent.end = tmpex.l_extent.start + PAGE_CACHE_SIZE - 1;
523                 /* check to see if another DLM lock covers this page */
524                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
525                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
526                                       LDLM_FL_TEST_LOCK,
527                                       &lock->l_resource->lr_name, LDLM_EXTENT,
528                                       &tmpex, LCK_PR | LCK_PW, &lockh);
529                 if (rc2 == 0 && page->mapping != NULL) {
530                         struct ll_async_page *llap = llap_cast_private(page);
531                         // checking again to account for writeback's lock_page()
532                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
533                         if (llap)
534                                 ll_ra_accounting(llap, inode->i_mapping);
535                         ll_truncate_complete_page(page);
536                 }
537                 unlock_page(page);
538                 page_cache_release(page);
539         }
540         LASSERTF(tmpex.l_extent.start <=
541                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
542                   lock->l_policy_data.l_extent.end + 1),
543                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
544                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
545                  start, i, end);
546         EXIT;
547 }
548
549 static int ll_extent_lock_callback(struct ldlm_lock *lock,
550                                    struct ldlm_lock_desc *new, void *data,
551                                    int flag)
552 {
553         struct lustre_handle lockh = { 0 };
554         int rc;
555         ENTRY;
556
557         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
558                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
559                 LBUG();
560         }
561
562         switch (flag) {
563         case LDLM_CB_BLOCKING:
564                 ldlm_lock2handle(lock, &lockh);
565                 rc = ldlm_cli_cancel(&lockh);
566                 if (rc != ELDLM_OK)
567                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
568                 break;
569         case LDLM_CB_CANCELING: {
570                 struct inode *inode;
571                 struct ll_inode_info *lli;
572                 struct lov_stripe_md *lsm;
573                 int stripe;
574                 __u64 kms;
575
576                 /* This lock wasn't granted, don't try to evict pages */
577                 if (lock->l_req_mode != lock->l_granted_mode)
578                         RETURN(0);
579
580                 inode = ll_inode_from_lock(lock);
581                 if (inode == NULL)
582                         RETURN(0);
583                 lli = ll_i2info(inode);
584                 if (lli == NULL)
585                         goto iput;
586                 if (lli->lli_smd == NULL)
587                         goto iput;
588                 lsm = lli->lli_smd;
589
590                 stripe = ll_lock_to_stripe_offset(inode, lock);
591                 if (stripe < 0)
592                         goto iput;
593
594                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
595
596                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
597                 lov_stripe_lock(lsm);
598                 kms = ldlm_extent_shift_kms(lock,
599                                             lsm->lsm_oinfo[stripe].loi_kms);
600
601                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
602                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
603                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
604                 lsm->lsm_oinfo[stripe].loi_kms = kms;
605                 lov_stripe_unlock(lsm);
606                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
607                 //ll_try_done_writing(inode);
608         iput:
609                 iput(inode);
610                 break;
611         }
612         default:
613                 LBUG();
614         }
615
616         RETURN(0);
617 }
618
619 #if 0
620 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
621 {
622         /* XXX ALLOCATE - 160 bytes */
623         struct inode *inode = ll_inode_from_lock(lock);
624         struct ll_inode_info *lli = ll_i2info(inode);
625         struct lustre_handle lockh = { 0 };
626         struct ost_lvb *lvb;
627         int stripe;
628         ENTRY;
629
630         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
631                      LDLM_FL_BLOCK_CONV)) {
632                 LBUG(); /* not expecting any blocked async locks yet */
633                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
634                            "lock, returning");
635                 ldlm_lock_dump(D_OTHER, lock, 0);
636                 ldlm_reprocess_all(lock->l_resource);
637                 RETURN(0);
638         }
639
640         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
641
642         stripe = ll_lock_to_stripe_offset(inode, lock);
643         if (stripe < 0)
644                 goto iput;
645
646         if (lock->l_lvb_len) {
647                 struct lov_stripe_md *lsm = lli->lli_smd;
648                 __u64 kms;
649                 lvb = lock->l_lvb_data;
650                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
651
652                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
653                 LOCK_INODE_MUTEX(inode);
654                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
655                 kms = ldlm_extent_shift_kms(NULL, kms);
656                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
657                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
658                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
659                 lsm->lsm_oinfo[stripe].loi_kms = kms;
660                 UNLOCK_INODE_MUTEX(inode);
661                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
662         }
663
664 iput:
665         iput(inode);
666         wake_up(&lock->l_waitq);
667
668         ldlm_lock2handle(lock, &lockh);
669         ldlm_lock_decref(&lockh, LCK_PR);
670         RETURN(0);
671 }
672 #endif
673
674 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
675 {
676         struct ptlrpc_request *req = reqp;
677         struct inode *inode = ll_inode_from_lock(lock);
678         struct ll_inode_info *lli;
679         struct lov_stripe_md *lsm;
680         struct ost_lvb *lvb;
681         int rc, size = sizeof(*lvb), stripe;
682         ENTRY;
683
684         if (inode == NULL)
685                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
686         lli = ll_i2info(inode);
687         if (lli == NULL)
688                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
689         lsm = lli->lli_smd;
690         if (lsm == NULL)
691                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
692
693         /* First, find out which stripe index this lock corresponds to. */
694         stripe = ll_lock_to_stripe_offset(inode, lock);
695         if (stripe < 0)
696                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
697
698         rc = lustre_pack_reply(req, 1, &size, NULL);
699         if (rc) {
700                 CERROR("lustre_pack_reply: %d\n", rc);
701                 GOTO(iput, rc);
702         }
703
704         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
705         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
706         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
707         lvb->lvb_atime = LTIME_S(inode->i_atime);
708         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
709
710         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
711                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
712                    inode->i_size, stripe, lvb->lvb_size, lvb->lvb_mtime,
713                    lvb->lvb_atime, lvb->lvb_ctime);
714         GOTO(iput, 0);
715  iput:
716         iput(inode);
717
718  out:
719         /* These errors are normal races, so we don't want to fill the console
720          * with messages by calling ptlrpc_error() */
721         if (rc == -ELDLM_NO_LOCK_DATA)
722                 lustre_pack_reply(req, 0, NULL, NULL);
723
724         req->rq_status = rc;
725         return rc;
726 }
727
728 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
729  * file (because it prefers KMS over RSS when larger) */
730 int ll_glimpse_size(struct inode *inode, int ast_flags)
731 {
732         struct ll_inode_info *lli = ll_i2info(inode);
733         struct ll_sb_info *sbi = ll_i2sbi(inode);
734         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
735         struct lustre_handle lockh = { 0 };
736         struct ost_lvb lvb;
737         int rc;
738         ENTRY;
739
740         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
741
742         ast_flags |= LDLM_FL_HAS_INTENT;
743
744         /* NOTE: this looks like DLM lock request, but it may not be one. Due
745          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
746          *       won't revoke any conflicting DLM locks held. Instead,
747          *       ll_glimpse_callback() will be called on each client
748          *       holding a DLM lock against this file, and resulting size
749          *       will be returned for each stripe. DLM lock on [0, EOF] is
750          *       acquired only if there were no conflicting locks. */
751         rc = obd_enqueue(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT, &policy,
752                          LCK_PR, &ast_flags, ll_extent_lock_callback,
753                          ldlm_completion_ast, ll_glimpse_callback, inode,
754                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, &lockh);
755         if (rc == -ENOENT)
756                 RETURN(rc);
757         if (rc != 0) {
758                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
759                 RETURN(rc > 0 ? -EIO : rc);
760         }
761
762         ll_inode_size_lock(inode, 1);
763         inode_init_lvb(inode, &lvb);
764         obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
765         inode->i_size = lvb.lvb_size;
766         inode->i_blocks = lvb.lvb_blocks;
767         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
768         LTIME_S(inode->i_atime) = lvb.lvb_atime;
769         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
770         ll_inode_size_unlock(inode, 1);
771
772         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
773                inode->i_size, inode->i_blocks);
774
775         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
776
777         RETURN(rc);
778 }
779
780 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
781                    struct lov_stripe_md *lsm, int mode,
782                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
783                    int ast_flags)
784 {
785         struct ll_sb_info *sbi = ll_i2sbi(inode);
786         struct ost_lvb lvb;
787         int rc;
788         ENTRY;
789
790         LASSERT(!lustre_handle_is_used(lockh));
791         LASSERT(lsm != NULL);
792
793         /* don't drop the mmapped file to LRU */
794         if (mapping_mapped(inode->i_mapping))
795                 ast_flags |= LDLM_FL_NO_LRU;
796
797         /* XXX phil: can we do this?  won't it screw the file size up? */
798         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
799             (sbi->ll_flags & LL_SBI_NOLCK))
800                 RETURN(0);
801
802         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
803                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
804
805         rc = obd_enqueue(sbi->ll_dt_exp, lsm, LDLM_EXTENT, policy, mode,
806                          &ast_flags, ll_extent_lock_callback,
807                          ldlm_completion_ast, ll_glimpse_callback, inode,
808                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
809         if (rc > 0)
810                 rc = -EIO;
811
812         ll_inode_size_lock(inode, 1);
813         inode_init_lvb(inode, &lvb);
814         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
815
816         if (policy->l_extent.start == 0 &&
817             policy->l_extent.end == OBD_OBJECT_EOF) {
818                 /* vmtruncate()->ll_truncate() first sets the i_size and then
819                  * the kms under both a DLM lock and the
820                  * ll_inode_size_lock().  If we don't get the
821                  * ll_inode_size_lock() here we can match the DLM lock and
822                  * reset i_size from the kms before the truncating path has
823                  * updated the kms.  generic_file_write can then trust the
824                  * stale i_size when doing appending writes and effectively
825                  * cancel the result of the truncate.  Getting the
826                  * ll_inode_size_lock() after the enqueue maintains the DLM
827                  * -> ll_inode_size_lock() acquiring order. */
828                 inode->i_size = lvb.lvb_size;
829         }
830
831         if (rc == 0) {
832                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
833                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
834                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
835         }
836         ll_inode_size_unlock(inode, 1);
837
838         RETURN(rc);
839 }
840
841 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
842                      struct lov_stripe_md *lsm, int mode,
843                      struct lustre_handle *lockh)
844 {
845         struct ll_sb_info *sbi = ll_i2sbi(inode);
846         int rc;
847         ENTRY;
848
849         /* XXX phil: can we do this?  won't it screw the file size up? */
850         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
851             (sbi->ll_flags & LL_SBI_NOLCK))
852                 RETURN(0);
853
854         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
855
856         RETURN(rc);
857 }
858
859 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
860                             loff_t *ppos)
861 {
862         struct inode *inode = file->f_dentry->d_inode;
863         struct ll_inode_info *lli = ll_i2info(inode);
864         struct lov_stripe_md *lsm = lli->lli_smd;
865         struct ll_lock_tree tree;
866         struct ll_lock_tree_node *node;
867         struct ost_lvb lvb;
868         struct ll_ra_read bead;
869         int rc;
870         ssize_t retval;
871         __u64 kms;
872         ENTRY;
873         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
874                inode->i_ino, inode->i_generation, inode, count, *ppos);
875
876         /* "If nbyte is 0, read() will return 0 and have no other results."
877          *                      -- Single Unix Spec */
878         if (count == 0)
879                 RETURN(0);
880
881         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
882                             count);
883
884         if (!lsm) {
885                 /* Read on file with no objects should return zero-filled
886                  * buffers up to file size (we can get non-zero sizes with
887                  * mknod + truncate, then opening file for read. This is a
888                  * common pattern in NFS case, it seems). Bug 6243 */
889                 int notzeroed;
890                 /* Since there are no objects on OSTs, we have nothing to get
891                  * lock on and so we are forced to access inode->i_size
892                  * unguarded */
893
894                 /* Read beyond end of file */
895                 if (*ppos >= inode->i_size)
896                         RETURN(0);
897
898                 if (count > inode->i_size - *ppos)
899                         count = inode->i_size - *ppos;
900                 /* Make sure to correctly adjust the file pos pointer for
901                  * EFAULT case */
902                 notzeroed = clear_user(buf, count);
903                 count -= notzeroed;
904                 *ppos += count;
905                 if (!count)
906                         RETURN(-EFAULT);
907                 RETURN(count);
908         }
909
910         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
911         tree.lt_fd = LUSTRE_FPRIVATE(file);
912         rc = ll_tree_lock(&tree, node, buf, count,
913                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
914         if (rc != 0)
915                 RETURN(rc);
916
917         ll_inode_size_lock(inode, 1);
918         /*
919          * Consistency guarantees: following possibilities exist for the
920          * relation between region being read and real file size at this
921          * moment:
922          *
923          *  (A): the region is completely inside of the file;
924          *
925          *  (B-x): x bytes of region are inside of the file, the rest is
926          *  outside;
927          *
928          *  (C): the region is completely outside of the file.
929          *
930          * This classification is stable under DLM lock acquired by
931          * ll_tree_lock() above, because to change class, other client has to
932          * take DLM lock conflicting with our lock. Also, any updates to
933          * ->i_size by other threads on this client are serialized by
934          * ll_inode_size_lock(). This guarantees that short reads are handled
935          * correctly in the face of concurrent writes and truncates.
936          */
937         inode_init_lvb(inode, &lvb);
938         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
939         kms = lvb.lvb_size;
940         if (*ppos + count - 1 > kms) {
941                 /* A glimpse is necessary to determine whether we return a
942                  * short read (B) or some zeroes at the end of the buffer (C) */
943                 ll_inode_size_unlock(inode, 1);
944                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
945                 if (retval)
946                         goto out;
947         } else {
948                 /* region is within kms and, hence, within real file size (A) */
949                 inode->i_size = kms;
950                 ll_inode_size_unlock(inode, 1);
951         }
952
953         CDEBUG(D_INFO, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
954                inode->i_ino, count, *ppos, inode->i_size);
955
956         /* turn off the kernel's read-ahead */
957 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
958         file->f_ramax = 0;
959 #else
960         file->f_ra.ra_pages = 0;
961 #endif
962         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
963         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
964         ll_ra_read_in(file, &bead);
965         /* BUG: 5972 */
966         file_accessed(file);
967         retval = generic_file_read(file, buf, count, ppos);
968         ll_ra_read_ex(file, &bead);
969
970  out:
971         ll_tree_unlock(&tree);
972         RETURN(retval);
973 }
974
975 /*
976  * Write to a file (through the page cache).
977  */
978 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
979                              loff_t *ppos)
980 {
981         struct inode *inode = file->f_dentry->d_inode;
982         struct ll_lock_tree tree;
983         struct ll_lock_tree_node *node;
984         loff_t maxbytes = ll_file_maxbytes(inode);
985         ssize_t retval;
986         int rc;
987         ENTRY;
988         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
989                inode->i_ino, inode->i_generation, inode, count, *ppos);
990
991         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
992
993         /* POSIX, but surprised the VFS doesn't check this already */
994         if (count == 0)
995                 RETURN(0);
996
997         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
998          * called on the file, don't fail the below assertion (bug 2388). */
999         if (file->f_flags & O_LOV_DELAY_CREATE &&
1000             ll_i2info(inode)->lli_smd == NULL)
1001                 RETURN(-EBADF);
1002
1003         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1004
1005         if (file->f_flags & O_APPEND)
1006                 node = ll_node_from_inode(inode, 0, OBD_OBJECT_EOF, LCK_PW);
1007         else
1008                 node = ll_node_from_inode(inode, *ppos, *ppos  + count - 1,
1009                                           LCK_PW);
1010
1011         if (IS_ERR(node))
1012                 RETURN(PTR_ERR(node));
1013
1014         tree.lt_fd = LUSTRE_FPRIVATE(file);
1015         rc = ll_tree_lock(&tree, node, buf, count,
1016                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1017         if (rc != 0)
1018                 RETURN(rc);
1019
1020         /* this is ok, g_f_w will overwrite this under i_mutex if it races
1021          * with a local truncate, it just makes our maxbyte checking easier */
1022         if (file->f_flags & O_APPEND)
1023                 *ppos = inode->i_size;
1024
1025         if (*ppos >= maxbytes) {
1026                 if (count || *ppos > maxbytes) {
1027                         send_sig(SIGXFSZ, current, 0);
1028                         GOTO(out, retval = -EFBIG);
1029                 }
1030         }
1031         if (*ppos + count > maxbytes)
1032                 count = maxbytes - *ppos;
1033
1034         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1035                inode->i_ino, count, *ppos);
1036
1037         /* generic_file_write handles O_APPEND after getting i_mutex */
1038         retval = generic_file_write(file, buf, count, ppos);
1039
1040 out:
1041         ll_tree_unlock(&tree);
1042         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WRITE_BYTES,
1043                             retval > 0 ? retval : 0);
1044         RETURN(retval);
1045 }
1046
1047 /*
1048  * Send file content (through pagecache) somewhere with helper
1049  */
1050 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1051 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1052                                 read_actor_t actor, void *target)
1053 {
1054         struct inode *inode = in_file->f_dentry->d_inode;
1055         struct ll_inode_info *lli = ll_i2info(inode);
1056         struct lov_stripe_md *lsm = lli->lli_smd;
1057         struct ll_lock_tree tree;
1058         struct ll_lock_tree_node *node;
1059         struct ost_lvb lvb;
1060         struct ll_ra_read bead;
1061         int rc;
1062         ssize_t retval;
1063         __u64 kms;
1064         ENTRY;
1065         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1066                inode->i_ino, inode->i_generation, inode, count, *ppos);
1067
1068         /* "If nbyte is 0, read() will return 0 and have no other results."
1069          *                      -- Single Unix Spec */
1070         if (count == 0)
1071                 RETURN(0);
1072
1073         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
1074                             count);
1075
1076         /* File with no objects, nothing to lock */
1077         if (!lsm)
1078                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1079
1080         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1081         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1082         rc = ll_tree_lock(&tree, node, NULL, count,
1083                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1084         if (rc != 0)
1085                 RETURN(rc);
1086
1087         ll_inode_size_lock(inode, 1);
1088         /*
1089          * Consistency guarantees: following possibilities exist for the
1090          * relation between region being read and real file size at this
1091          * moment:
1092          *
1093          *  (A): the region is completely inside of the file;
1094          *
1095          *  (B-x): x bytes of region are inside of the file, the rest is
1096          *  outside;
1097          *
1098          *  (C): the region is completely outside of the file.
1099          *
1100          * This classification is stable under DLM lock acquired by
1101          * ll_tree_lock() above, because to change class, other client has to
1102          * take DLM lock conflicting with our lock. Also, any updates to
1103          * ->i_size by other threads on this client are serialized by
1104          * ll_inode_size_lock(). This guarantees that short reads are handled
1105          * correctly in the face of concurrent writes and truncates.
1106          */
1107         inode_init_lvb(inode, &lvb);
1108         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1109         kms = lvb.lvb_size;
1110         if (*ppos + count - 1 > kms) {
1111                 /* A glimpse is necessary to determine whether we return a
1112                  * short read (B) or some zeroes at the end of the buffer (C) */
1113                 ll_inode_size_unlock(inode, 1);
1114                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1115                 if (retval)
1116                         goto out;
1117         } else {
1118                 /* region is within kms and, hence, within real file size (A) */
1119                 inode->i_size = kms;
1120                 ll_inode_size_unlock(inode, 1);
1121         }
1122
1123         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1124                inode->i_ino, count, *ppos, inode->i_size);
1125
1126         /* turn off the kernel's read-ahead */
1127         in_file->f_ra.ra_pages = 0;
1128
1129         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1130         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1131         ll_ra_read_in(in_file, &bead);
1132         /* BUG: 5972 */
1133         file_accessed(in_file);
1134         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1135         ll_ra_read_ex(in_file, &bead);
1136
1137  out:
1138         ll_tree_unlock(&tree);
1139         RETURN(retval);
1140 }
1141 #endif
1142
1143 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1144                                unsigned long arg)
1145 {
1146         struct ll_inode_info *lli = ll_i2info(inode);
1147         struct obd_export *exp = ll_i2dtexp(inode);
1148         struct ll_recreate_obj ucreatp;
1149         struct obd_trans_info oti = { 0 };
1150         struct obdo *oa = NULL;
1151         int lsm_size;
1152         int rc = 0;
1153         struct lov_stripe_md *lsm, *lsm2;
1154         ENTRY;
1155
1156         if (!capable (CAP_SYS_ADMIN))
1157                 RETURN(-EPERM);
1158
1159         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1160                             sizeof(struct ll_recreate_obj));
1161         if (rc) {
1162                 RETURN(-EFAULT);
1163         }
1164         oa = obdo_alloc();
1165         if (oa == NULL)
1166                 RETURN(-ENOMEM);
1167
1168         down(&lli->lli_open_sem);
1169         lsm = lli->lli_smd;
1170         if (lsm == NULL)
1171                 GOTO(out, rc = -ENOENT);
1172         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1173                    (lsm->lsm_stripe_count));
1174
1175         OBD_ALLOC(lsm2, lsm_size);
1176         if (lsm2 == NULL)
1177                 GOTO(out, rc = -ENOMEM);
1178
1179         oa->o_id = ucreatp.lrc_id;
1180         oa->o_nlink = ucreatp.lrc_ost_idx;
1181         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1182         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
1183         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1184                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1185
1186         oti.oti_objid = NULL;
1187         memcpy(lsm2, lsm, lsm_size);
1188         rc = obd_create(exp, oa, &lsm2, &oti);
1189
1190         OBD_FREE(lsm2, lsm_size);
1191         GOTO(out, rc);
1192 out:
1193         up(&lli->lli_open_sem);
1194         obdo_free(oa);
1195         return rc;
1196 }
1197
1198 static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1199                                     int flags, struct lov_user_md *lum,
1200                                     int lum_size)
1201 {
1202         struct ll_inode_info *lli = ll_i2info(inode);
1203         struct file *f = NULL;
1204         struct obd_export *dt_exp = ll_i2dtexp(inode);
1205         struct obd_export *md_exp = ll_i2mdexp(inode);
1206         struct lov_stripe_md *lsm;
1207         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1208         struct ptlrpc_request *req = NULL;
1209         struct ll_file_data *fd;
1210         int rc = 0;
1211         struct lustre_md md;
1212         ENTRY;
1213
1214         down(&lli->lli_open_sem);
1215         lsm = lli->lli_smd;
1216         if (lsm) {
1217                 up(&lli->lli_open_sem);
1218                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1219                        inode->i_ino);
1220                 RETURN(-EEXIST);
1221         }
1222
1223         fd = ll_file_data_get();
1224         if (fd == NULL)
1225                 GOTO(out, -ENOMEM);
1226
1227         f = get_empty_filp();
1228         if (!f)
1229                 GOTO(out, -ENOMEM);
1230
1231         f->f_dentry = dget(file->f_dentry);
1232         f->f_vfsmnt = mntget(file->f_vfsmnt);
1233
1234         rc = ll_intent_file_open(f, lum, lum_size, &oit);
1235         if (rc)
1236                 GOTO(out, rc);
1237         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1238                 GOTO(out, -ENOENT);
1239         req = oit.d.lustre.it_data;
1240         rc = oit.d.lustre.it_status;
1241
1242         if (rc < 0)
1243                 GOTO(out, rc);
1244
1245         rc = md_get_lustre_md(md_exp, req, 1, dt_exp, &md);
1246         if (rc)
1247                 GOTO(out, rc);
1248         ll_update_inode(f->f_dentry->d_inode, &md);
1249
1250         rc = ll_local_open(f, &oit, fd);
1251         if (rc)
1252                 GOTO(out, rc);
1253         fd = NULL;
1254         ll_intent_release(&oit);
1255
1256         rc = ll_file_release(f->f_dentry->d_inode, f);
1257
1258  out:
1259         if (f)
1260                 fput(f);
1261         ll_file_data_put(fd);
1262         up(&lli->lli_open_sem);
1263         if (req != NULL)
1264                 ptlrpc_req_finished(req);
1265         RETURN(rc);
1266 }
1267
1268 static int ll_lov_setea(struct inode *inode, struct file *file,
1269                             unsigned long arg)
1270 {
1271         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1272         struct lov_user_md  *lump;
1273         int lum_size = sizeof(struct lov_user_md) +
1274                        sizeof(struct lov_user_ost_data);
1275         int rc;
1276         ENTRY;
1277
1278         if (!capable (CAP_SYS_ADMIN))
1279                 RETURN(-EPERM);
1280
1281         OBD_ALLOC(lump, lum_size);
1282         if (lump == NULL) {
1283                 RETURN(-ENOMEM);
1284         }
1285         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1286         if (rc) {
1287                 OBD_FREE(lump, lum_size);
1288                 RETURN(-EFAULT);
1289         }
1290
1291         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1292
1293         OBD_FREE(lump, lum_size);
1294         RETURN(rc);
1295 }
1296
1297 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1298                             unsigned long arg)
1299 {
1300         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1301         int rc;
1302         int flags = FMODE_WRITE;
1303         ENTRY;
1304
1305         /* Bug 1152: copy properly when this is no longer true */
1306         LASSERT(sizeof(lum) == sizeof(*lump));
1307         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1308         rc = copy_from_user(&lum, lump, sizeof(lum));
1309         if (rc)
1310                 RETURN(-EFAULT);
1311
1312         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1313         if (rc == 0) {
1314                  put_user(0, &lump->lmm_stripe_count);
1315                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1316                                     0, ll_i2info(inode)->lli_smd, lump);
1317         }
1318         RETURN(rc);
1319 }
1320
1321 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1322 {
1323         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1324
1325         if (!lsm)
1326                 RETURN(-ENODATA);
1327
1328         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1329                             (void *)arg);
1330 }
1331
1332 static int ll_get_grouplock(struct inode *inode, struct file *file,
1333                             unsigned long arg)
1334 {
1335         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1336         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1337                                                     .end = OBD_OBJECT_EOF}};
1338         struct lustre_handle lockh = { 0 };
1339         struct ll_inode_info *lli = ll_i2info(inode);
1340         struct lov_stripe_md *lsm = lli->lli_smd;
1341         int flags = 0, rc;
1342         ENTRY;
1343
1344         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1345                 RETURN(-EINVAL);
1346         }
1347
1348         policy.l_extent.gid = arg;
1349         if (file->f_flags & O_NONBLOCK)
1350                 flags = LDLM_FL_BLOCK_NOWAIT;
1351
1352         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1353         if (rc)
1354                 RETURN(rc);
1355
1356         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1357         fd->fd_gid = arg;
1358         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1359
1360         RETURN(0);
1361 }
1362
1363 static int ll_put_grouplock(struct inode *inode, struct file *file,
1364                             unsigned long arg)
1365 {
1366         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1367         struct ll_inode_info *lli = ll_i2info(inode);
1368         struct lov_stripe_md *lsm = lli->lli_smd;
1369         int rc;
1370         ENTRY;
1371
1372         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1373                 /* Ugh, it's already unlocked. */
1374                 RETURN(-EINVAL);
1375         }
1376
1377         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1378                 RETURN(-EINVAL);
1379
1380         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1381
1382         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1383         if (rc)
1384                 RETURN(rc);
1385
1386         fd->fd_gid = 0;
1387         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1388
1389         RETURN(0);
1390 }
1391
1392 static int join_sanity_check(struct inode *head, struct inode *tail)
1393 {
1394         ENTRY;
1395         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
1396                 CERROR("server do not support join \n");
1397                 RETURN(-EINVAL);
1398         }
1399         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
1400                 CERROR("tail ino %lu and ino head %lu must be regular\n",
1401                        head->i_ino, tail->i_ino);
1402                 RETURN(-EINVAL);
1403         }
1404         if (head->i_ino == tail->i_ino) {
1405                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
1406                 RETURN(-EINVAL);
1407         }
1408         if (head->i_size % JOIN_FILE_ALIGN) {
1409                 CERROR("hsize" LPU64 " must be times of 64K\n",
1410                         head->i_size);
1411                 RETURN(-EINVAL);
1412         }
1413         RETURN(0);
1414 }
1415
1416 static int join_file(struct inode *head_inode, struct file *head_filp,
1417                      struct file *tail_filp)
1418 {
1419         struct inode *tail_inode, *tail_parent;
1420         struct dentry *tail_dentry = tail_filp->f_dentry;
1421         struct lookup_intent oit = {.it_op = IT_OPEN,
1422                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
1423         struct ptlrpc_request *req = NULL;
1424         struct ll_file_data *fd;
1425         struct lustre_handle lockh;
1426         struct md_op_data *op_data;
1427         __u32  hsize = head_inode->i_size >> 32;
1428         __u32  tsize = head_inode->i_size;
1429         struct file *f;
1430         int    rc;
1431         ENTRY;
1432
1433         tail_dentry = tail_filp->f_dentry;
1434         tail_inode = tail_dentry->d_inode;
1435         tail_parent = tail_dentry->d_parent->d_inode;
1436
1437         fd = ll_file_data_get();
1438         if (fd == NULL)
1439                 RETURN(-ENOMEM);
1440
1441         OBD_ALLOC_PTR(op_data);
1442         if (op_data == NULL) {
1443                 ll_file_data_put(fd);
1444                 RETURN(-ENOMEM);
1445         }
1446
1447         f = get_empty_filp();
1448         if (f == NULL)
1449                 GOTO(out, rc = -ENOMEM);
1450
1451         f->f_dentry = dget(head_filp->f_dentry);
1452         f->f_vfsmnt = mntget(head_filp->f_vfsmnt);
1453
1454         ll_prepare_md_op_data(op_data, head_inode, tail_parent,
1455                               tail_dentry->d_name.name,
1456                               tail_dentry->d_name.len, 0);
1457         
1458         rc = md_enqueue(ll_i2mdexp(head_inode), LDLM_IBITS, &oit, LCK_PW,
1459                         op_data, &lockh, &tsize, 0, ldlm_completion_ast,
1460                         ll_md_blocking_ast, &hsize, 0);
1461
1462         if (rc < 0)
1463                 GOTO(out, rc);
1464
1465         req = oit.d.lustre.it_data;
1466         rc = oit.d.lustre.it_status;
1467
1468         if (rc < 0)
1469                 GOTO(out, rc);
1470
1471         rc = ll_local_open(f, &oit, fd);
1472         LASSERTF(rc == 0, "rc = %d\n", rc);
1473
1474         fd = NULL;
1475         ll_intent_release(&oit);
1476
1477         rc = ll_file_release(f->f_dentry->d_inode, f);
1478 out:
1479         if (op_data)
1480                 OBD_FREE_PTR(op_data);
1481         if (f)
1482                 fput(f);
1483         ll_file_data_put(fd);
1484         ptlrpc_req_finished(req);
1485         RETURN(rc);
1486 }
1487
1488 static int ll_file_join(struct inode *head, struct file *filp,
1489                         char *filename_tail)
1490 {
1491         struct inode *tail = NULL, *first = NULL, *second = NULL;
1492         struct dentry *tail_dentry;
1493         struct file *tail_filp, *first_filp, *second_filp;
1494         struct ll_lock_tree first_tree, second_tree;
1495         struct ll_lock_tree_node *first_node, *second_node;
1496         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
1497         int rc = 0, cleanup_phase = 0;
1498         ENTRY;
1499
1500         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
1501                head->i_ino, head->i_generation, head, filename_tail);
1502
1503         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
1504         if (IS_ERR(tail_filp)) {
1505                 CERROR("Can not open tail file %s", filename_tail);
1506                 rc = PTR_ERR(tail_filp);
1507                 GOTO(cleanup, rc);
1508         }
1509         tail = igrab(tail_filp->f_dentry->d_inode);
1510
1511         tlli = ll_i2info(tail);
1512         tail_dentry = tail_filp->f_dentry;
1513         LASSERT(tail_dentry);
1514         cleanup_phase = 1;
1515
1516         /*reorder the inode for lock sequence*/
1517         first = head->i_ino > tail->i_ino ? head : tail;
1518         second = head->i_ino > tail->i_ino ? tail : head;
1519         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
1520         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
1521
1522         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
1523                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
1524         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
1525         if (IS_ERR(first_node)){
1526                 rc = PTR_ERR(first_node);
1527                 GOTO(cleanup, rc);
1528         }
1529         first_tree.lt_fd = first_filp->private_data;
1530         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
1531         if (rc != 0)
1532                 GOTO(cleanup, rc);
1533         cleanup_phase = 2;
1534
1535         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
1536         if (IS_ERR(second_node)){
1537                 rc = PTR_ERR(second_node);
1538                 GOTO(cleanup, rc);
1539         }
1540         second_tree.lt_fd = second_filp->private_data;
1541         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
1542         if (rc != 0)
1543                 GOTO(cleanup, rc);
1544         cleanup_phase = 3;
1545
1546         rc = join_sanity_check(head, tail);
1547         if (rc)
1548                 GOTO(cleanup, rc);
1549
1550         rc = join_file(head, filp, tail_filp);
1551         if (rc)
1552                 GOTO(cleanup, rc);
1553 cleanup:
1554         switch (cleanup_phase) {
1555         case 3:
1556                 ll_tree_unlock(&second_tree);
1557                 obd_cancel_unused(ll_i2dtexp(second),
1558                                   ll_i2info(second)->lli_smd, 0, NULL);
1559         case 2:
1560                 ll_tree_unlock(&first_tree);
1561                 obd_cancel_unused(ll_i2dtexp(first),
1562                                   ll_i2info(first)->lli_smd, 0, NULL);
1563         case 1:
1564                 filp_close(tail_filp, 0);
1565                 if (tail)
1566                         iput(tail);
1567                 if (head && rc == 0) {
1568                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
1569                                        &hlli->lli_smd);
1570                         hlli->lli_smd = NULL;
1571                 }
1572         case 0:
1573                 break;
1574         default:
1575                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1576                 LBUG();
1577         }
1578         RETURN(rc);
1579 }
1580
1581 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1582 {
1583         struct inode *inode = dentry->d_inode;
1584         struct obd_client_handle *och;
1585         int rc;
1586         ENTRY;
1587
1588         LASSERT(inode);
1589
1590         /* Root ? Do nothing. */
1591         if (dentry->d_inode->i_sb->s_root == dentry)
1592                 RETURN(0);
1593
1594         /* No open handle to close? Move away */
1595         if (!it_disposition(it, DISP_OPEN_OPEN))
1596                 RETURN(0);
1597
1598         OBD_ALLOC(och, sizeof(*och));
1599         if (!och)
1600                 GOTO(out, rc = -ENOMEM);
1601
1602         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
1603                     ll_i2info(inode), it, och);
1604
1605         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, 
1606                                        inode, och);
1607
1608         OBD_FREE(och, sizeof(*och));
1609  out:
1610         /* this one is in place of ll_file_open */
1611         ptlrpc_req_finished(it->d.lustre.it_data);
1612         RETURN(rc);
1613 }
1614
1615 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
1616                   unsigned long arg)
1617 {
1618         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1619         int flags;
1620         ENTRY;
1621
1622         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1623                inode->i_generation, inode, cmd);
1624
1625         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
1626         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
1627                 RETURN(-ENOTTY);
1628
1629         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_IOCTL);
1630         switch(cmd) {
1631         case LL_IOC_GETFLAGS:
1632                 /* Get the current value of the file flags */
1633                 return put_user(fd->fd_flags, (int *)arg);
1634         case LL_IOC_SETFLAGS:
1635         case LL_IOC_CLRFLAGS:
1636                 /* Set or clear specific file flags */
1637                 /* XXX This probably needs checks to ensure the flags are
1638                  *     not abused, and to handle any flag side effects.
1639                  */
1640                 if (get_user(flags, (int *) arg))
1641                         RETURN(-EFAULT);
1642
1643                 if (cmd == LL_IOC_SETFLAGS) {
1644                         if ((flags & LL_FILE_IGNORE_LOCK) &&
1645                             !(file->f_flags & O_DIRECT)) {
1646                                 CERROR("%s: unable to disable locking on "
1647                                        "non-O_DIRECT file\n", current->comm);
1648                                 RETURN(-EINVAL);
1649                         }
1650
1651                         fd->fd_flags |= flags;
1652                 } else {
1653                         fd->fd_flags &= ~flags;
1654                 }
1655                 RETURN(0);
1656         case LL_IOC_LOV_SETSTRIPE:
1657                 RETURN(ll_lov_setstripe(inode, file, arg));
1658         case LL_IOC_LOV_SETEA:
1659                 RETURN(ll_lov_setea(inode, file, arg));
1660         case LL_IOC_LOV_GETSTRIPE:
1661                 RETURN(ll_lov_getstripe(inode, arg));
1662         case LL_IOC_RECREATE_OBJ:
1663                 RETURN(ll_lov_recreate_obj(inode, file, arg));
1664         case EXT3_IOC_GETFLAGS:
1665         case EXT3_IOC_SETFLAGS:
1666                 RETURN(ll_iocontrol(inode, file, cmd, arg));
1667         case EXT3_IOC_GETVERSION_OLD:
1668         case EXT3_IOC_GETVERSION:
1669                 RETURN(put_user(inode->i_generation, (int *)arg));
1670         case LL_IOC_JOIN: {
1671                 char *ftail;
1672                 int rc;
1673
1674                 ftail = getname((const char *)arg);
1675                 if (IS_ERR(ftail))
1676                         RETURN(PTR_ERR(ftail));
1677                 rc = ll_file_join(inode, file, ftail);
1678                 putname(ftail);
1679                 RETURN(rc);
1680         }
1681         case LL_IOC_GROUP_LOCK:
1682                 RETURN(ll_get_grouplock(inode, file, arg));
1683         case LL_IOC_GROUP_UNLOCK:
1684                 RETURN(ll_put_grouplock(inode, file, arg));
1685         case LL_IOC_OBD_STATFS:
1686                 RETURN(ll_obd_statfs(inode, (void *)arg));
1687
1688         /* We need to special case any other ioctls we want to handle,
1689          * to send them to the MDS/OST as appropriate and to properly
1690          * network encode the arg field.
1691         case EXT3_IOC_SETVERSION_OLD:
1692         case EXT3_IOC_SETVERSION:
1693         */
1694         default:
1695                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
1696                                      (void *)arg));
1697         }
1698 }
1699
1700 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
1701 {
1702         struct inode *inode = file->f_dentry->d_inode;
1703         struct ll_inode_info *lli = ll_i2info(inode);
1704         struct lov_stripe_md *lsm = lli->lli_smd;
1705         loff_t retval;
1706         ENTRY;
1707         retval = offset + ((origin == 2) ? inode->i_size :
1708                            (origin == 1) ? file->f_pos : 0);
1709         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
1710                inode->i_ino, inode->i_generation, inode, retval, retval,
1711                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
1712
1713         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
1714         if (origin == 2) { /* SEEK_END */
1715                 int nonblock = 0, rc;
1716
1717                 if (file->f_flags & O_NONBLOCK)
1718                         nonblock = LDLM_FL_BLOCK_NOWAIT;
1719
1720                 if (lsm != NULL) {
1721                         rc = ll_glimpse_size(inode, nonblock);
1722                         if (rc != 0)
1723                                 RETURN(rc);
1724                 }
1725
1726                 ll_inode_size_lock(inode, 0);
1727                 offset += inode->i_size;
1728                 ll_inode_size_unlock(inode, 0);
1729         } else if (origin == 1) { /* SEEK_CUR */
1730                 offset += file->f_pos;
1731         }
1732
1733         retval = -EINVAL;
1734         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
1735                 if (offset != file->f_pos) {
1736                         file->f_pos = offset;
1737 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1738                         file->f_reada = 0;
1739                         file->f_version = ++event;
1740 #endif
1741                 }
1742                 retval = offset;
1743         }
1744
1745         RETURN(retval);
1746 }
1747
1748 int ll_fsync(struct file *file, struct dentry *dentry, int data)
1749 {
1750         struct inode *inode = dentry->d_inode;
1751         struct ll_inode_info *lli = ll_i2info(inode);
1752         struct lov_stripe_md *lsm = lli->lli_smd;
1753         struct ptlrpc_request *req;
1754         int rc, err;
1755         ENTRY;
1756         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
1757                inode->i_generation, inode);
1758
1759         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_FSYNC);
1760
1761         /* fsync's caller has already called _fdata{sync,write}, we want
1762          * that IO to finish before calling the osc and mdc sync methods */
1763         rc = filemap_fdatawait(inode->i_mapping);
1764
1765         /* catch async errors that were recorded back when async writeback
1766          * failed for pages in this mapping. */
1767         err = lli->lli_async_rc;
1768         lli->lli_async_rc = 0;
1769         if (rc == 0)
1770                 rc = err;
1771         if (lsm) {
1772                 err = lov_test_and_clear_async_rc(lsm);
1773                 if (rc == 0)
1774                         rc = err;
1775         }
1776
1777         err = md_sync(ll_i2sbi(inode)->ll_md_exp,
1778                       ll_inode2fid(inode), &req);
1779         if (!rc)
1780                 rc = err;
1781         if (!err)
1782                 ptlrpc_req_finished(req);
1783
1784         if (data && lsm) {
1785                 struct obdo *oa = obdo_alloc();
1786
1787                 if (!oa)
1788                         RETURN(rc ? rc : -ENOMEM);
1789
1790                 oa->o_id = lsm->lsm_object_id;
1791                 oa->o_valid = OBD_MD_FLID;
1792                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1793                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1794
1795                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
1796                                0, OBD_OBJECT_EOF);
1797                 if (!rc)
1798                         rc = err;
1799                 obdo_free(oa);
1800         }
1801
1802         RETURN(rc);
1803 }
1804
1805 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
1806 {
1807         struct inode *inode = file->f_dentry->d_inode;
1808         struct ll_sb_info *sbi = ll_i2sbi(inode);
1809         struct obd_device *obddev;
1810         struct ldlm_res_id res_id =
1811                 { .name = { fid_seq(ll_inode2fid(inode)),
1812                             fid_oid(ll_inode2fid(inode)),
1813                             fid_ver(ll_inode2fid(inode)),
1814                             LDLM_FLOCK} };
1815         struct lustre_handle lockh = {0};
1816         ldlm_policy_data_t flock;
1817         ldlm_mode_t mode = 0;
1818         int flags = 0;
1819         int rc;
1820         ENTRY;
1821
1822         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
1823                inode->i_ino, file_lock);
1824
1825         flock.l_flock.pid = file_lock->fl_pid;
1826         flock.l_flock.start = file_lock->fl_start;
1827         flock.l_flock.end = file_lock->fl_end;
1828
1829         switch (file_lock->fl_type) {
1830         case F_RDLCK:
1831                 mode = LCK_PR;
1832                 break;
1833         case F_UNLCK:
1834                 /* An unlock request may or may not have any relation to
1835                  * existing locks so we may not be able to pass a lock handle
1836                  * via a normal ldlm_lock_cancel() request. The request may even
1837                  * unlock a byte range in the middle of an existing lock. In
1838                  * order to process an unlock request we need all of the same
1839                  * information that is given with a normal read or write record
1840                  * lock request. To avoid creating another ldlm unlock (cancel)
1841                  * message we'll treat a LCK_NL flock request as an unlock. */
1842                 mode = LCK_NL;
1843                 break;
1844         case F_WRLCK:
1845                 mode = LCK_PW;
1846                 break;
1847         default:
1848                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
1849                 LBUG();
1850         }
1851
1852         switch (cmd) {
1853         case F_SETLKW:
1854 #ifdef F_SETLKW64
1855         case F_SETLKW64:
1856 #endif
1857                 flags = 0;
1858                 break;
1859         case F_SETLK:
1860 #ifdef F_SETLK64
1861         case F_SETLK64:
1862 #endif
1863                 flags = LDLM_FL_BLOCK_NOWAIT;
1864                 break;
1865         case F_GETLK:
1866 #ifdef F_GETLK64
1867         case F_GETLK64:
1868 #endif
1869                 flags = LDLM_FL_TEST_LOCK;
1870                 /* Save the old mode so that if the mode in the lock changes we
1871                  * can decrement the appropriate reader or writer refcount. */
1872                 file_lock->fl_type = mode;
1873                 break;
1874         default:
1875                 CERROR("unknown fcntl lock command: %d\n", cmd);
1876                 LBUG();
1877         }
1878
1879         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
1880                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
1881                flags, mode, flock.l_flock.start, flock.l_flock.end);
1882
1883         obddev = sbi->ll_md_exp->exp_obd;
1884         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, obddev->obd_namespace,
1885                               res_id, LDLM_FLOCK, &flock, mode, &flags,
1886                               NULL, ldlm_flock_completion_ast, NULL, file_lock,
1887                               NULL, 0, NULL, &lockh);
1888         RETURN(rc);
1889 }
1890
1891 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
1892 {
1893         struct lookup_intent oit = { .it_op = IT_GETATTR };
1894         struct md_op_data op_data = { { 0 } };
1895         struct inode *inode = dentry->d_inode;
1896         struct ptlrpc_request *req = NULL;
1897         struct ll_inode_info *lli;
1898         struct ll_sb_info *sbi;
1899         int rc;
1900         ENTRY;
1901
1902         if (!inode) {
1903                 CERROR("REPORT THIS LINE TO PETER\n");
1904                 RETURN(0);
1905         }
1906         sbi = ll_i2sbi(inode);
1907         lli = ll_i2info(inode);
1908         
1909         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
1910                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
1911 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
1912         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE);
1913 #endif
1914
1915         ll_prepare_md_op_data(&op_data, inode, inode, NULL, 0, 0);
1916         
1917         rc = md_intent_lock(sbi->ll_md_exp, &op_data, NULL, 0, &oit, 0,
1918                             &req, ll_md_blocking_ast, 0);
1919
1920         if (rc < 0)
1921                 GOTO(out, rc);
1922
1923         rc = ll_revalidate_it_finish(req, 1, &oit, dentry);
1924         if (rc)
1925                 GOTO(out, rc);
1926
1927         ll_lookup_finish_locks(&oit, dentry);
1928         
1929         /* object is allocated, validate size */
1930         if (lli->lli_smd) {
1931                 /* ll_glimpse_size will prefer locally cached writes if they
1932                  * extend the file */
1933                 rc = ll_glimpse_size(inode, 0);
1934         }
1935         EXIT;
1936 out:
1937         if (req)
1938                 ptlrpc_req_finished(req);
1939         return rc;
1940 }
1941
1942 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1943 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
1944                   struct lookup_intent *it, struct kstat *stat)
1945 {
1946         struct inode *inode = de->d_inode;
1947         int res = 0;
1948
1949         res = ll_inode_revalidate_it(de, it);
1950         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
1951
1952         if (res)
1953                 return res;
1954
1955         stat->dev = inode->i_sb->s_dev;
1956         stat->ino = inode->i_ino;
1957         stat->mode = inode->i_mode;
1958         stat->nlink = inode->i_nlink;
1959         stat->uid = inode->i_uid;
1960         stat->gid = inode->i_gid;
1961         stat->rdev = kdev_t_to_nr(inode->i_rdev);
1962         stat->atime = inode->i_atime;
1963         stat->mtime = inode->i_mtime;
1964         stat->ctime = inode->i_ctime;
1965         stat->blksize = inode->i_blksize;
1966
1967         ll_inode_size_lock(inode, 0);
1968         stat->size = inode->i_size;
1969         stat->blocks = inode->i_blocks;
1970         ll_inode_size_unlock(inode, 0);
1971
1972         return 0;
1973 }
1974 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
1975 {
1976         struct lookup_intent it = { .it_op = IT_GETATTR };
1977
1978         return ll_getattr_it(mnt, de, &it, stat);
1979 }
1980 #endif
1981
1982 static
1983 int lustre_check_acl(struct inode *inode, int mask)
1984 {
1985 #ifdef CONFIG_FS_POSIX_ACL
1986         struct ll_inode_info *lli = ll_i2info(inode);
1987         struct posix_acl *acl;
1988         int rc;
1989         ENTRY;
1990
1991         spin_lock(&lli->lli_lock);
1992         acl = posix_acl_dup(lli->lli_posix_acl);
1993         spin_unlock(&lli->lli_lock);
1994
1995         if (!acl)
1996                 RETURN(-EAGAIN);
1997
1998         rc = posix_acl_permission(inode, acl, mask);
1999         posix_acl_release(acl);
2000
2001         RETURN(rc);
2002 #else
2003         return -EAGAIN;
2004 #endif
2005 }
2006
2007 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2008 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2009 {
2010         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2011                inode->i_ino, inode->i_generation, inode, mask);
2012         return generic_permission(inode, mask, lustre_check_acl);
2013 }
2014 #else
2015 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
2016 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2017 #else
2018 int ll_inode_permission(struct inode *inode, int mask)
2019 #endif
2020 {
2021         int mode = inode->i_mode;
2022         int rc;
2023
2024         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2025                inode->i_ino, inode->i_generation, inode, mask);
2026
2027         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2028             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2029                 return -EROFS;
2030         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2031                 return -EACCES;
2032         if (current->fsuid == inode->i_uid) {
2033                 mode >>= 6;
2034         } else if (1) {
2035                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2036                         goto check_groups;
2037                 rc = lustre_check_acl(inode, mask);
2038                 if (rc == -EAGAIN)
2039                         goto check_groups;
2040                 if (rc == -EACCES)
2041                         goto check_capabilities;
2042                 return rc;
2043         } else {
2044 check_groups:
2045                 if (in_group_p(inode->i_gid))
2046                         mode >>= 3;
2047         }
2048         if ((mode & mask & S_IRWXO) == mask)
2049                 return 0;
2050
2051 check_capabilities:
2052         if (!(mask & MAY_EXEC) ||
2053             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2054                 if (capable(CAP_DAC_OVERRIDE))
2055                         return 0;
2056
2057         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2058             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2059                 return 0;
2060         return -EACCES;
2061 }
2062 #endif
2063
2064 struct file_operations ll_file_operations = {
2065         .read           = ll_file_read,
2066         .write          = ll_file_write,
2067         .ioctl          = ll_file_ioctl,
2068         .open           = ll_file_open,
2069         .release        = ll_file_release,
2070         .mmap           = ll_file_mmap,
2071         .llseek         = ll_file_seek,
2072 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2073         .sendfile       = ll_file_sendfile,
2074 #endif
2075         .fsync          = ll_fsync,
2076         /* .lock           = ll_file_flock */
2077 };
2078
2079 struct file_operations ll_file_operations_flock = {
2080         .read           = ll_file_read,
2081         .write          = ll_file_write,
2082         .ioctl          = ll_file_ioctl,
2083         .open           = ll_file_open,
2084         .release        = ll_file_release,
2085         .mmap           = ll_file_mmap,
2086         .llseek         = ll_file_seek,
2087 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2088         .sendfile       = ll_file_sendfile,
2089 #endif
2090         .fsync          = ll_fsync,
2091         .lock           = ll_file_flock
2092 };
2093
2094
2095 struct inode_operations ll_file_inode_operations = {
2096         .setattr_raw    = ll_setattr_raw,
2097         .setattr        = ll_setattr,
2098         .truncate       = ll_truncate,
2099 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2100         .getattr_it     = ll_getattr_it,
2101 #else
2102         .revalidate_it  = ll_inode_revalidate_it,
2103 #endif
2104         .permission     = ll_inode_permission,
2105         .setxattr       = ll_setxattr,
2106         .getxattr       = ll_getxattr,
2107         .listxattr      = ll_listxattr,
2108         .removexattr    = ll_removexattr,
2109 };
2110