Whamcloud - gitweb
- landed b_hd_cray_merge3
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <linux/lustre_dlm.h>
27 #include <linux/lustre_lite.h>
28 #include <linux/pagemap.h>
29 #include <linux/file.h>
30 #include <linux/lustre_acl.h>
31 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
32 #include <linux/lustre_compat25.h>
33 #endif
34 #include "llite_internal.h"
35 #include <linux/obd_lov.h>
36
37 int ll_md_och_close(struct obd_export *md_exp, struct inode *inode,
38                     struct obd_client_handle *och)
39 {
40         struct ptlrpc_request *req = NULL;
41         struct obdo *obdo = NULL;
42         struct obd_device *obd;
43         int rc;
44         ENTRY;
45
46         obd = class_exp2obd(md_exp);
47         if (obd == NULL) {
48                 CERROR("Invalid MDC connection handle "LPX64"\n",
49                        md_exp->exp_handle.h_cookie);
50                 EXIT;
51                 return 0;
52         }
53
54         /*
55          * here we check if this is forced umount. If so this is called on
56          * canceling "open lock" and we do not call md_close() in this case , as
57          * it will not successful, as import is already deactivated.
58          */
59         if (obd->obd_no_recov)
60                 GOTO(out, rc = 0);
61
62         /* closing opened file */
63         obdo = obdo_alloc();
64         if (obdo == NULL)
65                 RETURN(-ENOMEM);
66
67         obdo->o_id = inode->i_ino;
68         obdo->o_valid = OBD_MD_FLID;
69         obdo_from_inode(obdo, inode, (OBD_MD_FLTYPE | OBD_MD_FLMODE |
70                                       OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
71                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
72                                       OBD_MD_FLCTIME));
73         if (0 /* ll_is_inode_dirty(inode) */) {
74                 obdo->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
75                 obdo->o_valid |= OBD_MD_FLFLAGS;
76         }
77         obdo->o_fid = id_fid(&ll_i2info(inode)->lli_id);
78         obdo->o_mds = id_group(&ll_i2info(inode)->lli_id);
79         rc = md_close(md_exp, obdo, och, &req);
80         obdo_free(obdo);
81
82         if (rc == EAGAIN) {
83                 /*
84                  * we are the last writer, so the MDS has instructed us to get
85                  * the file size and any write cookies, then close again.
86                  */
87
88                 //ll_queue_done_writing(inode);
89                 rc = 0;
90         } else if (rc) {
91                 CERROR("inode %lu mdc close failed: rc = %d\n",
92                        (unsigned long)inode->i_ino, rc);
93         }
94
95         /* objects are destroed on OST only if metadata close was
96          * successful.*/
97         if (rc == 0) {
98                 rc = ll_objects_destroy(req, inode, 1);
99                 if (rc)
100                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
101                                inode->i_ino, rc);
102         }
103
104         ptlrpc_req_finished(req);
105         EXIT;
106 out:
107         mdc_clear_open_replay_data(md_exp, och);
108         och->och_fh.cookie = DEAD_HANDLE_MAGIC;
109         OBD_FREE(och, sizeof *och);
110         return rc;
111 }
112
113 int ll_md_real_close(struct obd_export *md_exp,
114                      struct inode *inode, int flags)
115 {
116         struct ll_inode_info *lli = ll_i2info(inode);
117         struct obd_client_handle **och_p;
118         struct obd_client_handle *och;
119         __u64 *och_usecount;
120         int rc = 0;
121         ENTRY;
122
123         if (flags & FMODE_WRITE) {
124                 och_p = &lli->lli_mds_write_och;
125                 och_usecount = &lli->lli_open_fd_write_count;
126         } else if (flags & FMODE_EXEC) {
127                 och_p = &lli->lli_mds_exec_och;
128                 och_usecount = &lli->lli_open_fd_exec_count;
129          } else {
130                 och_p = &lli->lli_mds_read_och;
131                 och_usecount = &lli->lli_open_fd_read_count;
132         }
133
134         down(&lli->lli_och_sem);
135         if (*och_usecount) { /* There are still users of this handle, so
136                                 skip freeing it. */
137                 up(&lli->lli_och_sem);
138                 RETURN(0);
139         }
140         och = *och_p;
141
142         *och_p = NULL;
143         up(&lli->lli_och_sem);
144
145         /*
146          * there might be a race and somebody have freed this och
147          * already. Another way to have this twice called is if file closing
148          * will fail due to netwok problems and on umount lock will be canceled
149          * and this will be called from block_ast callack.
150         */
151         if (och && och->och_fh.cookie != DEAD_HANDLE_MAGIC)
152                 rc = ll_md_och_close(md_exp, inode, och);
153         
154         RETURN(rc);
155 }
156
157 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
158                 struct file *file)
159 {
160         struct ll_file_data *fd = file->private_data;
161         struct ll_inode_info *lli = ll_i2info(inode);
162         int rc = 0;
163         ENTRY;
164
165         /* clear group lock, if present */
166         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
167                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
168                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
169                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
170                                       &fd->fd_cwlockh);
171         }
172
173         /* Let's see if we have good enough OPEN lock on the file and if
174            we can skip talking to MDS */
175         if (file->f_dentry->d_inode) {
176                 int lockmode;
177                 struct obd_device *obddev;
178                 struct lustre_handle lockh;
179                 int flags = LDLM_FL_BLOCK_GRANTED;
180                 struct ldlm_res_id file_res_id = {.name = {id_fid(&lli->lli_id), 
181                                                            id_group(&lli->lli_id)}};
182                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
183
184                 down(&lli->lli_och_sem);
185                 if (fd->fd_omode & FMODE_WRITE) {
186                         lockmode = LCK_CW;
187                         LASSERT(lli->lli_open_fd_write_count);
188                         lli->lli_open_fd_write_count--;
189                 } else if (fd->fd_omode & FMODE_EXEC) {
190                         lockmode = LCK_PR;
191                         LASSERT(lli->lli_open_fd_exec_count);
192                         lli->lli_open_fd_exec_count--;
193                 } else {
194                         lockmode = LCK_CR;
195                         LASSERT(lli->lli_open_fd_read_count);
196                         lli->lli_open_fd_read_count--;
197                 }
198                 up(&lli->lli_och_sem);
199                 
200                 obddev = md_get_real_obd(md_exp, &lli->lli_id);
201                 if (!ldlm_lock_match(obddev->obd_namespace, flags, &file_res_id,
202                                      LDLM_IBITS, &policy, lockmode, &lockh))
203                 {
204                         rc = ll_md_real_close(md_exp, file->f_dentry->d_inode,
205                                               fd->fd_omode);
206                 } else {
207                         ldlm_lock_decref(&lockh, lockmode);
208                 }
209         }
210
211         file->private_data = NULL;
212         OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof(*fd));
213         RETURN(rc);
214 }
215
216 /* While this returns an error code, fput() the caller does not, so we need
217  * to make every effort to clean up all of our state here.  Also, applications
218  * rarely check close errors and even if an error is returned they will not
219  * re-try the close call.
220  */
221 int ll_file_release(struct inode *inode, struct file *file)
222 {
223         struct ll_file_data *fd;
224         struct ll_sb_info *sbi = ll_i2sbi(inode);
225         int rc;
226
227         ENTRY;
228         CDEBUG(D_VFSTRACE, "VFS Op:inode="DLID4"(%p)\n",
229                OLID4(&ll_i2info(inode)->lli_id), inode);
230
231         /* don't do anything for / */
232         if (inode->i_sb->s_root == file->f_dentry)
233                 RETURN(0);
234
235         lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_RELEASE);
236         fd = (struct ll_file_data *)file->private_data;
237         LASSERT(fd != NULL);
238
239         rc = ll_md_close(sbi->ll_md_exp, inode, file);
240         RETURN(rc);
241 }
242
243 static int ll_intent_file_open(struct file *file, void *lmm,
244                                int lmmsize, struct lookup_intent *itp)
245 {
246         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
247         struct dentry *parent = file->f_dentry->d_parent;
248         const char *name = file->f_dentry->d_name.name;
249         const int len = file->f_dentry->d_name.len;
250         struct lustre_handle lockh;
251         struct mdc_op_data *op_data;
252         int rc;
253
254         if (!parent)
255                 RETURN(-ENOENT);
256
257         OBD_ALLOC(op_data, sizeof(*op_data));
258         if (op_data == NULL)
259                 RETURN(-ENOMEM);
260         
261         ll_prepare_mdc_data(op_data, parent->d_inode, NULL,
262                             name, len, O_RDWR);
263
264         rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, itp, LCK_PR, op_data,
265                         &lockh, lmm, lmmsize, ldlm_completion_ast,
266                         ll_mdc_blocking_ast, NULL);
267         OBD_FREE(op_data, sizeof(*op_data));
268         if (rc == 0) {
269                 if (LUSTRE_IT(itp)->it_lock_mode)
270                         memcpy(&LUSTRE_IT(itp)->it_lock_handle,
271                                &lockh, sizeof(lockh));
272
273         } else if (rc < 0) {
274                 CERROR("lock enqueue: err: %d\n", rc);
275         }
276         RETURN(rc);
277 }
278
279 void ll_och_fill(struct inode *inode, struct lookup_intent *it,
280                  struct obd_client_handle *och)
281 {
282         struct ptlrpc_request *req = LUSTRE_IT(it)->it_data;
283         struct ll_inode_info *lli = ll_i2info(inode);
284         struct mds_body *body;
285         LASSERT(och);
286
287         body = lustre_msg_buf (req->rq_repmsg, 1, sizeof (*body));
288         LASSERT (body != NULL);          /* reply already checked out */
289         LASSERT_REPSWABBED (req, 1);     /* and swabbed down */
290
291         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
292         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
293         lli->lli_io_epoch = body->io_epoch;
294         mdc_set_open_replay_data(ll_i2mdexp(inode), och, 
295                                  LUSTRE_IT(it)->it_data);
296 }
297
298 int ll_local_open(struct file *file, struct lookup_intent *it,
299                   struct obd_client_handle *och)
300 {
301         struct ll_file_data *fd;
302         ENTRY;
303
304         if (och)
305                 ll_och_fill(file->f_dentry->d_inode, it, och);
306
307         LASSERTF(file->private_data == NULL, "file %.*s/%.*s ino %lu/%u (%o)\n",
308                  file->f_dentry->d_name.len, file->f_dentry->d_name.name,
309                  file->f_dentry->d_parent->d_name.len,
310                  file->f_dentry->d_parent->d_name.name,
311                  file->f_dentry->d_inode->i_ino,
312                  file->f_dentry->d_inode->i_generation,
313                  file->f_dentry->d_inode->i_mode);
314
315         OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
316         
317         /* We can't handle this well without reorganizing ll_file_open and
318          * ll_md_close(), so don't even try right now. */
319         LASSERT(fd != NULL);
320
321         file->private_data = fd;
322         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
323         fd->fd_omode = it->it_flags;
324         RETURN(0);
325 }
326
327 /* Open a file, and (for the very first open) create objects on the OSTs at
328  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
329  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
330  * lli_open_sem to ensure no other process will create objects, send the
331  * stripe MD to the MDS, or try to destroy the objects if that fails.
332  *
333  * If we already have the stripe MD locally then we don't request it in
334  * mdc_open(), by passing a lmm_size = 0.
335  *
336  * It is up to the application to ensure no other processes open this file
337  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
338  * used.  We might be able to avoid races of that sort by getting lli_open_sem
339  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
340  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
341  */
342 int ll_file_open(struct inode *inode, struct file *file)
343 {
344         struct ll_inode_info *lli = ll_i2info(inode);
345         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
346                                           .it_flags = file->f_flags };
347         struct lov_stripe_md *lsm;
348         struct ptlrpc_request *req;
349         int rc = 0;
350         struct obd_client_handle **och_p;
351         __u64 *och_usecount;
352         ENTRY;
353
354         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n",
355                inode->i_ino, inode->i_generation, inode, file->f_flags);
356
357         /* don't do anything for / */
358         if (inode->i_sb->s_root == file->f_dentry)
359                 RETURN(0);
360
361         if ((file->f_flags+1) & O_ACCMODE)
362                 oit.it_flags++;
363         if (file->f_flags & O_TRUNC)
364                 oit.it_flags |= 2;
365
366         it = file->f_it;
367
368         /*
369          * sometimes LUSTRE_IT(it) may not be allocated like opening file by
370          * dentry_open() from GNS stuff.
371          */
372         if (!it || !LUSTRE_IT(it)) {
373                 it = &oit;
374                 rc = ll_intent_alloc(it);
375                 if (rc)
376                         GOTO(out, rc);
377         }
378
379         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
380         
381         /*
382          * mdc_intent_lock() didn't get a request ref if there was an open
383          * error, so don't do cleanup on the * request here (bug 3430)
384          */
385         if (LUSTRE_IT(it)->it_disposition) {
386                 rc = it_open_error(DISP_OPEN_OPEN, it);
387                 if (rc)
388                         RETURN(rc);
389         }
390
391         /* Let's see if we have file open on MDS already. */
392         if (it->it_flags & FMODE_WRITE) {
393                 och_p = &lli->lli_mds_write_och;
394                 och_usecount = &lli->lli_open_fd_write_count;
395         } else if (it->it_flags & FMODE_EXEC) {
396                 och_p = &lli->lli_mds_exec_och;
397                 och_usecount = &lli->lli_open_fd_exec_count;
398         } else {
399                 och_p = &lli->lli_mds_read_och;
400                 och_usecount = &lli->lli_open_fd_read_count;
401         }
402
403         down(&lli->lli_och_sem);
404         if (*och_p) { /* Open handle is present */
405                 if (LUSTRE_IT(it)->it_disposition) {
406                         struct obd_client_handle *och;
407                         /* Well, there's extra open request that we do not need,
408                            let's close it somehow*/
409                         OBD_ALLOC(och, sizeof (struct obd_client_handle));
410                         if (!och) {
411                                 up(&lli->lli_och_sem);
412                                 RETURN(-ENOMEM);
413                         }
414
415                         ll_och_fill(inode, it, och);
416                         /* ll_md_och_close() will free och */
417                         ll_md_och_close(ll_i2mdexp(inode), inode, och);
418                 }
419                 (*och_usecount)++;
420                         
421                 rc = ll_local_open(file, it, NULL);
422                 if (rc)
423                         LBUG();
424         } else {
425                 LASSERT(*och_usecount == 0);
426                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
427                 if (!*och_p)
428                         GOTO(out, rc = -ENOMEM);
429                 (*och_usecount)++;
430
431                 if (!it || !LUSTRE_IT(it) || !LUSTRE_IT(it)->it_disposition) {
432                         /* We are going to replace intent here, and that may
433                            possibly change access mode (FMODE_EXEC can only be
434                            set in intent), but I hope it never happens (I was
435                            not able to trigger it yet at least) -- green */
436                         /* FIXME: FMODE_EXEC is not covered by O_ACCMODE! */
437                         LASSERT(!(it->it_flags & FMODE_EXEC));
438                         LASSERTF((it->it_flags & O_ACCMODE) ==
439                                  (oit.it_flags & O_ACCMODE), "Changing intent "
440                                  "flags %x to incompatible %x\n",it->it_flags,
441                                  oit.it_flags);
442                         it = &oit;
443                         rc = ll_intent_file_open(file, NULL, 0, it);
444                         if (rc)
445                                 GOTO(out, rc);
446                         rc = it_open_error(DISP_OPEN_OPEN, it);
447                         if (rc)
448                                 GOTO(out_och_free, rc);
449
450                         mdc_set_lock_data(NULL, &LUSTRE_IT(it)->it_lock_handle,
451                                           file->f_dentry->d_inode);
452                 }
453                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
454                 rc = ll_local_open(file, it, *och_p);
455                 LASSERTF(rc == 0, "rc = %d\n", rc);
456         }
457         up(&lli->lli_och_sem);
458         /* Must do this outside lli_och_sem lock to prevent deadlock where
459            different kind of OPEN lock for this same inode gets cancelled
460            by ldlm_cancel_lru */
461
462         if (!S_ISREG(inode->i_mode))
463                 GOTO(out, rc);
464
465         lsm = lli->lli_smd;
466         if (lsm == NULL) {
467                 if (file->f_flags & O_LOV_DELAY_CREATE ||
468                     !(file->f_mode & FMODE_WRITE)) {
469                         CDEBUG(D_INODE, "object creation was delayed\n");
470                         GOTO(out, rc);
471                 }
472         }
473         file->f_flags &= ~O_LOV_DELAY_CREATE;
474         GOTO(out, rc);
475  out:
476         req = LUSTRE_IT(it)->it_data;
477         ll_intent_drop_lock(it);
478         ll_intent_release(it);
479         ptlrpc_req_finished(req);
480         if (rc == 0) {
481                 ll_open_complete(inode);
482         } else {
483 out_och_free:
484                 if (*och_p) {
485                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
486                         *och_p = NULL; /* OBD_FREE writes some magic there */
487                         (*och_usecount)--;
488                 }
489                 up(&lli->lli_och_sem);
490         }
491                 
492         return rc;
493 }
494
495 /* Fills the obdo with the attributes for the inode defined by lsm */
496 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
497                    struct obdo *oa)
498 {
499         struct ptlrpc_request_set *set;
500         int rc;
501         ENTRY;
502
503         LASSERT(lsm != NULL);
504
505         memset(oa, 0, sizeof *oa);
506         oa->o_id = lsm->lsm_object_id;
507         oa->o_gr = lsm->lsm_object_gr;
508         oa->o_mode = S_IFREG;
509         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
510                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
511                 OBD_MD_FLCTIME | OBD_MD_FLGROUP;
512
513         set = ptlrpc_prep_set();
514         if (set == NULL) {
515                 rc = -ENOMEM;
516         } else {
517                 rc = obd_getattr_async(exp, oa, lsm, set);
518                 if (rc == 0)
519                         rc = ptlrpc_set_wait(set);
520                 ptlrpc_set_destroy(set);
521         }
522         if (rc)
523                 RETURN(rc);
524
525         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
526                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
527         RETURN(0);
528 }
529
530 static inline void ll_remove_suid(struct inode *inode)
531 {
532         unsigned int mode;
533
534         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
535         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
536
537         /* was any of the uid bits set? */
538         mode &= inode->i_mode;
539         if (mode && !capable(CAP_FSETID)) {
540                 inode->i_mode &= ~mode;
541                 // XXX careful here - we cannot change the size
542         }
543 }
544
545 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
546 {
547         struct ll_inode_info *lli = ll_i2info(inode);
548         struct lov_stripe_md *lsm = lli->lli_smd;
549         struct obd_export *exp = ll_i2dtexp(inode);
550         struct {
551                 char name[16];
552                 struct ldlm_lock *lock;
553                 struct lov_stripe_md *lsm;
554         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
555         __u32 stripe, vallen = sizeof(stripe);
556         int rc;
557         ENTRY;
558
559         if (lsm->lsm_stripe_count == 1)
560                 GOTO(check, stripe = 0);
561
562         /* get our offset in the lov */
563         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
564         if (rc != 0) {
565                 CERROR("obd_get_info: rc = %d\n", rc);
566                 RETURN(rc);
567         }
568         LASSERT(stripe < lsm->lsm_stripe_count);
569         EXIT;
570 check:
571         if (lsm->lsm_oinfo[stripe].loi_id != lock->l_resource->lr_name.name[0]||
572             lsm->lsm_oinfo[stripe].loi_gr != lock->l_resource->lr_name.name[2]){
573                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64
574                            " inode=%lu/%u (%p)\n",
575                            lsm->lsm_oinfo[stripe].loi_id,
576                            lsm->lsm_oinfo[stripe].loi_gr,
577                            inode->i_ino, inode->i_generation, inode);
578                 return -ELDLM_NO_LOCK_DATA;
579         }
580
581         return stripe;
582 }
583
584 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
585  * we get a lock cancellation for each stripe, so we have to map the obd's
586  * region back onto the stripes in the file that it held.
587  *
588  * No one can dirty the extent until we've finished our work and they can
589  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
590  * but other kernel actors could have pages locked.
591  *
592  * Called with the DLM lock held. */
593 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
594                               struct ldlm_lock *lock, __u32 stripe)
595 {
596         ldlm_policy_data_t tmpex;
597         unsigned long start, end, count, skip, i, j;
598         struct page *page;
599         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
600         struct lustre_handle lockh;
601         ENTRY;
602
603         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
604         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
605                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
606                inode->i_size);
607
608         /* our locks are page granular thanks to osc_enqueue, we invalidate the
609          * whole page. */
610         LASSERT((tmpex.l_extent.start & ~PAGE_CACHE_MASK) == 0);
611         LASSERT(((tmpex.l_extent.end + 1) & ~PAGE_CACHE_MASK) == 0);
612
613         count = ~0;
614         skip = 0;
615         start = tmpex.l_extent.start >> PAGE_CACHE_SHIFT;
616         end = tmpex.l_extent.end >> PAGE_CACHE_SHIFT;
617         if (lsm->lsm_stripe_count > 1) {
618                 count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
619                 skip = (lsm->lsm_stripe_count - 1) * count;
620                 start += start/count * skip + stripe * count;
621                 if (end != ~0)
622                         end += end/count * skip + stripe * count;
623         }
624         if (end < tmpex.l_extent.end >> PAGE_CACHE_SHIFT)
625                 end = ~0;
626
627         i = inode->i_size ? (inode->i_size - 1) >> PAGE_CACHE_SHIFT : 0;
628         if (i < end)
629                 end = i;
630
631         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
632                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
633                count, skip, end, discard ? " (DISCARDING)" : "");
634         
635         /* walk through the vmas on the inode and tear down mmaped pages that
636          * intersect with the lock.  this stops immediately if there are no
637          * mmap()ed regions of the file.  This is not efficient at all and
638          * should be short lived. We'll associate mmap()ed pages with the lock
639          * and will be able to find them directly */
640         
641         for (i = start; i <= end; i += (j + skip)) {
642                 j = min(count - (i % count), end - i + 1);
643                 LASSERT(j > 0);
644                 LASSERT(inode->i_mapping);
645                 if (ll_teardown_mmaps(inode->i_mapping, i << PAGE_CACHE_SHIFT,
646                                       ((i+j) << PAGE_CACHE_SHIFT) - 1) )
647                         break;
648         }
649
650         /* this is the simplistic implementation of page eviction at
651          * cancelation.  It is careful to get races with other page
652          * lockers handled correctly.  fixes from bug 20 will make it
653          * more efficient by associating locks with pages and with
654          * batching writeback under the lock explicitly. */
655         for (i = start, j = start % count; i <= end;
656              j++, i++, tmpex.l_extent.start += PAGE_CACHE_SIZE) {
657                 if (j == count) {
658                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
659                         i += skip;
660                         j = 0;
661                         if (i > end)
662                                 break;
663                 }
664                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
665                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
666                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
667                          start, i, end);
668
669                 if (!mapping_has_pages(inode->i_mapping)) {
670                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
671                         break;
672                 }
673
674                 cond_resched();
675
676                 page = find_get_page(inode->i_mapping, i);
677                 if (page == NULL)
678                         continue;
679                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
680                                i, tmpex.l_extent.start);
681                 lock_page(page);
682
683                 /* page->mapping to check with racing against teardown */
684                 if (!discard && clear_page_dirty_for_io(page)) {
685                         rc = ll_call_writepage(inode, page);
686                         if (rc != 0)
687                                 CERROR("writepage of page %p failed: %d\n",
688                                        page, rc);
689                         /* either waiting for io to complete or reacquiring
690                          * the lock that the failed writepage released */
691                         lock_page(page);
692                 }
693
694                 tmpex.l_extent.end = tmpex.l_extent.start + PAGE_CACHE_SIZE - 1;
695                 /* check to see if another DLM lock covers this page */
696                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
697                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
698                                       LDLM_FL_TEST_LOCK,
699                                       &lock->l_resource->lr_name, LDLM_EXTENT,
700                                       &tmpex, LCK_PR | LCK_PW, &lockh);
701                 if (rc2 == 0 && page->mapping != NULL) {
702                         // checking again to account for writeback's lock_page()
703                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
704                         ll_ra_accounting(page, inode->i_mapping);
705                         ll_truncate_complete_page(page);
706                 }
707                 unlock_page(page);
708                 page_cache_release(page);
709         }
710         LASSERTF(tmpex.l_extent.start <=
711                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
712                   lock->l_policy_data.l_extent.end + 1),
713                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
714                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
715                  start, i, end);
716         EXIT;
717 }
718
719 static int ll_extent_lock_callback(struct ldlm_lock *lock,
720                                    struct ldlm_lock_desc *new, void *data,
721                                    int flag)
722 {
723         struct lustre_handle lockh = { 0 };
724         int rc;
725         ENTRY;
726
727         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
728                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
729                 LBUG();
730         }
731
732         switch (flag) {
733         case LDLM_CB_BLOCKING:
734                 ldlm_lock2handle(lock, &lockh);
735                 rc = ldlm_cli_cancel(&lockh);
736                 if (rc != ELDLM_OK)
737                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
738                 break;
739         case LDLM_CB_CANCELING: {
740                 struct inode *inode;
741                 struct ll_inode_info *lli;
742                 struct lov_stripe_md *lsm;
743                 __u32 stripe;
744                 __u64 kms;
745
746                 /* This lock wasn't granted, don't try to evict pages */
747                 if (lock->l_req_mode != lock->l_granted_mode)
748                         RETURN(0);
749
750                 inode = ll_inode_from_lock(lock);
751                 if (inode == NULL)
752                         RETURN(0);
753                 lli = ll_i2info(inode);
754                 if (lli == NULL)
755                         goto iput;
756                 if (lli->lli_smd == NULL)
757                         goto iput;
758                 lsm = lli->lli_smd;
759
760                 stripe = ll_lock_to_stripe_offset(inode, lock);
761                 if (stripe < 0)
762                         goto iput;
763                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
764
765                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
766                 down(&lli->lli_size_sem);
767                 kms = ldlm_extent_shift_kms(lock,
768                                             lsm->lsm_oinfo[stripe].loi_kms);
769                 
770                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
771                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
772                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
773                 lsm->lsm_oinfo[stripe].loi_kms = kms;
774                 up(&lli->lli_size_sem);
775                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
776                 //ll_try_done_writing(inode);
777         iput:
778                 iput(inode);
779                 break;
780         }
781         default:
782                 LBUG();
783         }
784
785         RETURN(0);
786 }
787
788 #if 0
789 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
790 {
791         /* XXX ALLOCATE - 160 bytes */
792         struct inode *inode = ll_inode_from_lock(lock);
793         struct ll_inode_info *lli = ll_i2info(inode);
794         struct lustre_handle lockh = { 0 };
795         struct ost_lvb *lvb;
796         __u32 stripe;
797         ENTRY;
798
799         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
800                      LDLM_FL_BLOCK_CONV)) {
801                 LBUG(); /* not expecting any blocked async locks yet */
802                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
803                            "lock, returning");
804                 ldlm_lock_dump(D_OTHER, lock, 0);
805                 ldlm_reprocess_all(lock->l_resource);
806                 RETURN(0);
807         }
808
809         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
810
811         stripe = ll_lock_to_stripe_offset(inode, lock);
812         if (stripe < 0)
813                 goto iput;
814
815         if (lock->l_lvb_len) {
816                 struct lov_stripe_md *lsm = lli->lli_smd;
817                 __u64 kms;
818                 lvb = lock->l_lvb_data;
819                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
820
821                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
822                 down(&inode->i_sem);
823                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
824                 kms = ldlm_extent_shift_kms(NULL, kms);
825                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
826                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
827                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
828                 lsm->lsm_oinfo[stripe].loi_kms = kms;
829                 up(&inode->i_sem);
830                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
831         }
832
833 iput:
834         iput(inode);
835         wake_up(&lock->l_waitq);
836
837         ldlm_lock2handle(lock, &lockh);
838         ldlm_lock_decref(&lockh, LCK_PR);
839         RETURN(0);
840 }
841 #endif
842
843 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
844 {
845         struct ptlrpc_request *req = reqp;
846         struct inode *inode = ll_inode_from_lock(lock);
847         struct ll_inode_info *lli;
848         struct ost_lvb *lvb;
849         struct lov_stripe_md *lsm;
850         int rc, size = sizeof(*lvb), stripe;
851         ENTRY;
852
853         if (inode == NULL)
854                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
855         lli = ll_i2info(inode);
856         if (lli == NULL)
857                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
858
859         lsm = lli->lli_smd;
860         if (lsm == NULL)
861                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
862
863         /* First, find out which stripe index this lock corresponds to. */
864         stripe = ll_lock_to_stripe_offset(inode, lock);
865         if (stripe < 0)
866                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
867
868         rc = lustre_pack_reply(req, 1, &size, NULL);
869         if (rc) {
870                 CERROR("lustre_pack_reply: %d\n", rc);
871                 GOTO(iput, rc);
872         }
873
874         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
875         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
876         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
877         lvb->lvb_atime = LTIME_S(inode->i_atime);
878         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
879
880         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64,
881                    inode->i_size, stripe, lvb->lvb_size);
882         GOTO(iput, 0);
883  iput:
884         iput(inode);
885
886  out:
887         /* These errors are normal races, so we don't want to fill the console
888          * with messages by calling ptlrpc_error() */
889         if (rc == -ELDLM_NO_LOCK_DATA)
890                 lustre_pack_reply(req, 0, NULL, NULL);
891
892         req->rq_status = rc;
893         return rc;
894 }
895
896 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
897 __u64 lov_merge_blocks(struct lov_stripe_md *lsm);
898 __u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
899
900 /* NB: lov_merge_size will prefer locally cached writes if they extend the
901  * file (because it prefers KMS over RSS when larger) */
902 int ll_glimpse_size(struct inode *inode)
903 {
904         struct ll_inode_info *lli = ll_i2info(inode);
905         struct ll_sb_info *sbi = ll_i2sbi(inode);
906         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
907         struct lustre_handle lockh = { 0 };
908         int rc, flags = LDLM_FL_HAS_INTENT;
909         ENTRY;
910
911         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
912
913         rc = obd_enqueue(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT, &policy,
914                          LCK_PR, &flags, ll_extent_lock_callback,
915                          ldlm_completion_ast, ll_glimpse_callback, inode,
916                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, &lockh);
917         if (rc == -ENOENT)
918                 RETURN(rc);
919
920         if (rc != 0) {
921                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
922                 RETURN(rc > 0 ? -EIO : rc);
923         }
924
925         down(&lli->lli_size_sem);
926         inode->i_size = lov_merge_size(lli->lli_smd, 0);
927         inode->i_blocks = lov_merge_blocks(lli->lli_smd);
928         up(&lli->lli_size_sem);
929
930         LTIME_S(inode->i_mtime) = lov_merge_mtime(lli->lli_smd,
931                                                   LTIME_S(inode->i_mtime));
932
933         CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64", blocks: "LPU64"\n",
934                (__u64)inode->i_size, (__u64)inode->i_blocks);
935         
936         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
937         RETURN(rc);
938 }
939
940 void ll_stime_record(struct ll_sb_info *sbi, struct timeval *start,
941                     struct obd_service_time *stime)
942 {
943         struct timeval stop;
944         do_gettimeofday(&stop);
945                                                                                                                                                                                                      
946         spin_lock(&sbi->ll_lock);
947         lprocfs_stime_record(stime, &stop, start);
948         spin_unlock(&sbi->ll_lock);
949 }
950
951 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
952                    struct lov_stripe_md *lsm, int mode,
953                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
954                    int ast_flags, struct obd_service_time *stime)
955 {
956         struct ll_inode_info *lli = ll_i2info(inode);
957         struct ll_sb_info *sbi = ll_i2sbi(inode);
958         struct timeval start;
959         int rc;
960         ENTRY;
961
962         LASSERT(lockh->cookie == 0);
963
964         /* XXX phil: can we do this?  won't it screw the file size up? */
965         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
966             (sbi->ll_flags & LL_SBI_NOLCK))
967                 RETURN(0);
968
969         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
970                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
971
972         do_gettimeofday(&start);
973         rc = obd_enqueue(sbi->ll_dt_exp, lsm, LDLM_EXTENT, policy, mode,
974                          &ast_flags, ll_extent_lock_callback,
975                          ldlm_completion_ast, ll_glimpse_callback, inode,
976                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
977         if (rc > 0)
978                 rc = -EIO;
979         
980         ll_stime_record(sbi, &start, stime);
981
982         if (policy->l_extent.start == 0 &&
983             policy->l_extent.end == OBD_OBJECT_EOF) {
984                 /* vmtruncate()->ll_truncate() first sets the i_size and then
985                  * the kms under both a DLM lock and the i_sem.  If we don't
986                  * get the i_sem here we can match the DLM lock and reset
987                  * i_size from the kms before the truncating path has updated
988                  * the kms.  generic_file_write can then trust the stale i_size
989                  * when doing appending writes and effectively cancel the
990                  * result of the truncate.  Getting the i_sem after the enqueue
991                  * maintains the DLM -> i_sem acquiry order. */
992                 down(&lli->lli_size_sem);
993                 inode->i_size = lov_merge_size(lsm, 1);
994                 up(&lli->lli_size_sem);
995         }
996         
997         if (rc == 0) {
998                 LTIME_S(inode->i_mtime) =
999                         lov_merge_mtime(lsm, LTIME_S(inode->i_mtime));
1000         }
1001
1002         RETURN(rc);
1003 }
1004
1005 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1006                      struct lov_stripe_md *lsm, int mode,
1007                      struct lustre_handle *lockh)
1008 {
1009         struct ll_sb_info *sbi = ll_i2sbi(inode);
1010         int rc;
1011         ENTRY;
1012
1013         /* XXX phil: can we do this?  won't it screw the file size up? */
1014         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1015             (sbi->ll_flags & LL_SBI_NOLCK))
1016                 RETURN(0);
1017
1018         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1019
1020         RETURN(rc);
1021 }
1022
1023 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1024                             loff_t *ppos)
1025 {
1026         struct inode *inode = file->f_dentry->d_inode;
1027         struct ll_inode_info *lli = ll_i2info(inode);
1028         struct lov_stripe_md *lsm = lli->lli_smd;
1029         struct ll_lock_tree tree;
1030         struct ll_lock_tree_node *node;
1031         int rc;
1032         ssize_t retval;
1033         __u64 kms;
1034         ENTRY;
1035         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1036                inode->i_ino, inode->i_generation, inode, count, *ppos);
1037
1038         /* "If nbyte is 0, read() will return 0 and have no other results."
1039          *                      -- Single Unix Spec */
1040         if (count == 0)
1041                 RETURN(0);
1042
1043         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
1044                             count);
1045
1046         if (!lsm)
1047                 RETURN(0);
1048
1049         node = ll_node_from_inode(inode, *ppos, *ppos  + count - 1,
1050                                   LCK_PR);
1051
1052         tree.lt_fd = file->private_data;
1053
1054         rc = ll_tree_lock(&tree, node, inode, buf, count,
1055                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1056         if (rc != 0)
1057                 RETURN(rc);
1058
1059         down(&lli->lli_size_sem);
1060         kms = lov_merge_size(lsm, 1);
1061         if (*ppos + count - 1 > kms) {
1062                 /* A glimpse is necessary to determine whether we return a short
1063                  * read or some zeroes at the end of the buffer */
1064                 up(&lli->lli_size_sem);
1065                 retval = ll_glimpse_size(inode);
1066                 if (retval)
1067                         goto out;
1068         } else {
1069                 inode->i_size = kms;
1070                 up(&lli->lli_size_sem);
1071         }
1072
1073         CDEBUG(D_INFO, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1074                inode->i_ino, count, *ppos, inode->i_size);
1075
1076         /* turn off the kernel's read-ahead */
1077 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1078         file->f_ramax = 0;
1079 #else
1080         file->f_ra.ra_pages = 0;
1081 #endif
1082         retval = generic_file_read(file, buf, count, ppos);
1083
1084  out:
1085         ll_tree_unlock(&tree, inode);
1086         RETURN(retval);
1087 }
1088
1089 /*
1090  * Write to a file (through the page cache).
1091  */
1092 static ssize_t ll_file_write(struct file *file, const char *buf,
1093                              size_t count, loff_t *ppos)
1094 {
1095         struct inode *inode = file->f_dentry->d_inode;
1096         loff_t maxbytes = ll_file_maxbytes(inode);
1097         struct ll_lock_tree tree;
1098         struct ll_lock_tree_node *node;
1099         ssize_t retval;
1100         int rc;
1101
1102         ENTRY;
1103         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1104                inode->i_ino, inode->i_generation, inode, count, *ppos);
1105
1106         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1107
1108         /* POSIX, but surprised the VFS doesn't check this already */
1109         if (count == 0)
1110                 RETURN(0);
1111
1112         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1113          * called on the file, don't fail the below assertion (bug 2388). */
1114         if (file->f_flags & O_LOV_DELAY_CREATE &&
1115             ll_i2info(inode)->lli_smd == NULL)
1116                 RETURN(-EBADF);
1117
1118         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1119         
1120         if (file->f_flags & O_APPEND)
1121                 node = ll_node_from_inode(inode, 0, OBD_OBJECT_EOF, LCK_PW);
1122         else
1123                 node = ll_node_from_inode(inode, *ppos, *ppos  + count - 1,
1124                                           LCK_PW);
1125
1126         if (IS_ERR(node))
1127                 RETURN(PTR_ERR(node));
1128
1129         tree.lt_fd = file->private_data;
1130
1131         rc = ll_tree_lock(&tree, node, inode, buf, count,
1132                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1133         if (rc != 0)
1134                 RETURN(rc);
1135
1136         /* this is ok, g_f_w will overwrite this under i_sem if it races
1137          * with a local truncate, it just makes our maxbyte checking easier */
1138         if (file->f_flags & O_APPEND)
1139                 *ppos = inode->i_size;
1140
1141         if (*ppos >= maxbytes) {
1142                 if (count || *ppos > maxbytes) {
1143                         send_sig(SIGXFSZ, current, 0);
1144                         GOTO(out, retval = -EFBIG);
1145                 }
1146         }
1147         if (*ppos + count > maxbytes)
1148                 count = maxbytes - *ppos;
1149
1150         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1151                inode->i_ino, count, *ppos);
1152
1153         /* generic_file_write handles O_APPEND after getting i_sem */
1154         retval = generic_file_write(file, buf, count, ppos);
1155         EXIT;
1156 out:
1157         ll_tree_unlock(&tree, inode);
1158         /* serialize with mmap/munmap/mremap */
1159         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WRITE_BYTES,
1160                             retval > 0 ? retval : 0);
1161         return retval;
1162 }
1163
1164 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1165                                unsigned long arg)
1166 {
1167         struct ll_inode_info *lli = ll_i2info(inode);
1168         struct obd_export *exp = ll_i2dtexp(inode);
1169         struct ll_recreate_obj ucreatp;
1170         struct obd_trans_info oti = { 0 };
1171         struct obdo *oa = NULL;
1172         int lsm_size;
1173         int rc = 0;
1174         struct lov_stripe_md *lsm, *lsm2;
1175         ENTRY;
1176
1177         if (!capable (CAP_SYS_ADMIN))
1178                 RETURN(-EPERM);
1179
1180         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1181                             sizeof(struct ll_recreate_obj));
1182         if (rc) {
1183                 RETURN(-EFAULT);
1184         }
1185         oa = obdo_alloc();
1186         if (oa == NULL) 
1187                 RETURN(-ENOMEM);
1188
1189         down(&lli->lli_open_sem);
1190         lsm = lli->lli_smd;
1191         if (lsm == NULL)
1192                 GOTO(out, rc = -ENOENT);
1193         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1194                    (lsm->lsm_stripe_count));
1195
1196         OBD_ALLOC(lsm2, lsm_size);
1197         if (lsm2 == NULL)
1198                 GOTO(out, rc = -ENOMEM);
1199
1200         oa->o_id = ucreatp.lrc_id;
1201         oa->o_nlink = ucreatp.lrc_ost_idx;
1202         oa->o_gr = ucreatp.lrc_group;
1203         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
1204         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1205         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1206                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1207
1208         oti.oti_objid = NULL;
1209         memcpy(lsm2, lsm, lsm_size);
1210         rc = obd_create(exp, oa, &lsm2, &oti);
1211
1212         OBD_FREE(lsm2, lsm_size);
1213         GOTO(out, rc);
1214 out:
1215         up(&lli->lli_open_sem);
1216         obdo_free(oa);
1217         return rc;
1218 }
1219
1220 static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1221                                     int flags, struct lov_user_md *lum,
1222                                     int lum_size)
1223 {
1224         struct ll_inode_info *lli = ll_i2info(inode);
1225         struct file *f;
1226         struct obd_export *exp = ll_i2dtexp(inode);
1227         struct lov_stripe_md *lsm;
1228         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1229         struct ptlrpc_request *req = NULL;
1230         int rc = 0;
1231         struct lustre_md md;
1232         struct obd_client_handle *och;
1233         ENTRY;
1234
1235         
1236         if ((file->f_flags+1) & O_ACCMODE)
1237                 oit.it_flags++;
1238         if (file->f_flags & O_TRUNC)
1239                 oit.it_flags |= 2;
1240
1241         down(&lli->lli_open_sem);
1242         lsm = lli->lli_smd;
1243         if (lsm) {
1244                 up(&lli->lli_open_sem);
1245                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1246                        inode->i_ino);
1247                 RETURN(-EEXIST);
1248         }
1249
1250         f = get_empty_filp();
1251         if (!f)
1252                 GOTO(out, -ENOMEM);
1253
1254         f->f_dentry = file->f_dentry;
1255         f->f_vfsmnt = file->f_vfsmnt;
1256         f->f_flags = flags;
1257
1258         rc = ll_intent_alloc(&oit);
1259         if (rc)
1260                 GOTO(out, rc);
1261
1262         rc = ll_intent_file_open(f, lum, lum_size, &oit);
1263         if (rc)
1264                 GOTO(out, rc);
1265         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1266                 GOTO(out, -ENOENT);
1267         
1268         req = LUSTRE_IT(&oit)->it_data;
1269         rc = LUSTRE_IT(&oit)->it_status;
1270
1271         if (rc < 0)
1272                 GOTO(out, rc);
1273
1274         rc = mdc_req2lustre_md(ll_i2mdexp(inode), req, 1, exp, &md);
1275         if (rc)
1276                 GOTO(out, rc);
1277         ll_update_inode(f->f_dentry->d_inode, &md);
1278
1279         OBD_ALLOC(och, sizeof(struct obd_client_handle));
1280         rc = ll_local_open(f, &oit, och);
1281         if (rc) { /* Actually ll_local_open cannot fail! */
1282                 GOTO(out, rc);
1283         }
1284         if (LUSTRE_IT(&oit)->it_lock_mode) {
1285                 ldlm_lock_decref_and_cancel((struct lustre_handle *)
1286                                             &LUSTRE_IT(&oit)->it_lock_handle,
1287                                             LUSTRE_IT(&oit)->it_lock_mode);
1288                 LUSTRE_IT(&oit)->it_lock_mode = 0;
1289         }
1290
1291         ll_intent_release(&oit);
1292
1293         /* ll_file_release will decrease the count, but won't free anything
1294            because we have at least one more reference coming from actual open
1295          */
1296         down(&lli->lli_och_sem);
1297         lli->lli_open_fd_write_count++;
1298         up(&lli->lli_och_sem);
1299         rc = ll_file_release(f->f_dentry->d_inode, f);
1300         
1301         /* Now also destroy our supplemental och */
1302         ll_md_och_close(ll_i2mdexp(inode), f->f_dentry->d_inode, och);
1303         EXIT;
1304  out:
1305         ll_intent_release(&oit);
1306         if (f)
1307                 put_filp(f);
1308         up(&lli->lli_open_sem);
1309         if (req != NULL)
1310                 ptlrpc_req_finished(req);
1311         return rc;
1312 }
1313
1314 static int ll_lov_setea(struct inode *inode, struct file *file,
1315                             unsigned long arg)
1316 {
1317         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1318         struct lov_user_md  *lump;
1319         int lum_size = sizeof(struct lov_user_md) +
1320                        sizeof(struct lov_user_ost_data);
1321         int rc;
1322         ENTRY;
1323
1324         if (!capable (CAP_SYS_ADMIN))
1325                 RETURN(-EPERM);
1326
1327         OBD_ALLOC(lump, lum_size);
1328         if (lump == NULL) {
1329                 RETURN(-ENOMEM);
1330         }
1331         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1332         if (rc) {
1333                 OBD_FREE(lump, lum_size);
1334                 RETURN(-EFAULT);
1335         }
1336
1337         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1338
1339         OBD_FREE(lump, lum_size);
1340         RETURN(rc);
1341 }
1342
1343 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1344                             unsigned long arg)
1345 {
1346         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1347         int rc;
1348         int flags = FMODE_WRITE;
1349         ENTRY;
1350
1351         /* Bug 1152: copy properly when this is no longer true */
1352         LASSERT(sizeof(lum) == sizeof(*lump));
1353         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1354         rc = copy_from_user(&lum, lump, sizeof(lum));
1355         if (rc)
1356                 RETURN(-EFAULT);
1357
1358         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1359         if (rc == 0) {
1360                  put_user(0, &lump->lmm_stripe_count);
1361                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1362                                     0, ll_i2info(inode)->lli_smd, lump);
1363         }
1364         RETURN(rc);
1365 }
1366
1367 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1368 {
1369         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1370
1371         if (!lsm)
1372                 RETURN(-ENODATA);
1373
1374         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1375                              (void *)arg);
1376 }
1377
1378 static int ll_get_grouplock(struct inode *inode, struct file *file,
1379                          unsigned long arg)
1380 {
1381         struct ll_file_data *fd = file->private_data;
1382         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1383                                                     .end = OBD_OBJECT_EOF}};
1384         struct lustre_handle lockh = { 0 };
1385         struct ll_inode_info *lli = ll_i2info(inode);
1386         struct lov_stripe_md *lsm = lli->lli_smd;
1387         int flags = 0, rc;
1388         ENTRY;
1389
1390         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1391                 RETURN(-EINVAL);
1392         }
1393
1394         policy.l_extent.gid = arg;
1395         if (file->f_flags & O_NONBLOCK)
1396                 flags = LDLM_FL_BLOCK_NOWAIT;
1397
1398         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags,
1399                             &ll_i2sbi(inode)->ll_grouplock_stime);
1400         if (rc != 0)
1401                 RETURN(rc);
1402
1403         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1404         fd->fd_gid = arg;
1405         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1406
1407         RETURN(0);
1408 }
1409
1410 static int ll_put_grouplock(struct inode *inode, struct file *file,
1411                          unsigned long arg)
1412 {
1413         struct ll_file_data *fd = file->private_data;
1414         struct ll_inode_info *lli = ll_i2info(inode);
1415         struct lov_stripe_md *lsm = lli->lli_smd;
1416         int rc;
1417         ENTRY;
1418
1419         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1420                 /* Ugh, it's already unlocked. */
1421                 RETURN(-EINVAL);
1422         }
1423
1424         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1425                 RETURN(-EINVAL);
1426
1427         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1428
1429         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1430         if (rc)
1431                 RETURN(rc);
1432
1433         fd->fd_gid = 0;
1434         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1435
1436         RETURN(0);
1437 }
1438
1439 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
1440                   unsigned long arg)
1441 {
1442         struct ll_file_data *fd = file->private_data;
1443         struct ll_sb_info *sbi = ll_i2sbi(inode);
1444         int flags;
1445         ENTRY;
1446
1447         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1448                inode->i_generation, inode, cmd);
1449
1450         if (_IOC_TYPE(cmd) == 'T') /* tty ioctls */
1451                 RETURN(-ENOTTY);
1452
1453         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_IOCTL);
1454         switch(cmd) {
1455         case LL_IOC_GETFLAGS:
1456                 /* Get the current value of the file flags */
1457                 return put_user(fd->fd_flags, (int *)arg);
1458         case LL_IOC_SETFLAGS:
1459         case LL_IOC_CLRFLAGS:
1460                 /* Set or clear specific file flags */
1461                 /* XXX This probably needs checks to ensure the flags are
1462                  *     not abused, and to handle any flag side effects.
1463                  */
1464                 if (get_user(flags, (int *) arg))
1465                         RETURN(-EFAULT);
1466
1467                 if (cmd == LL_IOC_SETFLAGS)
1468                         fd->fd_flags |= flags;
1469                 else
1470                         fd->fd_flags &= ~flags;
1471                 RETURN(0);
1472         case LL_IOC_LOV_SETSTRIPE:
1473                 RETURN(ll_lov_setstripe(inode, file, arg));
1474         case LL_IOC_LOV_SETEA:
1475                 RETURN(ll_lov_setea(inode, file, arg));
1476         case IOC_MDC_SHOWFID: {
1477                 struct lustre_id *idp = (struct lustre_id *)arg;
1478                 struct lustre_id id;
1479                 char *filename;
1480                 int rc;
1481
1482                 filename = getname((const char *)arg);
1483                 if (IS_ERR(filename))
1484                         RETURN(PTR_ERR(filename));
1485
1486                 ll_inode2id(&id, inode);
1487
1488                 rc = ll_get_fid(sbi->ll_md_exp, &id, filename, &id);
1489                 if (rc < 0)
1490                         GOTO(out_filename, rc);
1491
1492                 rc = copy_to_user(idp, &id, sizeof(*idp));
1493                 if (rc)
1494                         GOTO(out_filename, rc = -EFAULT);
1495
1496                 EXIT;
1497         out_filename:
1498                 putname(filename);
1499                 return rc;
1500         }
1501         case LL_IOC_LOV_GETSTRIPE:
1502                 RETURN(ll_lov_getstripe(inode, arg));
1503         case LL_IOC_RECREATE_OBJ:
1504                 RETURN(ll_lov_recreate_obj(inode, file, arg));
1505         case EXT3_IOC_GETFLAGS:
1506         case EXT3_IOC_SETFLAGS:
1507                 RETURN( ll_iocontrol(inode, file, cmd, arg) );
1508         case LL_IOC_GROUP_LOCK:
1509                 RETURN(ll_get_grouplock(inode, file, arg));
1510         case LL_IOC_GROUP_UNLOCK:
1511                 RETURN(ll_put_grouplock(inode, file, arg));
1512         case EXT3_IOC_GETVERSION_OLD:
1513         case EXT3_IOC_GETVERSION:
1514                 return put_user(inode->i_generation, (int *) arg);
1515         /* We need to special case any other ioctls we want to handle,
1516          * to send them to the MDS/OST as appropriate and to properly
1517          * network encode the arg field.
1518         case EXT2_IOC_GETVERSION_OLD:
1519         case EXT2_IOC_GETVERSION_NEW:
1520         case EXT2_IOC_SETVERSION_OLD:
1521         case EXT2_IOC_SETVERSION_NEW:
1522         case EXT3_IOC_SETVERSION_OLD:
1523         case EXT3_IOC_SETVERSION:
1524         */
1525         default:
1526                 RETURN( obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
1527                                       (void *)arg) );
1528         }
1529 }
1530
1531 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
1532 {
1533         struct inode *inode = file->f_dentry->d_inode;
1534         struct ll_file_data *fd = file->private_data;
1535         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1536         struct lustre_handle lockh = {0};
1537         loff_t retval;
1538         ENTRY;
1539         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),to=%llu\n", inode->i_ino,
1540                inode->i_generation, inode,
1541                offset + ((origin==2) ? inode->i_size : file->f_pos));
1542
1543         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
1544         if (origin == 2) { /* SEEK_END */
1545                 ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF }};
1546                 struct ll_inode_info *lli = ll_i2info(inode);
1547                 int nonblock = 0, rc;
1548
1549                 if (file->f_flags & O_NONBLOCK)
1550                         nonblock = LDLM_FL_BLOCK_NOWAIT;
1551
1552                 rc = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh,
1553                                     nonblock, &ll_i2sbi(inode)->ll_seek_stime);
1554                 if (rc != 0)
1555                         RETURN(rc);
1556
1557                 down(&lli->lli_size_sem);
1558                 offset += inode->i_size;
1559                 up(&lli->lli_size_sem);
1560         } else if (origin == 1) { /* SEEK_CUR */
1561                 offset += file->f_pos;
1562         }
1563
1564         retval = -EINVAL;
1565         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
1566                 if (offset != file->f_pos) {
1567                         file->f_pos = offset;
1568 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1569                         file->f_reada = 0;
1570                         file->f_version = ++event;
1571 #endif
1572                 }
1573                 retval = offset;
1574         }
1575
1576         if (origin == 2)
1577                 ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
1578         RETURN(retval);
1579 }
1580
1581 int ll_fsync(struct file *file, struct dentry *dentry, int data)
1582 {
1583         struct inode *inode = dentry->d_inode;
1584         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1585         struct lustre_id id;
1586         struct ptlrpc_request *req;
1587         int rc, err;
1588         ENTRY;
1589         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
1590                inode->i_generation, inode);
1591
1592         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_FSYNC);
1593
1594         /* fsync's caller has already called _fdata{sync,write}, we want
1595          * that IO to finish before calling the osc and mdc sync methods */
1596         rc = filemap_fdatawait(inode->i_mapping);
1597
1598         ll_inode2id(&id, inode);
1599         err = md_sync(ll_i2sbi(inode)->ll_md_exp, &id, &req);
1600         if (!rc)
1601                 rc = err;
1602         if (!err)
1603                 ptlrpc_req_finished(req);
1604
1605         if (data && lsm) {
1606                 struct obdo *oa = obdo_alloc();
1607
1608                 if (!oa)
1609                         RETURN(rc ? rc : -ENOMEM);
1610
1611                 oa->o_id = lsm->lsm_object_id;
1612                 oa->o_gr = lsm->lsm_object_gr;
1613                 oa->o_valid = OBD_MD_FLID;
1614                 obdo_from_inode(oa, inode, (OBD_MD_FLTYPE | OBD_MD_FLATIME |
1615                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME |
1616                                             OBD_MD_FLGROUP));
1617
1618                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
1619                                0, OBD_OBJECT_EOF);
1620                 if (!rc)
1621                         rc = err;
1622                 obdo_free(oa);
1623         }
1624
1625         RETURN(rc);
1626 }
1627
1628 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
1629 {
1630         struct inode *inode = file->f_dentry->d_inode;
1631         struct ll_inode_info *li = ll_i2info(inode);
1632         struct ll_sb_info *sbi = ll_i2sbi(inode);
1633         struct obd_device *obddev;
1634         struct ldlm_res_id res_id =
1635                 { .name = {id_fid(&li->lli_id), id_group(&li->lli_id), LDLM_FLOCK} };
1636         struct lustre_handle lockh = {0};
1637         ldlm_policy_data_t flock;
1638         ldlm_mode_t mode = 0;
1639         int flags = 0;
1640         int rc;
1641         ENTRY;
1642
1643         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
1644                inode->i_ino, file_lock);
1645
1646         flock.l_flock.pid = file_lock->fl_pid;
1647         flock.l_flock.start = file_lock->fl_start;
1648         flock.l_flock.end = file_lock->fl_end;
1649
1650         switch (file_lock->fl_type) {
1651         case F_RDLCK:
1652                 mode = LCK_PR;
1653                 break;
1654         case F_UNLCK:
1655                 /* An unlock request may or may not have any relation to
1656                  * existing locks so we may not be able to pass a lock handle
1657                  * via a normal ldlm_lock_cancel() request. The request may even
1658                  * unlock a byte range in the middle of an existing lock. In
1659                  * order to process an unlock request we need all of the same
1660                  * information that is given with a normal read or write record
1661                  * lock request. To avoid creating another ldlm unlock (cancel)
1662                  * message we'll treat a LCK_NL flock request as an unlock. */
1663                 mode = LCK_NL;
1664                 break;
1665         case F_WRLCK:
1666                 mode = LCK_PW;
1667                 break;
1668         default:
1669                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
1670                 LBUG();
1671         }
1672
1673         switch (cmd) {
1674         case F_SETLKW:
1675 #ifdef F_SETLKW64
1676         case F_SETLKW64:
1677 #endif
1678                 flags = 0;
1679                 break;
1680         case F_SETLK:
1681 #ifdef F_SETLK64
1682         case F_SETLK64:
1683 #endif
1684                 flags = LDLM_FL_BLOCK_NOWAIT;
1685                 break;
1686         case F_GETLK:
1687 #ifdef F_GETLK64
1688         case F_GETLK64:
1689 #endif
1690                 flags = LDLM_FL_TEST_LOCK;
1691                 /* Save the old mode so that if the mode in the lock changes we
1692                  * can decrement the appropriate reader or writer refcount. */
1693                 file_lock->fl_type = mode;
1694                 break;
1695         default:
1696                 CERROR("unknown fcntl lock command: %d\n", cmd);
1697                 LBUG();
1698         }
1699
1700         CDEBUG(D_DLMTRACE, "inode=%lu, pid="LPU64", flags=%#x, mode=%u, "
1701                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
1702                flags, mode, flock.l_flock.start, flock.l_flock.end);
1703
1704         obddev = md_get_real_obd(sbi->ll_md_exp, &li->lli_id);
1705         rc = ldlm_cli_enqueue(obddev->obd_self_export, NULL,
1706                               obddev->obd_namespace,
1707                               res_id, LDLM_FLOCK, &flock, mode, &flags,
1708                               NULL, ldlm_flock_completion_ast, NULL, file_lock,
1709                               NULL, 0, NULL, &lockh);
1710         RETURN(rc);
1711 }
1712
1713 int ll_inode_revalidate_it(struct dentry *dentry)
1714 {
1715         struct lookup_intent oit = { .it_op = IT_GETATTR };
1716         struct inode *inode = dentry->d_inode;
1717         struct ptlrpc_request *req = NULL;
1718         struct ll_inode_info *lli;
1719         struct lov_stripe_md *lsm;
1720         struct ll_sb_info *sbi;
1721         struct lustre_id id;
1722         int rc;
1723         ENTRY;
1724
1725         if (!inode) {
1726                 CERROR("REPORT THIS LINE TO PETER\n");
1727                 RETURN(0);
1728         }
1729         
1730         sbi = ll_i2sbi(inode);
1731         
1732         ll_inode2id(&id, inode);
1733         lli = ll_i2info(inode);
1734         LASSERT(id_fid(&id) != 0);
1735
1736         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), name=%s(%p)\n",
1737                inode->i_ino, inode->i_generation, inode, dentry->d_name.name,
1738                dentry);
1739
1740 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
1741         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE);
1742 #endif
1743
1744         rc = ll_intent_alloc(&oit);
1745         if (rc)
1746                 RETURN(-ENOMEM);
1747
1748         rc = md_intent_lock(sbi->ll_md_exp, &id, NULL, 0, NULL, 0, &id,
1749                             &oit, 0, &req, ll_mdc_blocking_ast);
1750         if (rc < 0)
1751                 GOTO(out, rc);
1752
1753         rc = revalidate_it_finish(req, 1, &oit, dentry);
1754         if (rc) {
1755                 GOTO(out, rc);
1756         }
1757
1758         ll_lookup_finish_locks(&oit, dentry);
1759
1760         lsm = lli->lli_smd;
1761         if (lsm == NULL) /* object not yet allocated, don't validate size */
1762                 GOTO(out, rc = 0);
1763
1764         /*
1765          * ll_glimpse_size() will prefer locally cached writes if they extend
1766          * the file.
1767          */
1768         rc = ll_glimpse_size(inode);
1769         EXIT;
1770 out:
1771         ll_intent_release(&oit);
1772         if (req)
1773                 ptlrpc_req_finished(req);
1774         return rc;
1775 }
1776
1777 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1778 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
1779 {
1780         int res = 0;
1781         struct inode *inode = de->d_inode;
1782         struct ll_inode_info *lli = ll_i2info(inode);
1783
1784         res = ll_inode_revalidate_it(de);
1785         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
1786
1787         if (res)
1788                 return res;
1789
1790         stat->ino = inode->i_ino;
1791         stat->mode = inode->i_mode;
1792         stat->nlink = inode->i_nlink;
1793         stat->uid = inode->i_uid;
1794         stat->gid = inode->i_gid;
1795         stat->atime = inode->i_atime;
1796         stat->mtime = inode->i_mtime;
1797         stat->ctime = inode->i_ctime;
1798         stat->blksize = inode->i_blksize;
1799
1800         down(&lli->lli_size_sem);
1801         stat->size = inode->i_size;
1802         stat->blocks = inode->i_blocks;
1803         up(&lli->lli_size_sem);
1804         
1805         stat->rdev = kdev_t_to_nr(inode->i_rdev);
1806         stat->dev = id_group(&ll_i2info(inode)->lli_id);
1807         return 0;
1808 }
1809 #endif
1810
1811 static
1812 int ll_setxattr_internal(struct inode *inode, const char *name,
1813                          const void *value, size_t size, int flags, 
1814                          __u64 valid)
1815 {
1816         struct ll_sb_info *sbi = ll_i2sbi(inode);
1817         struct ptlrpc_request *request = NULL;
1818         struct mdc_op_data op_data;
1819         struct iattr attr;
1820         int rc = 0;
1821         ENTRY;
1822
1823         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu\n", inode->i_ino);
1824         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_SETXATTR);
1825
1826         memset(&attr, 0x0, sizeof(attr));
1827         attr.ia_valid |= valid;
1828         attr.ia_attr_flags = flags;
1829
1830         ll_prepare_mdc_data(&op_data, inode, NULL, NULL, 0, 0);
1831
1832         rc = md_setattr(sbi->ll_md_exp, &op_data, &attr,
1833                         (void*) name, strnlen(name, XATTR_NAME_MAX)+1, 
1834                         (void*) value, size, &request);
1835         if (rc) {
1836                 CERROR("md_setattr fails: rc = %d\n", rc);
1837                 GOTO(out, rc);
1838         }
1839
1840  out:
1841         ptlrpc_req_finished(request);
1842         RETURN(rc);
1843 }
1844
1845 int ll_setxattr(struct dentry *dentry, const char *name, const void *value,
1846                 size_t size, int flags)
1847 {
1848         int rc, error;
1849         struct posix_acl *acl;
1850         struct ll_inode_info *lli;
1851         ENTRY;
1852
1853         rc = ll_setxattr_internal(dentry->d_inode, name, value, size, 
1854                                   flags, ATTR_EA);
1855         
1856         /* update inode's acl info */
1857         if (rc == 0 && strcmp(name, XATTR_NAME_ACL_ACCESS) == 0) {
1858                 if (value) {
1859                         acl = posix_acl_from_xattr(value, size);
1860                         if (IS_ERR(acl)) {
1861                                 CERROR("convert from xattr to acl error: %ld",
1862                                         PTR_ERR(acl));
1863                                 GOTO(out, rc);
1864                         } else if (acl) {
1865                                 error = posix_acl_valid(acl);
1866                                 if (error) {
1867                                         CERROR("acl valid error: %d", error);
1868                                         posix_acl_release(acl);
1869                                         GOTO(out, rc);
1870                                 }
1871                         }
1872                 } else {
1873                         acl = NULL;
1874                 }
1875                                         
1876                 lli = ll_i2info(dentry->d_inode);
1877                 spin_lock(&lli->lli_lock);
1878                 if (lli->lli_acl_access != NULL)
1879                         posix_acl_release(lli->lli_acl_access);
1880                 lli->lli_acl_access = acl;
1881                 spin_unlock(&lli->lli_lock);
1882         }
1883         EXIT;
1884 out:
1885         return(rc);
1886 }
1887
1888 int ll_removexattr(struct dentry *dentry, const char *name)
1889 {
1890         return ll_setxattr_internal(dentry->d_inode, name, NULL, 0, 0,
1891                                     ATTR_EA_RM);
1892 }
1893
1894 static
1895 int ll_getxattr_internal(struct inode *inode, const char *name, int namelen,
1896                          void *value, size_t size, __u64 valid)
1897 {
1898         struct ptlrpc_request *request = NULL;
1899         struct ll_sb_info *sbi = ll_i2sbi(inode);
1900         struct lustre_id id;
1901         struct mds_body *body;
1902         void *ea_data; 
1903         int rc, ea_size;
1904         ENTRY;
1905
1906         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETXATTR);
1907
1908         ll_inode2id(&id, inode);
1909         rc = md_getattr(sbi->ll_md_exp, &id, valid, name, namelen,
1910                          size, &request);
1911         if (rc) {
1912                 if (rc != -ENODATA && rc != -EOPNOTSUPP)
1913                         CERROR("md_getattr fails: rc = %d\n", rc);
1914                 GOTO(out, rc);
1915         }
1916
1917         body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
1918         LASSERT(body != NULL);
1919         LASSERT_REPSWABBED(request, 0);
1920
1921         ea_size = body->eadatasize;
1922         LASSERT(ea_size <= request->rq_repmsg->buflens[0]);
1923
1924         if (size == 0) 
1925                 GOTO(out, rc = ea_size);
1926
1927         ea_data = lustre_msg_buf(request->rq_repmsg, 1, ea_size);
1928         LASSERT(ea_data != NULL);
1929         LASSERT_REPSWABBED(request, 1);
1930
1931         if (value)
1932                 memcpy(value, ea_data, ea_size);
1933         rc = ea_size;
1934  out:
1935         ptlrpc_req_finished(request);
1936         RETURN(rc);
1937 }
1938
1939 int ll_getxattr(struct dentry *dentry, const char *name, void *value,
1940                 size_t size)
1941 {
1942         return ll_getxattr_internal(dentry->d_inode, name, strlen(name) + 1, 
1943                                     value, size, OBD_MD_FLEA);
1944 }
1945
1946 int ll_listxattr(struct dentry *dentry, char *list, size_t size)
1947 {
1948         return ll_getxattr_internal(dentry->d_inode, NULL, 0, list, size,
1949                                     OBD_MD_FLEALIST);
1950 }
1951
1952 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
1953 {
1954         struct lookup_intent it = { .it_op = IT_GETATTR };
1955         int mode = inode->i_mode;
1956         struct dentry de;
1957         struct ll_sb_info *sbi;
1958         struct lustre_id id;
1959         struct ptlrpc_request *req = NULL;
1960         int rc;
1961         ENTRY;
1962
1963         sbi = ll_i2sbi(inode);
1964         ll_inode2id(&id, inode);
1965
1966         /* Nobody gets write access to a read-only fs */
1967         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
1968             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
1969                 return -EROFS;
1970         /* Nobody gets write access to an immutable file */
1971         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
1972                 return -EACCES;
1973         if (current->fsuid == inode->i_uid) {
1974                 mode >>= 6;
1975         } else if (1) {
1976                 struct ll_inode_info *lli = ll_i2info(inode);
1977                 struct posix_acl *acl;
1978
1979                 /* The access ACL cannot grant access if the group class
1980                    permission bits don't contain all requested permissions. */
1981                 if (((mode >> 3) & mask & S_IRWXO) != mask)
1982                         goto check_groups;
1983
1984                 if (ll_intent_alloc(&it))
1985                         return -EACCES;
1986
1987                 de.d_inode = inode;
1988                 rc = md_intent_lock(sbi->ll_md_exp, &id, NULL, 0, NULL, 0, &id,
1989                                     &it, 0, &req, ll_mdc_blocking_ast);
1990                 if (rc < 0) {
1991                         ll_intent_free(&it);
1992                         GOTO(out, rc);
1993                 }
1994
1995                 rc = revalidate_it_finish(req, 1, &it, &de);
1996                 if (rc) {
1997                         ll_intent_release(&it);
1998                         GOTO(out, rc);
1999                 }
2000
2001                 ll_lookup_finish_locks(&it, &de);
2002                 ll_intent_free(&it);
2003
2004                 spin_lock(&lli->lli_lock);
2005                 acl = posix_acl_dup(ll_i2info(inode)->lli_acl_access);
2006                 spin_unlock(&lli->lli_lock);
2007
2008                 if (!acl)
2009                         goto check_groups;
2010
2011                 rc = posix_acl_permission(inode, acl, mask);
2012                 posix_acl_release(acl);
2013                 if (rc == -EACCES)
2014                         goto check_capabilities;
2015                 GOTO(out, rc);
2016         } else {
2017 check_groups:
2018                 if (in_group_p(inode->i_gid))
2019                         mode >>= 3;
2020         }
2021         if ((mode & mask & S_IRWXO) == mask)
2022                 GOTO(out, rc = 0);
2023
2024 check_capabilities:
2025         rc = -EACCES; 
2026         /* Allowed to override Discretionary Access Control? */
2027         if (!(mask & MAY_EXEC) ||
2028             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2029                 if (capable(CAP_DAC_OVERRIDE))
2030                         GOTO(out, rc = 0);
2031        /* Read and search granted if capable(CAP_DAC_READ_SEARCH) */
2032         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2033             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2034                 GOTO(out, rc = 0);
2035 out:
2036         if (req)
2037                 ptlrpc_req_finished(req);
2038
2039         return rc;
2040 }
2041
2042 struct file_operations ll_file_operations = {
2043         .read           = ll_file_read,
2044         .write          = ll_file_write,
2045         .ioctl          = ll_file_ioctl,
2046         .open           = ll_file_open,
2047         .release        = ll_file_release,
2048         .mmap           = ll_file_mmap,
2049         .llseek         = ll_file_seek,
2050 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2051         .sendfile       = generic_file_sendfile,
2052 #endif
2053         .fsync          = ll_fsync,
2054         .lock           = ll_file_flock
2055 };
2056
2057 struct inode_operations ll_file_inode_operations = {
2058         .setattr        = ll_setattr,
2059         .truncate       = ll_truncate,
2060 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2061         .getattr        = ll_getattr,
2062 #else
2063         .revalidate_it  = ll_inode_revalidate_it,
2064 #endif
2065         .setxattr       = ll_setxattr,
2066         .getxattr       = ll_getxattr,
2067         .listxattr      = ll_listxattr,
2068         .removexattr    = ll_removexattr,
2069         .permission     = ll_inode_permission,
2070 };
2071