Whamcloud - gitweb
- b_size_on_mds landed on HEAD:
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <linux/lustre_dlm.h>
27 #include <linux/lustre_lite.h>
28 #include <linux/pagemap.h>
29 #include <linux/file.h>
30 #include <linux/lustre_acl.h>
31 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
32 #include <linux/lustre_compat25.h>
33 #endif
34 #include "llite_internal.h"
35 #include <linux/obd_lov.h>
36
37 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
38 __u64 lov_merge_blocks(struct lov_stripe_md *lsm);
39 __u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
40
41 int ll_validate_size(struct inode *inode, __u64 *size, __u64 *blocks)
42 {
43         ldlm_policy_data_t extent = { .l_extent = { 0, OBD_OBJECT_EOF } };
44         struct obd_export *exp = ll_i2sbi(inode)->ll_dt_exp;
45         struct ll_inode_info *lli = ll_i2info(inode);
46         struct lustre_handle match_lockh = {0};
47         int rc, flags;
48         ENTRY;
49
50         if (lli->lli_smd == NULL)
51                 RETURN(0);
52
53         LASSERT(size != NULL && blocks != NULL);
54
55         flags = LDLM_FL_TEST_LOCK | LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
56         rc = obd_match(exp, lli->lli_smd, LDLM_EXTENT, &extent,
57                        LCK_PR | LCK_PW, &flags, inode, &match_lockh);
58         if (rc == 0) {
59                 /* we have no all needed locks,
60                  * so we don't know actual size */
61                 GOTO(finish, rc);
62         }
63
64         /* we know actual size! */
65         down(&lli->lli_size_sem);
66         *size = lov_merge_size(lli->lli_smd, 0);
67         *blocks = lov_merge_blocks(lli->lli_smd);
68         up(&lli->lli_size_sem);
69
70 finish:
71         RETURN(rc);
72 }
73
74 int ll_md_och_close(struct obd_export *md_exp, struct inode *inode,
75                     struct obd_client_handle *och, int dirty)
76 {
77         struct ptlrpc_request *req = NULL;
78         struct obdo *obdo = NULL;
79         struct obd_device *obd;
80         int rc;
81         ENTRY;
82
83         obd = class_exp2obd(md_exp);
84         if (obd == NULL) {
85                 CERROR("Invalid MDC connection handle "LPX64"\n",
86                        md_exp->exp_handle.h_cookie);
87                 EXIT;
88                 return 0;
89         }
90
91         /*
92          * here we check if this is forced umount. If so this is called on
93          * canceling "open lock" and we do not call md_close() in this case , as
94          * it will not successful, as import is already deactivated.
95          */
96         if (obd->obd_no_recov)
97                 GOTO(out, rc = 0);
98
99         /* closing opened file */
100         obdo = obdo_alloc();
101         if (obdo == NULL)
102                 RETURN(-ENOMEM);
103
104         obdo->o_id = inode->i_ino;
105         obdo->o_generation = inode->i_generation;
106         obdo->o_valid = OBD_MD_FLID;
107         obdo_from_inode(obdo, inode, (OBD_MD_FLTYPE | OBD_MD_FLMODE |
108                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
109                                       OBD_MD_FLCTIME));
110         if (0 /* ll_is_inode_dirty(inode) */) {
111                 obdo->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
112                 obdo->o_valid |= OBD_MD_FLFLAGS;
113         }
114         obdo->o_fid = id_fid(&ll_i2info(inode)->lli_id);
115         obdo->o_mds = id_group(&ll_i2info(inode)->lli_id);
116
117
118         obdo->o_valid |= OBD_MD_FLEPOCH;
119         obdo->o_easize = ll_i2info(inode)->lli_io_epoch;
120
121         if (dirty) {
122                 /* we modified data through this handle */
123                 obdo->o_flags |= MDS_BFLAG_DIRTY_EPOCH;
124                 obdo->o_valid |= OBD_MD_FLFLAGS;
125                 if (ll_validate_size(inode, &obdo->o_size, &obdo->o_blocks))
126                         obdo->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
127         }
128
129         rc = md_close(md_exp, obdo, och, &req);
130         obdo_free(obdo);
131
132         if (rc == EAGAIN) {
133                 /*
134                  * we are the last writer, so the MDS has instructed us to get
135                  * the file size and any write cookies, then close again.
136                  */
137
138                 //ll_queue_done_writing(inode);
139                 rc = 0;
140         } else if (rc) {
141                 CERROR("inode %lu mdc close failed: rc = %d\n",
142                        (unsigned long)inode->i_ino, rc);
143         }
144
145         ptlrpc_req_finished(req);
146         EXIT;
147 out:
148         mdc_clear_open_replay_data(md_exp, och);
149         och->och_fh.cookie = DEAD_HANDLE_MAGIC;
150         OBD_FREE(och, sizeof *och);
151         return rc;
152 }
153
154 int ll_md_real_close(struct obd_export *md_exp,
155                      struct inode *inode, int flags)
156 {
157         struct ll_inode_info *lli = ll_i2info(inode);
158         int freeing = inode->i_state & I_FREEING;
159         struct obd_client_handle **och_p;
160         struct obd_client_handle *och;
161         __u64 *och_usecount;
162         int rc = 0, dirty = 0;
163         ENTRY;
164
165         if (flags & FMODE_WRITE) {
166                 och_p = &lli->lli_mds_write_och;
167                 och_usecount = &lli->lli_open_fd_write_count;
168         } else if (flags & FMODE_EXEC) {
169                 och_p = &lli->lli_mds_exec_och;
170                 och_usecount = &lli->lli_open_fd_exec_count;
171          } else {
172                 och_p = &lli->lli_mds_read_och;
173                 och_usecount = &lli->lli_open_fd_read_count;
174         }
175
176         down(&lli->lli_och_sem);
177         if (*och_usecount) { /* There are still users of this handle, so
178                                 skip freeing it. */
179                 up(&lli->lli_och_sem);
180                 RETURN(0);
181         }
182         if (ll_is_inode_dirty(inode)) {
183                 /* the inode still has dirty pages, let's close later */
184                 CDEBUG(D_INODE, "inode %lu/%u still has dirty pages\n",
185                        inode->i_ino, inode->i_generation);
186                 LASSERT(freeing == 0);
187                 ll_queue_done_writing(inode);
188                 up(&lli->lli_och_sem);
189                 RETURN(0);
190         }
191         
192         if (LLI_DIRTY_HANDLE(inode) && (flags & FMODE_WRITE)) {
193                 clear_bit(LLI_F_DIRTY_HANDLE,  &lli->lli_flags);
194                 dirty = 1;
195         } else if (0 && !(flags & FMODE_SYNC) && !freeing) {
196                 /* in order to speed up creation rate we pass
197                  * closing to dedicated thread so we don't need
198                  * to wait for close reply here -bzzz */
199                 ll_queue_done_writing(inode);
200                 up(&lli->lli_och_sem);
201                 RETURN(0);
202         }
203
204         och = *och_p;
205         *och_p = NULL;
206
207
208         up(&lli->lli_och_sem);
209
210         /*
211          * there might be a race and somebody have freed this och
212          * already. Another way to have this twice called is if file closing
213          * will fail due to netwok problems and on umount lock will be canceled
214          * and this will be called from block_ast callack.
215         */
216         if (och && och->och_fh.cookie != DEAD_HANDLE_MAGIC)
217                 rc = ll_md_och_close(md_exp, inode, och, dirty);
218         
219         RETURN(rc);
220 }
221
222 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
223                 struct file *file)
224 {
225         struct ll_file_data *fd = file->private_data;
226         struct ll_inode_info *lli = ll_i2info(inode);
227         int rc = 0;
228         ENTRY;
229
230         /* clear group lock, if present */
231         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
232                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
233                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
234                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
235                                       &fd->fd_cwlockh);
236         }
237
238         /* Let's see if we have good enough OPEN lock on the file and if
239            we can skip talking to MDS */
240         if (file->f_dentry->d_inode) {
241                 int lockmode;
242                 struct obd_device *obddev;
243                 struct lustre_handle lockh;
244                 int flags = LDLM_FL_BLOCK_GRANTED;
245                 struct ldlm_res_id file_res_id = {.name = {id_fid(&lli->lli_id), 
246                                                            id_group(&lli->lli_id)}};
247                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
248
249                 down(&lli->lli_och_sem);
250                 if (fd->fd_omode & FMODE_WRITE) {
251                         lockmode = LCK_CW;
252                         LASSERT(lli->lli_open_fd_write_count);
253                         lli->lli_open_fd_write_count--;
254                 } else if (fd->fd_omode & FMODE_EXEC) {
255                         lockmode = LCK_PR;
256                         LASSERT(lli->lli_open_fd_exec_count);
257                         lli->lli_open_fd_exec_count--;
258                 } else {
259                         lockmode = LCK_CR;
260                         LASSERT(lli->lli_open_fd_read_count);
261                         lli->lli_open_fd_read_count--;
262                 }
263                 up(&lli->lli_och_sem);
264                 
265                 obddev = md_get_real_obd(md_exp, &lli->lli_id);
266                 if (!ldlm_lock_match(obddev->obd_namespace, flags, &file_res_id,
267                                      LDLM_IBITS, &policy, lockmode, &lockh))
268                 {
269                         rc = ll_md_real_close(md_exp, file->f_dentry->d_inode,
270                                               fd->fd_omode);
271                 } else {
272                         ldlm_lock_decref(&lockh, lockmode);
273                 }
274         }
275
276         file->private_data = NULL;
277         OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof(*fd));
278         RETURN(rc);
279 }
280
281 /* While this returns an error code, fput() the caller does not, so we need
282  * to make every effort to clean up all of our state here.  Also, applications
283  * rarely check close errors and even if an error is returned they will not
284  * re-try the close call.
285  */
286 int ll_file_release(struct inode *inode, struct file *file)
287 {
288         struct ll_file_data *fd;
289         struct ll_sb_info *sbi = ll_i2sbi(inode);
290         int rc;
291
292         ENTRY;
293         CDEBUG(D_VFSTRACE, "VFS Op:inode="DLID4"(%p)\n",
294                OLID4(&ll_i2info(inode)->lli_id), inode);
295
296         /* don't do anything for / */
297         if (inode->i_sb->s_root == file->f_dentry)
298                 RETURN(0);
299
300         lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_RELEASE);
301         fd = (struct ll_file_data *)file->private_data;
302         LASSERT(fd != NULL);
303
304         rc = ll_md_close(sbi->ll_md_exp, inode, file);
305         RETURN(rc);
306 }
307
308 static int ll_intent_file_open(struct file *file, void *lmm,
309                                int lmmsize, struct lookup_intent *itp)
310 {
311         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
312         const char *name = (char *)file->f_dentry->d_name.name;
313         struct dentry *parent = file->f_dentry->d_parent;
314         const int len = file->f_dentry->d_name.len;
315         struct lustre_handle lockh;
316         struct mdc_op_data *op_data;
317         int rc;
318
319         if (!parent)
320                 RETURN(-ENOENT);
321
322         OBD_ALLOC(op_data, sizeof(*op_data));
323         if (op_data == NULL)
324                 RETURN(-ENOMEM);
325         
326         ll_prepare_mdc_data(op_data, parent->d_inode, NULL,
327                             name, len, O_RDWR);
328
329         rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, itp, LCK_PR, op_data,
330                         &lockh, lmm, lmmsize, ldlm_completion_ast,
331                         ll_mdc_blocking_ast, NULL);
332         OBD_FREE(op_data, sizeof(*op_data));
333         if (rc == 0) {
334                 if (LUSTRE_IT(itp)->it_lock_mode)
335                         memcpy(&LUSTRE_IT(itp)->it_lock_handle,
336                                &lockh, sizeof(lockh));
337
338         } else if (rc < 0) {
339                 CERROR("lock enqueue: err: %d\n", rc);
340         }
341         RETURN(rc);
342 }
343
344 void ll_och_fill(struct inode *inode, struct lookup_intent *it,
345                  struct obd_client_handle *och)
346 {
347         struct ptlrpc_request *req = LUSTRE_IT(it)->it_data;
348         struct ll_inode_info *lli = ll_i2info(inode);
349         struct mds_body *body;
350         LASSERT(och);
351
352         body = lustre_msg_buf (req->rq_repmsg, 1, sizeof (*body));
353         LASSERT (body != NULL);          /* reply already checked out */
354         LASSERT_REPSWABBED (req, 1);     /* and swabbed down */
355
356         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
357         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
358         lli->lli_io_epoch = body->io_epoch;
359         mdc_set_open_replay_data(ll_i2mdexp(inode), och, 
360                                  LUSTRE_IT(it)->it_data);
361 }
362
363 int ll_local_open(struct file *file, struct lookup_intent *it,
364                   struct obd_client_handle *och)
365 {
366         struct ll_file_data *fd;
367         ENTRY;
368
369         if (och)
370                 ll_och_fill(file->f_dentry->d_inode, it, och);
371
372         LASSERTF(file->private_data == NULL, "file %.*s/%.*s ino %lu/%u (%o)\n",
373                  file->f_dentry->d_name.len, file->f_dentry->d_name.name,
374                  file->f_dentry->d_parent->d_name.len,
375                  file->f_dentry->d_parent->d_name.name,
376                  file->f_dentry->d_inode->i_ino,
377                  file->f_dentry->d_inode->i_generation,
378                  file->f_dentry->d_inode->i_mode);
379
380         OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
381         
382         /* We can't handle this well without reorganizing ll_file_open and
383          * ll_md_close(), so don't even try right now. */
384         LASSERT(fd != NULL);
385
386         file->private_data = fd;
387         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
388         fd->fd_omode = it->it_flags;
389         RETURN(0);
390 }
391
392 /* Open a file, and (for the very first open) create objects on the OSTs at
393  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
394  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
395  * lli_open_sem to ensure no other process will create objects, send the
396  * stripe MD to the MDS, or try to destroy the objects if that fails.
397  *
398  * If we already have the stripe MD locally then we don't request it in
399  * mdc_open(), by passing a lmm_size = 0.
400  *
401  * It is up to the application to ensure no other processes open this file
402  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
403  * used.  We might be able to avoid races of that sort by getting lli_open_sem
404  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
405  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
406  */
407 int ll_file_open(struct inode *inode, struct file *file)
408 {
409         struct ll_inode_info *lli = ll_i2info(inode);
410         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
411                                           .it_flags = file->f_flags };
412         struct lov_stripe_md *lsm;
413         struct ptlrpc_request *req;
414         int rc = 0;
415         struct obd_client_handle **och_p = NULL;
416         __u64 *och_usecount = NULL;
417         ENTRY;
418
419         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n",
420                inode->i_ino, inode->i_generation, inode, file->f_flags);
421
422         /* don't do anything for / */
423         if (inode->i_sb->s_root == file->f_dentry)
424                 RETURN(0);
425
426         if ((file->f_flags+1) & O_ACCMODE)
427                 oit.it_flags++;
428         if (file->f_flags & O_TRUNC)
429                 oit.it_flags |= 2;
430
431         it = file->f_it;
432
433         /*
434          * sometimes LUSTRE_IT(it) may not be allocated like opening file by
435          * dentry_open() from GNS stuff.
436          */
437         if (!it || !LUSTRE_IT(it)) {
438                 it = &oit;
439                 rc = ll_intent_alloc(it);
440                 if (rc)
441                         GOTO(out, rc);
442         }
443
444         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
445         
446         /*
447          * mdc_intent_lock() didn't get a request ref if there was an open
448          * error, so don't do cleanup on the * request here (bug 3430)
449          */
450         if (LUSTRE_IT(it)->it_disposition) {
451                 rc = it_open_error(DISP_OPEN_OPEN, it);
452                 if (rc)
453                         RETURN(rc);
454         }
455
456         /* Let's see if we have file open on MDS already. */
457         if (it->it_flags & FMODE_WRITE) {
458                 och_p = &lli->lli_mds_write_och;
459                 och_usecount = &lli->lli_open_fd_write_count;
460         } else if (it->it_flags & FMODE_EXEC) {
461                 och_p = &lli->lli_mds_exec_och;
462                 och_usecount = &lli->lli_open_fd_exec_count;
463         } else {
464                 och_p = &lli->lli_mds_read_och;
465                 och_usecount = &lli->lli_open_fd_read_count;
466         }
467
468         down(&lli->lli_och_sem);
469         if (*och_p) { /* Open handle is present */
470                 if (it_disposition(it, DISP_LOOKUP_POS) && /* Positive lookup */
471                     it_disposition(it, DISP_OPEN_OPEN)) { /* & OPEN happened */
472                         struct obd_client_handle *och;
473                         /* Well, there's extra open request that we do not need,
474                            let's close it somehow*/
475                         OBD_ALLOC(och, sizeof (struct obd_client_handle));
476                         if (!och) {
477                                 up(&lli->lli_och_sem);
478                                 RETURN(-ENOMEM);
479                         }
480
481                         ll_och_fill(inode, it, och);
482                         /* ll_md_och_close() will free och */
483                         ll_md_och_close(ll_i2mdexp(inode), inode, och, 0);
484                 }
485                 (*och_usecount)++;
486                         
487                 rc = ll_local_open(file, it, NULL);
488                 if (rc)
489                         LBUG();
490         } else {
491                 LASSERT(*och_usecount == 0);
492                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
493                 if (!*och_p)
494                         GOTO(out, rc = -ENOMEM);
495                 (*och_usecount)++;
496
497                 if (!it || !LUSTRE_IT(it) || !LUSTRE_IT(it)->it_disposition) {
498                         /*
499                          * we are going to replace intent here, and that may
500                          * possibly change access mode (FMODE_EXEC can only be
501                          * set in intent), but I hope it never happens (I was
502                          * not able to trigger it yet at least) -- green
503                          */
504                         
505                         /* FIXME: FMODE_EXEC is not covered by O_ACCMODE! */
506                         LASSERT(!(it->it_flags & FMODE_EXEC));
507                         LASSERTF((it->it_flags & O_ACCMODE) ==
508                                  (oit.it_flags & O_ACCMODE), "Changing intent "
509                                  "flags %x to incompatible %x\n", it->it_flags,
510                                  oit.it_flags);
511                         it = &oit;
512                         rc = ll_intent_file_open(file, NULL, 0, it);
513                         if (rc)
514                                 GOTO(out, rc);
515                         rc = it_open_error(DISP_OPEN_OPEN, it);
516                         if (rc)
517                                 GOTO(out_och_free, rc);
518
519                         mdc_set_lock_data(NULL, &LUSTRE_IT(it)->it_lock_handle,
520                                           file->f_dentry->d_inode);
521                 }
522                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
523                 rc = ll_local_open(file, it, *och_p);
524                 LASSERTF(rc == 0, "rc = %d\n", rc);
525         }
526         up(&lli->lli_och_sem);
527         
528         /*
529          * must do this outside lli_och_sem lock to prevent deadlock where
530          * different kind of OPEN lock for this same inode gets cancelled by
531          * ldlm_cancel_lru
532          */
533
534         if (!S_ISREG(inode->i_mode))
535                 GOTO(out, rc);
536
537         lsm = lli->lli_smd;
538         if (lsm == NULL) {
539                 if (file->f_flags & O_LOV_DELAY_CREATE ||
540                     !(file->f_mode & FMODE_WRITE)) {
541                         CDEBUG(D_INODE, "object creation was delayed\n");
542                         GOTO(out, rc);
543                 }
544         }
545         file->f_flags &= ~O_LOV_DELAY_CREATE;
546         GOTO(out, rc);
547  out:
548         req = LUSTRE_IT(it)->it_data;
549         ll_intent_drop_lock(it);
550         ll_intent_release(it);
551         ptlrpc_req_finished(req);
552         if (rc == 0) {
553                 ll_open_complete(inode);
554         } else {
555 out_och_free:
556                 if (*och_p) {
557                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
558                         *och_p = NULL; /* OBD_FREE writes some magic there */
559                         (*och_usecount)--;
560                 }
561                 up(&lli->lli_och_sem);
562         }
563                 
564         return rc;
565 }
566
567 /* Fills the obdo with the attributes for the inode defined by lsm */
568 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
569                    struct obdo *oa)
570 {
571         struct ptlrpc_request_set *set;
572         int rc;
573         ENTRY;
574
575         LASSERT(lsm != NULL);
576
577         memset(oa, 0, sizeof *oa);
578         oa->o_id = lsm->lsm_object_id;
579         oa->o_gr = lsm->lsm_object_gr;
580         oa->o_mode = S_IFREG;
581         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
582                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
583                 OBD_MD_FLCTIME | OBD_MD_FLGROUP;
584
585         set = ptlrpc_prep_set();
586         if (set == NULL) {
587                 rc = -ENOMEM;
588         } else {
589                 rc = obd_getattr_async(exp, oa, lsm, set);
590                 if (rc == 0)
591                         rc = ptlrpc_set_wait(set);
592                 ptlrpc_set_destroy(set);
593         }
594         if (rc)
595                 RETURN(rc);
596
597         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
598                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
599         RETURN(0);
600 }
601
602 static inline void ll_remove_suid(struct inode *inode)
603 {
604         unsigned int mode;
605
606         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
607         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
608
609         /* was any of the uid bits set? */
610         mode &= inode->i_mode;
611         if (mode && !capable(CAP_FSETID)) {
612                 inode->i_mode &= ~mode;
613                 // XXX careful here - we cannot change the size
614         }
615 }
616
617 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
618 {
619         struct ll_inode_info *lli = ll_i2info(inode);
620         struct lov_stripe_md *lsm = lli->lli_smd;
621         struct obd_export *exp = ll_i2dtexp(inode);
622         struct {
623                 char name[16];
624                 struct ldlm_lock *lock;
625                 struct lov_stripe_md *lsm;
626         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
627         __u32 stripe, vallen = sizeof(stripe);
628         int rc;
629         ENTRY;
630
631         if (lsm->lsm_stripe_count == 1)
632                 GOTO(check, stripe = 0);
633
634         /* get our offset in the lov */
635         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
636         if (rc != 0) {
637                 CERROR("obd_get_info: rc = %d\n", rc);
638                 RETURN(rc);
639         }
640         LASSERT(stripe < lsm->lsm_stripe_count);
641         EXIT;
642 check:
643         if (lsm->lsm_oinfo[stripe].loi_id != lock->l_resource->lr_name.name[0]||
644             lsm->lsm_oinfo[stripe].loi_gr != lock->l_resource->lr_name.name[2]){
645                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64
646                            " inode=%lu/%u (%p)\n",
647                            lsm->lsm_oinfo[stripe].loi_id,
648                            lsm->lsm_oinfo[stripe].loi_gr,
649                            inode->i_ino, inode->i_generation, inode);
650                 return -ELDLM_NO_LOCK_DATA;
651         }
652
653         return stripe;
654 }
655
656 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
657  * we get a lock cancellation for each stripe, so we have to map the obd's
658  * region back onto the stripes in the file that it held.
659  *
660  * No one can dirty the extent until we've finished our work and they can
661  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
662  * but other kernel actors could have pages locked.
663  *
664  * Called with the DLM lock held. */
665 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
666                               struct ldlm_lock *lock, __u32 stripe)
667 {
668         ldlm_policy_data_t tmpex;
669         unsigned long start, end, count, skip, i, j;
670         struct page *page;
671         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
672         struct lustre_handle lockh;
673         ENTRY;
674
675         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
676         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
677                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
678                inode->i_size);
679
680         /* our locks are page granular thanks to osc_enqueue, we invalidate the
681          * whole page. */
682         LASSERT((tmpex.l_extent.start & ~PAGE_CACHE_MASK) == 0);
683         LASSERT(((tmpex.l_extent.end + 1) & ~PAGE_CACHE_MASK) == 0);
684
685         count = ~0;
686         skip = 0;
687         start = tmpex.l_extent.start >> PAGE_CACHE_SHIFT;
688         end = tmpex.l_extent.end >> PAGE_CACHE_SHIFT;
689         if (lsm->lsm_stripe_count > 1) {
690                 count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
691                 skip = (lsm->lsm_stripe_count - 1) * count;
692                 start += start/count * skip + stripe * count;
693                 if (end != ~0)
694                         end += end/count * skip + stripe * count;
695         }
696         if (end < tmpex.l_extent.end >> PAGE_CACHE_SHIFT)
697                 end = ~0;
698
699         i = inode->i_size ? (inode->i_size - 1) >> PAGE_CACHE_SHIFT : 0;
700         if (i < end)
701                 end = i;
702
703         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
704                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
705                count, skip, end, discard ? " (DISCARDING)" : "");
706         
707         /* walk through the vmas on the inode and tear down mmaped pages that
708          * intersect with the lock.  this stops immediately if there are no
709          * mmap()ed regions of the file.  This is not efficient at all and
710          * should be short lived. We'll associate mmap()ed pages with the lock
711          * and will be able to find them directly */
712         
713         for (i = start; i <= end; i += (j + skip)) {
714                 j = min(count - (i % count), end - i + 1);
715                 LASSERT(j > 0);
716                 LASSERT(inode->i_mapping);
717                 if (ll_teardown_mmaps(inode->i_mapping, 
718                                       (__u64)i << PAGE_CACHE_SHIFT,
719                                       ((__u64)(i+j) << PAGE_CACHE_SHIFT) - 1) )
720                         break;
721         }
722
723         /* this is the simplistic implementation of page eviction at
724          * cancelation.  It is careful to get races with other page
725          * lockers handled correctly.  fixes from bug 20 will make it
726          * more efficient by associating locks with pages and with
727          * batching writeback under the lock explicitly. */
728         for (i = start, j = start % count; i <= end;
729              j++, i++, tmpex.l_extent.start += PAGE_CACHE_SIZE) {
730                 if (j == count) {
731                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
732                         i += skip;
733                         j = 0;
734                         if (i > end)
735                                 break;
736                 }
737                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
738                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
739                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
740                          start, i, end);
741
742                 if (!mapping_has_pages(inode->i_mapping)) {
743                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
744                         break;
745                 }
746
747                 cond_resched();
748
749                 page = find_get_page(inode->i_mapping, i);
750                 if (page == NULL)
751                         continue;
752                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
753                                i, tmpex.l_extent.start);
754                 lock_page(page);
755
756                 /* page->mapping to check with racing against teardown */
757                 if (!discard && clear_page_dirty_for_io(page)) {
758                         rc = ll_call_writepage(inode, page);
759                         if (rc != 0)
760                                 CERROR("writepage of page %p failed: %d\n",
761                                        page, rc);
762                         /* either waiting for io to complete or reacquiring
763                          * the lock that the failed writepage released */
764                         lock_page(page);
765                 }
766
767                 tmpex.l_extent.end = tmpex.l_extent.start + PAGE_CACHE_SIZE - 1;
768                 /* check to see if another DLM lock covers this page */
769                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
770                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
771                                       LDLM_FL_TEST_LOCK,
772                                       &lock->l_resource->lr_name, LDLM_EXTENT,
773                                       &tmpex, LCK_PR | LCK_PW, &lockh);
774                 if (rc2 == 0 && page->mapping != NULL) {
775                         // checking again to account for writeback's lock_page()
776                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
777                         ll_ra_accounting(page, inode->i_mapping);
778                         ll_truncate_complete_page(page);
779                 }
780                 unlock_page(page);
781                 page_cache_release(page);
782         }
783         LASSERTF(tmpex.l_extent.start <=
784                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
785                   lock->l_policy_data.l_extent.end + 1),
786                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
787                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
788                  start, i, end);
789         EXIT;
790 }
791
792 static int ll_extent_lock_callback(struct ldlm_lock *lock,
793                                    struct ldlm_lock_desc *new, void *data,
794                                    int flag)
795 {
796         struct lustre_handle lockh = { 0 };
797         int rc;
798         ENTRY;
799
800         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
801                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
802                 LBUG();
803         }
804
805         switch (flag) {
806         case LDLM_CB_BLOCKING:
807                 ldlm_lock2handle(lock, &lockh);
808                 rc = ldlm_cli_cancel(&lockh);
809                 if (rc != ELDLM_OK)
810                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
811                 break;
812         case LDLM_CB_CANCELING: {
813                 struct inode *inode;
814                 struct ll_inode_info *lli;
815                 struct lov_stripe_md *lsm;
816                 __u32 stripe;
817                 __u64 kms;
818
819                 /* This lock wasn't granted, don't try to evict pages */
820                 if (lock->l_req_mode != lock->l_granted_mode)
821                         RETURN(0);
822
823                 inode = ll_inode_from_lock(lock);
824                 if (inode == NULL)
825                         RETURN(0);
826                 lli = ll_i2info(inode);
827                 if (lli == NULL)
828                         goto iput;
829                 if (lli->lli_smd == NULL)
830                         goto iput;
831                 lsm = lli->lli_smd;
832
833                 stripe = ll_lock_to_stripe_offset(inode, lock);
834                 if (stripe < 0)
835                         goto iput;
836                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
837
838                 down(&lli->lli_size_sem);
839                 lock_res_and_lock(lock);
840                 kms = ldlm_extent_shift_kms(lock,
841                                             lsm->lsm_oinfo[stripe].loi_kms);
842                 
843                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
844                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
845                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
846                 lsm->lsm_oinfo[stripe].loi_kms = kms;
847                 unlock_res_and_lock(lock);
848                 up(&lli->lli_size_sem);
849                 //ll_try_done_writing(inode);
850         iput:
851                 iput(inode);
852                 break;
853         }
854         default:
855                 LBUG();
856         }
857
858         RETURN(0);
859 }
860
861 #if 0
862 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
863 {
864         /* XXX ALLOCATE - 160 bytes */
865         struct inode *inode = ll_inode_from_lock(lock);
866         struct ll_inode_info *lli = ll_i2info(inode);
867         struct lustre_handle lockh = { 0 };
868         struct ost_lvb *lvb;
869         __u32 stripe;
870         ENTRY;
871
872         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
873                      LDLM_FL_BLOCK_CONV)) {
874                 LBUG(); /* not expecting any blocked async locks yet */
875                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
876                            "lock, returning");
877                 ldlm_lock_dump(D_OTHER, lock, 0);
878                 ldlm_reprocess_all(lock->l_resource);
879                 RETURN(0);
880         }
881
882         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
883
884         stripe = ll_lock_to_stripe_offset(inode, lock);
885         if (stripe < 0)
886                 goto iput;
887
888         if (lock->l_lvb_len) {
889                 struct lov_stripe_md *lsm = lli->lli_smd;
890                 __u64 kms;
891                 lvb = lock->l_lvb_data;
892                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
893
894                 down(&inode->i_sem);
895                 lock_res_and_lock(lock);
896                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
897                 kms = ldlm_extent_shift_kms(NULL, kms);
898                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
899                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
900                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
901                 lsm->lsm_oinfo[stripe].loi_kms = kms;
902                 unlock_res_and_lock(lock);
903                 up(&inode->i_sem);
904         }
905
906 iput:
907         iput(inode);
908         wake_up(&lock->l_waitq);
909
910         ldlm_lock2handle(lock, &lockh);
911         ldlm_lock_decref(&lockh, LCK_PR);
912         RETURN(0);
913 }
914 #endif
915
916 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
917 {
918         struct ptlrpc_request *req = reqp;
919         struct inode *inode = ll_inode_from_lock(lock);
920         struct ll_inode_info *lli;
921         struct ost_lvb *lvb;
922         struct lov_stripe_md *lsm;
923         int rc, size = sizeof(*lvb), stripe;
924         ENTRY;
925
926         if (inode == NULL)
927                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
928         lli = ll_i2info(inode);
929         if (lli == NULL)
930                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
931
932         lsm = lli->lli_smd;
933         if (lsm == NULL)
934                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
935
936         /* First, find out which stripe index this lock corresponds to. */
937         stripe = ll_lock_to_stripe_offset(inode, lock);
938         if (stripe < 0)
939                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
940
941         rc = lustre_pack_reply(req, 1, &size, NULL);
942         if (rc) {
943                 CERROR("lustre_pack_reply: %d\n", rc);
944                 GOTO(iput, rc);
945         }
946
947         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
948         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
949         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
950         lvb->lvb_atime = LTIME_S(inode->i_atime);
951         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
952
953         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64,
954                    inode->i_size, stripe, lvb->lvb_size);
955         GOTO(iput, 0);
956  iput:
957         iput(inode);
958
959  out:
960         /* These errors are normal races, so we don't want to fill the console
961          * with messages by calling ptlrpc_error() */
962         if (rc == -ELDLM_NO_LOCK_DATA)
963                 lustre_pack_reply(req, 0, NULL, NULL);
964
965         req->rq_status = rc;
966         return rc;
967 }
968
969 /* NB: lov_merge_size will prefer locally cached writes if they extend the
970  * file (because it prefers KMS over RSS when larger) */
971 int ll_glimpse_size(struct inode *inode)
972 {
973         struct ll_inode_info *lli = ll_i2info(inode);
974         struct ll_sb_info *sbi = ll_i2sbi(inode);
975         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
976         struct lustre_handle lockh = { 0 };
977         int rc, flags = LDLM_FL_HAS_INTENT;
978         ENTRY;
979
980         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
981
982         rc = obd_enqueue(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT, &policy,
983                          LCK_PR, &flags, ll_extent_lock_callback,
984                          ldlm_completion_ast, ll_glimpse_callback, inode,
985                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, &lockh);
986         if (rc == -ENOENT)
987                 RETURN(rc);
988
989         if (rc != 0) {
990                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
991                 RETURN(rc > 0 ? -EIO : rc);
992         }
993
994         down(&lli->lli_size_sem);
995         inode->i_size = lov_merge_size(lli->lli_smd, 0);
996         inode->i_blocks = lov_merge_blocks(lli->lli_smd);
997         up(&lli->lli_size_sem);
998
999         LTIME_S(inode->i_mtime) = lov_merge_mtime(lli->lli_smd,
1000                                                   LTIME_S(inode->i_mtime));
1001
1002         CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64", blocks: "LPU64"\n",
1003                (__u64)inode->i_size, (__u64)inode->i_blocks);
1004         
1005         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1006         RETURN(rc);
1007 }
1008
1009 void ll_stime_record(struct ll_sb_info *sbi, struct timeval *start,
1010                     struct obd_service_time *stime)
1011 {
1012         struct timeval stop;
1013         do_gettimeofday(&stop);
1014                                                                                                                                                                                                      
1015         spin_lock(&sbi->ll_lock);
1016         lprocfs_stime_record(stime, &stop, start);
1017         spin_unlock(&sbi->ll_lock);
1018 }
1019
1020 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1021                    struct lov_stripe_md *lsm, int mode,
1022                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1023                    int ast_flags, struct obd_service_time *stime)
1024 {
1025         struct ll_inode_info *lli = ll_i2info(inode);
1026         struct ll_sb_info *sbi = ll_i2sbi(inode);
1027         struct timeval start;
1028         int rc;
1029         ENTRY;
1030
1031         LASSERT(lockh->cookie == 0);
1032
1033         /* XXX phil: can we do this?  won't it screw the file size up? */
1034         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1035             (sbi->ll_flags & LL_SBI_NOLCK))
1036                 RETURN(0);
1037
1038         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1039                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1040
1041         do_gettimeofday(&start);
1042         rc = obd_enqueue(sbi->ll_dt_exp, lsm, LDLM_EXTENT, policy, mode,
1043                          &ast_flags, ll_extent_lock_callback,
1044                          ldlm_completion_ast, ll_glimpse_callback, inode,
1045                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
1046         if (rc > 0)
1047                 rc = -EIO;
1048         
1049         ll_stime_record(sbi, &start, stime);
1050
1051         if (policy->l_extent.start == 0 &&
1052             policy->l_extent.end == OBD_OBJECT_EOF) {
1053                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1054                  * the kms under both a DLM lock and the i_sem.  If we don't
1055                  * get the i_sem here we can match the DLM lock and reset
1056                  * i_size from the kms before the truncating path has updated
1057                  * the kms.  generic_file_write can then trust the stale i_size
1058                  * when doing appending writes and effectively cancel the
1059                  * result of the truncate.  Getting the i_sem after the enqueue
1060                  * maintains the DLM -> i_sem acquiry order. */
1061                 down(&lli->lli_size_sem);
1062                 inode->i_size = lov_merge_size(lsm, 1);
1063                 up(&lli->lli_size_sem);
1064         }
1065         
1066         if (rc == 0) {
1067                 LTIME_S(inode->i_mtime) =
1068                         lov_merge_mtime(lsm, LTIME_S(inode->i_mtime));
1069         }
1070
1071         RETURN(rc);
1072 }
1073
1074 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1075                      struct lov_stripe_md *lsm, int mode,
1076                      struct lustre_handle *lockh)
1077 {
1078         struct ll_sb_info *sbi = ll_i2sbi(inode);
1079         int rc;
1080         ENTRY;
1081
1082         /* XXX phil: can we do this?  won't it screw the file size up? */
1083         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1084             (sbi->ll_flags & LL_SBI_NOLCK))
1085                 RETURN(0);
1086
1087         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1088
1089         RETURN(rc);
1090 }
1091
1092 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1093                             loff_t *ppos)
1094 {
1095         struct inode *inode = file->f_dentry->d_inode;
1096         struct ll_inode_info *lli = ll_i2info(inode);
1097         struct lov_stripe_md *lsm = lli->lli_smd;
1098         struct ll_lock_tree tree;
1099         struct ll_lock_tree_node *node;
1100         int rc;
1101         ssize_t retval;
1102         __u64 kms;
1103         ENTRY;
1104         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1105                inode->i_ino, inode->i_generation, inode, count, *ppos);
1106
1107         /* "If nbyte is 0, read() will return 0 and have no other results."
1108          *                      -- Single Unix Spec */
1109         if (count == 0)
1110                 RETURN(0);
1111
1112         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
1113                             count);
1114
1115         if (!lsm)
1116                 RETURN(0);
1117
1118         node = ll_node_from_inode(inode, *ppos, *ppos  + count - 1,
1119                                   LCK_PR);
1120
1121         tree.lt_fd = file->private_data;
1122
1123         rc = ll_tree_lock(&tree, node, inode, buf, count,
1124                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1125         if (rc != 0)
1126                 RETURN(rc);
1127
1128         down(&lli->lli_size_sem);
1129         kms = lov_merge_size(lsm, 1);
1130         if (*ppos + count - 1 > kms) {
1131                 /* A glimpse is necessary to determine whether we return a short
1132                  * read or some zeroes at the end of the buffer */
1133                 up(&lli->lli_size_sem);
1134                 retval = ll_glimpse_size(inode);
1135                 if (retval)
1136                         goto out;
1137         } else {
1138                 inode->i_size = kms;
1139                 up(&lli->lli_size_sem);
1140         }
1141
1142         CDEBUG(D_INFO, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1143                inode->i_ino, count, *ppos, inode->i_size);
1144
1145         /* turn off the kernel's read-ahead */
1146 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1147         file->f_ramax = 0;
1148 #else
1149         file->f_ra.ra_pages = 0;
1150 #endif
1151         retval = generic_file_read(file, buf, count, ppos);
1152
1153  out:
1154         ll_tree_unlock(&tree, inode);
1155         RETURN(retval);
1156 }
1157
1158 /*
1159  * Write to a file (through the page cache).
1160  */
1161 static ssize_t ll_file_write(struct file *file, const char *buf,
1162                              size_t count, loff_t *ppos)
1163 {
1164         struct inode *inode = file->f_dentry->d_inode;
1165         loff_t maxbytes = ll_file_maxbytes(inode);
1166         struct ll_lock_tree tree;
1167         struct ll_lock_tree_node *node;
1168         ssize_t retval;
1169         int rc;
1170
1171         ENTRY;
1172         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1173                inode->i_ino, inode->i_generation, inode, count, *ppos);
1174
1175         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1176
1177         /* POSIX, but surprised the VFS doesn't check this already */
1178         if (count == 0)
1179                 RETURN(0);
1180
1181         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1182          * called on the file, don't fail the below assertion (bug 2388). */
1183         if (file->f_flags & O_LOV_DELAY_CREATE &&
1184             ll_i2info(inode)->lli_smd == NULL)
1185                 RETURN(-EBADF);
1186
1187         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1188         
1189         if (file->f_flags & O_APPEND)
1190                 node = ll_node_from_inode(inode, 0, OBD_OBJECT_EOF, LCK_PW);
1191         else
1192                 node = ll_node_from_inode(inode, *ppos, *ppos  + count - 1,
1193                                           LCK_PW);
1194
1195         if (IS_ERR(node))
1196                 RETURN(PTR_ERR(node));
1197
1198         tree.lt_fd = file->private_data;
1199
1200         rc = ll_tree_lock(&tree, node, inode, buf, count,
1201                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1202         if (rc != 0)
1203                 RETURN(rc);
1204
1205         /* this is ok, g_f_w will overwrite this under i_sem if it races
1206          * with a local truncate, it just makes our maxbyte checking easier */
1207         if (file->f_flags & O_APPEND)
1208                 *ppos = inode->i_size;
1209
1210         if (*ppos >= maxbytes) {
1211                 if (count || *ppos > maxbytes) {
1212                         send_sig(SIGXFSZ, current, 0);
1213                         GOTO(out, retval = -EFBIG);
1214                 }
1215         }
1216         if (*ppos + count > maxbytes)
1217                 count = maxbytes - *ppos;
1218
1219         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1220                inode->i_ino, count, *ppos);
1221
1222         /* mark open handle dirty */
1223         set_bit(LLI_F_DIRTY_HANDLE, &(ll_i2info(inode)->lli_flags));
1224
1225         /* generic_file_write handles O_APPEND after getting i_sem */
1226         retval = generic_file_write(file, buf, count, ppos);
1227         EXIT;
1228 out:
1229         ll_tree_unlock(&tree, inode);
1230         /* serialize with mmap/munmap/mremap */
1231         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WRITE_BYTES,
1232                             retval > 0 ? retval : 0);
1233         return retval;
1234 }
1235
1236 static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1237                                     int flags, struct lov_user_md *lum,
1238                                     int lum_size)
1239 {
1240         struct ll_inode_info *lli = ll_i2info(inode);
1241         struct file *f;
1242         struct obd_export *exp = ll_i2dtexp(inode);
1243         struct lov_stripe_md *lsm;
1244         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1245         struct ptlrpc_request *req = NULL;
1246         int rc = 0;
1247         struct lustre_md md;
1248         struct obd_client_handle *och;
1249         ENTRY;
1250
1251         
1252         if ((file->f_flags+1) & O_ACCMODE)
1253                 oit.it_flags++;
1254         if (file->f_flags & O_TRUNC)
1255                 oit.it_flags |= 2;
1256
1257         down(&lli->lli_open_sem);
1258         lsm = lli->lli_smd;
1259         if (lsm) {
1260                 up(&lli->lli_open_sem);
1261                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1262                        inode->i_ino);
1263                 RETURN(-EEXIST);
1264         }
1265
1266         f = get_empty_filp();
1267         if (!f)
1268                 GOTO(out, -ENOMEM);
1269
1270         f->f_dentry = file->f_dentry;
1271         f->f_vfsmnt = file->f_vfsmnt;
1272         f->f_flags = flags;
1273
1274         rc = ll_intent_alloc(&oit);
1275         if (rc)
1276                 GOTO(out, rc);
1277
1278         rc = ll_intent_file_open(f, lum, lum_size, &oit);
1279         if (rc)
1280                 GOTO(out, rc);
1281         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1282                 GOTO(out, -ENOENT);
1283         
1284         req = LUSTRE_IT(&oit)->it_data;
1285         rc = LUSTRE_IT(&oit)->it_status;
1286
1287         if (rc < 0)
1288                 GOTO(out, rc);
1289
1290         rc = mdc_req2lustre_md(ll_i2mdexp(inode), req, 1, exp, &md);
1291         if (rc)
1292                 GOTO(out, rc);
1293         ll_update_inode(f->f_dentry->d_inode, &md);
1294
1295         OBD_ALLOC(och, sizeof(struct obd_client_handle));
1296         rc = ll_local_open(f, &oit, och);
1297         if (rc) { /* Actually ll_local_open cannot fail! */
1298                 GOTO(out, rc);
1299         }
1300         if (LUSTRE_IT(&oit)->it_lock_mode) {
1301                 ldlm_lock_decref_and_cancel((struct lustre_handle *)
1302                                             &LUSTRE_IT(&oit)->it_lock_handle,
1303                                             LUSTRE_IT(&oit)->it_lock_mode);
1304                 LUSTRE_IT(&oit)->it_lock_mode = 0;
1305         }
1306
1307         ll_intent_release(&oit);
1308
1309         /* ll_file_release will decrease the count, but won't free anything
1310            because we have at least one more reference coming from actual open
1311          */
1312         down(&lli->lli_och_sem);
1313         lli->lli_open_fd_write_count++;
1314         up(&lli->lli_och_sem);
1315         rc = ll_file_release(f->f_dentry->d_inode, f);
1316         
1317         /* Now also destroy our supplemental och */
1318         ll_md_och_close(ll_i2mdexp(inode), f->f_dentry->d_inode, och, 0);
1319         EXIT;
1320  out:
1321         ll_intent_release(&oit);
1322         if (f)
1323                 put_filp(f);
1324         up(&lli->lli_open_sem);
1325         if (req != NULL)
1326                 ptlrpc_req_finished(req);
1327         return rc;
1328 }
1329
1330 static int ll_lov_setea(struct inode *inode, struct file *file,
1331                         unsigned long arg)
1332 {
1333         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1334         struct lov_user_md  *lump;
1335         int lum_size = sizeof(struct lov_user_md) +
1336                 sizeof(struct lov_user_ost_data);
1337         int rc;
1338         ENTRY;
1339
1340         if (!capable (CAP_SYS_ADMIN))
1341                 RETURN(-EPERM);
1342
1343         OBD_ALLOC(lump, lum_size);
1344         if (lump == NULL) {
1345                 RETURN(-ENOMEM);
1346         }
1347         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1348         if (rc) {
1349                 OBD_FREE(lump, lum_size);
1350                 RETURN(-EFAULT);
1351         }
1352
1353         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1354
1355         OBD_FREE(lump, lum_size);
1356         RETURN(rc);
1357 }
1358
1359 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1360                             unsigned long arg)
1361 {
1362         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1363         int rc;
1364         int flags = FMODE_WRITE;
1365         ENTRY;
1366
1367         /* Bug 1152: copy properly when this is no longer true */
1368         LASSERT(sizeof(lum) == sizeof(*lump));
1369         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1370         rc = copy_from_user(&lum, lump, sizeof(lum));
1371         if (rc)
1372                 RETURN(-EFAULT);
1373
1374         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1375         if (rc == 0) {
1376                  put_user(0, &lump->lmm_stripe_count);
1377                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1378                                     0, ll_i2info(inode)->lli_smd, lump);
1379         }
1380         RETURN(rc);
1381 }
1382
1383 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1384 {
1385         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1386
1387         if (!lsm)
1388                 RETURN(-ENODATA);
1389
1390         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1391                              (void *)arg);
1392 }
1393
1394 static int ll_get_grouplock(struct inode *inode, struct file *file,
1395                          unsigned long arg)
1396 {
1397         struct ll_file_data *fd = file->private_data;
1398         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1399                                                     .end = OBD_OBJECT_EOF}};
1400         struct lustre_handle lockh = { 0 };
1401         struct ll_inode_info *lli = ll_i2info(inode);
1402         struct lov_stripe_md *lsm = lli->lli_smd;
1403         int flags = 0, rc;
1404         ENTRY;
1405
1406         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1407                 RETURN(-EINVAL);
1408         }
1409
1410         policy.l_extent.gid = arg;
1411         if (file->f_flags & O_NONBLOCK)
1412                 flags = LDLM_FL_BLOCK_NOWAIT;
1413
1414         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags,
1415                             &ll_i2sbi(inode)->ll_grouplock_stime);
1416         if (rc != 0)
1417                 RETURN(rc);
1418
1419         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1420         fd->fd_gid = arg;
1421         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1422
1423         RETURN(0);
1424 }
1425
1426 static int ll_put_grouplock(struct inode *inode, struct file *file,
1427                          unsigned long arg)
1428 {
1429         struct ll_file_data *fd = file->private_data;
1430         struct ll_inode_info *lli = ll_i2info(inode);
1431         struct lov_stripe_md *lsm = lli->lli_smd;
1432         int rc;
1433         ENTRY;
1434
1435         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1436                 /* Ugh, it's already unlocked. */
1437                 RETURN(-EINVAL);
1438         }
1439
1440         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1441                 RETURN(-EINVAL);
1442
1443         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1444
1445         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1446         if (rc)
1447                 RETURN(rc);
1448
1449         fd->fd_gid = 0;
1450         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1451
1452         RETURN(0);
1453 }
1454
1455 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
1456                   unsigned long arg)
1457 {
1458         struct ll_file_data *fd = file->private_data;
1459         struct ll_sb_info *sbi = ll_i2sbi(inode);
1460         int flags;
1461         ENTRY;
1462
1463         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1464                inode->i_generation, inode, cmd);
1465
1466         if (_IOC_TYPE(cmd) == 'T') /* tty ioctls */
1467                 RETURN(-ENOTTY);
1468
1469         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_IOCTL);
1470         switch(cmd) {
1471         case LL_IOC_GETFLAGS:
1472                 /* Get the current value of the file flags */
1473                 return put_user(fd->fd_flags, (int *)arg);
1474         case LL_IOC_SETFLAGS:
1475         case LL_IOC_CLRFLAGS:
1476                 /* Set or clear specific file flags */
1477                 /* XXX This probably needs checks to ensure the flags are
1478                  *     not abused, and to handle any flag side effects.
1479                  */
1480                 if (get_user(flags, (int *) arg))
1481                         RETURN(-EFAULT);
1482
1483                 if (cmd == LL_IOC_SETFLAGS)
1484                         fd->fd_flags |= flags;
1485                 else
1486                         fd->fd_flags &= ~flags;
1487                 RETURN(0);
1488         case LL_IOC_LOV_SETSTRIPE:
1489                 RETURN(ll_lov_setstripe(inode, file, arg));
1490         case LL_IOC_LOV_SETEA:
1491                 RETURN(ll_lov_setea(inode, file, arg));
1492         case IOC_MDC_SHOWFID: {
1493                 struct lustre_id *idp = (struct lustre_id *)arg;
1494                 struct lustre_id id;
1495                 char *filename;
1496                 int rc;
1497
1498                 filename = getname((const char *)arg);
1499                 if (IS_ERR(filename))
1500                         RETURN(PTR_ERR(filename));
1501
1502                 ll_inode2id(&id, inode);
1503
1504                 rc = ll_get_fid(sbi->ll_md_exp, &id, filename, &id);
1505                 if (rc < 0)
1506                         GOTO(out_filename, rc);
1507
1508                 rc = copy_to_user(idp, &id, sizeof(*idp));
1509                 if (rc)
1510                         GOTO(out_filename, rc = -EFAULT);
1511
1512                 EXIT;
1513         out_filename:
1514                 putname(filename);
1515                 return rc;
1516         }
1517         case LL_IOC_LOV_GETSTRIPE:
1518                 RETURN(ll_lov_getstripe(inode, arg));
1519         case EXT3_IOC_GETFLAGS:
1520         case EXT3_IOC_SETFLAGS:
1521                 RETURN( ll_iocontrol(inode, file, cmd, arg) );
1522         case LL_IOC_GROUP_LOCK:
1523                 RETURN(ll_get_grouplock(inode, file, arg));
1524         case LL_IOC_GROUP_UNLOCK:
1525                 RETURN(ll_put_grouplock(inode, file, arg));
1526         case EXT3_IOC_GETVERSION_OLD:
1527         case EXT3_IOC_GETVERSION:
1528                 return put_user(inode->i_generation, (int *) arg);
1529         /* We need to special case any other ioctls we want to handle,
1530          * to send them to the MDS/OST as appropriate and to properly
1531          * network encode the arg field.
1532         case EXT2_IOC_GETVERSION_OLD:
1533         case EXT2_IOC_GETVERSION_NEW:
1534         case EXT2_IOC_SETVERSION_OLD:
1535         case EXT2_IOC_SETVERSION_NEW:
1536         case EXT3_IOC_SETVERSION_OLD:
1537         case EXT3_IOC_SETVERSION:
1538         */
1539         case LL_IOC_FLUSH_CRED:
1540                 RETURN(ll_flush_cred(inode));
1541         default:
1542                 RETURN( obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
1543                                       (void *)arg) );
1544         }
1545 }
1546
1547 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
1548 {
1549         struct inode *inode = file->f_dentry->d_inode;
1550         struct ll_file_data *fd = file->private_data;
1551         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1552         struct lustre_handle lockh = {0};
1553         loff_t retval;
1554         ENTRY;
1555         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),to=%llu\n", inode->i_ino,
1556                inode->i_generation, inode,
1557                offset + ((origin==2) ? inode->i_size : file->f_pos));
1558
1559         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
1560         if (origin == 2) { /* SEEK_END */
1561                 ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF }};
1562                 struct ll_inode_info *lli = ll_i2info(inode);
1563                 int nonblock = 0, rc;
1564
1565                 if (file->f_flags & O_NONBLOCK)
1566                         nonblock = LDLM_FL_BLOCK_NOWAIT;
1567
1568                 rc = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh,
1569                                     nonblock, &ll_i2sbi(inode)->ll_seek_stime);
1570                 if (rc != 0)
1571                         RETURN(rc);
1572
1573                 down(&lli->lli_size_sem);
1574                 offset += inode->i_size;
1575                 up(&lli->lli_size_sem);
1576         } else if (origin == 1) { /* SEEK_CUR */
1577                 offset += file->f_pos;
1578         }
1579
1580         retval = -EINVAL;
1581         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
1582                 if (offset != file->f_pos) {
1583                         file->f_pos = offset;
1584 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1585                         file->f_reada = 0;
1586                         file->f_version = ++event;
1587 #endif
1588                 }
1589                 retval = offset;
1590         }
1591
1592         if (origin == 2)
1593                 ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
1594         RETURN(retval);
1595 }
1596
1597 int ll_fsync(struct file *file, struct dentry *dentry, int data)
1598 {
1599         struct inode *inode = dentry->d_inode;
1600         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1601         struct lustre_id id;
1602         struct ptlrpc_request *req;
1603         int rc, err;
1604         ENTRY;
1605         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
1606                inode->i_generation, inode);
1607
1608         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_FSYNC);
1609
1610         /* fsync's caller has already called _fdata{sync,write}, we want
1611          * that IO to finish before calling the osc and mdc sync methods */
1612         rc = filemap_fdatawait(inode->i_mapping);
1613
1614         ll_inode2id(&id, inode);
1615         err = md_sync(ll_i2sbi(inode)->ll_md_exp, &id, &req);
1616         if (!rc)
1617                 rc = err;
1618         if (!err)
1619                 ptlrpc_req_finished(req);
1620
1621         if (data && lsm) {
1622                 struct obdo *oa = obdo_alloc();
1623
1624                 if (!oa)
1625                         RETURN(rc ? rc : -ENOMEM);
1626
1627                 oa->o_id = lsm->lsm_object_id;
1628                 oa->o_gr = lsm->lsm_object_gr;
1629                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1630
1631                 obdo_from_inode(oa, inode, (OBD_MD_FLTYPE | OBD_MD_FLATIME |
1632                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME |
1633                                             OBD_MD_FLGROUP));
1634
1635                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
1636                                0, OBD_OBJECT_EOF);
1637                 if (!rc)
1638                         rc = err;
1639                 obdo_free(oa);
1640         }
1641
1642         RETURN(rc);
1643 }
1644
1645 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
1646 {
1647         struct inode *inode = file->f_dentry->d_inode;
1648         struct ll_inode_info *li = ll_i2info(inode);
1649         struct ll_sb_info *sbi = ll_i2sbi(inode);
1650         struct obd_device *obddev;
1651         struct ldlm_res_id res_id =
1652                 { .name = {id_fid(&li->lli_id), id_group(&li->lli_id), LDLM_FLOCK} };
1653         struct lustre_handle lockh = {0};
1654         ldlm_policy_data_t flock;
1655         ldlm_mode_t mode = 0;
1656         int flags = 0;
1657         int rc;
1658         ENTRY;
1659
1660         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
1661                inode->i_ino, file_lock);
1662
1663         flock.l_flock.pid = file_lock->fl_pid;
1664         flock.l_flock.start = file_lock->fl_start;
1665         flock.l_flock.end = file_lock->fl_end;
1666
1667         switch (file_lock->fl_type) {
1668         case F_RDLCK:
1669                 mode = LCK_PR;
1670                 break;
1671         case F_UNLCK:
1672                 /* An unlock request may or may not have any relation to
1673                  * existing locks so we may not be able to pass a lock handle
1674                  * via a normal ldlm_lock_cancel() request. The request may even
1675                  * unlock a byte range in the middle of an existing lock. In
1676                  * order to process an unlock request we need all of the same
1677                  * information that is given with a normal read or write record
1678                  * lock request. To avoid creating another ldlm unlock (cancel)
1679                  * message we'll treat a LCK_NL flock request as an unlock. */
1680                 mode = LCK_NL;
1681                 break;
1682         case F_WRLCK:
1683                 mode = LCK_PW;
1684                 break;
1685         default:
1686                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
1687                 LBUG();
1688         }
1689
1690         switch (cmd) {
1691         case F_SETLKW:
1692 #ifdef F_SETLKW64
1693         case F_SETLKW64:
1694 #endif
1695                 flags = 0;
1696                 break;
1697         case F_SETLK:
1698 #ifdef F_SETLK64
1699         case F_SETLK64:
1700 #endif
1701                 flags = LDLM_FL_BLOCK_NOWAIT;
1702                 break;
1703         case F_GETLK:
1704 #ifdef F_GETLK64
1705         case F_GETLK64:
1706 #endif
1707                 flags = LDLM_FL_TEST_LOCK;
1708                 /* Save the old mode so that if the mode in the lock changes we
1709                  * can decrement the appropriate reader or writer refcount. */
1710                 file_lock->fl_type = mode;
1711                 break;
1712         default:
1713                 CERROR("unknown fcntl lock command: %d\n", cmd);
1714                 LBUG();
1715         }
1716
1717         CDEBUG(D_DLMTRACE, "inode=%lu, pid="LPU64", flags=%#x, mode=%u, "
1718                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
1719                flags, mode, flock.l_flock.start, flock.l_flock.end);
1720
1721         obddev = md_get_real_obd(sbi->ll_md_exp, &li->lli_id);
1722         rc = ldlm_cli_enqueue(obddev->obd_self_export, NULL,
1723                               obddev->obd_namespace,
1724                               res_id, LDLM_FLOCK, &flock, mode, &flags,
1725                               NULL, ldlm_flock_completion_ast, NULL, file_lock,
1726                               NULL, 0, NULL, &lockh);
1727         RETURN(rc);
1728 }
1729
1730 int ll_inode_revalidate_it(struct dentry *dentry)
1731 {
1732         struct lookup_intent oit = { .it_op = IT_GETATTR };
1733         struct inode *inode = dentry->d_inode;
1734         struct ptlrpc_request *req = NULL;
1735         struct ll_inode_info *lli;
1736         struct lov_stripe_md *lsm;
1737         struct ll_sb_info *sbi;
1738         struct lustre_id id;
1739         int rc;
1740         ENTRY;
1741
1742         if (!inode) {
1743                 CERROR("REPORT THIS LINE TO PETER\n");
1744                 RETURN(0);
1745         }
1746         
1747         sbi = ll_i2sbi(inode);
1748         
1749         ll_inode2id(&id, inode);
1750         lli = ll_i2info(inode);
1751         LASSERT(id_fid(&id) != 0);
1752
1753         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), name=%s(%p)\n",
1754                inode->i_ino, inode->i_generation, inode, dentry->d_name.name,
1755                dentry);
1756
1757 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
1758         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE);
1759 #endif
1760
1761         rc = ll_intent_alloc(&oit);
1762         if (rc)
1763                 RETURN(-ENOMEM);
1764
1765         rc = md_intent_lock(sbi->ll_md_exp, &id, NULL, 0, NULL, 0, &id,
1766                             &oit, 0, &req, ll_mdc_blocking_ast);
1767         if (rc < 0)
1768                 GOTO(out, rc);
1769
1770         rc = revalidate_it_finish(req, 1, &oit, dentry);
1771         if (rc) {
1772                 GOTO(out, rc);
1773         }
1774
1775         ll_lookup_finish_locks(&oit, dentry);
1776
1777         if (!LLI_HAVE_FLSIZE(inode)) {
1778                 /* if object not yet allocated, don't validate size */
1779                 lsm = lli->lli_smd;
1780                 if (lsm != NULL) {
1781                         /* ll_glimpse_size() will prefer locally cached
1782                          * writes if they extend the file */
1783                         rc = ll_glimpse_size(inode);
1784                 }
1785         }
1786         EXIT;
1787 out:
1788         ll_intent_release(&oit);
1789         if (req)
1790                 ptlrpc_req_finished(req);
1791         return rc;
1792 }
1793
1794 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1795 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
1796 {
1797         int res = 0;
1798         struct inode *inode = de->d_inode;
1799         struct ll_inode_info *lli = ll_i2info(inode);
1800
1801         res = ll_inode_revalidate_it(de);
1802         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
1803
1804         if (res)
1805                 return res;
1806
1807         stat->ino = inode->i_ino;
1808         stat->mode = inode->i_mode;
1809         stat->nlink = inode->i_nlink;
1810         stat->uid = inode->i_uid;
1811         stat->gid = inode->i_gid;
1812         stat->atime = inode->i_atime;
1813         stat->mtime = inode->i_mtime;
1814         stat->ctime = inode->i_ctime;
1815         stat->blksize = inode->i_blksize;
1816
1817         down(&lli->lli_size_sem);
1818         stat->size = inode->i_size;
1819         stat->blocks = inode->i_blocks;
1820         up(&lli->lli_size_sem);
1821         
1822         stat->rdev = kdev_t_to_nr(inode->i_rdev);
1823         stat->dev = id_group(&ll_i2info(inode)->lli_id);
1824         return 0;
1825 }
1826 #endif
1827
1828 static
1829 int ll_setxattr_internal(struct inode *inode, const char *name,
1830                          const void *value, size_t size, int flags, 
1831                          __u64 valid)
1832 {
1833         struct ll_sb_info *sbi = ll_i2sbi(inode);
1834         struct ptlrpc_request *request = NULL;
1835         struct mdc_op_data op_data;
1836         struct iattr attr;
1837         int rc = 0;
1838         ENTRY;
1839
1840         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu\n", inode->i_ino);
1841         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_SETXATTR);
1842
1843         if (sbi->ll_remote && !strcmp(name, XATTR_NAME_ACL_ACCESS))
1844                 RETURN(-EOPNOTSUPP);
1845
1846         memset(&attr, 0x0, sizeof(attr));
1847         attr.ia_valid |= valid;
1848         attr.ia_attr_flags = flags;
1849
1850         ll_prepare_mdc_data(&op_data, inode, NULL, NULL, 0, 0);
1851
1852         rc = md_setattr(sbi->ll_md_exp, &op_data, &attr,
1853                         (void*) name, strnlen(name, XATTR_NAME_MAX) + 1,
1854                         (void*) value, size, &request);
1855         if (rc)
1856                 GOTO(out, rc);
1857
1858  out:
1859         ptlrpc_req_finished(request);
1860         RETURN(rc);
1861 }
1862
1863 int ll_setxattr(struct dentry *dentry, const char *name, const void *value,
1864                 size_t size, int flags)
1865 {
1866         int rc, error;
1867         struct posix_acl *acl;
1868         struct ll_inode_info *lli;
1869         ENTRY;
1870
1871         rc = ll_setxattr_internal(dentry->d_inode, name, value, size, 
1872                                   flags, ATTR_EA);
1873
1874         /* update inode's acl info */
1875         if (rc == 0 && strcmp(name, XATTR_NAME_ACL_ACCESS) == 0) {
1876                 if (value) {
1877                         acl = posix_acl_from_xattr(value, size);
1878                         if (IS_ERR(acl)) {
1879                                 CERROR("convert from xattr to acl error: %ld",
1880                                         PTR_ERR(acl));
1881                                 GOTO(out, rc);
1882                         } else if (acl) {
1883                                 error = posix_acl_valid(acl);
1884                                 if (error) {
1885                                         CERROR("acl valid error: %d", error);
1886                                         posix_acl_release(acl);
1887                                         GOTO(out, rc);
1888                                 }
1889                         }
1890                 } else {
1891                         acl = NULL;
1892                 }
1893                                         
1894                 lli = ll_i2info(dentry->d_inode);
1895                 spin_lock(&lli->lli_lock);
1896                 if (lli->lli_posix_acl != NULL)
1897                         posix_acl_release(lli->lli_posix_acl);
1898                 lli->lli_posix_acl = acl;
1899                 spin_unlock(&lli->lli_lock);
1900         }
1901         EXIT;
1902 out:
1903         return(rc);
1904 }
1905
1906 int ll_removexattr(struct dentry *dentry, const char *name)
1907 {
1908         return ll_setxattr_internal(dentry->d_inode, name, NULL, 0, 0,
1909                                     ATTR_EA_RM);
1910 }
1911
1912 static
1913 int ll_getxattr_internal(struct inode *inode, const char *name,
1914                          void *value, size_t size, __u64 valid)
1915 {
1916         struct ptlrpc_request *request = NULL;
1917         struct ll_sb_info *sbi = ll_i2sbi(inode);
1918         struct lustre_id id;
1919         struct mds_body *body;
1920         void *ea_data; 
1921         int rc, ea_size;
1922         ENTRY;
1923
1924         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETXATTR);
1925
1926         if (sbi->ll_remote && !strcmp(name, XATTR_NAME_ACL_ACCESS))
1927                 RETURN(-EOPNOTSUPP);
1928
1929         ll_inode2id(&id, inode);
1930         rc = md_getattr(sbi->ll_md_exp, &id, valid, name, NULL, 0,
1931                         size, &request);
1932         if (rc) {
1933                 if (rc != -ENODATA && rc != -EOPNOTSUPP)
1934                         CERROR("rc = %d\n", rc);
1935                 GOTO(out, rc);
1936         }
1937
1938         body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
1939         LASSERT(body != NULL);
1940         LASSERT_REPSWABBED(request, 0);
1941
1942         ea_size = body->eadatasize;
1943         if (size == 0) 
1944                 GOTO(out, rc = ea_size);
1945
1946         LASSERT(ea_size <= request->rq_repmsg->buflens[1]);
1947         ea_data = lustre_msg_buf(request->rq_repmsg, 1, ea_size);
1948         LASSERT(ea_data != NULL);
1949         LASSERT_REPSWABBED(request, 1);
1950
1951         if (value)
1952                 memcpy(value, ea_data, ea_size);
1953         rc = ea_size;
1954
1955  out:
1956         ptlrpc_req_finished(request);
1957         RETURN(rc);
1958 }
1959
1960 int ll_getxattr(struct dentry *dentry, const char *name, void *value,
1961                 size_t size)
1962 {
1963         return ll_getxattr_internal(dentry->d_inode, name,
1964                                     value, size, OBD_MD_FLXATTR);
1965 }
1966
1967 int ll_listxattr(struct dentry *dentry, char *list, size_t size)
1968 {
1969         return ll_getxattr_internal(dentry->d_inode, NULL, list, size,
1970                                     OBD_MD_FLXATTRLIST);
1971 }
1972
1973 /*
1974  * XXX We could choose not to check DLM lock. Leave the decision
1975  * to remote acl handling.
1976  */
1977 static int
1978 lustre_check_acl(struct inode *inode, int mask)
1979 {
1980         struct lookup_intent it = { .it_op = IT_GETATTR };
1981         struct dentry de = { .d_inode = inode };
1982         struct ll_sb_info *sbi;
1983         struct lustre_id id;
1984         struct ptlrpc_request *req = NULL;
1985         struct ll_inode_info *lli = ll_i2info(inode);
1986         struct posix_acl *acl;
1987         int rc = 0;
1988         ENTRY;
1989
1990         sbi = ll_i2sbi(inode);
1991         ll_inode2id(&id, inode);
1992
1993         if (ll_intent_alloc(&it))
1994                 return -EACCES;
1995
1996         rc = md_intent_lock(sbi->ll_md_exp, &id, NULL, 0, NULL, 0, &id,
1997                             &it, 0, &req, ll_mdc_blocking_ast);
1998         if (rc < 0) {
1999                 ll_intent_free(&it);
2000                 GOTO(out, rc);
2001         }
2002
2003         rc = revalidate_it_finish(req, 1, &it, &de);
2004         if (rc) {
2005                 ll_intent_release(&it);
2006                 GOTO(out, rc);
2007         }
2008
2009         if (sbi->ll_remote) {
2010                 rc = ll_remote_acl_permission(inode, mask);
2011         } else {
2012                 spin_lock(&lli->lli_lock);
2013                 acl = posix_acl_dup(ll_i2info(inode)->lli_posix_acl);
2014                 spin_unlock(&lli->lli_lock);
2015
2016                 if (!acl)
2017                         rc = -EAGAIN;
2018                 else {
2019                         rc = posix_acl_permission(inode, acl, mask);
2020                         posix_acl_release(acl);
2021                 }
2022         }
2023
2024         ll_lookup_finish_locks(&it, &de);
2025         ll_intent_free(&it);
2026
2027 out:
2028         if (req)
2029                 ptlrpc_req_finished(req);
2030
2031         RETURN(rc);
2032 }
2033
2034 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2035 {
2036         return generic_permission(inode, mask, lustre_check_acl);
2037 }
2038
2039 struct file_operations ll_file_operations = {
2040         .read           = ll_file_read,
2041         .write          = ll_file_write,
2042         .ioctl          = ll_file_ioctl,
2043         .open           = ll_file_open,
2044         .release        = ll_file_release,
2045         .mmap           = ll_file_mmap,
2046         .llseek         = ll_file_seek,
2047 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2048         .sendfile       = generic_file_sendfile,
2049 #endif
2050         .fsync          = ll_fsync,
2051         .lock           = ll_file_flock
2052 };
2053
2054 struct inode_operations ll_file_inode_operations = {
2055         .setattr        = ll_setattr,
2056         .truncate       = ll_truncate,
2057 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2058         .getattr        = ll_getattr,
2059 #else
2060         .revalidate_it  = ll_inode_revalidate_it,
2061 #endif
2062         .setxattr       = ll_setxattr,
2063         .getxattr       = ll_getxattr,
2064         .listxattr      = ll_listxattr,
2065         .removexattr    = ll_removexattr,
2066         .permission     = ll_inode_permission,
2067 };
2068