Whamcloud - gitweb
land lustre part of b_hd_sec on HEAD.
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <linux/lustre_dlm.h>
27 #include <linux/lustre_lite.h>
28 #include <linux/pagemap.h>
29 #include <linux/file.h>
30 #include <linux/lustre_acl.h>
31 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
32 #include <linux/lustre_compat25.h>
33 #endif
34 #include "llite_internal.h"
35 #include <linux/obd_lov.h>
36
37 #define XATTR_NAME_MAX  255
38 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
39                 struct file *file)
40 {
41         struct ll_file_data *fd = file->private_data;
42         struct ptlrpc_request *req = NULL;
43         struct obd_client_handle *och = &fd->fd_mds_och;
44         struct obdo *obdo = NULL;
45         int rc;
46         ENTRY;
47
48         /* clear group lock, if present */
49         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
50                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
51                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
52                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
53                                       &fd->fd_cwlockh);
54         }
55
56         obdo = obdo_alloc();
57         if (obdo == NULL)
58                 RETURN(-ENOMEM);
59
60         obdo->o_id = inode->i_ino;
61         obdo->o_valid = OBD_MD_FLID;
62         obdo_from_inode(obdo, inode, (OBD_MD_FLTYPE | OBD_MD_FLMODE |
63                                       OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
64                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
65                                       OBD_MD_FLCTIME));
66         if (0 /* ll_is_inode_dirty(inode) */) {
67                 obdo->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
68                 obdo->o_valid |= OBD_MD_FLFLAGS;
69         }
70         obdo->o_mds = id_group(&ll_i2info(inode)->lli_id);
71         rc = md_close(md_exp, obdo, och, &req);
72         obdo_free(obdo);
73
74         if (rc == EAGAIN) {
75                 /* We are the last writer, so the MDS has instructed us to get
76                  * the file size and any write cookies, then close again. */
77                 //ll_queue_done_writing(inode);
78                 rc = 0;
79         } else if (rc) {
80                 CERROR("inode %lu mdc close failed: rc = %d\n",
81                        inode->i_ino, rc);
82         }
83         if (rc == 0) {
84                 rc = ll_objects_destroy(req, file->f_dentry->d_inode, 1);
85                 if (rc)
86                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
87                                inode->i_ino, rc);
88         }
89
90         mdc_clear_open_replay_data(md_exp, och);
91         ptlrpc_req_finished(req);
92         och->och_fh.cookie = DEAD_HANDLE_MAGIC;
93         file->private_data = NULL;
94         OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof(*fd));
95         RETURN(rc);
96 }
97
98 /* While this returns an error code, fput() the caller does not, so we need
99  * to make every effort to clean up all of our state here.  Also, applications
100  * rarely check close errors and even if an error is returned they will not
101  * re-try the close call.
102  */
103 int ll_file_release(struct inode *inode, struct file *file)
104 {
105         struct ll_file_data *fd;
106         struct ll_sb_info *sbi = ll_i2sbi(inode);
107         int rc;
108
109         ENTRY;
110         CDEBUG(D_VFSTRACE, "VFS Op:inode="DLID4"(%p)\n",
111                OLID4(&ll_i2info(inode)->lli_id), inode);
112
113         /* don't do anything for / */
114         if (inode->i_sb->s_root == file->f_dentry)
115                 RETURN(0);
116
117         lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_RELEASE);
118         fd = (struct ll_file_data *)file->private_data;
119         LASSERT(fd != NULL);
120
121         rc = ll_md_close(sbi->ll_md_exp, inode, file);
122         RETURN(rc);
123 }
124
125 static int ll_intent_file_open(struct file *file, void *lmm,
126                                int lmmsize, struct lookup_intent *itp)
127 {
128         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
129         struct dentry *parent = file->f_dentry->d_parent;
130         const char *name = file->f_dentry->d_name.name;
131         const int len = file->f_dentry->d_name.len;
132         struct lustre_handle lockh;
133         struct mdc_op_data *op_data;
134         int rc;
135
136         if (!parent)
137                 RETURN(-ENOENT);
138
139         OBD_ALLOC(op_data, sizeof(*op_data));
140         if (op_data == NULL)
141                 RETURN(-ENOMEM);
142         ll_prepare_mdc_data(op_data, parent->d_inode, NULL, name, len, O_RDWR);
143
144         rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, itp, LCK_PR, op_data,
145                         &lockh, lmm, lmmsize, ldlm_completion_ast,
146                         ll_mdc_blocking_ast, NULL);
147         OBD_FREE(op_data, sizeof(*op_data));
148         if (rc == 0) {
149                 if (LUSTRE_IT(itp)->it_lock_mode)
150                         memcpy(&LUSTRE_IT(itp)->it_lock_handle,
151                                &lockh, sizeof(lockh));
152
153         } else if (rc < 0) {
154                 CERROR("lock enqueue: err: %d\n", rc);
155         }
156         
157         RETURN(rc);
158 }
159
160 int ll_local_open(struct file *file, struct lookup_intent *it)
161 {
162         struct ptlrpc_request *req = LUSTRE_IT(it)->it_data;
163         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
164         struct obd_export *md_exp = ll_i2mdexp(file->f_dentry->d_inode);
165         struct ll_file_data *fd;
166         struct mds_body *body;
167         ENTRY;
168
169         body = lustre_msg_buf (req->rq_repmsg, 1, sizeof (*body));
170         LASSERT (body != NULL);                 /* reply already checked out */
171         LASSERT_REPSWABBED (req, 1);            /* and swabbed down */
172
173         LASSERTF(file->private_data == NULL, "file %*s/%*s ino %lu/%u (%o)\n",
174                  file->f_dentry->d_name.len, file->f_dentry->d_name.name,
175                  file->f_dentry->d_parent->d_name.len,
176                  file->f_dentry->d_parent->d_name.name,
177                  file->f_dentry->d_inode->i_ino,
178                  file->f_dentry->d_inode->i_generation,
179                  file->f_dentry->d_inode->i_mode);
180
181
182         OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
183         
184         /* We can't handle this well without reorganizing ll_file_open and
185          * ll_md_close(), so don't even try right now. */
186         LASSERT(fd != NULL);
187
188         memcpy(&fd->fd_mds_och.och_fh, &body->handle, sizeof(body->handle));
189         fd->fd_mds_och.och_magic = OBD_CLIENT_HANDLE_MAGIC;
190         file->private_data = fd;
191         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
192
193         lli->lli_io_epoch = body->io_epoch;
194
195         mdc_set_open_replay_data(md_exp, &fd->fd_mds_och, LUSTRE_IT(it)->it_data);
196         
197         RETURN(0);
198 }
199
200 /* Open a file, and (for the very first open) create objects on the OSTs at
201  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
202  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
203  * lli_open_sem to ensure no other process will create objects, send the
204  * stripe MD to the MDS, or try to destroy the objects if that fails.
205  *
206  * If we already have the stripe MD locally then we don't request it in
207  * mdc_open(), by passing a lmm_size = 0.
208  *
209  * It is up to the application to ensure no other processes open this file
210  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
211  * used.  We might be able to avoid races of that sort by getting lli_open_sem
212  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
213  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
214  */
215 int ll_file_open(struct inode *inode, struct file *file)
216 {
217         struct ll_inode_info *lli = ll_i2info(inode);
218         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
219                                           .it_flags = file->f_flags };
220         struct lov_stripe_md *lsm;
221         struct ptlrpc_request *req;
222         int rc = 0;
223         ENTRY;
224
225         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
226                inode->i_generation, inode, file->f_flags);
227
228         /* don't do anything for / */
229         if (inode->i_sb->s_root == file->f_dentry)
230                 RETURN(0);
231
232         it = file->f_it;
233
234         if (!it || !LUSTRE_IT(it) || !LUSTRE_IT(it)->it_disposition) {
235                 it = &oit;
236                 rc = ll_intent_alloc(it);
237                 if (rc)
238                         GOTO(out, rc);
239                 rc = ll_intent_file_open(file, NULL, 0, it);
240                 if (rc)
241                         GOTO(out, rc);
242         }
243
244
245         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
246         /* mdc_intent_lock() didn't get a request ref if there was an open
247          * error, so don't do cleanup on the request here (bug 3430) */
248         rc = it_open_error(DISP_OPEN_OPEN, it);
249         if (rc)
250                 RETURN(rc);
251
252         rc = ll_local_open(file, it);
253
254         LASSERTF(rc == 0, "rc = %d\n", rc);
255
256         if (!S_ISREG(inode->i_mode))
257                 GOTO(out, rc);
258
259         lsm = lli->lli_smd;
260         if (lsm == NULL) {
261                 if (file->f_flags & O_LOV_DELAY_CREATE ||
262                     !(file->f_mode & FMODE_WRITE)) {
263                         CDEBUG(D_INODE, "object creation was delayed\n");
264                         GOTO(out, rc);
265                 }
266         }
267         file->f_flags &= ~O_LOV_DELAY_CREATE;
268         GOTO(out, rc);
269  out:
270         req = LUSTRE_IT(it)->it_data;
271         ll_intent_release(it);
272
273         ptlrpc_req_finished(req);
274         if (rc == 0)
275                 ll_open_complete(inode);
276         return rc;
277 }
278
279 /* Fills the obdo with the attributes for the inode defined by lsm */
280 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
281                    struct obdo *oa)
282 {
283         struct ptlrpc_request_set *set;
284         int rc;
285         ENTRY;
286
287         LASSERT(lsm != NULL);
288
289         memset(oa, 0, sizeof *oa);
290         oa->o_id = lsm->lsm_object_id;
291         oa->o_gr = lsm->lsm_object_gr;
292         oa->o_mode = S_IFREG;
293         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
294                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
295                 OBD_MD_FLCTIME | OBD_MD_FLGROUP;
296
297         set = ptlrpc_prep_set();
298         if (set == NULL) {
299                 rc = -ENOMEM;
300         } else {
301                 rc = obd_getattr_async(exp, oa, lsm, set);
302                 if (rc == 0)
303                         rc = ptlrpc_set_wait(set);
304                 ptlrpc_set_destroy(set);
305         }
306         if (rc)
307                 RETURN(rc);
308
309         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
310                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
311         RETURN(0);
312 }
313
314 static inline void ll_remove_suid(struct inode *inode)
315 {
316         unsigned int mode;
317
318         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
319         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
320
321         /* was any of the uid bits set? */
322         mode &= inode->i_mode;
323         if (mode && !capable(CAP_FSETID)) {
324                 inode->i_mode &= ~mode;
325                 // XXX careful here - we cannot change the size
326         }
327 }
328
329 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
330 {
331         struct ll_inode_info *lli = ll_i2info(inode);
332         struct lov_stripe_md *lsm = lli->lli_smd;
333         struct obd_export *exp = ll_i2dtexp(inode);
334         struct {
335                 char name[16];
336                 struct ldlm_lock *lock;
337                 struct lov_stripe_md *lsm;
338         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
339         __u32 stripe, vallen = sizeof(stripe);
340         int rc;
341         ENTRY;
342
343         if (lsm->lsm_stripe_count == 1)
344                 GOTO(check, stripe = 0);
345
346         /* get our offset in the lov */
347         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
348         if (rc != 0) {
349                 CERROR("obd_get_info: rc = %d\n", rc);
350                 RETURN(rc);
351         }
352         LASSERT(stripe < lsm->lsm_stripe_count);
353         EXIT;
354 check:
355         if (lsm->lsm_oinfo[stripe].loi_id != lock->l_resource->lr_name.name[0]||
356             lsm->lsm_oinfo[stripe].loi_gr != lock->l_resource->lr_name.name[2]){
357                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64
358                            " inode=%lu/%u (%p)\n",
359                            lsm->lsm_oinfo[stripe].loi_id,
360                            lsm->lsm_oinfo[stripe].loi_gr,
361                            inode->i_ino, inode->i_generation, inode);
362                 return -ELDLM_NO_LOCK_DATA;
363         }
364
365         return stripe;
366 }
367
368 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
369  * we get a lock cancellation for each stripe, so we have to map the obd's
370  * region back onto the stripes in the file that it held.
371  *
372  * No one can dirty the extent until we've finished our work and they can
373  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
374  * but other kernel actors could have pages locked.
375  *
376  * Called with the DLM lock held. */
377 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
378                               struct ldlm_lock *lock, __u32 stripe)
379 {
380         ldlm_policy_data_t tmpex;
381         unsigned long start, end, count, skip, i, j;
382         struct page *page;
383         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
384         struct lustre_handle lockh;
385         ENTRY;
386
387         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
388         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
389                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
390                inode->i_size);
391
392         /* our locks are page granular thanks to osc_enqueue, we invalidate the
393          * whole page. */
394         LASSERT((tmpex.l_extent.start & ~PAGE_CACHE_MASK) == 0);
395         LASSERT(((tmpex.l_extent.end + 1) & ~PAGE_CACHE_MASK) == 0);
396
397         count = ~0;
398         skip = 0;
399         start = tmpex.l_extent.start >> PAGE_CACHE_SHIFT;
400         end = tmpex.l_extent.end >> PAGE_CACHE_SHIFT;
401         if (lsm->lsm_stripe_count > 1) {
402                 count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
403                 skip = (lsm->lsm_stripe_count - 1) * count;
404                 start += start/count * skip + stripe * count;
405                 if (end != ~0)
406                         end += end/count * skip + stripe * count;
407         }
408         if (end < tmpex.l_extent.end >> PAGE_CACHE_SHIFT)
409                 end = ~0;
410
411         i = inode->i_size ? (inode->i_size - 1) >> PAGE_CACHE_SHIFT : 0;
412         if (i < end)
413                 end = i;
414
415         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
416                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
417                count, skip, end, discard ? " (DISCARDING)" : "");
418         
419         /* walk through the vmas on the inode and tear down mmaped pages that
420          * intersect with the lock.  this stops immediately if there are no
421          * mmap()ed regions of the file.  This is not efficient at all and
422          * should be short lived. We'll associate mmap()ed pages with the lock
423          * and will be able to find them directly */
424         
425         for (i = start; i <= end; i += (j + skip)) {
426                 j = min(count - (i % count), end - i + 1);
427                 LASSERT(inode->i_mapping);
428                 if (ll_teardown_mmaps(inode->i_mapping, i << PAGE_CACHE_SHIFT,
429                                       ((i+j) << PAGE_CACHE_SHIFT) - 1) )
430                         break;
431         }
432
433         /* this is the simplistic implementation of page eviction at
434          * cancelation.  It is careful to get races with other page
435          * lockers handled correctly.  fixes from bug 20 will make it
436          * more efficient by associating locks with pages and with
437          * batching writeback under the lock explicitly. */
438         for (i = start, j = start % count; i <= end;
439              j++, i++, tmpex.l_extent.start += PAGE_CACHE_SIZE) {
440                 if (j == count) {
441                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
442                         i += skip;
443                         j = 0;
444                         if (i > end)
445                                 break;
446                 }
447                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
448                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
449                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
450                          start, i, end);
451
452                 if (!mapping_has_pages(inode->i_mapping)) {
453                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
454                         break;
455                 }
456
457                 cond_resched();
458
459                 page = find_get_page(inode->i_mapping, i);
460                 if (page == NULL)
461                         continue;
462                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
463                                i, tmpex.l_extent.start);
464                 lock_page(page);
465
466                 /* page->mapping to check with racing against teardown */
467                 if (!discard && clear_page_dirty_for_io(page)) {
468                         rc = ll_call_writepage(inode, page);
469                         if (rc != 0)
470                                 CERROR("writepage of page %p failed: %d\n",
471                                        page, rc);
472                         /* either waiting for io to complete or reacquiring
473                          * the lock that the failed writepage released */
474                         lock_page(page);
475                 }
476
477                 tmpex.l_extent.end = tmpex.l_extent.start + PAGE_CACHE_SIZE - 1;
478                 /* check to see if another DLM lock covers this page */
479                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
480                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
481                                       LDLM_FL_TEST_LOCK,
482                                       &lock->l_resource->lr_name, LDLM_EXTENT,
483                                       &tmpex, LCK_PR | LCK_PW, &lockh);
484                 if (rc2 == 0 && page->mapping != NULL) {
485                         // checking again to account for writeback's lock_page()
486                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
487                         ll_ra_accounting(page, inode->i_mapping);
488                         ll_truncate_complete_page(page);
489                 }
490                 unlock_page(page);
491                 page_cache_release(page);
492         }
493         LASSERTF(tmpex.l_extent.start <=
494                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
495                   lock->l_policy_data.l_extent.end + 1),
496                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
497                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
498                  start, i, end);
499         EXIT;
500 }
501
502 static int ll_extent_lock_callback(struct ldlm_lock *lock,
503                                    struct ldlm_lock_desc *new, void *data,
504                                    int flag)
505 {
506         struct lustre_handle lockh = { 0 };
507         int rc;
508         ENTRY;
509
510         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
511                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
512                 LBUG();
513         }
514
515         switch (flag) {
516         case LDLM_CB_BLOCKING:
517                 ldlm_lock2handle(lock, &lockh);
518                 rc = ldlm_cli_cancel(&lockh);
519                 if (rc != ELDLM_OK)
520                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
521                 break;
522         case LDLM_CB_CANCELING: {
523                 struct inode *inode;
524                 struct ll_inode_info *lli;
525                 struct lov_stripe_md *lsm;
526                 __u32 stripe;
527                 __u64 kms;
528
529                 /* This lock wasn't granted, don't try to evict pages */
530                 if (lock->l_req_mode != lock->l_granted_mode)
531                         RETURN(0);
532
533                 inode = ll_inode_from_lock(lock);
534                 if (inode == NULL)
535                         RETURN(0);
536                 lli = ll_i2info(inode);
537                 if (lli == NULL)
538                         goto iput;
539                 if (lli->lli_smd == NULL)
540                         goto iput;
541                 lsm = lli->lli_smd;
542
543                 stripe = ll_lock_to_stripe_offset(inode, lock);
544                 if (stripe < 0)
545                         goto iput;
546                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
547
548                 /* grabbing the i_sem will wait for write() to complete.  ns
549                  * lock hold times should be very short as ast processing
550                  * requires them and has a short timeout.  so, i_sem before ns
551                  * lock.*/
552
553                 down(&inode->i_sem);
554                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
555                 kms = ldlm_extent_shift_kms(lock,
556                                             lsm->lsm_oinfo[stripe].loi_kms);
557                 
558                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
559                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
560                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
561                 lsm->lsm_oinfo[stripe].loi_kms = kms;
562                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
563                 up(&inode->i_sem);
564                 //ll_try_done_writing(inode);
565         iput:
566                 iput(inode);
567                 break;
568         }
569         default:
570                 LBUG();
571         }
572
573         RETURN(0);
574 }
575
576 #if 0
577 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
578 {
579         /* XXX ALLOCATE - 160 bytes */
580         struct inode *inode = ll_inode_from_lock(lock);
581         struct ll_inode_info *lli = ll_i2info(inode);
582         struct lustre_handle lockh = { 0 };
583         struct ost_lvb *lvb;
584         __u32 stripe;
585         ENTRY;
586
587         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
588                      LDLM_FL_BLOCK_CONV)) {
589                 LBUG(); /* not expecting any blocked async locks yet */
590                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
591                            "lock, returning");
592                 ldlm_lock_dump(D_OTHER, lock, 0);
593                 ldlm_reprocess_all(lock->l_resource);
594                 RETURN(0);
595         }
596
597         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
598
599         stripe = ll_lock_to_stripe_offset(inode, lock);
600         if (stripe < 0)
601                 goto iput;
602
603         if (lock->l_lvb_len) {
604                 struct lov_stripe_md *lsm = lli->lli_smd;
605                 __u64 kms;
606                 lvb = lock->l_lvb_data;
607                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
608
609                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
610                 down(&inode->i_sem);
611                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
612                 kms = ldlm_extent_shift_kms(NULL, kms);
613                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
614                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
615                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
616                 lsm->lsm_oinfo[stripe].loi_kms = kms;
617                 up(&inode->i_sem);
618                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
619         }
620
621 iput:
622         iput(inode);
623         wake_up(&lock->l_waitq);
624
625         ldlm_lock2handle(lock, &lockh);
626         ldlm_lock_decref(&lockh, LCK_PR);
627         RETURN(0);
628 }
629 #endif
630
631 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
632 {
633         struct ptlrpc_request *req = reqp;
634         struct inode *inode = ll_inode_from_lock(lock);
635         struct ll_inode_info *lli;
636         struct ost_lvb *lvb;
637         struct lov_stripe_md *lsm;
638         int rc, size = sizeof(*lvb), stripe;
639         ENTRY;
640
641         if (inode == NULL)
642                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
643         lli = ll_i2info(inode);
644         if (lli == NULL)
645                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
646
647         lsm = lli->lli_smd;
648         if (lsm == NULL)
649                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
650
651         /* First, find out which stripe index this lock corresponds to. */
652         stripe = ll_lock_to_stripe_offset(inode, lock);
653         if (stripe < 0)
654                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
655
656         rc = lustre_pack_reply(req, 1, &size, NULL);
657         if (rc) {
658                 CERROR("lustre_pack_reply: %d\n", rc);
659                 GOTO(iput, rc);
660         }
661
662         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
663         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
664
665         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64,
666                    inode->i_size, stripe, lvb->lvb_size);
667         GOTO(iput, 0);
668  iput:
669         iput(inode);
670
671  out:
672         /* These errors are normal races, so we don't want to fill the console
673          * with messages by calling ptlrpc_error() */
674         if (rc == -ELDLM_NO_LOCK_DATA)
675                 lustre_pack_reply(req, 0, NULL, NULL);
676
677         req->rq_status = rc;
678         return rc;
679 }
680
681 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
682 __u64 lov_merge_blocks(struct lov_stripe_md *lsm);
683 __u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
684
685 /* NB: lov_merge_size will prefer locally cached writes if they extend the
686  * file (because it prefers KMS over RSS when larger) */
687 int ll_glimpse_size(struct inode *inode)
688 {
689         struct ll_inode_info *lli = ll_i2info(inode);
690         struct ll_sb_info *sbi = ll_i2sbi(inode);
691         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
692         struct lustre_handle lockh = { 0 };
693         int rc, flags = LDLM_FL_HAS_INTENT;
694         ENTRY;
695
696         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
697
698         rc = obd_enqueue(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT, &policy,
699                          LCK_PR, &flags, ll_extent_lock_callback,
700                          ldlm_completion_ast, ll_glimpse_callback, inode,
701                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, &lockh);
702         if (rc == -ENOENT)
703                 RETURN(rc);
704
705         if (rc != 0) {
706                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
707                 RETURN(rc > 0 ? -EIO : rc);
708         }
709
710         inode->i_size = lov_merge_size(lli->lli_smd, 0);
711         inode->i_blocks = lov_merge_blocks(lli->lli_smd);
712         //inode->i_mtime = lov_merge_mtime(lli->lli_smd, inode->i_mtime);
713
714         CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64", blocks: "LPU64"\n",
715                (__u64)inode->i_size, (__u64)inode->i_blocks);
716         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
717         RETURN(rc);
718 }
719
720 void ll_stime_record(struct ll_sb_info *sbi, struct timeval *start,
721                     struct obd_service_time *stime)
722 {
723         struct timeval stop;
724         do_gettimeofday(&stop);
725                                                                                                                                                                                                      
726         spin_lock(&sbi->ll_lock);
727         lprocfs_stime_record(stime, &stop, start);
728         spin_unlock(&sbi->ll_lock);
729 }
730
731 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
732                    struct lov_stripe_md *lsm, int mode,
733                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
734                    int ast_flags, struct obd_service_time *stime)
735 {
736         struct ll_sb_info *sbi = ll_i2sbi(inode);
737         struct timeval start;
738         int rc;
739         ENTRY;
740
741         LASSERT(lockh->cookie == 0);
742
743         /* XXX phil: can we do this?  won't it screw the file size up? */
744         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
745             (sbi->ll_flags & LL_SBI_NOLCK))
746                 RETURN(0);
747
748         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
749                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
750
751         do_gettimeofday(&start);
752         rc = obd_enqueue(sbi->ll_dt_exp, lsm, LDLM_EXTENT, policy, mode,
753                          &ast_flags, ll_extent_lock_callback,
754                          ldlm_completion_ast, ll_glimpse_callback, inode,
755                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
756         if (rc > 0)
757                 rc = -EIO;
758         
759         ll_stime_record(sbi, &start, stime);
760
761         if (policy->l_extent.start == 0 &&
762             policy->l_extent.end == OBD_OBJECT_EOF) {
763                 /* vmtruncate()->ll_truncate() first sets the i_size and then
764                  * the kms under both a DLM lock and the i_sem.  If we don't
765                  * get the i_sem here we can match the DLM lock and reset
766                  * i_size from the kms before the truncating path has updated
767                  * the kms.  generic_file_write can then trust the stale i_size
768                  * when doing appending writes and effectively cancel the
769                  * result of the truncate.  Getting the i_sem after the enqueue
770                  * maintains the DLM -> i_sem acquiry order. */
771                 down(&inode->i_sem);
772                 inode->i_size = lov_merge_size(lsm, 1);
773                 up(&inode->i_sem);
774         }
775         //inode->i_mtime = lov_merge_mtime(lsm, inode->i_mtime);
776
777         RETURN(rc);
778 }
779
780 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
781                      struct lov_stripe_md *lsm, int mode,
782                      struct lustre_handle *lockh)
783 {
784         struct ll_sb_info *sbi = ll_i2sbi(inode);
785         int rc;
786         ENTRY;
787
788         /* XXX phil: can we do this?  won't it screw the file size up? */
789         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
790             (sbi->ll_flags & LL_SBI_NOLCK))
791                 RETURN(0);
792
793         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
794
795         RETURN(rc);
796 }
797
798 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
799                             loff_t *ppos)
800 {
801         struct inode *inode = file->f_dentry->d_inode;
802         struct ll_inode_info *lli = ll_i2info(inode);
803         struct lov_stripe_md *lsm = lli->lli_smd;
804         struct ll_lock_tree tree;
805         struct ll_lock_tree_node *node;
806         int rc;
807         ssize_t retval;
808         __u64 kms;
809         ENTRY;
810         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
811                inode->i_ino, inode->i_generation, inode, count, *ppos);
812
813         /* "If nbyte is 0, read() will return 0 and have no other results."
814          *                      -- Single Unix Spec */
815         if (count == 0)
816                 RETURN(0);
817
818         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
819                             count);
820
821         if (!lsm)
822                 RETURN(0);
823
824         node = ll_node_from_inode(inode, *ppos, *ppos  + count - 1,
825                                   LCK_PR);
826
827         tree.lt_fd = file->private_data;
828
829         rc = ll_tree_lock(&tree, node, inode, buf, count,
830                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
831         if (rc != 0)
832                 RETURN(rc);
833
834         kms = lov_merge_size(lsm, 1);
835         if (*ppos + count - 1 > kms) {
836                 /* A glimpse is necessary to determine whether we return a short
837                  * read or some zeroes at the end of the buffer */
838                 retval = ll_glimpse_size(inode);
839                 if (retval)
840                         goto out;
841         } else {
842                 inode->i_size = kms;
843         }
844
845         CDEBUG(D_INFO, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
846                inode->i_ino, count, *ppos, inode->i_size);
847
848         /* turn off the kernel's read-ahead */
849 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
850         file->f_ramax = 0;
851 #else
852         file->f_ra.ra_pages = 0;
853 #endif
854         retval = generic_file_read(file, buf, count, ppos);
855
856  out:
857         ll_tree_unlock(&tree, inode);
858         RETURN(retval);
859 }
860
861 /*
862  * Write to a file (through the page cache).
863  */
864 static ssize_t ll_file_write(struct file *file, const char *buf,
865                              size_t count, loff_t *ppos)
866 {
867         struct inode *inode = file->f_dentry->d_inode;
868         loff_t maxbytes = ll_file_maxbytes(inode);
869         struct ll_lock_tree tree;
870         struct ll_lock_tree_node *node;
871         ssize_t retval;
872         int rc;
873
874         ENTRY;
875         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
876                inode->i_ino, inode->i_generation, inode, count, *ppos);
877
878         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
879
880         /* POSIX, but surprised the VFS doesn't check this already */
881         if (count == 0)
882                 RETURN(0);
883
884         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
885          * called on the file, don't fail the below assertion (bug 2388). */
886         if (file->f_flags & O_LOV_DELAY_CREATE &&
887             ll_i2info(inode)->lli_smd == NULL)
888                 RETURN(-EBADF);
889
890         LASSERT(ll_i2info(inode)->lli_smd != NULL);
891         
892         if (file->f_flags & O_APPEND)
893                 node = ll_node_from_inode(inode, 0, OBD_OBJECT_EOF, LCK_PW);
894         else
895                 node = ll_node_from_inode(inode, *ppos, *ppos  + count - 1,
896                                           LCK_PW);
897
898         if (IS_ERR(node))
899                 RETURN(PTR_ERR(node));
900
901         tree.lt_fd = file->private_data;
902
903         rc = ll_tree_lock(&tree, node, inode, buf, count,
904                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
905         if (rc != 0)
906                 RETURN(rc);
907
908         /* this is ok, g_f_w will overwrite this under i_sem if it races
909          * with a local truncate, it just makes our maxbyte checking easier */
910         if (file->f_flags & O_APPEND)
911                 *ppos = inode->i_size;
912
913         if (*ppos >= maxbytes) {
914                 if (count || *ppos > maxbytes) {
915                         send_sig(SIGXFSZ, current, 0);
916                         GOTO(out, retval = -EFBIG);
917                 }
918         }
919         if (*ppos + count > maxbytes)
920                 count = maxbytes - *ppos;
921
922         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
923                inode->i_ino, count, *ppos);
924
925         /* generic_file_write handles O_APPEND after getting i_sem */
926         retval = generic_file_write(file, buf, count, ppos);
927         EXIT;
928 out:
929         ll_tree_unlock(&tree, inode);
930         /* serialize with mmap/munmap/mremap */
931         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WRITE_BYTES,
932                             retval > 0 ? retval : 0);
933         return retval;
934 }
935
936 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
937                                unsigned long arg)
938 {
939         struct ll_inode_info *lli = ll_i2info(inode);
940         struct obd_export *exp = ll_i2dtexp(inode);
941         struct ll_recreate_obj ucreatp;
942         struct obd_trans_info oti = { 0 };
943         struct obdo *oa = NULL;
944         int lsm_size;
945         int rc = 0;
946         struct lov_stripe_md *lsm, *lsm2;
947         ENTRY;
948
949         if (!capable (CAP_SYS_ADMIN))
950                 RETURN(-EPERM);
951
952         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
953                             sizeof(struct ll_recreate_obj));
954         if (rc) {
955                 RETURN(-EFAULT);
956         }
957         oa = obdo_alloc();
958         if (oa == NULL) 
959                 RETURN(-ENOMEM);
960
961         down(&lli->lli_open_sem);
962         lsm = lli->lli_smd;
963         if (lsm == NULL)
964                 GOTO(out, rc = -ENOENT);
965         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
966                    (lsm->lsm_stripe_count));
967
968         OBD_ALLOC(lsm2, lsm_size);
969         if (lsm2 == NULL)
970                 GOTO(out, rc = -ENOMEM);
971
972         oa->o_id = ucreatp.lrc_id;
973         oa->o_nlink = ucreatp.lrc_ost_idx;
974         oa->o_gr = ucreatp.lrc_group;
975         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
976         oa->o_flags |= OBD_FL_RECREATE_OBJS;
977         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
978                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
979
980         oti.oti_objid = NULL;
981         memcpy(lsm2, lsm, lsm_size);
982         rc = obd_create(exp, oa, &lsm2, &oti);
983
984         OBD_FREE(lsm2, lsm_size);
985         GOTO(out, rc);
986 out:
987         up(&lli->lli_open_sem);
988         obdo_free(oa);
989         return rc;
990 }
991
992 static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
993                                     int flags, struct lov_user_md *lum,
994                                     int lum_size)
995 {
996         struct ll_inode_info *lli = ll_i2info(inode);
997         struct file *f;
998         struct obd_export *exp = ll_i2dtexp(inode);
999         struct lov_stripe_md *lsm;
1000         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1001         struct ptlrpc_request *req = NULL;
1002         int rc = 0;
1003         struct lustre_md md;
1004         ENTRY;
1005
1006         down(&lli->lli_open_sem);
1007         lsm = lli->lli_smd;
1008         if (lsm) {
1009                 up(&lli->lli_open_sem);
1010                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1011                        inode->i_ino);
1012                 RETURN(-EEXIST);
1013         }
1014
1015         f = get_empty_filp();
1016         if (!f)
1017                 GOTO(out, -ENOMEM);
1018
1019         f->f_dentry = file->f_dentry;
1020         f->f_vfsmnt = file->f_vfsmnt;
1021
1022         rc = ll_intent_alloc(&oit);
1023         if (rc)
1024                 GOTO(out, rc);
1025
1026         rc = ll_intent_file_open(f, lum, lum_size, &oit);
1027         if (rc)
1028                 GOTO(out, rc);
1029         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1030                 GOTO(out, -ENOENT);
1031         
1032         req = LUSTRE_IT(&oit)->it_data;
1033         rc = LUSTRE_IT(&oit)->it_status;
1034
1035         if (rc < 0)
1036                 GOTO(out, rc);
1037
1038         rc = mdc_req2lustre_md(ll_i2mdexp(inode), req, 1, exp, &md);
1039         if (rc)
1040                 GOTO(out, rc);
1041         ll_update_inode(f->f_dentry->d_inode, &md);
1042
1043         rc = ll_local_open(f, &oit);
1044         if (rc)
1045                 GOTO(out, rc);
1046         ll_intent_release(&oit);
1047
1048         rc = ll_file_release(f->f_dentry->d_inode, f);
1049         EXIT;
1050  out:
1051         ll_intent_release(&oit);
1052         if (f)
1053                 put_filp(f);
1054         up(&lli->lli_open_sem);
1055         if (req != NULL)
1056                 ptlrpc_req_finished(req);
1057         return rc;
1058 }
1059
1060 static int ll_lov_setea(struct inode *inode, struct file *file,
1061                             unsigned long arg)
1062 {
1063         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1064         struct lov_user_md  *lump;
1065         int lum_size = sizeof(struct lov_user_md) +
1066                        sizeof(struct lov_user_ost_data);
1067         int rc;
1068         ENTRY;
1069
1070         if (!capable (CAP_SYS_ADMIN))
1071                 RETURN(-EPERM);
1072
1073         OBD_ALLOC(lump, lum_size);
1074         if (lump == NULL) {
1075                 RETURN(-ENOMEM);
1076         }
1077         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1078         if (rc) {
1079                 OBD_FREE(lump, lum_size);
1080                 RETURN(-EFAULT);
1081         }
1082
1083         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1084
1085         OBD_FREE(lump, lum_size);
1086         RETURN(rc);
1087 }
1088
1089 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1090                             unsigned long arg)
1091 {
1092         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1093         int rc;
1094         int flags = FMODE_WRITE;
1095         ENTRY;
1096
1097         /* Bug 1152: copy properly when this is no longer true */
1098         LASSERT(sizeof(lum) == sizeof(*lump));
1099         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1100         rc = copy_from_user(&lum, lump, sizeof(lum));
1101         if (rc)
1102                 RETURN(-EFAULT);
1103
1104         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1105         if (rc == 0) {
1106                  put_user(0, &lump->lmm_stripe_count);
1107                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1108                                     0, ll_i2info(inode)->lli_smd, lump);
1109         }
1110         RETURN(rc);
1111 }
1112
1113 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1114 {
1115         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1116
1117         if (!lsm)
1118                 RETURN(-ENODATA);
1119
1120         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1121                             (void *)arg);
1122 }
1123
1124 static int ll_get_grouplock(struct inode *inode, struct file *file,
1125                          unsigned long arg)
1126 {
1127         struct ll_file_data *fd = file->private_data;
1128         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1129                                                     .end = OBD_OBJECT_EOF}};
1130         struct lustre_handle lockh = { 0 };
1131         struct ll_inode_info *lli = ll_i2info(inode);
1132         struct lov_stripe_md *lsm = lli->lli_smd;
1133         int flags = 0, rc;
1134         ENTRY;
1135
1136         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1137                 RETURN(-EINVAL);
1138         }
1139
1140         policy.l_extent.gid = arg;
1141         if (file->f_flags & O_NONBLOCK)
1142                 flags = LDLM_FL_BLOCK_NOWAIT;
1143
1144         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags,
1145                             &ll_i2sbi(inode)->ll_grouplock_stime);
1146         if (rc != 0)
1147                 RETURN(rc);
1148
1149         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1150         fd->fd_gid = arg;
1151         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1152
1153         RETURN(0);
1154 }
1155
1156 static int ll_put_grouplock(struct inode *inode, struct file *file,
1157                          unsigned long arg)
1158 {
1159         struct ll_file_data *fd = file->private_data;
1160         struct ll_inode_info *lli = ll_i2info(inode);
1161         struct lov_stripe_md *lsm = lli->lli_smd;
1162         int rc;
1163         ENTRY;
1164
1165         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1166                 /* Ugh, it's already unlocked. */
1167                 RETURN(-EINVAL);
1168         }
1169
1170         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1171                 RETURN(-EINVAL);
1172
1173         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1174
1175         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1176         if (rc)
1177                 RETURN(rc);
1178
1179         fd->fd_gid = 0;
1180         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1181
1182         RETURN(0);
1183 }
1184
1185 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
1186                   unsigned long arg)
1187 {
1188         struct ll_file_data *fd = file->private_data;
1189         struct ll_sb_info *sbi = ll_i2sbi(inode);
1190         int flags;
1191         ENTRY;
1192
1193         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1194                inode->i_generation, inode, cmd);
1195
1196         if (_IOC_TYPE(cmd) == 'T') /* tty ioctls */
1197                 RETURN(-ENOTTY);
1198
1199         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_IOCTL);
1200         switch(cmd) {
1201         case LL_IOC_GETFLAGS:
1202                 /* Get the current value of the file flags */
1203                 return put_user(fd->fd_flags, (int *)arg);
1204         case LL_IOC_SETFLAGS:
1205         case LL_IOC_CLRFLAGS:
1206                 /* Set or clear specific file flags */
1207                 /* XXX This probably needs checks to ensure the flags are
1208                  *     not abused, and to handle any flag side effects.
1209                  */
1210                 if (get_user(flags, (int *) arg))
1211                         RETURN(-EFAULT);
1212
1213                 if (cmd == LL_IOC_SETFLAGS)
1214                         fd->fd_flags |= flags;
1215                 else
1216                         fd->fd_flags &= ~flags;
1217                 RETURN(0);
1218         case LL_IOC_LOV_SETSTRIPE:
1219                 RETURN(ll_lov_setstripe(inode, file, arg));
1220         case LL_IOC_LOV_SETEA:
1221                 RETURN(ll_lov_setea(inode, file, arg));
1222         case IOC_MDC_SHOWFID: {
1223                 struct lustre_id *idp = (struct lustre_id *)arg;
1224                 struct lustre_id id;
1225                 char *filename;
1226                 int rc;
1227
1228                 filename = getname((const char *)arg);
1229                 if (IS_ERR(filename))
1230                         RETURN(PTR_ERR(filename));
1231
1232                 ll_inode2id(&id, inode);
1233
1234                 rc = ll_get_fid(sbi->ll_md_exp, &id, filename, &id);
1235                 if (rc < 0)
1236                         GOTO(out_filename, rc);
1237
1238                 rc = copy_to_user(idp, &id, sizeof(*idp));
1239                 if (rc)
1240                         GOTO(out_filename, rc = -EFAULT);
1241
1242                 EXIT;
1243         out_filename:
1244                 putname(filename);
1245                 return rc;
1246         }
1247         case LL_IOC_LOV_GETSTRIPE:
1248                 RETURN(ll_lov_getstripe(inode, arg));
1249         case LL_IOC_RECREATE_OBJ:
1250                 RETURN(ll_lov_recreate_obj(inode, file, arg));
1251         case EXT3_IOC_GETFLAGS:
1252         case EXT3_IOC_SETFLAGS:
1253                 RETURN( ll_iocontrol(inode, file, cmd, arg) );
1254         case LL_IOC_GROUP_LOCK:
1255                 RETURN(ll_get_grouplock(inode, file, arg));
1256         case LL_IOC_GROUP_UNLOCK:
1257                 RETURN(ll_put_grouplock(inode, file, arg));
1258         case EXT3_IOC_GETVERSION_OLD:
1259         case EXT3_IOC_GETVERSION:
1260                 return put_user(inode->i_generation, (int *) arg);
1261         /* We need to special case any other ioctls we want to handle,
1262          * to send them to the MDS/OST as appropriate and to properly
1263          * network encode the arg field.
1264         case EXT2_IOC_GETVERSION_OLD:
1265         case EXT2_IOC_GETVERSION_NEW:
1266         case EXT2_IOC_SETVERSION_OLD:
1267         case EXT2_IOC_SETVERSION_NEW:
1268         case EXT3_IOC_SETVERSION_OLD:
1269         case EXT3_IOC_SETVERSION:
1270         */
1271         default:
1272                 RETURN( obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
1273                                       (void *)arg) );
1274         }
1275 }
1276
1277 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
1278 {
1279         struct inode *inode = file->f_dentry->d_inode;
1280         struct ll_file_data *fd = file->private_data;
1281         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1282         struct lustre_handle lockh = {0};
1283         loff_t retval;
1284         ENTRY;
1285         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),to=%llu\n", inode->i_ino,
1286                inode->i_generation, inode,
1287                offset + ((origin==2) ? inode->i_size : file->f_pos));
1288
1289         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
1290         if (origin == 2) { /* SEEK_END */
1291                 ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF }};
1292                 int nonblock = 0, rc;
1293
1294                 if (file->f_flags & O_NONBLOCK)
1295                         nonblock = LDLM_FL_BLOCK_NOWAIT;
1296
1297                 rc = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh,
1298                                     nonblock, &ll_i2sbi(inode)->ll_seek_stime);
1299                 if (rc != 0)
1300                         RETURN(rc);
1301
1302                 offset += inode->i_size;
1303         } else if (origin == 1) { /* SEEK_CUR */
1304                 offset += file->f_pos;
1305         }
1306
1307         retval = -EINVAL;
1308         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
1309                 if (offset != file->f_pos) {
1310                         file->f_pos = offset;
1311 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1312                         file->f_reada = 0;
1313                         file->f_version = ++event;
1314 #endif
1315                 }
1316                 retval = offset;
1317         }
1318
1319         if (origin == 2)
1320                 ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
1321         RETURN(retval);
1322 }
1323
1324 int ll_fsync(struct file *file, struct dentry *dentry, int data)
1325 {
1326         struct inode *inode = dentry->d_inode;
1327         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1328         struct lustre_id id;
1329         struct ptlrpc_request *req;
1330         int rc, err;
1331         ENTRY;
1332         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
1333                inode->i_generation, inode);
1334
1335         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_FSYNC);
1336
1337         /* fsync's caller has already called _fdata{sync,write}, we want
1338          * that IO to finish before calling the osc and mdc sync methods */
1339         rc = filemap_fdatawait(inode->i_mapping);
1340
1341         ll_inode2id(&id, inode);
1342         err = md_sync(ll_i2sbi(inode)->ll_md_exp, &id, &req);
1343         if (!rc)
1344                 rc = err;
1345         if (!err)
1346                 ptlrpc_req_finished(req);
1347
1348         if (data && lsm) {
1349                 struct obdo *oa = obdo_alloc();
1350
1351                 if (!oa)
1352                         RETURN(rc ? rc : -ENOMEM);
1353
1354                 oa->o_id = lsm->lsm_object_id;
1355                 oa->o_gr = lsm->lsm_object_gr;
1356                 oa->o_valid = OBD_MD_FLID;
1357                 obdo_from_inode(oa, inode, (OBD_MD_FLTYPE | OBD_MD_FLATIME |
1358                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME |
1359                                             OBD_MD_FLGROUP));
1360
1361                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
1362                                0, OBD_OBJECT_EOF);
1363                 if (!rc)
1364                         rc = err;
1365                 obdo_free(oa);
1366         }
1367
1368         RETURN(rc);
1369 }
1370
1371 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
1372 {
1373         struct inode *inode = file->f_dentry->d_inode;
1374         struct ll_inode_info *li = ll_i2info(inode);
1375         struct ll_sb_info *sbi = ll_i2sbi(inode);
1376         struct obd_device *obddev;
1377         struct ldlm_res_id res_id =
1378                 { .name = {id_fid(&li->lli_id), id_group(&li->lli_id), LDLM_FLOCK} };
1379         struct lustre_handle lockh = {0};
1380         ldlm_policy_data_t flock;
1381         ldlm_mode_t mode = 0;
1382         int flags = 0;
1383         int rc;
1384         ENTRY;
1385
1386         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
1387                inode->i_ino, file_lock);
1388
1389         flock.l_flock.pid = file_lock->fl_pid;
1390         flock.l_flock.start = file_lock->fl_start;
1391         flock.l_flock.end = file_lock->fl_end;
1392
1393         switch (file_lock->fl_type) {
1394         case F_RDLCK:
1395                 mode = LCK_PR;
1396                 break;
1397         case F_UNLCK:
1398                 /* An unlock request may or may not have any relation to
1399                  * existing locks so we may not be able to pass a lock handle
1400                  * via a normal ldlm_lock_cancel() request. The request may even
1401                  * unlock a byte range in the middle of an existing lock. In
1402                  * order to process an unlock request we need all of the same
1403                  * information that is given with a normal read or write record
1404                  * lock request. To avoid creating another ldlm unlock (cancel)
1405                  * message we'll treat a LCK_NL flock request as an unlock. */
1406                 mode = LCK_NL;
1407                 break;
1408         case F_WRLCK:
1409                 mode = LCK_PW;
1410                 break;
1411         default:
1412                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
1413                 LBUG();
1414         }
1415
1416         switch (cmd) {
1417         case F_SETLKW:
1418 #ifdef F_SETLKW64
1419         case F_SETLKW64:
1420 #endif
1421                 flags = 0;
1422                 break;
1423         case F_SETLK:
1424 #ifdef F_SETLK64
1425         case F_SETLK64:
1426 #endif
1427                 flags = LDLM_FL_BLOCK_NOWAIT;
1428                 break;
1429         case F_GETLK:
1430 #ifdef F_GETLK64
1431         case F_GETLK64:
1432 #endif
1433                 flags = LDLM_FL_TEST_LOCK;
1434                 /* Save the old mode so that if the mode in the lock changes we
1435                  * can decrement the appropriate reader or writer refcount. */
1436                 file_lock->fl_type = mode;
1437                 break;
1438         default:
1439                 CERROR("unknown fcntl lock command: %d\n", cmd);
1440                 LBUG();
1441         }
1442
1443         CDEBUG(D_DLMTRACE, "inode=%lu, pid="LPU64", flags=%#x, mode=%u, "
1444                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
1445                flags, mode, flock.l_flock.start, flock.l_flock.end);
1446
1447         obddev = md_get_real_obd(sbi->ll_md_exp, &li->lli_id);
1448         rc = ldlm_cli_enqueue(obddev->obd_self_export, NULL,
1449                               obddev->obd_namespace,
1450                               res_id, LDLM_FLOCK, &flock, mode, &flags,
1451                               NULL, ldlm_flock_completion_ast, NULL, file_lock,
1452                               NULL, 0, NULL, &lockh);
1453         RETURN(rc);
1454 }
1455
1456 int ll_inode_revalidate_it(struct dentry *dentry)
1457 {
1458         struct lookup_intent oit = { .it_op = IT_GETATTR };
1459         struct inode *inode = dentry->d_inode;
1460         struct ptlrpc_request *req = NULL;
1461         struct ll_inode_info *lli;
1462         struct lov_stripe_md *lsm;
1463         struct ll_sb_info *sbi;
1464         struct lustre_id id;
1465         int rc;
1466         ENTRY;
1467
1468         if (!inode) {
1469                 CERROR("REPORT THIS LINE TO PETER\n");
1470                 RETURN(0);
1471         }
1472         
1473         sbi = ll_i2sbi(inode);
1474         
1475         ll_inode2id(&id, inode);
1476         lli = ll_i2info(inode);
1477         LASSERT(id_fid(&id) != 0);
1478
1479         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), name=%s(%p)\n",
1480                inode->i_ino, inode->i_generation, inode, dentry->d_name.name,
1481                dentry);
1482
1483 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
1484         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE);
1485 #endif
1486
1487         rc = ll_intent_alloc(&oit);
1488         if (rc)
1489                 RETURN(-ENOMEM);
1490
1491         rc = md_intent_lock(sbi->ll_md_exp, &id, NULL, 0, NULL, 0, &id,
1492                             &oit, 0, &req, ll_mdc_blocking_ast);
1493         if (rc < 0)
1494                 GOTO(out, rc);
1495
1496         rc = revalidate_it_finish(req, 1, &oit, dentry);
1497         if (rc) {
1498                 GOTO(out, rc);
1499         }
1500
1501         ll_lookup_finish_locks(&oit, dentry);
1502
1503         lsm = lli->lli_smd;
1504         if (lsm == NULL) /* object not yet allocated, don't validate size */
1505                 GOTO(out, rc = 0);
1506
1507         /*
1508          * ll_glimpse_size() will prefer locally cached writes if they extend
1509          * the file.
1510          */
1511         rc = ll_glimpse_size(inode);
1512         EXIT;
1513 out:
1514         ll_intent_release(&oit);
1515         if (req)
1516                 ptlrpc_req_finished(req);
1517         return rc;
1518 }
1519
1520 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1521 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
1522 {
1523         int res = 0;
1524         struct inode *inode = de->d_inode;
1525
1526         res = ll_inode_revalidate_it(de);
1527         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
1528
1529         if (res)
1530                 return res;
1531
1532         stat->ino = inode->i_ino;
1533         stat->mode = inode->i_mode;
1534         stat->nlink = inode->i_nlink;
1535         stat->uid = inode->i_uid;
1536         stat->gid = inode->i_gid;
1537         stat->atime = inode->i_atime;
1538         stat->mtime = inode->i_mtime;
1539         stat->ctime = inode->i_ctime;
1540         stat->size = inode->i_size;
1541         stat->blksize = inode->i_blksize;
1542         stat->blocks = inode->i_blocks;
1543         stat->rdev = kdev_t_to_nr(inode->i_rdev);
1544         stat->dev = id_group(&ll_i2info(inode)->lli_id);
1545         return 0;
1546 }
1547 #endif
1548
1549 static
1550 int ll_setxattr_internal(struct inode *inode, const char *name,
1551                          const void *value, size_t size, int flags, 
1552                          __u64 valid)
1553 {
1554         struct ll_sb_info *sbi = ll_i2sbi(inode);
1555         struct ptlrpc_request *request = NULL;
1556         struct mdc_op_data op_data;
1557         struct iattr attr;
1558         int rc = 0;
1559         ENTRY;
1560
1561         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu\n", inode->i_ino);
1562         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_SETXATTR);
1563
1564         memset(&attr, 0x0, sizeof(attr));
1565         attr.ia_valid |= valid;
1566         attr.ia_attr_flags = flags;
1567
1568         ll_prepare_mdc_data(&op_data, inode, NULL, NULL, 0, 0);
1569
1570         rc = md_setattr(sbi->ll_md_exp, &op_data, &attr,
1571                         (void*) name, strnlen(name, XATTR_NAME_MAX)+1, 
1572                         (void*) value, size, &request);
1573         if (rc) {
1574                 CERROR("md_setattr fails: rc = %d\n", rc);
1575                 GOTO(out, rc);
1576         }
1577
1578  out:
1579         ptlrpc_req_finished(request);
1580         RETURN(rc);
1581 }
1582
1583 int ll_setxattr(struct dentry *dentry, const char *name, const void *value,
1584                 size_t size, int flags)
1585 {
1586         int rc, error;
1587         struct posix_acl *acl;
1588         struct ll_inode_info *lli;
1589         ENTRY;
1590
1591         rc = ll_setxattr_internal(dentry->d_inode, name, value, size, 
1592                                   flags, ATTR_EA);
1593         
1594         /* update inode's acl info */
1595         if (rc == 0 && strcmp(name, XATTR_NAME_ACL_ACCESS) == 0) {
1596                 if (value) {
1597                         acl = posix_acl_from_xattr(value, size);
1598                         if (IS_ERR(acl)) {
1599                                 CERROR("convert from xattr to acl error: %ld",
1600                                         PTR_ERR(acl));
1601                                 GOTO(out, rc);
1602                         } else if (acl) {
1603                                 error = posix_acl_valid(acl);
1604                                 if (error) {
1605                                         CERROR("acl valid error: %d", error);
1606                                         posix_acl_release(acl);
1607                                         GOTO(out, rc);
1608                                 }
1609                         }
1610                 } else {
1611                         acl = NULL;
1612                 }
1613                                         
1614                 lli = ll_i2info(dentry->d_inode);
1615                 spin_lock(&lli->lli_lock);
1616                 if (lli->lli_acl_access != NULL)
1617                         posix_acl_release(lli->lli_acl_access);
1618                 lli->lli_acl_access = acl;
1619                 spin_unlock(&lli->lli_lock);
1620         }
1621         EXIT;
1622 out:
1623         return(rc);
1624 }
1625
1626 int ll_removexattr(struct dentry *dentry, const char *name)
1627 {
1628         return ll_setxattr_internal(dentry->d_inode, name, NULL, 0, 0,
1629                                     ATTR_EA_RM);
1630 }
1631
1632 static
1633 int ll_getxattr_internal(struct inode *inode, const char *name, int namelen,
1634                          void *value, size_t size, __u64 valid)
1635 {
1636         struct ptlrpc_request *request = NULL;
1637         struct ll_sb_info *sbi = ll_i2sbi(inode);
1638         struct lustre_id id;
1639         struct mds_body *body;
1640         void *ea_data; 
1641         int rc, ea_size;
1642         ENTRY;
1643
1644         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETXATTR);
1645
1646         ll_inode2id(&id, inode);
1647         rc = md_getattr(sbi->ll_md_exp, &id, valid, name, namelen,
1648                          size, &request);
1649         if (rc) {
1650                 if (rc != -ENODATA && rc != -EOPNOTSUPP)
1651                         CERROR("md_getattr fails: rc = %d\n", rc);
1652                 GOTO(out, rc);
1653         }
1654
1655         body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
1656         LASSERT(body != NULL);
1657         LASSERT_REPSWABBED(request, 0);
1658
1659         ea_size = body->eadatasize;
1660         LASSERT(ea_size <= request->rq_repmsg->buflens[0]);
1661
1662         if (size == 0) 
1663                 GOTO(out, rc = ea_size);
1664
1665         ea_data = lustre_msg_buf(request->rq_repmsg, 1, ea_size);
1666         LASSERT(ea_data != NULL);
1667         LASSERT_REPSWABBED(request, 1);
1668
1669         if (value)
1670                 memcpy(value, ea_data, ea_size);
1671         rc = ea_size;
1672  out:
1673         ptlrpc_req_finished(request);
1674         RETURN(rc);
1675 }
1676
1677 int ll_getxattr(struct dentry *dentry, const char *name, void *value,
1678                 size_t size)
1679 {
1680         return ll_getxattr_internal(dentry->d_inode, name, strlen(name) + 1, 
1681                                     value, size, OBD_MD_FLEA);
1682 }
1683
1684 int ll_listxattr(struct dentry *dentry, char *list, size_t size)
1685 {
1686         return ll_getxattr_internal(dentry->d_inode, NULL, 0, list, size,
1687                                     OBD_MD_FLEALIST);
1688 }
1689
1690 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
1691 {
1692         struct lookup_intent it = { .it_op = IT_GETATTR };
1693         int mode = inode->i_mode;
1694         struct dentry de;
1695         struct ll_sb_info *sbi;
1696         struct lustre_id id;
1697         struct ptlrpc_request *req = NULL;
1698         int rc;
1699         ENTRY;
1700
1701         sbi = ll_i2sbi(inode);
1702         ll_inode2id(&id, inode);
1703
1704         /* Nobody gets write access to a read-only fs */
1705         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
1706             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
1707                 return -EROFS;
1708         /* Nobody gets write access to an immutable file */
1709         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
1710                 return -EACCES;
1711         if (current->fsuid == inode->i_uid) {
1712                 mode >>= 6;
1713         } else if (1) {
1714                 struct ll_inode_info *lli = ll_i2info(inode);
1715                 struct posix_acl *acl;
1716
1717                 /* The access ACL cannot grant access if the group class
1718                    permission bits don't contain all requested permissions. */
1719                 if (((mode >> 3) & mask & S_IRWXO) != mask)
1720                         goto check_groups;
1721
1722                 if (ll_intent_alloc(&it))
1723                         return -EACCES;
1724
1725                 de.d_inode = inode;
1726                 rc = md_intent_lock(sbi->ll_md_exp, &id, NULL, 0, NULL, 0, &id,
1727                                     &it, 0, &req, ll_mdc_blocking_ast);
1728                 if (rc < 0) {
1729                         ll_intent_free(&it);
1730                         GOTO(out, rc);
1731                 }
1732
1733                 rc = revalidate_it_finish(req, 1, &it, &de);
1734                 if (rc) {
1735                         ll_intent_release(&it);
1736                         GOTO(out, rc);
1737                 }
1738
1739                 ll_lookup_finish_locks(&it, &de);
1740                 ll_intent_free(&it);
1741
1742                 spin_lock(&lli->lli_lock);
1743                 acl = posix_acl_dup(ll_i2info(inode)->lli_acl_access);
1744                 spin_unlock(&lli->lli_lock);
1745
1746                 if (!acl)
1747                         goto check_groups;
1748
1749                 rc = posix_acl_permission(inode, acl, mask);
1750                 posix_acl_release(acl);
1751                 if (rc == -EACCES)
1752                         goto check_capabilities;
1753                 GOTO(out, rc);
1754         } else {
1755 check_groups:
1756                 if (in_group_p(inode->i_gid))
1757                         mode >>= 3;
1758         }
1759         if ((mode & mask & S_IRWXO) == mask)
1760                 GOTO(out, rc = 0);
1761
1762 check_capabilities:
1763         rc = -EACCES; 
1764         /* Allowed to override Discretionary Access Control? */
1765         if (!(mask & MAY_EXEC) ||
1766             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
1767                 if (capable(CAP_DAC_OVERRIDE))
1768                         GOTO(out, rc = 0);
1769        /* Read and search granted if capable(CAP_DAC_READ_SEARCH) */
1770         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
1771             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
1772                 GOTO(out, rc = 0);
1773 out:
1774         if (req)
1775                 ptlrpc_req_finished(req);
1776
1777         return rc;
1778 }
1779
1780 struct file_operations ll_file_operations = {
1781         .read           = ll_file_read,
1782         .write          = ll_file_write,
1783         .ioctl          = ll_file_ioctl,
1784         .open           = ll_file_open,
1785         .release        = ll_file_release,
1786         .mmap           = ll_file_mmap,
1787         .llseek         = ll_file_seek,
1788 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
1789         .sendfile       = generic_file_sendfile,
1790 #endif
1791         .fsync          = ll_fsync,
1792         .lock           = ll_file_flock
1793 };
1794
1795 struct inode_operations ll_file_inode_operations = {
1796         .setattr        = ll_setattr,
1797         .truncate       = ll_truncate,
1798 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1799         .getattr        = ll_getattr,
1800 #else
1801         .revalidate_it  = ll_inode_revalidate_it,
1802 #endif
1803         .setxattr       = ll_setxattr,
1804         .getxattr       = ll_getxattr,
1805         .listxattr      = ll_listxattr,
1806         .removexattr    = ll_removexattr,
1807         .permission     = ll_inode_permission,
1808 };
1809