Whamcloud - gitweb
land b_hd_sec onto HEAD:
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <linux/lustre_dlm.h>
27 #include <linux/lustre_lite.h>
28 #include <linux/pagemap.h>
29 #include <linux/file.h>
30 #include <linux/lustre_acl.h>
31 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
32 #include <linux/lustre_compat25.h>
33 #endif
34 #include "llite_internal.h"
35 #include <linux/obd_lov.h>
36
37 int ll_md_och_close(struct obd_export *md_exp, struct inode *inode,
38                     struct obd_client_handle *och)
39 {
40         struct ptlrpc_request *req = NULL;
41         struct obdo *obdo = NULL;
42         struct obd_device *obd;
43         int rc;
44         ENTRY;
45
46         obd = class_exp2obd(md_exp);
47         if (obd == NULL) {
48                 CERROR("Invalid MDC connection handle "LPX64"\n",
49                        md_exp->exp_handle.h_cookie);
50                 EXIT;
51                 return 0;
52         }
53
54         /*
55          * here we check if this is forced umount. If so this is called on
56          * canceling "open lock" and we do not call md_close() in this case , as
57          * it will not successful, as import is already deactivated.
58          */
59         if (obd->obd_no_recov)
60                 GOTO(out, rc = 0);
61
62         /* closing opened file */
63         obdo = obdo_alloc();
64         if (obdo == NULL)
65                 RETURN(-ENOMEM);
66
67         obdo->o_id = inode->i_ino;
68         obdo->o_valid = OBD_MD_FLID;
69         obdo_from_inode(obdo, inode, (OBD_MD_FLTYPE | OBD_MD_FLMODE |
70                                       OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
71                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
72                                       OBD_MD_FLCTIME));
73         if (0 /* ll_is_inode_dirty(inode) */) {
74                 obdo->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
75                 obdo->o_valid |= OBD_MD_FLFLAGS;
76         }
77         obdo->o_fid = id_fid(&ll_i2info(inode)->lli_id);
78         obdo->o_mds = id_group(&ll_i2info(inode)->lli_id);
79         rc = md_close(md_exp, obdo, och, &req);
80         obdo_free(obdo);
81
82         if (rc == EAGAIN) {
83                 /*
84                  * we are the last writer, so the MDS has instructed us to get
85                  * the file size and any write cookies, then close again.
86                  */
87
88                 //ll_queue_done_writing(inode);
89                 rc = 0;
90         } else if (rc) {
91                 CERROR("inode %lu mdc close failed: rc = %d\n",
92                        (unsigned long)inode->i_ino, rc);
93         }
94
95         /* objects are destroed on OST only if metadata close was
96          * successful.*/
97         if (rc == 0) {
98                 rc = ll_objects_destroy(req, inode, 1);
99                 if (rc)
100                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
101                                inode->i_ino, rc);
102         }
103
104         ptlrpc_req_finished(req);
105         EXIT;
106 out:
107         mdc_clear_open_replay_data(md_exp, och);
108         och->och_fh.cookie = DEAD_HANDLE_MAGIC;
109         OBD_FREE(och, sizeof *och);
110         return rc;
111 }
112
113 int ll_md_real_close(struct obd_export *md_exp,
114                      struct inode *inode, int flags)
115 {
116         struct ll_inode_info *lli = ll_i2info(inode);
117         struct obd_client_handle **och_p;
118         struct obd_client_handle *och;
119         __u64 *och_usecount;
120         int rc = 0;
121         ENTRY;
122
123         if (flags & FMODE_WRITE) {
124                 och_p = &lli->lli_mds_write_och;
125                 och_usecount = &lli->lli_open_fd_write_count;
126         } else if (flags & FMODE_EXEC) {
127                 och_p = &lli->lli_mds_exec_och;
128                 och_usecount = &lli->lli_open_fd_exec_count;
129          } else {
130                 och_p = &lli->lli_mds_read_och;
131                 och_usecount = &lli->lli_open_fd_read_count;
132         }
133
134         down(&lli->lli_och_sem);
135         if (*och_usecount) { /* There are still users of this handle, so
136                                 skip freeing it. */
137                 up(&lli->lli_och_sem);
138                 RETURN(0);
139         }
140         och = *och_p;
141
142         *och_p = NULL;
143         up(&lli->lli_och_sem);
144
145         /*
146          * there might be a race and somebody have freed this och
147          * already. Another way to have this twice called is if file closing
148          * will fail due to netwok problems and on umount lock will be canceled
149          * and this will be called from block_ast callack.
150         */
151         if (och && och->och_fh.cookie != DEAD_HANDLE_MAGIC)
152                 rc = ll_md_och_close(md_exp, inode, och);
153         
154         RETURN(rc);
155 }
156
157 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
158                 struct file *file)
159 {
160         struct ll_file_data *fd = file->private_data;
161         struct ll_inode_info *lli = ll_i2info(inode);
162         int rc = 0;
163         ENTRY;
164
165         /* clear group lock, if present */
166         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
167                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
168                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
169                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
170                                       &fd->fd_cwlockh);
171         }
172
173         /* Let's see if we have good enough OPEN lock on the file and if
174            we can skip talking to MDS */
175         if (file->f_dentry->d_inode) {
176                 int lockmode;
177                 struct obd_device *obddev;
178                 struct lustre_handle lockh;
179                 int flags = LDLM_FL_BLOCK_GRANTED;
180                 struct ldlm_res_id file_res_id = {.name = {id_fid(&lli->lli_id), 
181                                                            id_group(&lli->lli_id)}};
182                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
183
184                 down(&lli->lli_och_sem);
185                 if (fd->fd_omode & FMODE_WRITE) {
186                         lockmode = LCK_CW;
187                         LASSERT(lli->lli_open_fd_write_count);
188                         lli->lli_open_fd_write_count--;
189                 } else if (fd->fd_omode & FMODE_EXEC) {
190                         lockmode = LCK_PR;
191                         LASSERT(lli->lli_open_fd_exec_count);
192                         lli->lli_open_fd_exec_count--;
193                 } else {
194                         lockmode = LCK_CR;
195                         LASSERT(lli->lli_open_fd_read_count);
196                         lli->lli_open_fd_read_count--;
197                 }
198                 up(&lli->lli_och_sem);
199                 
200                 obddev = md_get_real_obd(md_exp, &lli->lli_id);
201                 if (!ldlm_lock_match(obddev->obd_namespace, flags, &file_res_id,
202                                      LDLM_IBITS, &policy, lockmode, &lockh))
203                 {
204                         rc = ll_md_real_close(md_exp, file->f_dentry->d_inode,
205                                               fd->fd_omode);
206                 } else {
207                         ldlm_lock_decref(&lockh, lockmode);
208                 }
209         }
210
211         file->private_data = NULL;
212         OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof(*fd));
213         RETURN(rc);
214 }
215
216 /* While this returns an error code, fput() the caller does not, so we need
217  * to make every effort to clean up all of our state here.  Also, applications
218  * rarely check close errors and even if an error is returned they will not
219  * re-try the close call.
220  */
221 int ll_file_release(struct inode *inode, struct file *file)
222 {
223         struct ll_file_data *fd;
224         struct ll_sb_info *sbi = ll_i2sbi(inode);
225         int rc;
226
227         ENTRY;
228         CDEBUG(D_VFSTRACE, "VFS Op:inode="DLID4"(%p)\n",
229                OLID4(&ll_i2info(inode)->lli_id), inode);
230
231         /* don't do anything for / */
232         if (inode->i_sb->s_root == file->f_dentry)
233                 RETURN(0);
234
235         lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_RELEASE);
236         fd = (struct ll_file_data *)file->private_data;
237         LASSERT(fd != NULL);
238
239         rc = ll_md_close(sbi->ll_md_exp, inode, file);
240         RETURN(rc);
241 }
242
243 static int ll_intent_file_open(struct file *file, void *lmm,
244                                int lmmsize, struct lookup_intent *itp)
245 {
246         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
247         struct dentry *parent = file->f_dentry->d_parent;
248         const char *name = file->f_dentry->d_name.name;
249         const int len = file->f_dentry->d_name.len;
250         struct lustre_handle lockh;
251         struct mdc_op_data *op_data;
252         int rc;
253
254         if (!parent)
255                 RETURN(-ENOENT);
256
257         OBD_ALLOC(op_data, sizeof(*op_data));
258         if (op_data == NULL)
259                 RETURN(-ENOMEM);
260         
261         ll_prepare_mdc_data(op_data, parent->d_inode, NULL,
262                             name, len, O_RDWR);
263
264         rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, itp, LCK_PR, op_data,
265                         &lockh, lmm, lmmsize, ldlm_completion_ast,
266                         ll_mdc_blocking_ast, NULL);
267         OBD_FREE(op_data, sizeof(*op_data));
268         if (rc == 0) {
269                 if (LUSTRE_IT(itp)->it_lock_mode)
270                         memcpy(&LUSTRE_IT(itp)->it_lock_handle,
271                                &lockh, sizeof(lockh));
272
273         } else if (rc < 0) {
274                 CERROR("lock enqueue: err: %d\n", rc);
275         }
276         RETURN(rc);
277 }
278
279 void ll_och_fill(struct inode *inode, struct lookup_intent *it,
280                  struct obd_client_handle *och)
281 {
282         struct ptlrpc_request *req = LUSTRE_IT(it)->it_data;
283         struct ll_inode_info *lli = ll_i2info(inode);
284         struct mds_body *body;
285         LASSERT(och);
286
287         body = lustre_msg_buf (req->rq_repmsg, 1, sizeof (*body));
288         LASSERT (body != NULL);          /* reply already checked out */
289         LASSERT_REPSWABBED (req, 1);     /* and swabbed down */
290
291         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
292         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
293         lli->lli_io_epoch = body->io_epoch;
294         mdc_set_open_replay_data(ll_i2mdexp(inode), och, 
295                                  LUSTRE_IT(it)->it_data);
296 }
297
298 int ll_local_open(struct file *file, struct lookup_intent *it,
299                   struct obd_client_handle *och)
300 {
301         struct ll_file_data *fd;
302         ENTRY;
303
304         if (och)
305                 ll_och_fill(file->f_dentry->d_inode, it, och);
306
307         LASSERTF(file->private_data == NULL, "file %.*s/%.*s ino %lu/%u (%o)\n",
308                  file->f_dentry->d_name.len, file->f_dentry->d_name.name,
309                  file->f_dentry->d_parent->d_name.len,
310                  file->f_dentry->d_parent->d_name.name,
311                  file->f_dentry->d_inode->i_ino,
312                  file->f_dentry->d_inode->i_generation,
313                  file->f_dentry->d_inode->i_mode);
314
315         OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
316         
317         /* We can't handle this well without reorganizing ll_file_open and
318          * ll_md_close(), so don't even try right now. */
319         LASSERT(fd != NULL);
320
321         file->private_data = fd;
322         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
323         fd->fd_omode = it->it_flags;
324         RETURN(0);
325 }
326
327 /* Open a file, and (for the very first open) create objects on the OSTs at
328  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
329  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
330  * lli_open_sem to ensure no other process will create objects, send the
331  * stripe MD to the MDS, or try to destroy the objects if that fails.
332  *
333  * If we already have the stripe MD locally then we don't request it in
334  * mdc_open(), by passing a lmm_size = 0.
335  *
336  * It is up to the application to ensure no other processes open this file
337  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
338  * used.  We might be able to avoid races of that sort by getting lli_open_sem
339  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
340  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
341  */
342 int ll_file_open(struct inode *inode, struct file *file)
343 {
344         struct ll_inode_info *lli = ll_i2info(inode);
345         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
346                                           .it_flags = file->f_flags };
347         struct lov_stripe_md *lsm;
348         struct ptlrpc_request *req;
349         int rc = 0;
350         struct obd_client_handle **och_p;
351         __u64 *och_usecount;
352         ENTRY;
353
354         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n",
355                inode->i_ino, inode->i_generation, inode, file->f_flags);
356
357         /* don't do anything for / */
358         if (inode->i_sb->s_root == file->f_dentry)
359                 RETURN(0);
360
361         if ((file->f_flags+1) & O_ACCMODE)
362                 oit.it_flags++;
363         if (file->f_flags & O_TRUNC)
364                 oit.it_flags |= 2;
365
366         it = file->f_it;
367
368         /*
369          * sometimes LUSTRE_IT(it) may not be allocated like opening file by
370          * dentry_open() from GNS stuff.
371          */
372         if (!it || !LUSTRE_IT(it)) {
373                 it = &oit;
374                 rc = ll_intent_alloc(it);
375                 if (rc)
376                         GOTO(out, rc);
377         }
378
379         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
380         
381         /*
382          * mdc_intent_lock() didn't get a request ref if there was an open
383          * error, so don't do cleanup on the * request here (bug 3430)
384          */
385         if (LUSTRE_IT(it)->it_disposition) {
386                 rc = it_open_error(DISP_OPEN_OPEN, it);
387                 if (rc)
388                         RETURN(rc);
389         }
390
391         /* Let's see if we have file open on MDS already. */
392         if (it->it_flags & FMODE_WRITE) {
393                 och_p = &lli->lli_mds_write_och;
394                 och_usecount = &lli->lli_open_fd_write_count;
395         } else if (it->it_flags & FMODE_EXEC) {
396                 och_p = &lli->lli_mds_exec_och;
397                 och_usecount = &lli->lli_open_fd_exec_count;
398         } else {
399                 och_p = &lli->lli_mds_read_och;
400                 och_usecount = &lli->lli_open_fd_read_count;
401         }
402
403         down(&lli->lli_och_sem);
404         if (*och_p) { /* Open handle is present */
405                 if (LUSTRE_IT(it)->it_disposition) {
406                         struct obd_client_handle *och;
407                         /* Well, there's extra open request that we do not need,
408                            let's close it somehow*/
409                         OBD_ALLOC(och, sizeof (struct obd_client_handle));
410                         if (!och) {
411                                 up(&lli->lli_och_sem);
412                                 RETURN(-ENOMEM);
413                         }
414
415                         ll_och_fill(inode, it, och);
416                         /* ll_md_och_close() will free och */
417                         ll_md_och_close(ll_i2mdexp(inode), inode, och);
418                 }
419                 (*och_usecount)++;
420                         
421                 rc = ll_local_open(file, it, NULL);
422                 if (rc)
423                         LBUG();
424         } else {
425                 LASSERT(*och_usecount == 0);
426                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
427                 if (!*och_p)
428                         GOTO(out, rc = -ENOMEM);
429                 (*och_usecount)++;
430
431                 if (!it || !LUSTRE_IT(it) || !LUSTRE_IT(it)->it_disposition) {
432                         /*
433                          * we are going to replace intent here, and that may
434                          * possibly change access mode (FMODE_EXEC can only be
435                          * set in intent), but I hope it never happens (I was
436                          * not able to trigger it yet at least) -- green
437                          */
438                         
439                         /* FIXME: FMODE_EXEC is not covered by O_ACCMODE! */
440                         LASSERT(!(it->it_flags & FMODE_EXEC));
441                         LASSERTF((it->it_flags & O_ACCMODE) ==
442                                  (oit.it_flags & O_ACCMODE), "Changing intent "
443                                  "flags %x to incompatible %x\n", it->it_flags,
444                                  oit.it_flags);
445                         it = &oit;
446                         rc = ll_intent_file_open(file, NULL, 0, it);
447                         if (rc)
448                                 GOTO(out, rc);
449                         rc = it_open_error(DISP_OPEN_OPEN, it);
450                         if (rc)
451                                 GOTO(out_och_free, rc);
452
453                         mdc_set_lock_data(NULL, &LUSTRE_IT(it)->it_lock_handle,
454                                           file->f_dentry->d_inode);
455                 }
456                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
457                 rc = ll_local_open(file, it, *och_p);
458                 LASSERTF(rc == 0, "rc = %d\n", rc);
459         }
460         up(&lli->lli_och_sem);
461         
462         /*
463          * must do this outside lli_och_sem lock to prevent deadlock where
464          * different kind of OPEN lock for this same inode gets cancelled by
465          * ldlm_cancel_lru
466          */
467
468         if (!S_ISREG(inode->i_mode))
469                 GOTO(out, rc);
470
471         lsm = lli->lli_smd;
472         if (lsm == NULL) {
473                 if (file->f_flags & O_LOV_DELAY_CREATE ||
474                     !(file->f_mode & FMODE_WRITE)) {
475                         CDEBUG(D_INODE, "object creation was delayed\n");
476                         GOTO(out, rc);
477                 }
478         }
479         file->f_flags &= ~O_LOV_DELAY_CREATE;
480         GOTO(out, rc);
481  out:
482         req = LUSTRE_IT(it)->it_data;
483         ll_intent_drop_lock(it);
484         ll_intent_release(it);
485         ptlrpc_req_finished(req);
486         if (rc == 0) {
487                 ll_open_complete(inode);
488         } else {
489 out_och_free:
490                 if (*och_p) {
491                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
492                         *och_p = NULL; /* OBD_FREE writes some magic there */
493                         (*och_usecount)--;
494                 }
495                 up(&lli->lli_och_sem);
496         }
497                 
498         return rc;
499 }
500
501 /* Fills the obdo with the attributes for the inode defined by lsm */
502 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
503                    struct obdo *oa)
504 {
505         struct ptlrpc_request_set *set;
506         int rc;
507         ENTRY;
508
509         LASSERT(lsm != NULL);
510
511         memset(oa, 0, sizeof *oa);
512         oa->o_id = lsm->lsm_object_id;
513         oa->o_gr = lsm->lsm_object_gr;
514         oa->o_mode = S_IFREG;
515         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
516                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
517                 OBD_MD_FLCTIME | OBD_MD_FLGROUP;
518
519         set = ptlrpc_prep_set();
520         if (set == NULL) {
521                 rc = -ENOMEM;
522         } else {
523                 rc = obd_getattr_async(exp, oa, lsm, set);
524                 if (rc == 0)
525                         rc = ptlrpc_set_wait(set);
526                 ptlrpc_set_destroy(set);
527         }
528         if (rc)
529                 RETURN(rc);
530
531         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
532                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
533         RETURN(0);
534 }
535
536 static inline void ll_remove_suid(struct inode *inode)
537 {
538         unsigned int mode;
539
540         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
541         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
542
543         /* was any of the uid bits set? */
544         mode &= inode->i_mode;
545         if (mode && !capable(CAP_FSETID)) {
546                 inode->i_mode &= ~mode;
547                 // XXX careful here - we cannot change the size
548         }
549 }
550
551 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
552 {
553         struct ll_inode_info *lli = ll_i2info(inode);
554         struct lov_stripe_md *lsm = lli->lli_smd;
555         struct obd_export *exp = ll_i2dtexp(inode);
556         struct {
557                 char name[16];
558                 struct ldlm_lock *lock;
559                 struct lov_stripe_md *lsm;
560         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
561         __u32 stripe, vallen = sizeof(stripe);
562         int rc;
563         ENTRY;
564
565         if (lsm->lsm_stripe_count == 1)
566                 GOTO(check, stripe = 0);
567
568         /* get our offset in the lov */
569         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
570         if (rc != 0) {
571                 CERROR("obd_get_info: rc = %d\n", rc);
572                 RETURN(rc);
573         }
574         LASSERT(stripe < lsm->lsm_stripe_count);
575         EXIT;
576 check:
577         if (lsm->lsm_oinfo[stripe].loi_id != lock->l_resource->lr_name.name[0]||
578             lsm->lsm_oinfo[stripe].loi_gr != lock->l_resource->lr_name.name[2]){
579                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64
580                            " inode=%lu/%u (%p)\n",
581                            lsm->lsm_oinfo[stripe].loi_id,
582                            lsm->lsm_oinfo[stripe].loi_gr,
583                            inode->i_ino, inode->i_generation, inode);
584                 return -ELDLM_NO_LOCK_DATA;
585         }
586
587         return stripe;
588 }
589
590 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
591  * we get a lock cancellation for each stripe, so we have to map the obd's
592  * region back onto the stripes in the file that it held.
593  *
594  * No one can dirty the extent until we've finished our work and they can
595  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
596  * but other kernel actors could have pages locked.
597  *
598  * Called with the DLM lock held. */
599 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
600                               struct ldlm_lock *lock, __u32 stripe)
601 {
602         ldlm_policy_data_t tmpex;
603         unsigned long start, end, count, skip, i, j;
604         struct page *page;
605         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
606         struct lustre_handle lockh;
607         ENTRY;
608
609         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
610         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
611                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
612                inode->i_size);
613
614         /* our locks are page granular thanks to osc_enqueue, we invalidate the
615          * whole page. */
616         LASSERT((tmpex.l_extent.start & ~PAGE_CACHE_MASK) == 0);
617         LASSERT(((tmpex.l_extent.end + 1) & ~PAGE_CACHE_MASK) == 0);
618
619         count = ~0;
620         skip = 0;
621         start = tmpex.l_extent.start >> PAGE_CACHE_SHIFT;
622         end = tmpex.l_extent.end >> PAGE_CACHE_SHIFT;
623         if (lsm->lsm_stripe_count > 1) {
624                 count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
625                 skip = (lsm->lsm_stripe_count - 1) * count;
626                 start += start/count * skip + stripe * count;
627                 if (end != ~0)
628                         end += end/count * skip + stripe * count;
629         }
630         if (end < tmpex.l_extent.end >> PAGE_CACHE_SHIFT)
631                 end = ~0;
632
633         i = inode->i_size ? (inode->i_size - 1) >> PAGE_CACHE_SHIFT : 0;
634         if (i < end)
635                 end = i;
636
637         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
638                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
639                count, skip, end, discard ? " (DISCARDING)" : "");
640         
641         /* walk through the vmas on the inode and tear down mmaped pages that
642          * intersect with the lock.  this stops immediately if there are no
643          * mmap()ed regions of the file.  This is not efficient at all and
644          * should be short lived. We'll associate mmap()ed pages with the lock
645          * and will be able to find them directly */
646         
647         for (i = start; i <= end; i += (j + skip)) {
648                 j = min(count - (i % count), end - i + 1);
649                 LASSERT(j > 0);
650                 LASSERT(inode->i_mapping);
651                 if (ll_teardown_mmaps(inode->i_mapping, i << PAGE_CACHE_SHIFT,
652                                       ((i+j) << PAGE_CACHE_SHIFT) - 1) )
653                         break;
654         }
655
656         /* this is the simplistic implementation of page eviction at
657          * cancelation.  It is careful to get races with other page
658          * lockers handled correctly.  fixes from bug 20 will make it
659          * more efficient by associating locks with pages and with
660          * batching writeback under the lock explicitly. */
661         for (i = start, j = start % count; i <= end;
662              j++, i++, tmpex.l_extent.start += PAGE_CACHE_SIZE) {
663                 if (j == count) {
664                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
665                         i += skip;
666                         j = 0;
667                         if (i > end)
668                                 break;
669                 }
670                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
671                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
672                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
673                          start, i, end);
674
675                 if (!mapping_has_pages(inode->i_mapping)) {
676                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
677                         break;
678                 }
679
680                 cond_resched();
681
682                 page = find_get_page(inode->i_mapping, i);
683                 if (page == NULL)
684                         continue;
685                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
686                                i, tmpex.l_extent.start);
687                 lock_page(page);
688
689                 /* page->mapping to check with racing against teardown */
690                 if (!discard && clear_page_dirty_for_io(page)) {
691                         rc = ll_call_writepage(inode, page);
692                         if (rc != 0)
693                                 CERROR("writepage of page %p failed: %d\n",
694                                        page, rc);
695                         /* either waiting for io to complete or reacquiring
696                          * the lock that the failed writepage released */
697                         lock_page(page);
698                 }
699
700                 tmpex.l_extent.end = tmpex.l_extent.start + PAGE_CACHE_SIZE - 1;
701                 /* check to see if another DLM lock covers this page */
702                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
703                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
704                                       LDLM_FL_TEST_LOCK,
705                                       &lock->l_resource->lr_name, LDLM_EXTENT,
706                                       &tmpex, LCK_PR | LCK_PW, &lockh);
707                 if (rc2 == 0 && page->mapping != NULL) {
708                         // checking again to account for writeback's lock_page()
709                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
710                         ll_ra_accounting(page, inode->i_mapping);
711                         ll_truncate_complete_page(page);
712                 }
713                 unlock_page(page);
714                 page_cache_release(page);
715         }
716         LASSERTF(tmpex.l_extent.start <=
717                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
718                   lock->l_policy_data.l_extent.end + 1),
719                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
720                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
721                  start, i, end);
722         EXIT;
723 }
724
725 static int ll_extent_lock_callback(struct ldlm_lock *lock,
726                                    struct ldlm_lock_desc *new, void *data,
727                                    int flag)
728 {
729         struct lustre_handle lockh = { 0 };
730         int rc;
731         ENTRY;
732
733         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
734                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
735                 LBUG();
736         }
737
738         switch (flag) {
739         case LDLM_CB_BLOCKING:
740                 ldlm_lock2handle(lock, &lockh);
741                 rc = ldlm_cli_cancel(&lockh);
742                 if (rc != ELDLM_OK)
743                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
744                 break;
745         case LDLM_CB_CANCELING: {
746                 struct inode *inode;
747                 struct ll_inode_info *lli;
748                 struct lov_stripe_md *lsm;
749                 __u32 stripe;
750                 __u64 kms;
751
752                 /* This lock wasn't granted, don't try to evict pages */
753                 if (lock->l_req_mode != lock->l_granted_mode)
754                         RETURN(0);
755
756                 inode = ll_inode_from_lock(lock);
757                 if (inode == NULL)
758                         RETURN(0);
759                 lli = ll_i2info(inode);
760                 if (lli == NULL)
761                         goto iput;
762                 if (lli->lli_smd == NULL)
763                         goto iput;
764                 lsm = lli->lli_smd;
765
766                 stripe = ll_lock_to_stripe_offset(inode, lock);
767                 if (stripe < 0)
768                         goto iput;
769                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
770
771                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
772                 down(&lli->lli_size_sem);
773                 kms = ldlm_extent_shift_kms(lock,
774                                             lsm->lsm_oinfo[stripe].loi_kms);
775                 
776                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
777                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
778                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
779                 lsm->lsm_oinfo[stripe].loi_kms = kms;
780                 up(&lli->lli_size_sem);
781                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
782                 //ll_try_done_writing(inode);
783         iput:
784                 iput(inode);
785                 break;
786         }
787         default:
788                 LBUG();
789         }
790
791         RETURN(0);
792 }
793
794 #if 0
795 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
796 {
797         /* XXX ALLOCATE - 160 bytes */
798         struct inode *inode = ll_inode_from_lock(lock);
799         struct ll_inode_info *lli = ll_i2info(inode);
800         struct lustre_handle lockh = { 0 };
801         struct ost_lvb *lvb;
802         __u32 stripe;
803         ENTRY;
804
805         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
806                      LDLM_FL_BLOCK_CONV)) {
807                 LBUG(); /* not expecting any blocked async locks yet */
808                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
809                            "lock, returning");
810                 ldlm_lock_dump(D_OTHER, lock, 0);
811                 ldlm_reprocess_all(lock->l_resource);
812                 RETURN(0);
813         }
814
815         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
816
817         stripe = ll_lock_to_stripe_offset(inode, lock);
818         if (stripe < 0)
819                 goto iput;
820
821         if (lock->l_lvb_len) {
822                 struct lov_stripe_md *lsm = lli->lli_smd;
823                 __u64 kms;
824                 lvb = lock->l_lvb_data;
825                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
826
827                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
828                 down(&inode->i_sem);
829                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
830                 kms = ldlm_extent_shift_kms(NULL, kms);
831                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
832                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
833                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
834                 lsm->lsm_oinfo[stripe].loi_kms = kms;
835                 up(&inode->i_sem);
836                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
837         }
838
839 iput:
840         iput(inode);
841         wake_up(&lock->l_waitq);
842
843         ldlm_lock2handle(lock, &lockh);
844         ldlm_lock_decref(&lockh, LCK_PR);
845         RETURN(0);
846 }
847 #endif
848
849 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
850 {
851         struct ptlrpc_request *req = reqp;
852         struct inode *inode = ll_inode_from_lock(lock);
853         struct ll_inode_info *lli;
854         struct ost_lvb *lvb;
855         struct lov_stripe_md *lsm;
856         int rc, size = sizeof(*lvb), stripe;
857         ENTRY;
858
859         if (inode == NULL)
860                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
861         lli = ll_i2info(inode);
862         if (lli == NULL)
863                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
864
865         lsm = lli->lli_smd;
866         if (lsm == NULL)
867                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
868
869         /* First, find out which stripe index this lock corresponds to. */
870         stripe = ll_lock_to_stripe_offset(inode, lock);
871         if (stripe < 0)
872                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
873
874         rc = lustre_pack_reply(req, 1, &size, NULL);
875         if (rc) {
876                 CERROR("lustre_pack_reply: %d\n", rc);
877                 GOTO(iput, rc);
878         }
879
880         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
881         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
882         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
883         lvb->lvb_atime = LTIME_S(inode->i_atime);
884         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
885
886         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64,
887                    inode->i_size, stripe, lvb->lvb_size);
888         GOTO(iput, 0);
889  iput:
890         iput(inode);
891
892  out:
893         /* These errors are normal races, so we don't want to fill the console
894          * with messages by calling ptlrpc_error() */
895         if (rc == -ELDLM_NO_LOCK_DATA)
896                 lustre_pack_reply(req, 0, NULL, NULL);
897
898         req->rq_status = rc;
899         return rc;
900 }
901
902 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
903 __u64 lov_merge_blocks(struct lov_stripe_md *lsm);
904 __u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
905
906 /* NB: lov_merge_size will prefer locally cached writes if they extend the
907  * file (because it prefers KMS over RSS when larger) */
908 int ll_glimpse_size(struct inode *inode)
909 {
910         struct ll_inode_info *lli = ll_i2info(inode);
911         struct ll_sb_info *sbi = ll_i2sbi(inode);
912         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
913         struct lustre_handle lockh = { 0 };
914         int rc, flags = LDLM_FL_HAS_INTENT;
915         ENTRY;
916
917         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
918
919         rc = obd_enqueue(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT, &policy,
920                          LCK_PR, &flags, ll_extent_lock_callback,
921                          ldlm_completion_ast, ll_glimpse_callback, inode,
922                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, &lockh);
923         if (rc == -ENOENT)
924                 RETURN(rc);
925
926         if (rc != 0) {
927                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
928                 RETURN(rc > 0 ? -EIO : rc);
929         }
930
931         down(&lli->lli_size_sem);
932         inode->i_size = lov_merge_size(lli->lli_smd, 0);
933         inode->i_blocks = lov_merge_blocks(lli->lli_smd);
934         up(&lli->lli_size_sem);
935
936         LTIME_S(inode->i_mtime) = lov_merge_mtime(lli->lli_smd,
937                                                   LTIME_S(inode->i_mtime));
938
939         CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64", blocks: "LPU64"\n",
940                (__u64)inode->i_size, (__u64)inode->i_blocks);
941         
942         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
943         RETURN(rc);
944 }
945
946 void ll_stime_record(struct ll_sb_info *sbi, struct timeval *start,
947                     struct obd_service_time *stime)
948 {
949         struct timeval stop;
950         do_gettimeofday(&stop);
951                                                                                                                                                                                                      
952         spin_lock(&sbi->ll_lock);
953         lprocfs_stime_record(stime, &stop, start);
954         spin_unlock(&sbi->ll_lock);
955 }
956
957 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
958                    struct lov_stripe_md *lsm, int mode,
959                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
960                    int ast_flags, struct obd_service_time *stime)
961 {
962         struct ll_inode_info *lli = ll_i2info(inode);
963         struct ll_sb_info *sbi = ll_i2sbi(inode);
964         struct timeval start;
965         int rc;
966         ENTRY;
967
968         LASSERT(lockh->cookie == 0);
969
970         /* XXX phil: can we do this?  won't it screw the file size up? */
971         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
972             (sbi->ll_flags & LL_SBI_NOLCK))
973                 RETURN(0);
974
975         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
976                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
977
978         do_gettimeofday(&start);
979         rc = obd_enqueue(sbi->ll_dt_exp, lsm, LDLM_EXTENT, policy, mode,
980                          &ast_flags, ll_extent_lock_callback,
981                          ldlm_completion_ast, ll_glimpse_callback, inode,
982                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
983         if (rc > 0)
984                 rc = -EIO;
985         
986         ll_stime_record(sbi, &start, stime);
987
988         if (policy->l_extent.start == 0 &&
989             policy->l_extent.end == OBD_OBJECT_EOF) {
990                 /* vmtruncate()->ll_truncate() first sets the i_size and then
991                  * the kms under both a DLM lock and the i_sem.  If we don't
992                  * get the i_sem here we can match the DLM lock and reset
993                  * i_size from the kms before the truncating path has updated
994                  * the kms.  generic_file_write can then trust the stale i_size
995                  * when doing appending writes and effectively cancel the
996                  * result of the truncate.  Getting the i_sem after the enqueue
997                  * maintains the DLM -> i_sem acquiry order. */
998                 down(&lli->lli_size_sem);
999                 inode->i_size = lov_merge_size(lsm, 1);
1000                 up(&lli->lli_size_sem);
1001         }
1002         
1003         if (rc == 0) {
1004                 LTIME_S(inode->i_mtime) =
1005                         lov_merge_mtime(lsm, LTIME_S(inode->i_mtime));
1006         }
1007
1008         RETURN(rc);
1009 }
1010
1011 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1012                      struct lov_stripe_md *lsm, int mode,
1013                      struct lustre_handle *lockh)
1014 {
1015         struct ll_sb_info *sbi = ll_i2sbi(inode);
1016         int rc;
1017         ENTRY;
1018
1019         /* XXX phil: can we do this?  won't it screw the file size up? */
1020         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1021             (sbi->ll_flags & LL_SBI_NOLCK))
1022                 RETURN(0);
1023
1024         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1025
1026         RETURN(rc);
1027 }
1028
1029 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1030                             loff_t *ppos)
1031 {
1032         struct inode *inode = file->f_dentry->d_inode;
1033         struct ll_inode_info *lli = ll_i2info(inode);
1034         struct lov_stripe_md *lsm = lli->lli_smd;
1035         struct ll_lock_tree tree;
1036         struct ll_lock_tree_node *node;
1037         int rc;
1038         ssize_t retval;
1039         __u64 kms;
1040         ENTRY;
1041         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1042                inode->i_ino, inode->i_generation, inode, count, *ppos);
1043
1044         /* "If nbyte is 0, read() will return 0 and have no other results."
1045          *                      -- Single Unix Spec */
1046         if (count == 0)
1047                 RETURN(0);
1048
1049         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
1050                             count);
1051
1052         if (!lsm)
1053                 RETURN(0);
1054
1055         node = ll_node_from_inode(inode, *ppos, *ppos  + count - 1,
1056                                   LCK_PR);
1057
1058         tree.lt_fd = file->private_data;
1059
1060         rc = ll_tree_lock(&tree, node, inode, buf, count,
1061                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1062         if (rc != 0)
1063                 RETURN(rc);
1064
1065         down(&lli->lli_size_sem);
1066         kms = lov_merge_size(lsm, 1);
1067         if (*ppos + count - 1 > kms) {
1068                 /* A glimpse is necessary to determine whether we return a short
1069                  * read or some zeroes at the end of the buffer */
1070                 up(&lli->lli_size_sem);
1071                 retval = ll_glimpse_size(inode);
1072                 if (retval)
1073                         goto out;
1074         } else {
1075                 inode->i_size = kms;
1076                 up(&lli->lli_size_sem);
1077         }
1078
1079         CDEBUG(D_INFO, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1080                inode->i_ino, count, *ppos, inode->i_size);
1081
1082         /* turn off the kernel's read-ahead */
1083 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1084         file->f_ramax = 0;
1085 #else
1086         file->f_ra.ra_pages = 0;
1087 #endif
1088         retval = generic_file_read(file, buf, count, ppos);
1089
1090  out:
1091         ll_tree_unlock(&tree, inode);
1092         RETURN(retval);
1093 }
1094
1095 /*
1096  * Write to a file (through the page cache).
1097  */
1098 static ssize_t ll_file_write(struct file *file, const char *buf,
1099                              size_t count, loff_t *ppos)
1100 {
1101         struct inode *inode = file->f_dentry->d_inode;
1102         loff_t maxbytes = ll_file_maxbytes(inode);
1103         struct ll_lock_tree tree;
1104         struct ll_lock_tree_node *node;
1105         ssize_t retval;
1106         int rc;
1107
1108         ENTRY;
1109         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1110                inode->i_ino, inode->i_generation, inode, count, *ppos);
1111
1112         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1113
1114         /* POSIX, but surprised the VFS doesn't check this already */
1115         if (count == 0)
1116                 RETURN(0);
1117
1118         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1119          * called on the file, don't fail the below assertion (bug 2388). */
1120         if (file->f_flags & O_LOV_DELAY_CREATE &&
1121             ll_i2info(inode)->lli_smd == NULL)
1122                 RETURN(-EBADF);
1123
1124         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1125         
1126         if (file->f_flags & O_APPEND)
1127                 node = ll_node_from_inode(inode, 0, OBD_OBJECT_EOF, LCK_PW);
1128         else
1129                 node = ll_node_from_inode(inode, *ppos, *ppos  + count - 1,
1130                                           LCK_PW);
1131
1132         if (IS_ERR(node))
1133                 RETURN(PTR_ERR(node));
1134
1135         tree.lt_fd = file->private_data;
1136
1137         rc = ll_tree_lock(&tree, node, inode, buf, count,
1138                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1139         if (rc != 0)
1140                 RETURN(rc);
1141
1142         /* this is ok, g_f_w will overwrite this under i_sem if it races
1143          * with a local truncate, it just makes our maxbyte checking easier */
1144         if (file->f_flags & O_APPEND)
1145                 *ppos = inode->i_size;
1146
1147         if (*ppos >= maxbytes) {
1148                 if (count || *ppos > maxbytes) {
1149                         send_sig(SIGXFSZ, current, 0);
1150                         GOTO(out, retval = -EFBIG);
1151                 }
1152         }
1153         if (*ppos + count > maxbytes)
1154                 count = maxbytes - *ppos;
1155
1156         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1157                inode->i_ino, count, *ppos);
1158
1159         /* generic_file_write handles O_APPEND after getting i_sem */
1160         retval = generic_file_write(file, buf, count, ppos);
1161         EXIT;
1162 out:
1163         ll_tree_unlock(&tree, inode);
1164         /* serialize with mmap/munmap/mremap */
1165         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WRITE_BYTES,
1166                             retval > 0 ? retval : 0);
1167         return retval;
1168 }
1169
1170 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1171                                unsigned long arg)
1172 {
1173         struct ll_inode_info *lli = ll_i2info(inode);
1174         struct obd_export *exp = ll_i2dtexp(inode);
1175         struct ll_recreate_obj ucreatp;
1176         struct obd_trans_info oti = { 0 };
1177         struct obdo *oa = NULL;
1178         int lsm_size;
1179         int rc = 0;
1180         struct lov_stripe_md *lsm, *lsm2;
1181         ENTRY;
1182
1183         if (!capable (CAP_SYS_ADMIN))
1184                 RETURN(-EPERM);
1185
1186         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1187                             sizeof(struct ll_recreate_obj));
1188         if (rc) {
1189                 RETURN(-EFAULT);
1190         }
1191         oa = obdo_alloc();
1192         if (oa == NULL) 
1193                 RETURN(-ENOMEM);
1194
1195         down(&lli->lli_open_sem);
1196         lsm = lli->lli_smd;
1197         if (lsm == NULL)
1198                 GOTO(out, rc = -ENOENT);
1199         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1200                    (lsm->lsm_stripe_count));
1201
1202         OBD_ALLOC(lsm2, lsm_size);
1203         if (lsm2 == NULL)
1204                 GOTO(out, rc = -ENOMEM);
1205
1206         oa->o_id = ucreatp.lrc_id;
1207         oa->o_nlink = ucreatp.lrc_ost_idx;
1208         oa->o_gr = ucreatp.lrc_group;
1209         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
1210         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1211         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1212                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1213
1214         oti.oti_objid = NULL;
1215         memcpy(lsm2, lsm, lsm_size);
1216         rc = obd_create(exp, oa, &lsm2, &oti);
1217
1218         OBD_FREE(lsm2, lsm_size);
1219         GOTO(out, rc);
1220 out:
1221         up(&lli->lli_open_sem);
1222         obdo_free(oa);
1223         return rc;
1224 }
1225
1226 static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1227                                     int flags, struct lov_user_md *lum,
1228                                     int lum_size)
1229 {
1230         struct ll_inode_info *lli = ll_i2info(inode);
1231         struct file *f;
1232         struct obd_export *exp = ll_i2dtexp(inode);
1233         struct lov_stripe_md *lsm;
1234         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1235         struct ptlrpc_request *req = NULL;
1236         int rc = 0;
1237         struct lustre_md md;
1238         struct obd_client_handle *och;
1239         ENTRY;
1240
1241         
1242         if ((file->f_flags+1) & O_ACCMODE)
1243                 oit.it_flags++;
1244         if (file->f_flags & O_TRUNC)
1245                 oit.it_flags |= 2;
1246
1247         down(&lli->lli_open_sem);
1248         lsm = lli->lli_smd;
1249         if (lsm) {
1250                 up(&lli->lli_open_sem);
1251                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1252                        inode->i_ino);
1253                 RETURN(-EEXIST);
1254         }
1255
1256         f = get_empty_filp();
1257         if (!f)
1258                 GOTO(out, -ENOMEM);
1259
1260         f->f_dentry = file->f_dentry;
1261         f->f_vfsmnt = file->f_vfsmnt;
1262         f->f_flags = flags;
1263
1264         rc = ll_intent_alloc(&oit);
1265         if (rc)
1266                 GOTO(out, rc);
1267
1268         rc = ll_intent_file_open(f, lum, lum_size, &oit);
1269         if (rc)
1270                 GOTO(out, rc);
1271         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1272                 GOTO(out, -ENOENT);
1273         
1274         req = LUSTRE_IT(&oit)->it_data;
1275         rc = LUSTRE_IT(&oit)->it_status;
1276
1277         if (rc < 0)
1278                 GOTO(out, rc);
1279
1280         rc = mdc_req2lustre_md(ll_i2mdexp(inode), req, 1, exp, &md);
1281         if (rc)
1282                 GOTO(out, rc);
1283         ll_update_inode(f->f_dentry->d_inode, &md);
1284
1285         OBD_ALLOC(och, sizeof(struct obd_client_handle));
1286         rc = ll_local_open(f, &oit, och);
1287         if (rc) { /* Actually ll_local_open cannot fail! */
1288                 GOTO(out, rc);
1289         }
1290         if (LUSTRE_IT(&oit)->it_lock_mode) {
1291                 ldlm_lock_decref_and_cancel((struct lustre_handle *)
1292                                             &LUSTRE_IT(&oit)->it_lock_handle,
1293                                             LUSTRE_IT(&oit)->it_lock_mode);
1294                 LUSTRE_IT(&oit)->it_lock_mode = 0;
1295         }
1296
1297         ll_intent_release(&oit);
1298
1299         /* ll_file_release will decrease the count, but won't free anything
1300            because we have at least one more reference coming from actual open
1301          */
1302         down(&lli->lli_och_sem);
1303         lli->lli_open_fd_write_count++;
1304         up(&lli->lli_och_sem);
1305         rc = ll_file_release(f->f_dentry->d_inode, f);
1306         
1307         /* Now also destroy our supplemental och */
1308         ll_md_och_close(ll_i2mdexp(inode), f->f_dentry->d_inode, och);
1309         EXIT;
1310  out:
1311         ll_intent_release(&oit);
1312         if (f)
1313                 put_filp(f);
1314         up(&lli->lli_open_sem);
1315         if (req != NULL)
1316                 ptlrpc_req_finished(req);
1317         return rc;
1318 }
1319
1320 static int ll_lov_setea(struct inode *inode, struct file *file,
1321                             unsigned long arg)
1322 {
1323         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1324         struct lov_user_md  *lump;
1325         int lum_size = sizeof(struct lov_user_md) +
1326                        sizeof(struct lov_user_ost_data);
1327         int rc;
1328         ENTRY;
1329
1330         if (!capable (CAP_SYS_ADMIN))
1331                 RETURN(-EPERM);
1332
1333         OBD_ALLOC(lump, lum_size);
1334         if (lump == NULL) {
1335                 RETURN(-ENOMEM);
1336         }
1337         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1338         if (rc) {
1339                 OBD_FREE(lump, lum_size);
1340                 RETURN(-EFAULT);
1341         }
1342
1343         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1344
1345         OBD_FREE(lump, lum_size);
1346         RETURN(rc);
1347 }
1348
1349 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1350                             unsigned long arg)
1351 {
1352         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1353         int rc;
1354         int flags = FMODE_WRITE;
1355         ENTRY;
1356
1357         /* Bug 1152: copy properly when this is no longer true */
1358         LASSERT(sizeof(lum) == sizeof(*lump));
1359         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1360         rc = copy_from_user(&lum, lump, sizeof(lum));
1361         if (rc)
1362                 RETURN(-EFAULT);
1363
1364         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1365         if (rc == 0) {
1366                  put_user(0, &lump->lmm_stripe_count);
1367                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1368                                     0, ll_i2info(inode)->lli_smd, lump);
1369         }
1370         RETURN(rc);
1371 }
1372
1373 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1374 {
1375         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1376
1377         if (!lsm)
1378                 RETURN(-ENODATA);
1379
1380         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1381                              (void *)arg);
1382 }
1383
1384 static int ll_get_grouplock(struct inode *inode, struct file *file,
1385                          unsigned long arg)
1386 {
1387         struct ll_file_data *fd = file->private_data;
1388         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1389                                                     .end = OBD_OBJECT_EOF}};
1390         struct lustre_handle lockh = { 0 };
1391         struct ll_inode_info *lli = ll_i2info(inode);
1392         struct lov_stripe_md *lsm = lli->lli_smd;
1393         int flags = 0, rc;
1394         ENTRY;
1395
1396         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1397                 RETURN(-EINVAL);
1398         }
1399
1400         policy.l_extent.gid = arg;
1401         if (file->f_flags & O_NONBLOCK)
1402                 flags = LDLM_FL_BLOCK_NOWAIT;
1403
1404         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags,
1405                             &ll_i2sbi(inode)->ll_grouplock_stime);
1406         if (rc != 0)
1407                 RETURN(rc);
1408
1409         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1410         fd->fd_gid = arg;
1411         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1412
1413         RETURN(0);
1414 }
1415
1416 static int ll_put_grouplock(struct inode *inode, struct file *file,
1417                          unsigned long arg)
1418 {
1419         struct ll_file_data *fd = file->private_data;
1420         struct ll_inode_info *lli = ll_i2info(inode);
1421         struct lov_stripe_md *lsm = lli->lli_smd;
1422         int rc;
1423         ENTRY;
1424
1425         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1426                 /* Ugh, it's already unlocked. */
1427                 RETURN(-EINVAL);
1428         }
1429
1430         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1431                 RETURN(-EINVAL);
1432
1433         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1434
1435         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1436         if (rc)
1437                 RETURN(rc);
1438
1439         fd->fd_gid = 0;
1440         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1441
1442         RETURN(0);
1443 }
1444
1445 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
1446                   unsigned long arg)
1447 {
1448         struct ll_file_data *fd = file->private_data;
1449         struct ll_sb_info *sbi = ll_i2sbi(inode);
1450         int flags;
1451         ENTRY;
1452
1453         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1454                inode->i_generation, inode, cmd);
1455
1456         if (_IOC_TYPE(cmd) == 'T') /* tty ioctls */
1457                 RETURN(-ENOTTY);
1458
1459         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_IOCTL);
1460         switch(cmd) {
1461         case LL_IOC_GETFLAGS:
1462                 /* Get the current value of the file flags */
1463                 return put_user(fd->fd_flags, (int *)arg);
1464         case LL_IOC_SETFLAGS:
1465         case LL_IOC_CLRFLAGS:
1466                 /* Set or clear specific file flags */
1467                 /* XXX This probably needs checks to ensure the flags are
1468                  *     not abused, and to handle any flag side effects.
1469                  */
1470                 if (get_user(flags, (int *) arg))
1471                         RETURN(-EFAULT);
1472
1473                 if (cmd == LL_IOC_SETFLAGS)
1474                         fd->fd_flags |= flags;
1475                 else
1476                         fd->fd_flags &= ~flags;
1477                 RETURN(0);
1478         case LL_IOC_LOV_SETSTRIPE:
1479                 RETURN(ll_lov_setstripe(inode, file, arg));
1480         case LL_IOC_LOV_SETEA:
1481                 RETURN(ll_lov_setea(inode, file, arg));
1482         case IOC_MDC_SHOWFID: {
1483                 struct lustre_id *idp = (struct lustre_id *)arg;
1484                 struct lustre_id id;
1485                 char *filename;
1486                 int rc;
1487
1488                 filename = getname((const char *)arg);
1489                 if (IS_ERR(filename))
1490                         RETURN(PTR_ERR(filename));
1491
1492                 ll_inode2id(&id, inode);
1493
1494                 rc = ll_get_fid(sbi->ll_md_exp, &id, filename, &id);
1495                 if (rc < 0)
1496                         GOTO(out_filename, rc);
1497
1498                 rc = copy_to_user(idp, &id, sizeof(*idp));
1499                 if (rc)
1500                         GOTO(out_filename, rc = -EFAULT);
1501
1502                 EXIT;
1503         out_filename:
1504                 putname(filename);
1505                 return rc;
1506         }
1507         case LL_IOC_LOV_GETSTRIPE:
1508                 RETURN(ll_lov_getstripe(inode, arg));
1509         case LL_IOC_RECREATE_OBJ:
1510                 RETURN(ll_lov_recreate_obj(inode, file, arg));
1511         case EXT3_IOC_GETFLAGS:
1512         case EXT3_IOC_SETFLAGS:
1513                 RETURN( ll_iocontrol(inode, file, cmd, arg) );
1514         case LL_IOC_GROUP_LOCK:
1515                 RETURN(ll_get_grouplock(inode, file, arg));
1516         case LL_IOC_GROUP_UNLOCK:
1517                 RETURN(ll_put_grouplock(inode, file, arg));
1518         case EXT3_IOC_GETVERSION_OLD:
1519         case EXT3_IOC_GETVERSION:
1520                 return put_user(inode->i_generation, (int *) arg);
1521         /* We need to special case any other ioctls we want to handle,
1522          * to send them to the MDS/OST as appropriate and to properly
1523          * network encode the arg field.
1524         case EXT2_IOC_GETVERSION_OLD:
1525         case EXT2_IOC_GETVERSION_NEW:
1526         case EXT2_IOC_SETVERSION_OLD:
1527         case EXT2_IOC_SETVERSION_NEW:
1528         case EXT3_IOC_SETVERSION_OLD:
1529         case EXT3_IOC_SETVERSION:
1530         */
1531         default:
1532                 RETURN( obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
1533                                       (void *)arg) );
1534         }
1535 }
1536
1537 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
1538 {
1539         struct inode *inode = file->f_dentry->d_inode;
1540         struct ll_file_data *fd = file->private_data;
1541         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1542         struct lustre_handle lockh = {0};
1543         loff_t retval;
1544         ENTRY;
1545         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),to=%llu\n", inode->i_ino,
1546                inode->i_generation, inode,
1547                offset + ((origin==2) ? inode->i_size : file->f_pos));
1548
1549         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
1550         if (origin == 2) { /* SEEK_END */
1551                 ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF }};
1552                 struct ll_inode_info *lli = ll_i2info(inode);
1553                 int nonblock = 0, rc;
1554
1555                 if (file->f_flags & O_NONBLOCK)
1556                         nonblock = LDLM_FL_BLOCK_NOWAIT;
1557
1558                 rc = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh,
1559                                     nonblock, &ll_i2sbi(inode)->ll_seek_stime);
1560                 if (rc != 0)
1561                         RETURN(rc);
1562
1563                 down(&lli->lli_size_sem);
1564                 offset += inode->i_size;
1565                 up(&lli->lli_size_sem);
1566         } else if (origin == 1) { /* SEEK_CUR */
1567                 offset += file->f_pos;
1568         }
1569
1570         retval = -EINVAL;
1571         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
1572                 if (offset != file->f_pos) {
1573                         file->f_pos = offset;
1574 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1575                         file->f_reada = 0;
1576                         file->f_version = ++event;
1577 #endif
1578                 }
1579                 retval = offset;
1580         }
1581
1582         if (origin == 2)
1583                 ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
1584         RETURN(retval);
1585 }
1586
1587 int ll_fsync(struct file *file, struct dentry *dentry, int data)
1588 {
1589         struct inode *inode = dentry->d_inode;
1590         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1591         struct lustre_id id;
1592         struct ptlrpc_request *req;
1593         int rc, err;
1594         ENTRY;
1595         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
1596                inode->i_generation, inode);
1597
1598         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_FSYNC);
1599
1600         /* fsync's caller has already called _fdata{sync,write}, we want
1601          * that IO to finish before calling the osc and mdc sync methods */
1602         rc = filemap_fdatawait(inode->i_mapping);
1603
1604         ll_inode2id(&id, inode);
1605         err = md_sync(ll_i2sbi(inode)->ll_md_exp, &id, &req);
1606         if (!rc)
1607                 rc = err;
1608         if (!err)
1609                 ptlrpc_req_finished(req);
1610
1611         if (data && lsm) {
1612                 struct obdo *oa = obdo_alloc();
1613
1614                 if (!oa)
1615                         RETURN(rc ? rc : -ENOMEM);
1616
1617                 oa->o_id = lsm->lsm_object_id;
1618                 oa->o_gr = lsm->lsm_object_gr;
1619                 oa->o_valid = OBD_MD_FLID;
1620                 obdo_from_inode(oa, inode, (OBD_MD_FLTYPE | OBD_MD_FLATIME |
1621                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME |
1622                                             OBD_MD_FLGROUP));
1623
1624                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
1625                                0, OBD_OBJECT_EOF);
1626                 if (!rc)
1627                         rc = err;
1628                 obdo_free(oa);
1629         }
1630
1631         RETURN(rc);
1632 }
1633
1634 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
1635 {
1636         struct inode *inode = file->f_dentry->d_inode;
1637         struct ll_inode_info *li = ll_i2info(inode);
1638         struct ll_sb_info *sbi = ll_i2sbi(inode);
1639         struct obd_device *obddev;
1640         struct ldlm_res_id res_id =
1641                 { .name = {id_fid(&li->lli_id), id_group(&li->lli_id), LDLM_FLOCK} };
1642         struct lustre_handle lockh = {0};
1643         ldlm_policy_data_t flock;
1644         ldlm_mode_t mode = 0;
1645         int flags = 0;
1646         int rc;
1647         ENTRY;
1648
1649         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
1650                inode->i_ino, file_lock);
1651
1652         flock.l_flock.pid = file_lock->fl_pid;
1653         flock.l_flock.start = file_lock->fl_start;
1654         flock.l_flock.end = file_lock->fl_end;
1655
1656         switch (file_lock->fl_type) {
1657         case F_RDLCK:
1658                 mode = LCK_PR;
1659                 break;
1660         case F_UNLCK:
1661                 /* An unlock request may or may not have any relation to
1662                  * existing locks so we may not be able to pass a lock handle
1663                  * via a normal ldlm_lock_cancel() request. The request may even
1664                  * unlock a byte range in the middle of an existing lock. In
1665                  * order to process an unlock request we need all of the same
1666                  * information that is given with a normal read or write record
1667                  * lock request. To avoid creating another ldlm unlock (cancel)
1668                  * message we'll treat a LCK_NL flock request as an unlock. */
1669                 mode = LCK_NL;
1670                 break;
1671         case F_WRLCK:
1672                 mode = LCK_PW;
1673                 break;
1674         default:
1675                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
1676                 LBUG();
1677         }
1678
1679         switch (cmd) {
1680         case F_SETLKW:
1681 #ifdef F_SETLKW64
1682         case F_SETLKW64:
1683 #endif
1684                 flags = 0;
1685                 break;
1686         case F_SETLK:
1687 #ifdef F_SETLK64
1688         case F_SETLK64:
1689 #endif
1690                 flags = LDLM_FL_BLOCK_NOWAIT;
1691                 break;
1692         case F_GETLK:
1693 #ifdef F_GETLK64
1694         case F_GETLK64:
1695 #endif
1696                 flags = LDLM_FL_TEST_LOCK;
1697                 /* Save the old mode so that if the mode in the lock changes we
1698                  * can decrement the appropriate reader or writer refcount. */
1699                 file_lock->fl_type = mode;
1700                 break;
1701         default:
1702                 CERROR("unknown fcntl lock command: %d\n", cmd);
1703                 LBUG();
1704         }
1705
1706         CDEBUG(D_DLMTRACE, "inode=%lu, pid="LPU64", flags=%#x, mode=%u, "
1707                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
1708                flags, mode, flock.l_flock.start, flock.l_flock.end);
1709
1710         obddev = md_get_real_obd(sbi->ll_md_exp, &li->lli_id);
1711         rc = ldlm_cli_enqueue(obddev->obd_self_export, NULL,
1712                               obddev->obd_namespace,
1713                               res_id, LDLM_FLOCK, &flock, mode, &flags,
1714                               NULL, ldlm_flock_completion_ast, NULL, file_lock,
1715                               NULL, 0, NULL, &lockh);
1716         RETURN(rc);
1717 }
1718
1719 int ll_inode_revalidate_it(struct dentry *dentry)
1720 {
1721         struct lookup_intent oit = { .it_op = IT_GETATTR };
1722         struct inode *inode = dentry->d_inode;
1723         struct ptlrpc_request *req = NULL;
1724         struct ll_inode_info *lli;
1725         struct lov_stripe_md *lsm;
1726         struct ll_sb_info *sbi;
1727         struct lustre_id id;
1728         int rc;
1729         ENTRY;
1730
1731         if (!inode) {
1732                 CERROR("REPORT THIS LINE TO PETER\n");
1733                 RETURN(0);
1734         }
1735         
1736         sbi = ll_i2sbi(inode);
1737         
1738         ll_inode2id(&id, inode);
1739         lli = ll_i2info(inode);
1740         LASSERT(id_fid(&id) != 0);
1741
1742         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), name=%s(%p)\n",
1743                inode->i_ino, inode->i_generation, inode, dentry->d_name.name,
1744                dentry);
1745
1746 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
1747         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE);
1748 #endif
1749
1750         rc = ll_intent_alloc(&oit);
1751         if (rc)
1752                 RETURN(-ENOMEM);
1753
1754         rc = md_intent_lock(sbi->ll_md_exp, &id, NULL, 0, NULL, 0, &id,
1755                             &oit, 0, &req, ll_mdc_blocking_ast);
1756         if (rc < 0)
1757                 GOTO(out, rc);
1758
1759         rc = revalidate_it_finish(req, 1, &oit, dentry);
1760         if (rc) {
1761                 GOTO(out, rc);
1762         }
1763
1764         ll_lookup_finish_locks(&oit, dentry);
1765
1766         lsm = lli->lli_smd;
1767         if (lsm == NULL) /* object not yet allocated, don't validate size */
1768                 GOTO(out, rc = 0);
1769
1770         /*
1771          * ll_glimpse_size() will prefer locally cached writes if they extend
1772          * the file.
1773          */
1774         rc = ll_glimpse_size(inode);
1775         EXIT;
1776 out:
1777         ll_intent_release(&oit);
1778         if (req)
1779                 ptlrpc_req_finished(req);
1780         return rc;
1781 }
1782
1783 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1784 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
1785 {
1786         int res = 0;
1787         struct inode *inode = de->d_inode;
1788         struct ll_inode_info *lli = ll_i2info(inode);
1789
1790         res = ll_inode_revalidate_it(de);
1791         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
1792
1793         if (res)
1794                 return res;
1795
1796         stat->ino = inode->i_ino;
1797         stat->mode = inode->i_mode;
1798         stat->nlink = inode->i_nlink;
1799         stat->uid = inode->i_uid;
1800         stat->gid = inode->i_gid;
1801         stat->atime = inode->i_atime;
1802         stat->mtime = inode->i_mtime;
1803         stat->ctime = inode->i_ctime;
1804         stat->blksize = inode->i_blksize;
1805
1806         down(&lli->lli_size_sem);
1807         stat->size = inode->i_size;
1808         stat->blocks = inode->i_blocks;
1809         up(&lli->lli_size_sem);
1810         
1811         stat->rdev = kdev_t_to_nr(inode->i_rdev);
1812         stat->dev = id_group(&ll_i2info(inode)->lli_id);
1813         return 0;
1814 }
1815 #endif
1816
1817 static
1818 int ll_setxattr_internal(struct inode *inode, const char *name,
1819                          const void *value, size_t size, int flags, 
1820                          __u64 valid)
1821 {
1822         struct ll_sb_info *sbi = ll_i2sbi(inode);
1823         struct ptlrpc_request *request = NULL;
1824         struct mdc_op_data op_data;
1825         struct iattr attr;
1826         int rc = 0;
1827         ENTRY;
1828
1829         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu\n", inode->i_ino);
1830         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_SETXATTR);
1831
1832         memset(&attr, 0x0, sizeof(attr));
1833         attr.ia_valid |= valid;
1834         attr.ia_attr_flags = flags;
1835
1836         ll_prepare_mdc_data(&op_data, inode, NULL, NULL, 0, 0);
1837
1838         rc = md_setattr(sbi->ll_md_exp, &op_data, &attr,
1839                         (void*) name, strnlen(name, XATTR_NAME_MAX)+1, 
1840                         (void*) value, size, &request);
1841         if (rc) {
1842                 CERROR("md_setattr fails: rc = %d\n", rc);
1843                 GOTO(out, rc);
1844         }
1845
1846  out:
1847         ptlrpc_req_finished(request);
1848         RETURN(rc);
1849 }
1850
1851 int ll_setxattr(struct dentry *dentry, const char *name, const void *value,
1852                 size_t size, int flags)
1853 {
1854         int rc, error;
1855         struct posix_acl *acl;
1856         struct ll_inode_info *lli;
1857         ENTRY;
1858
1859         rc = ll_setxattr_internal(dentry->d_inode, name, value, size, 
1860                                   flags, ATTR_EA);
1861         
1862         /* update inode's acl info */
1863         if (rc == 0 && strcmp(name, XATTR_NAME_ACL_ACCESS) == 0) {
1864                 if (value) {
1865                         acl = posix_acl_from_xattr(value, size);
1866                         if (IS_ERR(acl)) {
1867                                 CERROR("convert from xattr to acl error: %ld",
1868                                         PTR_ERR(acl));
1869                                 GOTO(out, rc);
1870                         } else if (acl) {
1871                                 error = posix_acl_valid(acl);
1872                                 if (error) {
1873                                         CERROR("acl valid error: %d", error);
1874                                         posix_acl_release(acl);
1875                                         GOTO(out, rc);
1876                                 }
1877                         }
1878                 } else {
1879                         acl = NULL;
1880                 }
1881                                         
1882                 lli = ll_i2info(dentry->d_inode);
1883                 spin_lock(&lli->lli_lock);
1884                 if (lli->lli_acl_access != NULL)
1885                         posix_acl_release(lli->lli_acl_access);
1886                 lli->lli_acl_access = acl;
1887                 spin_unlock(&lli->lli_lock);
1888         }
1889         EXIT;
1890 out:
1891         return(rc);
1892 }
1893
1894 int ll_removexattr(struct dentry *dentry, const char *name)
1895 {
1896         return ll_setxattr_internal(dentry->d_inode, name, NULL, 0, 0,
1897                                     ATTR_EA_RM);
1898 }
1899
1900 static
1901 int ll_getxattr_internal(struct inode *inode, const char *name, int namelen,
1902                          void *value, size_t size, __u64 valid)
1903 {
1904         struct ptlrpc_request *request = NULL;
1905         struct ll_sb_info *sbi = ll_i2sbi(inode);
1906         struct lustre_id id;
1907         struct mds_body *body;
1908         void *ea_data; 
1909         int rc, ea_size;
1910         ENTRY;
1911
1912         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETXATTR);
1913
1914         ll_inode2id(&id, inode);
1915         rc = md_getattr(sbi->ll_md_exp, &id, valid, name, namelen,
1916                          size, &request);
1917         if (rc) {
1918                 if (rc != -ENODATA && rc != -EOPNOTSUPP)
1919                         CERROR("md_getattr fails: rc = %d\n", rc);
1920                 GOTO(out, rc);
1921         }
1922
1923         body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
1924         LASSERT(body != NULL);
1925         LASSERT_REPSWABBED(request, 0);
1926
1927         ea_size = body->eadatasize;
1928         LASSERT(ea_size <= request->rq_repmsg->buflens[0]);
1929
1930         if (size == 0) 
1931                 GOTO(out, rc = ea_size);
1932
1933         ea_data = lustre_msg_buf(request->rq_repmsg, 1, ea_size);
1934         LASSERT(ea_data != NULL);
1935         LASSERT_REPSWABBED(request, 1);
1936
1937         if (value)
1938                 memcpy(value, ea_data, ea_size);
1939         rc = ea_size;
1940  out:
1941         ptlrpc_req_finished(request);
1942         RETURN(rc);
1943 }
1944
1945 int ll_getxattr(struct dentry *dentry, const char *name, void *value,
1946                 size_t size)
1947 {
1948         return ll_getxattr_internal(dentry->d_inode, name, strlen(name) + 1, 
1949                                     value, size, OBD_MD_FLEA);
1950 }
1951
1952 int ll_listxattr(struct dentry *dentry, char *list, size_t size)
1953 {
1954         return ll_getxattr_internal(dentry->d_inode, NULL, 0, list, size,
1955                                     OBD_MD_FLEALIST);
1956 }
1957
1958 /*
1959  * XXX We could choose not to check DLM lock. Leave the decision
1960  * to remote acl handling.
1961  */
1962 static int
1963 lustre_check_acl(struct inode *inode, int mask)
1964 {
1965         struct lookup_intent it = { .it_op = IT_GETATTR };
1966         struct dentry de = { .d_inode = inode };
1967         struct ll_sb_info *sbi;
1968         struct lustre_id id;
1969         struct ptlrpc_request *req = NULL;
1970         struct ll_inode_info *lli = ll_i2info(inode);
1971         struct posix_acl *acl;
1972         int rc;
1973         ENTRY;
1974
1975         sbi = ll_i2sbi(inode);
1976         ll_inode2id(&id, inode);
1977
1978         if (ll_intent_alloc(&it))
1979                 return -EACCES;
1980
1981         rc = md_intent_lock(sbi->ll_md_exp, &id, NULL, 0, NULL, 0, &id,
1982                             &it, 0, &req, ll_mdc_blocking_ast);
1983         if (rc < 0) {
1984                 ll_intent_free(&it);
1985                 GOTO(out, rc);
1986         }
1987
1988         rc = revalidate_it_finish(req, 1, &it, &de);
1989         if (rc) {
1990                 ll_intent_release(&it);
1991                 GOTO(out, rc);
1992         }
1993
1994         ll_lookup_finish_locks(&it, &de);
1995         ll_intent_free(&it);
1996
1997         spin_lock(&lli->lli_lock);
1998         acl = posix_acl_dup(ll_i2info(inode)->lli_acl_access);
1999         spin_unlock(&lli->lli_lock);
2000
2001         if (!acl)
2002                 GOTO(out, rc = -EAGAIN);
2003
2004         rc = posix_acl_permission(inode, acl, mask);
2005         posix_acl_release(acl);
2006
2007 out:
2008         if (req)
2009                 ptlrpc_req_finished(req);
2010
2011         RETURN(rc);
2012 }
2013
2014 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2015 {
2016         return generic_permission(inode, mask, lustre_check_acl);
2017 }
2018
2019 struct file_operations ll_file_operations = {
2020         .read           = ll_file_read,
2021         .write          = ll_file_write,
2022         .ioctl          = ll_file_ioctl,
2023         .open           = ll_file_open,
2024         .release        = ll_file_release,
2025         .mmap           = ll_file_mmap,
2026         .llseek         = ll_file_seek,
2027 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2028         .sendfile       = generic_file_sendfile,
2029 #endif
2030         .fsync          = ll_fsync,
2031         .lock           = ll_file_flock
2032 };
2033
2034 struct inode_operations ll_file_inode_operations = {
2035         .setattr        = ll_setattr,
2036         .truncate       = ll_truncate,
2037 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2038         .getattr        = ll_getattr,
2039 #else
2040         .revalidate_it  = ll_inode_revalidate_it,
2041 #endif
2042         .setxattr       = ll_setxattr,
2043         .getxattr       = ll_getxattr,
2044         .listxattr      = ll_listxattr,
2045         .removexattr    = ll_removexattr,
2046         .permission     = ll_inode_permission,
2047 };
2048