Whamcloud - gitweb
Branch Head
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
32 #include <linux/lustre_compat25.h>
33 #endif
34 #include "llite_internal.h"
35
36 /* also used by llite/special.c:ll_special_open() */
37 struct ll_file_data *ll_file_data_get(void)
38 {
39         struct ll_file_data *fd;
40
41         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
42         return fd;
43 }
44
45 static void ll_file_data_put(struct ll_file_data *fd)
46 {
47         if (fd != NULL)
48                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
49 }
50
51 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
52                           struct lustre_handle *fh)
53 {
54         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
55         op_data->op_attr.ia_mode = inode->i_mode;
56         op_data->op_attr.ia_atime = inode->i_atime;
57         op_data->op_attr.ia_mtime = inode->i_mtime;
58         op_data->op_attr.ia_ctime = inode->i_ctime;
59         op_data->op_attr.ia_size = inode->i_size;
60         op_data->op_attr_blocks = inode->i_blocks;
61         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
62         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
63         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
64         op_data->op_capa1 = ll_mdscapa_get(inode);
65 }
66
67 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
68                              struct obd_client_handle *och)
69 {
70         ENTRY;
71
72         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
73                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
74
75         if (!(och->och_flags & FMODE_WRITE))
76                 goto out;
77
78         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
79             !S_ISREG(inode->i_mode))
80                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
81         else
82                 ll_epoch_close(inode, op_data, &och, 0);
83
84 out:
85         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
86         EXIT;
87 }
88
89 static int ll_close_inode_openhandle(struct obd_export *md_exp,
90                                      struct inode *inode,
91                                      struct obd_client_handle *och)
92 {
93         struct obd_export *exp = ll_i2mdexp(inode);
94         struct md_op_data *op_data;
95         struct ptlrpc_request *req = NULL;
96         struct obd_device *obd = class_exp2obd(exp);
97         int epoch_close = 1;
98         int rc;
99         ENTRY;
100
101         if (obd == NULL) {
102                 /*
103                  * XXX: in case of LMV, is this correct to access
104                  * ->exp_handle?
105                  */
106                 CERROR("Invalid MDC connection handle "LPX64"\n",
107                        ll_i2mdexp(inode)->exp_handle.h_cookie);
108                 GOTO(out, rc = 0);
109         }
110
111         /*
112          * here we check if this is forced umount. If so this is called on
113          * canceling "open lock" and we do not call md_close() in this case, as
114          * it will not be successful, as import is already deactivated.
115          */
116         if (obd->obd_force)
117                 GOTO(out, rc = 0);
118
119         OBD_ALLOC_PTR(op_data);
120         if (op_data == NULL)
121                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
122
123         ll_prepare_close(inode, op_data, och);
124         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
125         rc = md_close(md_exp, op_data, och, &req);
126
127         if (rc == -EAGAIN) {
128                 /* This close must have the epoch closed. */
129                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
130                 LASSERT(epoch_close);
131                 /* MDS has instructed us to obtain Size-on-MDS attribute from
132                  * OSTs and send setattr to back to MDS. */
133                 rc = ll_sizeonmds_update(inode, &och->och_fh,
134                                          op_data->op_ioepoch);
135                 if (rc) {
136                         CERROR("inode %lu mdc Size-on-MDS update failed: "
137                                "rc = %d\n", inode->i_ino, rc);
138                         rc = 0;
139                 }
140         } else if (rc) {
141                 CERROR("inode %lu mdc close failed: rc = %d\n",
142                        inode->i_ino, rc);
143         }
144         ll_finish_md_op_data(op_data);
145
146         if (rc == 0) {
147                 rc = ll_objects_destroy(req, inode);
148                 if (rc)
149                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
150                                inode->i_ino, rc);
151         }
152
153         ptlrpc_req_finished(req); /* This is close request */
154         EXIT;
155 out:
156       
157         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
158             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
159                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
160         } else {
161                 md_clear_open_replay_data(md_exp, och);
162                 /* Free @och if it is not waiting for DONE_WRITING. */
163                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
164                 OBD_FREE_PTR(och);
165         }
166         
167         return rc;
168 }
169
170 int ll_md_real_close(struct inode *inode, int flags)
171 {
172         struct ll_inode_info *lli = ll_i2info(inode);
173         struct obd_client_handle **och_p;
174         struct obd_client_handle *och;
175         __u64 *och_usecount;
176         int rc = 0;
177         ENTRY;
178
179         if (flags & FMODE_WRITE) {
180                 och_p = &lli->lli_mds_write_och;
181                 och_usecount = &lli->lli_open_fd_write_count;
182         } else if (flags & FMODE_EXEC) {
183                 och_p = &lli->lli_mds_exec_och;
184                 och_usecount = &lli->lli_open_fd_exec_count;
185         } else {
186                 LASSERT(flags & FMODE_READ);
187                 och_p = &lli->lli_mds_read_och;
188                 och_usecount = &lli->lli_open_fd_read_count;
189         }
190
191         down(&lli->lli_och_sem);
192         if (*och_usecount) { /* There are still users of this handle, so
193                                 skip freeing it. */
194                 up(&lli->lli_och_sem);
195                 RETURN(0);
196         }
197         och=*och_p;
198         *och_p = NULL;
199         up(&lli->lli_och_sem);
200
201         if (och) { /* There might be a race and somebody have freed this och
202                       already */
203                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
204                                                inode, och);
205         }
206
207         RETURN(rc);
208 }
209
210 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
211                 struct file *file)
212 {
213         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
214         struct ll_inode_info *lli = ll_i2info(inode);
215         int rc = 0;
216         ENTRY;
217
218         /* clear group lock, if present */
219         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
220                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
221                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
222                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
223                                       &fd->fd_cwlockh);
224         }
225
226         /* Let's see if we have good enough OPEN lock on the file and if
227            we can skip talking to MDS */
228         if (file->f_dentry->d_inode) { /* Can this ever be false? */
229                 int lockmode;
230                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
231                 struct lustre_handle lockh;
232                 struct inode *inode = file->f_dentry->d_inode;
233                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
234
235                 down(&lli->lli_och_sem);
236                 if (fd->fd_omode & FMODE_WRITE) {
237                         lockmode = LCK_CW;
238                         LASSERT(lli->lli_open_fd_write_count);
239                         lli->lli_open_fd_write_count--;
240                 } else if (fd->fd_omode & FMODE_EXEC) {
241                         lockmode = LCK_PR;
242                         LASSERT(lli->lli_open_fd_exec_count);
243                         lli->lli_open_fd_exec_count--;
244                 } else {
245                         lockmode = LCK_CR;
246                         LASSERT(lli->lli_open_fd_read_count);
247                         lli->lli_open_fd_read_count--;
248                 }
249                 up(&lli->lli_och_sem);
250
251                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
252                                    LDLM_IBITS, &policy, lockmode,
253                                    &lockh)) {
254                         rc = ll_md_real_close(file->f_dentry->d_inode,
255                                               fd->fd_omode);
256                 }
257         } else {
258                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
259                        file, file->f_dentry, file->f_dentry->d_name.name);
260         }
261
262         LUSTRE_FPRIVATE(file) = NULL;
263         ll_file_data_put(fd);
264         ll_capa_close(inode);
265
266         RETURN(rc);
267 }
268
269 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
270
271 /* While this returns an error code, fput() the caller does not, so we need
272  * to make every effort to clean up all of our state here.  Also, applications
273  * rarely check close errors and even if an error is returned they will not
274  * re-try the close call.
275  */
276 int ll_file_release(struct inode *inode, struct file *file)
277 {
278         struct ll_file_data *fd;
279         struct ll_sb_info *sbi = ll_i2sbi(inode);
280         struct ll_inode_info *lli = ll_i2info(inode);
281         struct lov_stripe_md *lsm = lli->lli_smd;
282         int rc;
283
284         ENTRY;
285         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
286                inode->i_generation, inode);
287
288         /* don't do anything for / */
289         if (inode->i_sb->s_root == file->f_dentry)
290                 RETURN(0);
291
292         ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
293         fd = LUSTRE_FPRIVATE(file);
294         LASSERT(fd != NULL);
295
296         /* don't do anything for / */
297         if (inode->i_sb->s_root == file->f_dentry) {
298                 LUSTRE_FPRIVATE(file) = NULL;
299                 ll_file_data_put(fd);
300                 RETURN(0);
301         }
302         
303         if (lsm)
304                 lov_test_and_clear_async_rc(lsm);
305         lli->lli_async_rc = 0;
306
307         rc = ll_md_close(sbi->ll_md_exp, inode, file);
308         RETURN(rc);
309 }
310
311 static int ll_intent_file_open(struct file *file, void *lmm,
312                                int lmmsize, struct lookup_intent *itp)
313 {
314         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
315         struct dentry *parent = file->f_dentry->d_parent;
316         const char *name = file->f_dentry->d_name.name;
317         const int len = file->f_dentry->d_name.len;
318         struct md_op_data *op_data;
319         struct ptlrpc_request *req;
320         int rc;
321
322         if (!parent)
323                 RETURN(-ENOENT);
324
325         /* Usually we come here only for NFSD, and we want open lock.
326            But we can also get here with pre 2.6.15 patchless kernels, and in
327            that case that lock is also ok */
328         /* We can also get here if there was cached open handle in revalidate_it
329          * but it disappeared while we were getting from there to ll_file_open.
330          * But this means this file was closed and immediatelly opened which
331          * makes a good candidate for using OPEN lock */
332         /* If lmmsize & lmm are not 0, we are just setting stripe info
333          * parameters. No need for the open lock */
334         if (!lmm && !lmmsize)
335                 itp->it_flags |= MDS_OPEN_LOCK;
336
337         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
338                                       file->f_dentry->d_inode, name, len,
339                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
340         if (IS_ERR(op_data))
341                 RETURN(PTR_ERR(op_data));
342
343         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
344                             0 /*unused */, &req, ll_md_blocking_ast, 0);
345         ll_finish_md_op_data(op_data);
346         if (rc == -ESTALE) {
347                 /* reason for keep own exit path - don`t flood log
348                 * with messages with -ESTALE errors.
349                 */
350                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
351                      it_open_error(DISP_OPEN_OPEN, itp))
352                         GOTO(out, rc);
353                 ll_release_openhandle(file->f_dentry, itp);
354                 GOTO(out_stale, rc);
355         }
356
357         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
358                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
359                 CERROR("lock enqueue: err: %d\n", rc);
360                 GOTO(out, rc);
361         }
362
363         if (itp->d.lustre.it_lock_mode)
364                 md_set_lock_data(sbi->ll_md_exp,
365                                  &itp->d.lustre.it_lock_handle, 
366                                  file->f_dentry->d_inode);
367
368         rc = ll_prep_inode(&file->f_dentry->d_inode, req, DLM_REPLY_REC_OFF,
369                            NULL);
370 out:
371         ptlrpc_req_finished(itp->d.lustre.it_data);
372
373 out_stale:
374         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
375         ll_intent_drop_lock(itp);
376
377         RETURN(rc);
378 }
379
380 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
381                        struct lookup_intent *it, struct obd_client_handle *och)
382 {
383         struct ptlrpc_request *req = it->d.lustre.it_data;
384         struct mdt_body *body;
385
386         LASSERT(och);
387
388         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
389         LASSERT(body != NULL);                      /* reply already checked out */
390         LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* and swabbed in md_enqueue */
391
392         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
393         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
394         och->och_fid = lli->lli_fid;
395         och->och_flags = it->it_flags;
396         lli->lli_ioepoch = body->ioepoch;
397
398         return md_set_open_replay_data(md_exp, och, req);
399 }
400
401 int ll_local_open(struct file *file, struct lookup_intent *it,
402                   struct ll_file_data *fd, struct obd_client_handle *och)
403 {
404         struct inode *inode = file->f_dentry->d_inode;
405         struct ll_inode_info *lli = ll_i2info(inode);
406         ENTRY;
407
408         LASSERT(!LUSTRE_FPRIVATE(file));
409
410         LASSERT(fd != NULL);
411
412         if (och) {
413                 struct ptlrpc_request *req = it->d.lustre.it_data;
414                 struct mdt_body *body;
415                 int rc;
416
417                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
418                 if (rc)
419                         RETURN(rc);
420
421                 body = lustre_msg_buf(req->rq_repmsg,
422                                       DLM_REPLY_REC_OFF, sizeof(*body));
423
424                 if ((it->it_flags & FMODE_WRITE) &&
425                     (body->valid & OBD_MD_FLSIZE))
426                 {
427                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
428                                lli->lli_ioepoch, PFID(&lli->lli_fid));
429                 }
430         }
431
432         LUSTRE_FPRIVATE(file) = fd;
433         ll_readahead_init(inode, &fd->fd_ras);
434         fd->fd_omode = it->it_flags;
435         RETURN(0);
436 }
437
438 /* Open a file, and (for the very first open) create objects on the OSTs at
439  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
440  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
441  * lli_open_sem to ensure no other process will create objects, send the
442  * stripe MD to the MDS, or try to destroy the objects if that fails.
443  *
444  * If we already have the stripe MD locally then we don't request it in
445  * md_open(), by passing a lmm_size = 0.
446  *
447  * It is up to the application to ensure no other processes open this file
448  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
449  * used.  We might be able to avoid races of that sort by getting lli_open_sem
450  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
451  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
452  */
453 int ll_file_open(struct inode *inode, struct file *file)
454 {
455         struct ll_inode_info *lli = ll_i2info(inode);
456         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
457                                           .it_flags = file->f_flags };
458         struct lov_stripe_md *lsm;
459         struct ptlrpc_request *req = NULL;
460         struct obd_client_handle **och_p;
461         __u64 *och_usecount;
462         struct ll_file_data *fd;
463         int rc = 0;
464         ENTRY;
465
466         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
467                inode->i_generation, inode, file->f_flags);
468
469         /* don't do anything for / */
470         if (inode->i_sb->s_root == file->f_dentry)
471                 RETURN(0);
472
473 #ifdef LUSTRE_KERNEL_VERSION
474         it = file->f_it;
475 #else
476         it = file->private_data; /* XXX: compat macro */
477         file->private_data = NULL; /* prevent ll_local_open assertion */
478 #endif
479
480         fd = ll_file_data_get();
481         if (fd == NULL)
482                 RETURN(-ENOMEM);
483
484         /* don't do anything for / */
485         if (inode->i_sb->s_root == file->f_dentry) {
486                 LUSTRE_FPRIVATE(file) = fd;
487                 RETURN(0);
488         }
489
490         if (!it || !it->d.lustre.it_disposition) {
491                 /* Convert f_flags into access mode. We cannot use file->f_mode,
492                  * because everything but O_ACCMODE mask was stripped from
493                  * there */
494                 if ((oit.it_flags + 1) & O_ACCMODE)
495                         oit.it_flags++;
496                 if (file->f_flags & O_TRUNC)
497                         oit.it_flags |= FMODE_WRITE;
498
499                 /* kernel only call f_op->open in dentry_open.  filp_open calls
500                  * dentry_open after call to open_namei that checks permissions.
501                  * Only nfsd_open call dentry_open directly without checking
502                  * permissions and because of that this code below is safe. */
503                 if (oit.it_flags & FMODE_WRITE)
504                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
505
506                 /* We do not want O_EXCL here, presumably we opened the file
507                  * already? XXX - NFS implications? */
508                 oit.it_flags &= ~O_EXCL;
509
510                 it = &oit;
511         }
512
513         /* Let's see if we have file open on MDS already. */
514         if (it->it_flags & FMODE_WRITE) {
515                 och_p = &lli->lli_mds_write_och;
516                 och_usecount = &lli->lli_open_fd_write_count;
517         } else if (it->it_flags & FMODE_EXEC) {
518                 och_p = &lli->lli_mds_exec_och;
519                 och_usecount = &lli->lli_open_fd_exec_count;
520          } else {
521                 och_p = &lli->lli_mds_read_och;
522                 och_usecount = &lli->lli_open_fd_read_count;
523         }
524         
525         down(&lli->lli_och_sem);
526         if (*och_p) { /* Open handle is present */
527                 if (it_disposition(it, DISP_OPEN_OPEN)) {
528                         /* Well, there's extra open request that we do not need,
529                            let's close it somehow. This will decref request. */
530                         rc = it_open_error(DISP_OPEN_OPEN, it);
531                         if (rc) {
532                                 ll_file_data_put(fd);
533                                 GOTO(out_och_free, rc);
534                         }       
535                         ll_release_openhandle(file->f_dentry, it);
536                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
537                                              LPROC_LL_OPEN);
538                 }
539                 (*och_usecount)++;
540
541                 rc = ll_local_open(file, it, fd, NULL);
542                 if (rc) {
543                         up(&lli->lli_och_sem);
544                         ll_file_data_put(fd);
545                         RETURN(rc);
546                 }
547         } else {
548                 LASSERT(*och_usecount == 0);
549                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
550                 if (!*och_p) {
551                         ll_file_data_put(fd);
552                         GOTO(out_och_free, rc = -ENOMEM);
553                 }
554                 (*och_usecount)++;
555                 if (!it->d.lustre.it_disposition) {
556                         it->it_flags |= O_CHECK_STALE;
557                         rc = ll_intent_file_open(file, NULL, 0, it);
558                         it->it_flags &= ~O_CHECK_STALE;
559                         if (rc) {
560                                 ll_file_data_put(fd);
561                                 GOTO(out_och_free, rc);
562                         }
563
564                         /* Got some error? Release the request */
565                         if (it->d.lustre.it_status < 0) {
566                                 req = it->d.lustre.it_data;
567                                 ptlrpc_req_finished(req);
568                         }
569                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
570                                          &it->d.lustre.it_lock_handle,
571                                          file->f_dentry->d_inode);
572                 }
573                 req = it->d.lustre.it_data;
574
575                 /* md_intent_lock() didn't get a request ref if there was an
576                  * open error, so don't do cleanup on the request here
577                  * (bug 3430) */
578                 /* XXX (green): Should not we bail out on any error here, not
579                  * just open error? */
580                 rc = it_open_error(DISP_OPEN_OPEN, it);
581                 if (rc) {
582                         ll_file_data_put(fd);
583                         GOTO(out_och_free, rc);
584                 }
585
586                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
587                 rc = ll_local_open(file, it, fd, *och_p);
588                 if (rc) {
589                         up(&lli->lli_och_sem);
590                         ll_file_data_put(fd);
591                         GOTO(out_och_free, rc);
592                 }
593         }
594         up(&lli->lli_och_sem);
595
596         /* Must do this outside lli_och_sem lock to prevent deadlock where
597            different kind of OPEN lock for this same inode gets cancelled
598            by ldlm_cancel_lru */
599         if (!S_ISREG(inode->i_mode))
600                 GOTO(out, rc);
601
602         ll_capa_open(inode);
603
604         lsm = lli->lli_smd;
605         if (lsm == NULL) {
606                 if (file->f_flags & O_LOV_DELAY_CREATE ||
607                     !(file->f_mode & FMODE_WRITE)) {
608                         CDEBUG(D_INODE, "object creation was delayed\n");
609                         GOTO(out, rc);
610                 }
611         }
612         file->f_flags &= ~O_LOV_DELAY_CREATE;
613         GOTO(out, rc);
614 out:
615         ptlrpc_req_finished(req);
616         if (req)
617                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
618 out_och_free:
619         if (rc) {
620                 if (*och_p) {
621                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
622                         *och_p = NULL; /* OBD_FREE writes some magic there */
623                         (*och_usecount)--;
624                 }
625                 up(&lli->lli_och_sem);
626         }
627
628         return rc;
629 }
630
631 /* Fills the obdo with the attributes for the inode defined by lsm */
632 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
633 {
634         struct ptlrpc_request_set *set;
635         struct ll_inode_info *lli = ll_i2info(inode);
636         struct lov_stripe_md *lsm = lli->lli_smd;
637
638         struct obd_info oinfo = { { { 0 } } };
639         int rc;
640         ENTRY;
641
642         LASSERT(lsm != NULL);
643
644         oinfo.oi_md = lsm;
645         oinfo.oi_oa = obdo;
646         oinfo.oi_oa->o_id = lsm->lsm_object_id;
647         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
648         oinfo.oi_oa->o_mode = S_IFREG;
649         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
650                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
651                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
652                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
653                                OBD_MD_FLGROUP;
654         oinfo.oi_capa = ll_mdscapa_get(inode);
655
656         set = ptlrpc_prep_set();
657         if (set == NULL) {
658                 CERROR("can't allocate ptlrpc set\n");
659                 rc = -ENOMEM;
660         } else {
661                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
662                 if (rc == 0)
663                         rc = ptlrpc_set_wait(set);
664                 ptlrpc_set_destroy(set);
665         }
666         capa_put(oinfo.oi_capa);
667         if (rc)
668                 RETURN(rc);
669
670         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
671                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
672                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
673
674         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
675         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
676                lli->lli_smd->lsm_object_id, inode->i_size, inode->i_blocks,
677                inode->i_blksize);
678         RETURN(0);
679 }
680
681 static inline void ll_remove_suid(struct inode *inode)
682 {
683         unsigned int mode;
684
685         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
686         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
687
688         /* was any of the uid bits set? */
689         mode &= inode->i_mode;
690         if (mode && !capable(CAP_FSETID)) {
691                 inode->i_mode &= ~mode;
692                 // XXX careful here - we cannot change the size
693         }
694 }
695
696 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
697 {
698         struct ll_inode_info *lli = ll_i2info(inode);
699         struct lov_stripe_md *lsm = lli->lli_smd;
700         struct obd_export *exp = ll_i2dtexp(inode);
701         struct {
702                 char name[16];
703                 struct ldlm_lock *lock;
704                 struct lov_stripe_md *lsm;
705         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
706         __u32 stripe, vallen = sizeof(stripe);
707         int rc;
708         ENTRY;
709
710         if (lsm->lsm_stripe_count == 1)
711                 GOTO(check, stripe = 0);
712
713         /* get our offset in the lov */
714         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
715         if (rc != 0) {
716                 CERROR("obd_get_info: rc = %d\n", rc);
717                 RETURN(rc);
718         }
719         LASSERT(stripe < lsm->lsm_stripe_count);
720
721 check:
722         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
723             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
724                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
725                            lsm->lsm_oinfo[stripe]->loi_id,
726                            lsm->lsm_oinfo[stripe]->loi_gr);
727                 RETURN(-ELDLM_NO_LOCK_DATA);
728         }
729
730         RETURN(stripe);
731 }
732
733 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
734  * we get a lock cancellation for each stripe, so we have to map the obd's
735  * region back onto the stripes in the file that it held.
736  *
737  * No one can dirty the extent until we've finished our work and they can
738  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
739  * but other kernel actors could have pages locked.
740  *
741  * Called with the DLM lock held. */
742 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
743                               struct ldlm_lock *lock, __u32 stripe)
744 {
745         ldlm_policy_data_t tmpex;
746         unsigned long start, end, count, skip, i, j;
747         struct page *page;
748         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
749         struct lustre_handle lockh;
750         ENTRY;
751
752         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
753         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
754                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
755                inode->i_size);
756
757         /* our locks are page granular thanks to osc_enqueue, we invalidate the
758          * whole page. */
759         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
760             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
761                 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
762                            CFS_PAGE_SIZE);
763         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
764         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
765
766         count = ~0;
767         skip = 0;
768         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
769         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
770         if (lsm->lsm_stripe_count > 1) {
771                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
772                 skip = (lsm->lsm_stripe_count - 1) * count;
773                 start += start/count * skip + stripe * count;
774                 if (end != ~0)
775                         end += end/count * skip + stripe * count;
776         }
777         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
778                 end = ~0;
779
780         i = inode->i_size ? (__u64)(inode->i_size - 1) >> CFS_PAGE_SHIFT : 0;
781         if (i < end)
782                 end = i;
783
784         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
785                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
786                count, skip, end, discard ? " (DISCARDING)" : "");
787
788         /* walk through the vmas on the inode and tear down mmaped pages that
789          * intersect with the lock.  this stops immediately if there are no
790          * mmap()ed regions of the file.  This is not efficient at all and
791          * should be short lived. We'll associate mmap()ed pages with the lock
792          * and will be able to find them directly */
793         for (i = start; i <= end; i += (j + skip)) {
794                 j = min(count - (i % count), end - i + 1);
795                 LASSERT(j > 0);
796                 LASSERT(inode->i_mapping);
797                 if (ll_teardown_mmaps(inode->i_mapping,
798                                       (__u64)i << CFS_PAGE_SHIFT,
799                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
800                         break;
801         }
802
803         /* this is the simplistic implementation of page eviction at
804          * cancelation.  It is careful to get races with other page
805          * lockers handled correctly.  fixes from bug 20 will make it
806          * more efficient by associating locks with pages and with
807          * batching writeback under the lock explicitly. */
808         for (i = start, j = start % count; i <= end;
809              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
810                 if (j == count) {
811                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
812                         i += skip;
813                         j = 0;
814                         if (i > end)
815                                 break;
816                 }
817                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
818                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
819                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
820                          start, i, end);
821
822                 if (!mapping_has_pages(inode->i_mapping)) {
823                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
824                         break;
825                 }
826
827                 cond_resched();
828
829                 page = find_get_page(inode->i_mapping, i);
830                 if (page == NULL)
831                         continue;
832                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
833                                i, tmpex.l_extent.start);
834                 lock_page(page);
835
836                 /* page->mapping to check with racing against teardown */
837                 if (!discard && clear_page_dirty_for_io(page)) {
838                         rc = ll_call_writepage(inode, page);
839                         if (rc != 0)
840                                 CERROR("writepage of page %p failed: %d\n",
841                                        page, rc);
842                         /* either waiting for io to complete or reacquiring
843                          * the lock that the failed writepage released */
844                         lock_page(page);
845                 }
846
847                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
848                 /* check to see if another DLM lock covers this page b=2765 */
849                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
850                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
851                                       LDLM_FL_TEST_LOCK,
852                                       &lock->l_resource->lr_name, LDLM_EXTENT,
853                                       &tmpex, LCK_PR | LCK_PW, &lockh);
854
855                 if (rc2 <= 0 && page->mapping != NULL) {
856                         struct ll_async_page *llap = llap_cast_private(page);
857                         /* checking again to account for writeback's
858                          * lock_page() */
859                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
860                         if (llap)
861                                 ll_ra_accounting(llap, inode->i_mapping);
862                         ll_truncate_complete_page(page);
863                 }
864                 unlock_page(page);
865                 page_cache_release(page);
866         }
867         LASSERTF(tmpex.l_extent.start <=
868                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
869                   lock->l_policy_data.l_extent.end + 1),
870                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
871                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
872                  start, i, end);
873         EXIT;
874 }
875
876 static int ll_extent_lock_callback(struct ldlm_lock *lock,
877                                    struct ldlm_lock_desc *new, void *data,
878                                    int flag)
879 {
880         struct lustre_handle lockh = { 0 };
881         int rc;
882         ENTRY;
883
884         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
885                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
886                 LBUG();
887         }
888
889         switch (flag) {
890         case LDLM_CB_BLOCKING:
891                 ldlm_lock2handle(lock, &lockh);
892                 rc = ldlm_cli_cancel(&lockh);
893                 if (rc != ELDLM_OK)
894                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
895                 break;
896         case LDLM_CB_CANCELING: {
897                 struct inode *inode;
898                 struct ll_inode_info *lli;
899                 struct lov_stripe_md *lsm;
900                 int stripe;
901                 __u64 kms;
902
903                 /* This lock wasn't granted, don't try to evict pages */
904                 if (lock->l_req_mode != lock->l_granted_mode)
905                         RETURN(0);
906
907                 inode = ll_inode_from_lock(lock);
908                 if (inode == NULL)
909                         RETURN(0);
910                 lli = ll_i2info(inode);
911                 if (lli == NULL)
912                         goto iput;
913                 if (lli->lli_smd == NULL)
914                         goto iput;
915                 lsm = lli->lli_smd;
916
917                 stripe = ll_lock_to_stripe_offset(inode, lock);
918                 if (stripe < 0)
919                         goto iput;
920
921                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
922
923                 lov_stripe_lock(lsm);
924                 lock_res_and_lock(lock);
925                 kms = ldlm_extent_shift_kms(lock,
926                                             lsm->lsm_oinfo[stripe]->loi_kms);
927
928                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
929                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
930                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
931                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
932                 unlock_res_and_lock(lock);
933                 lov_stripe_unlock(lsm);
934         iput:
935                 iput(inode);
936                 break;
937         }
938         default:
939                 LBUG();
940         }
941
942         RETURN(0);
943 }
944
945 #if 0
946 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
947 {
948         /* XXX ALLOCATE - 160 bytes */
949         struct inode *inode = ll_inode_from_lock(lock);
950         struct ll_inode_info *lli = ll_i2info(inode);
951         struct lustre_handle lockh = { 0 };
952         struct ost_lvb *lvb;
953         int stripe;
954         ENTRY;
955
956         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
957                      LDLM_FL_BLOCK_CONV)) {
958                 LBUG(); /* not expecting any blocked async locks yet */
959                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
960                            "lock, returning");
961                 ldlm_lock_dump(D_OTHER, lock, 0);
962                 ldlm_reprocess_all(lock->l_resource);
963                 RETURN(0);
964         }
965
966         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
967
968         stripe = ll_lock_to_stripe_offset(inode, lock);
969         if (stripe < 0)
970                 goto iput;
971
972         if (lock->l_lvb_len) {
973                 struct lov_stripe_md *lsm = lli->lli_smd;
974                 __u64 kms;
975                 lvb = lock->l_lvb_data;
976                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
977
978                 lock_res_and_lock(lock);
979                 ll_inode_size_lock(inode, 1);
980                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
981                 kms = ldlm_extent_shift_kms(NULL, kms);
982                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
983                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
984                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
985                 lsm->lsm_oinfo[stripe].loi_kms = kms;
986                 ll_inode_size_unlock(inode, 1);
987                 unlock_res_and_lock(lock);
988         }
989
990 iput:
991         iput(inode);
992         wake_up(&lock->l_waitq);
993
994         ldlm_lock2handle(lock, &lockh);
995         ldlm_lock_decref(&lockh, LCK_PR);
996         RETURN(0);
997 }
998 #endif
999
1000 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
1001 {
1002         struct ptlrpc_request *req = reqp;
1003         struct inode *inode = ll_inode_from_lock(lock);
1004         struct ll_inode_info *lli;
1005         struct lov_stripe_md *lsm;
1006         struct ost_lvb *lvb;
1007         int rc, stripe;
1008         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
1009         ENTRY;
1010
1011         if (inode == NULL)
1012                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
1013         lli = ll_i2info(inode);
1014         if (lli == NULL)
1015                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1016         lsm = lli->lli_smd;
1017         if (lsm == NULL)
1018                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1019
1020         /* First, find out which stripe index this lock corresponds to. */
1021         stripe = ll_lock_to_stripe_offset(inode, lock);
1022         if (stripe < 0)
1023                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1024
1025         rc = lustre_pack_reply(req, 2, size, NULL);
1026         if (rc) {
1027                 CERROR("lustre_pack_reply: %d\n", rc);
1028                 GOTO(iput, rc);
1029         }
1030
1031         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
1032         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1033         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1034         lvb->lvb_atime = LTIME_S(inode->i_atime);
1035         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1036
1037         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1038                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1039                    inode->i_size, stripe, lvb->lvb_size, lvb->lvb_mtime,
1040                    lvb->lvb_atime, lvb->lvb_ctime);
1041  iput:
1042         iput(inode);
1043
1044  out:
1045         /* These errors are normal races, so we don't want to fill the console
1046          * with messages by calling ptlrpc_error() */
1047         if (rc == -ELDLM_NO_LOCK_DATA)
1048                 lustre_pack_reply(req, 1, NULL, NULL);
1049
1050         req->rq_status = rc;
1051         return rc;
1052 }
1053
1054 static void ll_merge_lvb(struct inode *inode)
1055 {
1056         struct ll_inode_info *lli = ll_i2info(inode);
1057         struct ll_sb_info *sbi = ll_i2sbi(inode);
1058         struct ost_lvb lvb;
1059         ENTRY;
1060
1061         ll_inode_size_lock(inode, 1);
1062         inode_init_lvb(inode, &lvb);
1063         obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1064         inode->i_size = lvb.lvb_size;
1065         inode->i_blocks = lvb.lvb_blocks;
1066         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1067         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1068         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1069         ll_inode_size_unlock(inode, 1);
1070         EXIT;
1071 }
1072
1073 int ll_local_size(struct inode *inode)
1074 {
1075         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1076         struct ll_inode_info *lli = ll_i2info(inode);
1077         struct ll_sb_info *sbi = ll_i2sbi(inode);
1078         struct lustre_handle lockh = { 0 };
1079         int flags = 0;
1080         int rc;
1081         ENTRY;
1082
1083         if (lli->lli_smd->lsm_stripe_count == 0)
1084                 RETURN(0);
1085
1086         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1087                        &policy, LCK_PR | LCK_PW, &flags, inode, &lockh);
1088         if (rc < 0)
1089                 RETURN(rc);
1090         else if (rc == 0)
1091                 RETURN(-ENODATA);
1092
1093         ll_merge_lvb(inode);
1094         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR | LCK_PW, &lockh);
1095         RETURN(0);
1096 }
1097
1098 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1099                      lstat_t *st)
1100 {
1101         struct lustre_handle lockh = { 0 };
1102         struct obd_enqueue_info einfo = { 0 };
1103         struct obd_info oinfo = { { { 0 } } };
1104         struct ost_lvb lvb;
1105         int rc;
1106
1107         ENTRY;
1108
1109         einfo.ei_type = LDLM_EXTENT;
1110         einfo.ei_mode = LCK_PR;
1111         einfo.ei_flags = LDLM_FL_HAS_INTENT;
1112         einfo.ei_cb_bl = ll_extent_lock_callback;
1113         einfo.ei_cb_cp = ldlm_completion_ast;
1114         einfo.ei_cb_gl = ll_glimpse_callback;
1115         einfo.ei_cbdata = NULL;
1116
1117         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1118         oinfo.oi_lockh = &lockh;
1119         oinfo.oi_md = lsm;
1120
1121         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1122         if (rc == -ENOENT)
1123                 RETURN(rc);
1124         if (rc != 0) {
1125                 CERROR("obd_enqueue returned rc %d, "
1126                        "returning -EIO\n", rc);
1127                 RETURN(rc > 0 ? -EIO : rc);
1128         }
1129
1130         lov_stripe_lock(lsm);
1131         memset(&lvb, 0, sizeof(lvb));
1132         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1133         st->st_size = lvb.lvb_size;
1134         st->st_blocks = lvb.lvb_blocks;
1135         st->st_mtime = lvb.lvb_mtime;
1136         st->st_atime = lvb.lvb_atime;
1137         st->st_ctime = lvb.lvb_ctime;
1138         lov_stripe_unlock(lsm);
1139
1140         RETURN(rc);
1141 }
1142
1143 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1144  * file (because it prefers KMS over RSS when larger) */
1145 int ll_glimpse_size(struct inode *inode, int ast_flags)
1146 {
1147         struct ll_inode_info *lli = ll_i2info(inode);
1148         struct ll_sb_info *sbi = ll_i2sbi(inode);
1149         struct lustre_handle lockh = { 0 };
1150         struct obd_enqueue_info einfo = { 0 };
1151         struct obd_info oinfo = { { { 0 } } };
1152         int rc;
1153         ENTRY;
1154
1155         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1156                 RETURN(0);
1157
1158         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1159
1160         if (!lli->lli_smd) {
1161                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1162                 RETURN(0);
1163         }
1164
1165         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1166          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1167          *       won't revoke any conflicting DLM locks held. Instead,
1168          *       ll_glimpse_callback() will be called on each client
1169          *       holding a DLM lock against this file, and resulting size
1170          *       will be returned for each stripe. DLM lock on [0, EOF] is
1171          *       acquired only if there were no conflicting locks. */
1172         einfo.ei_type = LDLM_EXTENT;
1173         einfo.ei_mode = LCK_PR;
1174         einfo.ei_flags = ast_flags | LDLM_FL_HAS_INTENT;
1175         einfo.ei_cb_bl = ll_extent_lock_callback;
1176         einfo.ei_cb_cp = ldlm_completion_ast;
1177         einfo.ei_cb_gl = ll_glimpse_callback;
1178         einfo.ei_cbdata = inode;
1179
1180         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1181         oinfo.oi_lockh = &lockh;
1182         oinfo.oi_md = lli->lli_smd;
1183
1184         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1185         if (rc == -ENOENT)
1186                 RETURN(rc);
1187         if (rc != 0) {
1188                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1189                 RETURN(rc > 0 ? -EIO : rc);
1190         }
1191
1192         ll_merge_lvb(inode);
1193
1194         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1195                inode->i_size, inode->i_blocks);
1196
1197         RETURN(rc);
1198 }
1199
1200 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1201                    struct lov_stripe_md *lsm, int mode,
1202                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1203                    int ast_flags)
1204 {
1205         struct ll_sb_info *sbi = ll_i2sbi(inode);
1206         struct ost_lvb lvb;
1207         struct obd_enqueue_info einfo = { 0 };
1208         struct obd_info oinfo = { { { 0 } } };
1209         int rc;
1210         ENTRY;
1211
1212         LASSERT(!lustre_handle_is_used(lockh));
1213         LASSERT(lsm != NULL);
1214
1215         /* don't drop the mmapped file to LRU */
1216         if (mapping_mapped(inode->i_mapping))
1217                 ast_flags |= LDLM_FL_NO_LRU;
1218
1219         /* XXX phil: can we do this?  won't it screw the file size up? */
1220         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1221             (sbi->ll_flags & LL_SBI_NOLCK))
1222                 RETURN(0);
1223
1224         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1225                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1226
1227         einfo.ei_type = LDLM_EXTENT;
1228         einfo.ei_mode = mode;
1229         einfo.ei_flags = ast_flags;
1230         einfo.ei_cb_bl = ll_extent_lock_callback;
1231         einfo.ei_cb_cp = ldlm_completion_ast;
1232         einfo.ei_cb_gl = ll_glimpse_callback;
1233         einfo.ei_cbdata = inode;
1234
1235         oinfo.oi_policy = *policy;
1236         oinfo.oi_lockh = lockh;
1237         oinfo.oi_md = lsm;
1238
1239         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo);
1240         *policy = oinfo.oi_policy;
1241         if (rc > 0)
1242                 rc = -EIO;
1243
1244         ll_inode_size_lock(inode, 1);
1245         inode_init_lvb(inode, &lvb);
1246         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1247
1248         if (policy->l_extent.start == 0 &&
1249             policy->l_extent.end == OBD_OBJECT_EOF) {
1250                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1251                  * the kms under both a DLM lock and the
1252                  * ll_inode_size_lock().  If we don't get the
1253                  * ll_inode_size_lock() here we can match the DLM lock and
1254                  * reset i_size from the kms before the truncating path has
1255                  * updated the kms.  generic_file_write can then trust the
1256                  * stale i_size when doing appending writes and effectively
1257                  * cancel the result of the truncate.  Getting the
1258                  * ll_inode_size_lock() after the enqueue maintains the DLM
1259                  * -> ll_inode_size_lock() acquiring order. */
1260                 inode->i_size = lvb.lvb_size;
1261                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n", 
1262                        inode->i_ino, inode->i_size);
1263         }
1264
1265         if (rc == 0) {
1266                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1267                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1268                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1269         }
1270         ll_inode_size_unlock(inode, 1);
1271
1272         RETURN(rc);
1273 }
1274
1275 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1276                      struct lov_stripe_md *lsm, int mode,
1277                      struct lustre_handle *lockh)
1278 {
1279         struct ll_sb_info *sbi = ll_i2sbi(inode);
1280         int rc;
1281         ENTRY;
1282
1283         /* XXX phil: can we do this?  won't it screw the file size up? */
1284         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1285             (sbi->ll_flags & LL_SBI_NOLCK))
1286                 RETURN(0);
1287
1288         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1289
1290         RETURN(rc);
1291 }
1292
1293 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1294                             loff_t *ppos)
1295 {
1296         struct inode *inode = file->f_dentry->d_inode;
1297         struct ll_inode_info *lli = ll_i2info(inode);
1298         struct lov_stripe_md *lsm = lli->lli_smd;
1299         struct ll_sb_info *sbi = ll_i2sbi(inode);
1300         struct ll_lock_tree tree;
1301         struct ll_lock_tree_node *node;
1302         struct ost_lvb lvb;
1303         struct ll_ra_read bead;
1304         int rc, ra = 0;
1305         loff_t end;
1306         ssize_t retval, chunk, sum = 0;
1307
1308         __u64 kms;
1309         ENTRY;
1310         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1311                inode->i_ino, inode->i_generation, inode, count, *ppos);
1312         /* "If nbyte is 0, read() will return 0 and have no other results."
1313          *                      -- Single Unix Spec */
1314         if (count == 0)
1315                 RETURN(0);
1316
1317         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1318
1319         if (!lsm) {
1320                 /* Read on file with no objects should return zero-filled
1321                  * buffers up to file size (we can get non-zero sizes with
1322                  * mknod + truncate, then opening file for read. This is a
1323                  * common pattern in NFS case, it seems). Bug 6243 */
1324                 int notzeroed;
1325                 /* Since there are no objects on OSTs, we have nothing to get
1326                  * lock on and so we are forced to access inode->i_size
1327                  * unguarded */
1328
1329                 /* Read beyond end of file */
1330                 if (*ppos >= inode->i_size)
1331                         RETURN(0);
1332
1333                 if (count > inode->i_size - *ppos)
1334                         count = inode->i_size - *ppos;
1335                 /* Make sure to correctly adjust the file pos pointer for
1336                  * EFAULT case */
1337                 notzeroed = clear_user(buf, count);
1338                 count -= notzeroed;
1339                 *ppos += count;
1340                 if (!count)
1341                         RETURN(-EFAULT);
1342                 RETURN(count);
1343         }
1344
1345 repeat:
1346         if (sbi->ll_max_rw_chunk != 0) {
1347                 /* first, let's know the end of the current stripe */
1348                 end = *ppos;
1349                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
1350                                 (obd_off *)&end);
1351
1352                 /* correct, the end is beyond the request */
1353                 if (end > *ppos + count - 1)
1354                         end = *ppos + count - 1;
1355
1356                 /* and chunk shouldn't be too large even if striping is wide */
1357                 if (end - *ppos > sbi->ll_max_rw_chunk)
1358                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1359         } else {
1360                 end = *ppos + count - 1;
1361         }
1362
1363         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1364         tree.lt_fd = LUSTRE_FPRIVATE(file);
1365         rc = ll_tree_lock(&tree, node, buf, count,
1366                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1367         if (rc != 0)
1368                 GOTO(out, retval = rc);
1369
1370         ll_inode_size_lock(inode, 1);
1371         /*
1372          * Consistency guarantees: following possibilities exist for the
1373          * relation between region being read and real file size at this
1374          * moment:
1375          *
1376          *  (A): the region is completely inside of the file;
1377          *
1378          *  (B-x): x bytes of region are inside of the file, the rest is
1379          *  outside;
1380          *
1381          *  (C): the region is completely outside of the file.
1382          *
1383          * This classification is stable under DLM lock acquired by
1384          * ll_tree_lock() above, because to change class, other client has to
1385          * take DLM lock conflicting with our lock. Also, any updates to
1386          * ->i_size by other threads on this client are serialized by
1387          * ll_inode_size_lock(). This guarantees that short reads are handled
1388          * correctly in the face of concurrent writes and truncates.
1389          */
1390         inode_init_lvb(inode, &lvb);
1391         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1392         kms = lvb.lvb_size;
1393         if (*ppos + count - 1 > kms) {
1394                 /* A glimpse is necessary to determine whether we return a
1395                  * short read (B) or some zeroes at the end of the buffer (C) */
1396                 ll_inode_size_unlock(inode, 1);
1397                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1398                 if (retval) {
1399                         ll_tree_unlock(&tree);
1400                         goto out;
1401                 }
1402         } else {
1403                 /* region is within kms and, hence, within real file size (A).
1404                  * We need to increase i_size to cover the read region so that
1405                  * generic_file_read() will do its job, but that doesn't mean
1406                  * the kms size is _correct_, it is only the _minimum_ size.
1407                  * If someone does a stat they will get the correct size which
1408                  * will always be >= the kms value here.  b=11081 */
1409                 if (inode->i_size < kms)
1410                         inode->i_size = kms;
1411                 ll_inode_size_unlock(inode, 1);
1412         }
1413
1414         chunk = end - *ppos + 1;
1415         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1416                         inode->i_ino, chunk, *ppos, inode->i_size);
1417
1418         /* turn off the kernel's read-ahead */
1419 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1420         file->f_ramax = 0;
1421 #else
1422         file->f_ra.ra_pages = 0;
1423 #endif
1424         /* initialize read-ahead window once per syscall */
1425         if (ra == 0) {
1426                 ra = 1;
1427                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1428                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1429                 ll_ra_read_in(file, &bead);
1430         }
1431
1432         /* BUG: 5972 */
1433         file_accessed(file);
1434         retval = generic_file_read(file, buf, chunk, ppos);
1435         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1436
1437         ll_tree_unlock(&tree);
1438
1439         if (retval > 0) {
1440                 buf += retval;
1441                 count -= retval;
1442                 sum += retval;
1443                 if (retval == chunk && count > 0)
1444                         goto repeat;
1445         }
1446
1447  out:
1448         if (ra != 0)
1449                 ll_ra_read_ex(file, &bead);
1450         retval = (sum > 0) ? sum : retval;
1451         RETURN(retval);
1452 }
1453
1454 /*
1455  * Write to a file (through the page cache).
1456  */
1457 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1458                              loff_t *ppos)
1459 {
1460         struct inode *inode = file->f_dentry->d_inode;
1461         struct ll_sb_info *sbi = ll_i2sbi(inode);
1462         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1463         struct ll_lock_tree tree;
1464         struct ll_lock_tree_node *node;
1465         loff_t maxbytes = ll_file_maxbytes(inode);
1466         loff_t lock_start, lock_end, end;
1467         ssize_t retval, chunk, sum = 0;
1468         int rc;
1469         ENTRY;
1470
1471         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1472                inode->i_ino, inode->i_generation, inode, count, *ppos);
1473
1474         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1475
1476         /* POSIX, but surprised the VFS doesn't check this already */
1477         if (count == 0)
1478                 RETURN(0);
1479
1480         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1481          * called on the file, don't fail the below assertion (bug 2388). */
1482         if (file->f_flags & O_LOV_DELAY_CREATE &&
1483             ll_i2info(inode)->lli_smd == NULL)
1484                 RETURN(-EBADF);
1485
1486         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1487
1488         down(&ll_i2info(inode)->lli_write_sem);
1489
1490 repeat:
1491         chunk = 0; /* just to fix gcc's warning */
1492         end = *ppos + count - 1;
1493
1494         if (file->f_flags & O_APPEND) {
1495                 lock_start = 0;
1496                 lock_end = OBD_OBJECT_EOF;
1497         } else if (sbi->ll_max_rw_chunk != 0) {
1498                 /* first, let's know the end of the current stripe */
1499                 end = *ppos;
1500                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1501                                 (obd_off *)&end);
1502
1503                 /* correct, the end is beyond the request */
1504                 if (end > *ppos + count - 1)
1505                         end = *ppos + count - 1;
1506
1507                 /* and chunk shouldn't be too large even if striping is wide */
1508                 if (end - *ppos > sbi->ll_max_rw_chunk)
1509                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1510                 lock_start = *ppos;
1511                 lock_end = end;
1512         } else {
1513                 lock_start = *ppos;
1514                 lock_end = *ppos + count - 1;
1515         }
1516         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1517
1518         if (IS_ERR(node))
1519                 GOTO(out, retval = PTR_ERR(node));
1520
1521         tree.lt_fd = LUSTRE_FPRIVATE(file);
1522         rc = ll_tree_lock(&tree, node, buf, count,
1523                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1524         if (rc != 0)
1525                 GOTO(out, retval = rc);
1526
1527         /* This is ok, g_f_w will overwrite this under i_sem if it races
1528          * with a local truncate, it just makes our maxbyte checking easier.
1529          * The i_size value gets updated in ll_extent_lock() as a consequence
1530          * of the [0,EOF] extent lock we requested above. */
1531         if (file->f_flags & O_APPEND) {
1532                 *ppos = inode->i_size;
1533                 end = *ppos + count - 1;
1534         }
1535
1536         if (*ppos >= maxbytes) {
1537                 send_sig(SIGXFSZ, current, 0);
1538                 GOTO(out, retval = -EFBIG);
1539         }
1540         if (*ppos + count > maxbytes)
1541                 count = maxbytes - *ppos;
1542
1543         /* generic_file_write handles O_APPEND after getting i_mutex */
1544         chunk = end - *ppos + 1;
1545         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1546                inode->i_ino, chunk, *ppos);
1547         retval = generic_file_write(file, buf, chunk, ppos);
1548         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1549
1550 out:
1551         ll_tree_unlock(&tree);
1552
1553         if (retval > 0) {
1554                 buf += retval;
1555                 count -= retval;
1556                 sum += retval;
1557                 if (retval == chunk && count > 0)
1558                         goto repeat;
1559         }
1560
1561         up(&ll_i2info(inode)->lli_write_sem);
1562
1563         retval = (sum > 0) ? sum : retval;
1564         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1565                            retval > 0 ? retval : 0);
1566         RETURN(retval);
1567 }
1568
1569 /*
1570  * Send file content (through pagecache) somewhere with helper
1571  */
1572 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1573 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1574                                 read_actor_t actor, void *target)
1575 {
1576         struct inode *inode = in_file->f_dentry->d_inode;
1577         struct ll_inode_info *lli = ll_i2info(inode);
1578         struct lov_stripe_md *lsm = lli->lli_smd;
1579         struct ll_lock_tree tree;
1580         struct ll_lock_tree_node *node;
1581         struct ost_lvb lvb;
1582         struct ll_ra_read bead;
1583         int rc;
1584         ssize_t retval;
1585         __u64 kms;
1586         ENTRY;
1587         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1588                inode->i_ino, inode->i_generation, inode, count, *ppos);
1589
1590         /* "If nbyte is 0, read() will return 0 and have no other results."
1591          *                      -- Single Unix Spec */
1592         if (count == 0)
1593                 RETURN(0);
1594
1595         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1596         /* turn off the kernel's read-ahead */
1597         in_file->f_ra.ra_pages = 0;
1598
1599         /* File with no objects, nothing to lock */
1600         if (!lsm)
1601                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1602
1603         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1604         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1605         rc = ll_tree_lock(&tree, node, NULL, count,
1606                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1607         if (rc != 0)
1608                 RETURN(rc);
1609
1610         ll_inode_size_lock(inode, 1);
1611         /*
1612          * Consistency guarantees: following possibilities exist for the
1613          * relation between region being read and real file size at this
1614          * moment:
1615          *
1616          *  (A): the region is completely inside of the file;
1617          *
1618          *  (B-x): x bytes of region are inside of the file, the rest is
1619          *  outside;
1620          *
1621          *  (C): the region is completely outside of the file.
1622          *
1623          * This classification is stable under DLM lock acquired by
1624          * ll_tree_lock() above, because to change class, other client has to
1625          * take DLM lock conflicting with our lock. Also, any updates to
1626          * ->i_size by other threads on this client are serialized by
1627          * ll_inode_size_lock(). This guarantees that short reads are handled
1628          * correctly in the face of concurrent writes and truncates.
1629          */
1630         inode_init_lvb(inode, &lvb);
1631         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1632         kms = lvb.lvb_size;
1633         if (*ppos + count - 1 > kms) {
1634                 /* A glimpse is necessary to determine whether we return a
1635                  * short read (B) or some zeroes at the end of the buffer (C) */
1636                 ll_inode_size_unlock(inode, 1);
1637                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1638                 if (retval)
1639                         goto out;
1640         } else {
1641                 /* region is within kms and, hence, within real file size (A) */
1642                 inode->i_size = kms;
1643                 ll_inode_size_unlock(inode, 1);
1644         }
1645
1646         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1647                inode->i_ino, count, *ppos, inode->i_size);
1648
1649         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1650         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1651         ll_ra_read_in(in_file, &bead);
1652         /* BUG: 5972 */
1653         file_accessed(in_file);
1654         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1655         ll_ra_read_ex(in_file, &bead);
1656
1657  out:
1658         ll_tree_unlock(&tree);
1659         RETURN(retval);
1660 }
1661 #endif
1662
1663 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1664                                unsigned long arg)
1665 {
1666         struct ll_inode_info *lli = ll_i2info(inode);
1667         struct obd_export *exp = ll_i2dtexp(inode);
1668         struct ll_recreate_obj ucreatp;
1669         struct obd_trans_info oti = { 0 };
1670         struct obdo *oa = NULL;
1671         int lsm_size;
1672         int rc = 0;
1673         struct lov_stripe_md *lsm, *lsm2;
1674         ENTRY;
1675
1676         if (!capable (CAP_SYS_ADMIN))
1677                 RETURN(-EPERM);
1678
1679         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1680                             sizeof(struct ll_recreate_obj));
1681         if (rc) {
1682                 RETURN(-EFAULT);
1683         }
1684         OBDO_ALLOC(oa);
1685         if (oa == NULL)
1686                 RETURN(-ENOMEM);
1687
1688         down(&lli->lli_size_sem);
1689         lsm = lli->lli_smd;
1690         if (lsm == NULL)
1691                 GOTO(out, rc = -ENOENT);
1692         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1693                    (lsm->lsm_stripe_count));
1694
1695         OBD_ALLOC(lsm2, lsm_size);
1696         if (lsm2 == NULL)
1697                 GOTO(out, rc = -ENOMEM);
1698
1699         oa->o_id = ucreatp.lrc_id;
1700         oa->o_gr = ucreatp.lrc_group;
1701         oa->o_nlink = ucreatp.lrc_ost_idx;
1702         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1703         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1704         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1705                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1706
1707         oti.oti_objid = NULL;
1708         memcpy(lsm2, lsm, lsm_size);
1709         rc = obd_create(exp, oa, &lsm2, &oti);
1710
1711         OBD_FREE(lsm2, lsm_size);
1712         GOTO(out, rc);
1713 out:
1714         up(&lli->lli_size_sem);
1715         OBDO_FREE(oa);
1716         return rc;
1717 }
1718
1719 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1720                              int flags, struct lov_user_md *lum, int lum_size)
1721 {
1722         struct ll_inode_info *lli = ll_i2info(inode);
1723         struct lov_stripe_md *lsm;
1724         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1725         int rc = 0;
1726         ENTRY;
1727
1728         down(&lli->lli_size_sem);
1729         lsm = lli->lli_smd;
1730         if (lsm) {
1731                 up(&lli->lli_size_sem);
1732                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1733                        inode->i_ino);
1734                 RETURN(-EEXIST);
1735         }
1736
1737         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1738         if (rc)
1739                 GOTO(out, rc);
1740         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1741                 GOTO(out_req_free, rc = -ENOENT);
1742         rc = oit.d.lustre.it_status;
1743         if (rc < 0)
1744                 GOTO(out_req_free, rc);
1745
1746         ll_release_openhandle(file->f_dentry, &oit);
1747
1748  out:
1749         up(&lli->lli_size_sem);
1750         ll_intent_release(&oit);
1751         RETURN(rc);
1752 out_req_free:
1753         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1754         goto out;
1755 }
1756
1757 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1758                              struct lov_mds_md **lmmp, int *lmm_size, 
1759                              struct ptlrpc_request **request)
1760 {
1761         struct ll_sb_info *sbi = ll_i2sbi(inode);
1762         struct mdt_body  *body;
1763         struct lov_mds_md *lmm = NULL;
1764         struct ptlrpc_request *req = NULL;
1765         struct obd_capa *oc;
1766         int rc, lmmsize;
1767
1768         rc = ll_get_max_mdsize(sbi, &lmmsize);
1769         if (rc)
1770                 RETURN(rc);
1771
1772         oc = ll_mdscapa_get(inode);
1773         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1774                              oc, filename, strlen(filename) + 1,
1775                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize, &req);
1776         capa_put(oc);
1777         if (rc < 0) {
1778                 CDEBUG(D_INFO, "md_getattr_name failed "
1779                        "on %s: rc %d\n", filename, rc);
1780                 GOTO(out, rc);
1781         }
1782
1783         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
1784         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1785         /* swabbed by mdc_getattr_name */
1786         LASSERT_REPSWABBED(req, REPLY_REC_OFF);
1787
1788         lmmsize = body->eadatasize;
1789
1790         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1791                         lmmsize == 0) {
1792                 GOTO(out, rc = -ENODATA);
1793         }
1794
1795         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, lmmsize);
1796         LASSERT(lmm != NULL);
1797         LASSERT_REPSWABBED(req, REPLY_REC_OFF + 1);
1798
1799         /*
1800          * This is coming from the MDS, so is probably in
1801          * little endian.  We convert it to host endian before
1802          * passing it to userspace.
1803          */
1804         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1805                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1806                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1807         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1808                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1809         }
1810
1811         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1812                 struct lov_stripe_md *lsm;
1813                 struct lov_user_md_join *lmj;
1814                 int lmj_size, i, aindex = 0;
1815
1816                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1817                 if (rc < 0)
1818                         GOTO(out, rc = -ENOMEM);
1819                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1820                 if (rc)
1821                         GOTO(out_free_memmd, rc);
1822
1823                 lmj_size = sizeof(struct lov_user_md_join) +
1824                            lsm->lsm_stripe_count *
1825                            sizeof(struct lov_user_ost_data_join);
1826                 OBD_ALLOC(lmj, lmj_size);
1827                 if (!lmj)
1828                         GOTO(out_free_memmd, rc = -ENOMEM);
1829
1830                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1831                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1832                         struct lov_extent *lex =
1833                                 &lsm->lsm_array->lai_ext_array[aindex];
1834
1835                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1836                                 aindex ++;
1837                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1838                                         LPU64" len %d\n", aindex, i,
1839                                         lex->le_start, (int)lex->le_len);
1840                         lmj->lmm_objects[i].l_extent_start =
1841                                 lex->le_start;
1842
1843                         if ((int)lex->le_len == -1)
1844                                 lmj->lmm_objects[i].l_extent_end = -1;
1845                         else
1846                                 lmj->lmm_objects[i].l_extent_end =
1847                                         lex->le_start + lex->le_len;
1848                         lmj->lmm_objects[i].l_object_id =
1849                                 lsm->lsm_oinfo[i]->loi_id;
1850                         lmj->lmm_objects[i].l_object_gr =
1851                                 lsm->lsm_oinfo[i]->loi_gr;
1852                         lmj->lmm_objects[i].l_ost_gen =
1853                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1854                         lmj->lmm_objects[i].l_ost_idx =
1855                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1856                 }
1857                 lmm = (struct lov_mds_md *)lmj;
1858                 lmmsize = lmj_size;
1859 out_free_memmd:
1860                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
1861         }
1862 out:
1863         *lmmp = lmm;
1864         *lmm_size = lmmsize;
1865         *request = req;
1866         return rc;
1867 }
1868
1869 static int ll_lov_setea(struct inode *inode, struct file *file,
1870                             unsigned long arg)
1871 {
1872         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1873         struct lov_user_md  *lump;
1874         int lum_size = sizeof(struct lov_user_md) +
1875                        sizeof(struct lov_user_ost_data);
1876         int rc;
1877         ENTRY;
1878
1879         if (!capable (CAP_SYS_ADMIN))
1880                 RETURN(-EPERM);
1881
1882         OBD_ALLOC(lump, lum_size);
1883         if (lump == NULL) {
1884                 RETURN(-ENOMEM);
1885         }
1886         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1887         if (rc) {
1888                 OBD_FREE(lump, lum_size);
1889                 RETURN(-EFAULT);
1890         }
1891
1892         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1893
1894         OBD_FREE(lump, lum_size);
1895         RETURN(rc);
1896 }
1897
1898 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1899                             unsigned long arg)
1900 {
1901         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1902         int rc;
1903         int flags = FMODE_WRITE;
1904         ENTRY;
1905
1906         /* Bug 1152: copy properly when this is no longer true */
1907         LASSERT(sizeof(lum) == sizeof(*lump));
1908         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1909         rc = copy_from_user(&lum, lump, sizeof(lum));
1910         if (rc)
1911                 RETURN(-EFAULT);
1912
1913         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1914         if (rc == 0) {
1915                  put_user(0, &lump->lmm_stripe_count);
1916                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1917                                     0, ll_i2info(inode)->lli_smd, lump);
1918         }
1919         RETURN(rc);
1920 }
1921
1922 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1923 {
1924         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1925
1926         if (!lsm)
1927                 RETURN(-ENODATA);
1928
1929         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1930                             (void *)arg);
1931 }
1932
1933 static int ll_get_grouplock(struct inode *inode, struct file *file,
1934                             unsigned long arg)
1935 {
1936         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1937         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1938                                                     .end = OBD_OBJECT_EOF}};
1939         struct lustre_handle lockh = { 0 };
1940         struct ll_inode_info *lli = ll_i2info(inode);
1941         struct lov_stripe_md *lsm = lli->lli_smd;
1942         int flags = 0, rc;
1943         ENTRY;
1944
1945         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1946                 RETURN(-EINVAL);
1947         }
1948
1949         policy.l_extent.gid = arg;
1950         if (file->f_flags & O_NONBLOCK)
1951                 flags = LDLM_FL_BLOCK_NOWAIT;
1952
1953         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1954         if (rc)
1955                 RETURN(rc);
1956
1957         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1958         fd->fd_gid = arg;
1959         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1960
1961         RETURN(0);
1962 }
1963
1964 static int ll_put_grouplock(struct inode *inode, struct file *file,
1965                             unsigned long arg)
1966 {
1967         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1968         struct ll_inode_info *lli = ll_i2info(inode);
1969         struct lov_stripe_md *lsm = lli->lli_smd;
1970         int rc;
1971         ENTRY;
1972
1973         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1974                 /* Ugh, it's already unlocked. */
1975                 RETURN(-EINVAL);
1976         }
1977
1978         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1979                 RETURN(-EINVAL);
1980
1981         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1982
1983         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1984         if (rc)
1985                 RETURN(rc);
1986
1987         fd->fd_gid = 0;
1988         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1989
1990         RETURN(0);
1991 }
1992
1993 static int join_sanity_check(struct inode *head, struct inode *tail)
1994 {
1995         ENTRY;
1996         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
1997                 CERROR("server do not support join \n");
1998                 RETURN(-EINVAL);
1999         }
2000         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2001                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2002                        head->i_ino, tail->i_ino);
2003                 RETURN(-EINVAL);
2004         }
2005         if (head->i_ino == tail->i_ino) {
2006                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2007                 RETURN(-EINVAL);
2008         }
2009         if (head->i_size % JOIN_FILE_ALIGN) {
2010                 CERROR("hsize %llu must be times of 64K\n", head->i_size);
2011                 RETURN(-EINVAL);
2012         }
2013         RETURN(0);
2014 }
2015
2016 static int join_file(struct inode *head_inode, struct file *head_filp,
2017                      struct file *tail_filp)
2018 {
2019         struct inode *tail_inode, *tail_parent;
2020         struct dentry *tail_dentry = tail_filp->f_dentry;
2021         struct lookup_intent oit = {.it_op = IT_OPEN,
2022                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2023         struct lustre_handle lockh;
2024         struct md_op_data *op_data;
2025         int    rc;
2026         ENTRY;
2027
2028         tail_dentry = tail_filp->f_dentry;
2029         tail_inode = tail_dentry->d_inode;
2030         tail_parent = tail_dentry->d_parent->d_inode;
2031
2032         op_data = ll_prep_md_op_data(NULL, head_inode, tail_parent,
2033                                      tail_dentry->d_name.name,
2034                                      tail_dentry->d_name.len, 0,
2035                                      LUSTRE_OPC_ANY, &head_inode->i_size);
2036         if (IS_ERR(op_data))
2037                 RETURN(PTR_ERR(op_data));
2038
2039         rc = md_enqueue(ll_i2mdexp(head_inode), LDLM_IBITS, &oit, LCK_CW,
2040                         op_data, &lockh, NULL, 0, ldlm_completion_ast,
2041                         ll_md_blocking_ast, NULL, 0);
2042
2043         ll_finish_md_op_data(op_data);
2044         if (rc < 0)
2045                 GOTO(out, rc);
2046
2047         rc = oit.d.lustre.it_status;
2048
2049         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2050                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2051                 ptlrpc_req_finished((struct ptlrpc_request *)
2052                                     oit.d.lustre.it_data);
2053                 GOTO(out, rc);
2054         }
2055
2056         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2057                                            * away */
2058                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2059                 oit.d.lustre.it_lock_mode = 0;
2060         }
2061         ll_release_openhandle(head_filp->f_dentry, &oit);
2062 out:
2063         ll_intent_release(&oit);
2064         RETURN(rc);
2065 }
2066
2067 static int ll_file_join(struct inode *head, struct file *filp,
2068                         char *filename_tail)
2069 {
2070         struct inode *tail = NULL, *first = NULL, *second = NULL;
2071         struct dentry *tail_dentry;
2072         struct file *tail_filp, *first_filp, *second_filp;
2073         struct ll_lock_tree first_tree, second_tree;
2074         struct ll_lock_tree_node *first_node, *second_node;
2075         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2076         int rc = 0, cleanup_phase = 0;
2077         ENTRY;
2078
2079         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2080                head->i_ino, head->i_generation, head, filename_tail);
2081
2082         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2083         if (IS_ERR(tail_filp)) {
2084                 CERROR("Can not open tail file %s", filename_tail);
2085                 rc = PTR_ERR(tail_filp);
2086                 GOTO(cleanup, rc);
2087         }
2088         tail = igrab(tail_filp->f_dentry->d_inode);
2089
2090         tlli = ll_i2info(tail);
2091         tail_dentry = tail_filp->f_dentry;
2092         LASSERT(tail_dentry);
2093         cleanup_phase = 1;
2094
2095         /*reorder the inode for lock sequence*/
2096         first = head->i_ino > tail->i_ino ? head : tail;
2097         second = head->i_ino > tail->i_ino ? tail : head;
2098         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2099         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2100
2101         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2102                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2103         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2104         if (IS_ERR(first_node)){
2105                 rc = PTR_ERR(first_node);
2106                 GOTO(cleanup, rc);
2107         }
2108         first_tree.lt_fd = first_filp->private_data;
2109         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2110         if (rc != 0)
2111                 GOTO(cleanup, rc);
2112         cleanup_phase = 2;
2113
2114         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2115         if (IS_ERR(second_node)){
2116                 rc = PTR_ERR(second_node);
2117                 GOTO(cleanup, rc);
2118         }
2119         second_tree.lt_fd = second_filp->private_data;
2120         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2121         if (rc != 0)
2122                 GOTO(cleanup, rc);
2123         cleanup_phase = 3;
2124
2125         rc = join_sanity_check(head, tail);
2126         if (rc)
2127                 GOTO(cleanup, rc);
2128
2129         rc = join_file(head, filp, tail_filp);
2130         if (rc)
2131                 GOTO(cleanup, rc);
2132 cleanup:
2133         switch (cleanup_phase) {
2134         case 3:
2135                 ll_tree_unlock(&second_tree);
2136                 obd_cancel_unused(ll_i2dtexp(second),
2137                                   ll_i2info(second)->lli_smd, 0, NULL);
2138         case 2:
2139                 ll_tree_unlock(&first_tree);
2140                 obd_cancel_unused(ll_i2dtexp(first),
2141                                   ll_i2info(first)->lli_smd, 0, NULL);
2142         case 1:
2143                 filp_close(tail_filp, 0);
2144                 if (tail)
2145                         iput(tail);
2146                 if (head && rc == 0) {
2147                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2148                                        &hlli->lli_smd);
2149                         hlli->lli_smd = NULL;
2150                 }
2151         case 0:
2152                 break;
2153         default:
2154                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2155                 LBUG();
2156         }
2157         RETURN(rc);
2158 }
2159
2160 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2161 {
2162         struct inode *inode = dentry->d_inode;
2163         struct obd_client_handle *och;
2164         int rc;
2165         ENTRY;
2166
2167         LASSERT(inode);
2168
2169         /* Root ? Do nothing. */
2170         if (dentry->d_inode->i_sb->s_root == dentry)
2171                 RETURN(0);
2172
2173         /* No open handle to close? Move away */
2174         if (!it_disposition(it, DISP_OPEN_OPEN))
2175                 RETURN(0);
2176
2177         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2178
2179         OBD_ALLOC(och, sizeof(*och));
2180         if (!och)
2181                 GOTO(out, rc = -ENOMEM);
2182
2183         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2184                     ll_i2info(inode), it, och);
2185
2186         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2187                                        inode, och);
2188  out:
2189         /* this one is in place of ll_file_open */
2190         ptlrpc_req_finished(it->d.lustre.it_data);
2191         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2192         RETURN(rc);
2193 }
2194
2195 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2196                   unsigned long arg)
2197 {
2198         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2199         int flags;
2200         ENTRY;
2201
2202         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2203                inode->i_generation, inode, cmd);
2204         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2205
2206         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2207         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2208                 RETURN(-ENOTTY);
2209
2210         switch(cmd) {
2211         case LL_IOC_GETFLAGS:
2212                 /* Get the current value of the file flags */
2213                 return put_user(fd->fd_flags, (int *)arg);
2214         case LL_IOC_SETFLAGS:
2215         case LL_IOC_CLRFLAGS:
2216                 /* Set or clear specific file flags */
2217                 /* XXX This probably needs checks to ensure the flags are
2218                  *     not abused, and to handle any flag side effects.
2219                  */
2220                 if (get_user(flags, (int *) arg))
2221                         RETURN(-EFAULT);
2222
2223                 if (cmd == LL_IOC_SETFLAGS) {
2224                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2225                             !(file->f_flags & O_DIRECT)) {
2226                                 CERROR("%s: unable to disable locking on "
2227                                        "non-O_DIRECT file\n", current->comm);
2228                                 RETURN(-EINVAL);
2229                         }
2230
2231                         fd->fd_flags |= flags;
2232                 } else {
2233                         fd->fd_flags &= ~flags;
2234                 }
2235                 RETURN(0);
2236         case LL_IOC_LOV_SETSTRIPE:
2237                 RETURN(ll_lov_setstripe(inode, file, arg));
2238         case LL_IOC_LOV_SETEA:
2239                 RETURN(ll_lov_setea(inode, file, arg));
2240         case LL_IOC_LOV_GETSTRIPE:
2241                 RETURN(ll_lov_getstripe(inode, arg));
2242         case LL_IOC_RECREATE_OBJ:
2243                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2244         case EXT3_IOC_GETFLAGS:
2245         case EXT3_IOC_SETFLAGS:
2246                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2247         case EXT3_IOC_GETVERSION_OLD:
2248         case EXT3_IOC_GETVERSION:
2249                 RETURN(put_user(inode->i_generation, (int *)arg));
2250         case LL_IOC_JOIN: {
2251                 char *ftail;
2252                 int rc;
2253
2254                 ftail = getname((const char *)arg);
2255                 if (IS_ERR(ftail))
2256                         RETURN(PTR_ERR(ftail));
2257                 rc = ll_file_join(inode, file, ftail);
2258                 putname(ftail);
2259                 RETURN(rc);
2260         }
2261         case LL_IOC_GROUP_LOCK:
2262                 RETURN(ll_get_grouplock(inode, file, arg));
2263         case LL_IOC_GROUP_UNLOCK:
2264                 RETURN(ll_put_grouplock(inode, file, arg));
2265         case IOC_OBD_STATFS:
2266                 RETURN(ll_obd_statfs(inode, (void *)arg));
2267
2268         /* We need to special case any other ioctls we want to handle,
2269          * to send them to the MDS/OST as appropriate and to properly
2270          * network encode the arg field.
2271         case EXT3_IOC_SETVERSION_OLD:
2272         case EXT3_IOC_SETVERSION:
2273         */
2274         case LL_IOC_FLUSHCTX:
2275                 RETURN(ll_flush_ctx(inode));
2276         case LL_IOC_GETFACL: {
2277                 struct rmtacl_ioctl_data ioc;
2278
2279                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2280                         RETURN(-EFAULT);
2281
2282                 RETURN(ll_ioctl_getfacl(inode, &ioc));
2283         }
2284         case LL_IOC_SETFACL: {
2285                 struct rmtacl_ioctl_data ioc;
2286
2287                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2288                         RETURN(-EFAULT);
2289
2290                 RETURN(ll_ioctl_setfacl(inode, &ioc));
2291         }
2292         default:
2293                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2294                                      (void *)arg));
2295         }
2296 }
2297
2298 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2299 {
2300         struct inode *inode = file->f_dentry->d_inode;
2301         struct ll_inode_info *lli = ll_i2info(inode);
2302         struct lov_stripe_md *lsm = lli->lli_smd;
2303         loff_t retval;
2304         ENTRY;
2305         retval = offset + ((origin == 2) ? inode->i_size :
2306                            (origin == 1) ? file->f_pos : 0);
2307         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2308                inode->i_ino, inode->i_generation, inode, retval, retval,
2309                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2310         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2311         
2312         if (origin == 2) { /* SEEK_END */
2313                 int nonblock = 0, rc;
2314
2315                 if (file->f_flags & O_NONBLOCK)
2316                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2317
2318                 if (lsm != NULL) {
2319                         rc = ll_glimpse_size(inode, nonblock);
2320                         if (rc != 0)
2321                                 RETURN(rc);
2322                 }
2323
2324                 ll_inode_size_lock(inode, 0);
2325                 offset += inode->i_size;
2326                 ll_inode_size_unlock(inode, 0);
2327         } else if (origin == 1) { /* SEEK_CUR */
2328                 offset += file->f_pos;
2329         }
2330
2331         retval = -EINVAL;
2332         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2333                 if (offset != file->f_pos) {
2334                         file->f_pos = offset;
2335 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2336                         file->f_reada = 0;
2337                         file->f_version = ++event;
2338 #endif
2339                 }
2340                 retval = offset;
2341         }
2342         
2343         RETURN(retval);
2344 }
2345
2346 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2347 {
2348         struct inode *inode = dentry->d_inode;
2349         struct ll_inode_info *lli = ll_i2info(inode);
2350         struct lov_stripe_md *lsm = lli->lli_smd;
2351         struct ptlrpc_request *req;
2352         struct obd_capa *oc;
2353         int rc, err;
2354         ENTRY;
2355         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2356                inode->i_generation, inode);
2357         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2358
2359         /* fsync's caller has already called _fdata{sync,write}, we want
2360          * that IO to finish before calling the osc and mdc sync methods */
2361         rc = filemap_fdatawait(inode->i_mapping);
2362
2363         /* catch async errors that were recorded back when async writeback
2364          * failed for pages in this mapping. */
2365         err = lli->lli_async_rc;
2366         lli->lli_async_rc = 0;
2367         if (rc == 0)
2368                 rc = err;
2369         if (lsm) {
2370                 err = lov_test_and_clear_async_rc(lsm);
2371                 if (rc == 0)
2372                         rc = err;
2373         }
2374
2375         oc = ll_mdscapa_get(inode);
2376         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2377                       &req);
2378         capa_put(oc);
2379         if (!rc)
2380                 rc = err;
2381         if (!err)
2382                 ptlrpc_req_finished(req);
2383
2384         if (data && lsm) {
2385                 struct obdo *oa;
2386                 
2387                 OBDO_ALLOC(oa);
2388                 if (!oa)
2389                         RETURN(rc ? rc : -ENOMEM);
2390
2391                 oa->o_id = lsm->lsm_object_id;
2392                 oa->o_gr = lsm->lsm_object_gr;
2393                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2394                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2395                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2396                                            OBD_MD_FLGROUP);
2397
2398                 oc = ll_osscapa_get(inode, 0, CAPA_OPC_OSS_WRITE);
2399                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2400                                0, OBD_OBJECT_EOF, oc);
2401                 capa_put(oc);
2402                 if (!rc)
2403                         rc = err;
2404                 OBDO_FREE(oa);
2405         }
2406
2407         RETURN(rc);
2408 }
2409
2410 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2411 {
2412         struct inode *inode = file->f_dentry->d_inode;
2413         struct ll_sb_info *sbi = ll_i2sbi(inode);
2414         struct ldlm_res_id res_id =
2415                 { .name = { fid_seq(ll_inode2fid(inode)),
2416                             fid_oid(ll_inode2fid(inode)),
2417                             fid_ver(ll_inode2fid(inode)),
2418                             LDLM_FLOCK} };
2419         struct lustre_handle lockh = {0};
2420         ldlm_policy_data_t flock;
2421         ldlm_mode_t mode = 0;
2422         int flags = 0;
2423         int rc;
2424         ENTRY;
2425
2426         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2427                inode->i_ino, file_lock);
2428
2429         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2430  
2431         if (file_lock->fl_flags & FL_FLOCK) {
2432                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2433                 /* set missing params for flock() calls */
2434                 file_lock->fl_end = OFFSET_MAX;
2435                 file_lock->fl_pid = current->tgid;
2436         }
2437         flock.l_flock.pid = file_lock->fl_pid;
2438         flock.l_flock.start = file_lock->fl_start;
2439         flock.l_flock.end = file_lock->fl_end;
2440
2441         switch (file_lock->fl_type) {
2442         case F_RDLCK:
2443                 mode = LCK_PR;
2444                 break;
2445         case F_UNLCK:
2446                 /* An unlock request may or may not have any relation to
2447                  * existing locks so we may not be able to pass a lock handle
2448                  * via a normal ldlm_lock_cancel() request. The request may even
2449                  * unlock a byte range in the middle of an existing lock. In
2450                  * order to process an unlock request we need all of the same
2451                  * information that is given with a normal read or write record
2452                  * lock request. To avoid creating another ldlm unlock (cancel)
2453                  * message we'll treat a LCK_NL flock request as an unlock. */
2454                 mode = LCK_NL;
2455                 break;
2456         case F_WRLCK:
2457                 mode = LCK_PW;
2458                 break;
2459         default:
2460                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2461                 LBUG();
2462         }
2463
2464         switch (cmd) {
2465         case F_SETLKW:
2466 #ifdef F_SETLKW64
2467         case F_SETLKW64:
2468 #endif
2469                 flags = 0;
2470                 break;
2471         case F_SETLK:
2472 #ifdef F_SETLK64
2473         case F_SETLK64:
2474 #endif
2475                 flags = LDLM_FL_BLOCK_NOWAIT;
2476                 break;
2477         case F_GETLK:
2478 #ifdef F_GETLK64
2479         case F_GETLK64:
2480 #endif
2481                 flags = LDLM_FL_TEST_LOCK;
2482                 /* Save the old mode so that if the mode in the lock changes we
2483                  * can decrement the appropriate reader or writer refcount. */
2484                 file_lock->fl_type = mode;
2485                 break;
2486         default:
2487                 CERROR("unknown fcntl lock command: %d\n", cmd);
2488                 LBUG();
2489         }
2490
2491         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2492                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2493                flags, mode, flock.l_flock.start, flock.l_flock.end);
2494
2495         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &res_id,
2496                               LDLM_FLOCK, &flock, mode, &flags, NULL,
2497                               ldlm_flock_completion_ast, NULL, file_lock,
2498                               NULL, 0, NULL, &lockh, 0);
2499         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2500                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2501 #ifdef HAVE_F_OP_FLOCK
2502         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2503             !(flags & LDLM_FL_TEST_LOCK))
2504                 posix_lock_file_wait(file, file_lock);
2505 #endif
2506
2507         RETURN(rc);
2508 }
2509
2510 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2511 {
2512         ENTRY;
2513
2514         RETURN(-ENOSYS);
2515 }
2516
2517 int ll_have_md_lock(struct inode *inode, __u64 bits)
2518 {
2519         struct lustre_handle lockh;
2520         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2521         struct lu_fid *fid;
2522         int flags;
2523         ENTRY;
2524
2525         if (!inode)
2526                RETURN(0);
2527
2528         fid = &ll_i2info(inode)->lli_fid;
2529         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2530
2531         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2532         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2533                           LCK_CR|LCK_CW|LCK_PR, &lockh)) {
2534                 RETURN(1);
2535         }
2536
2537         RETURN(0);
2538 }
2539
2540 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2541         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2542                               * and return success */
2543                 inode->i_nlink = 0;
2544                 /* This path cannot be hit for regular files unless in
2545                  * case of obscure races, so no need to to validate
2546                  * size. */
2547                 if (!S_ISREG(inode->i_mode) &&
2548                     !S_ISDIR(inode->i_mode))
2549                         return 0;
2550         }
2551
2552         if (rc) {
2553                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2554                 return -abs(rc);
2555
2556         }
2557
2558         return 0;
2559 }
2560
2561 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2562 {
2563         struct inode *inode = dentry->d_inode;
2564         struct ptlrpc_request *req = NULL;
2565         struct ll_sb_info *sbi;
2566         struct obd_export *exp;
2567         int rc;
2568         ENTRY;
2569
2570         if (!inode) {
2571                 CERROR("REPORT THIS LINE TO PETER\n");
2572                 RETURN(0);
2573         }
2574         sbi = ll_i2sbi(inode);
2575
2576         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2577                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2578 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
2579         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_REVALIDATE, 1);
2580 #endif
2581
2582         exp = ll_i2mdexp(inode);
2583
2584         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2585                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2586                 struct md_op_data *op_data;
2587
2588                 /* Call getattr by fid, so do not provide name at all. */
2589                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2590                                              dentry->d_inode, NULL, 0, 0,
2591                                              LUSTRE_OPC_ANY, NULL);
2592                 if (IS_ERR(op_data))
2593                         RETURN(PTR_ERR(op_data));
2594
2595                 oit.it_flags |= O_CHECK_STALE;
2596                 rc = md_intent_lock(exp, op_data, NULL, 0,
2597                                     /* we are not interested in name
2598                                        based lookup */
2599                                     &oit, 0, &req,
2600                                     ll_md_blocking_ast, 0);
2601                 ll_finish_md_op_data(op_data);
2602                 oit.it_flags &= ~O_CHECK_STALE;
2603                 if (rc < 0) {
2604                         rc = ll_inode_revalidate_fini(inode, rc);
2605                         GOTO (out, rc);
2606                 }
2607
2608                 rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2609                 if (rc != 0) {
2610                         ll_intent_release(&oit);
2611                         GOTO(out, rc);
2612                 }
2613
2614                 /* Unlinked? Unhash dentry, so it is not picked up later by
2615                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2616                    here to preserve get_cwd functionality on 2.6.
2617                    Bug 10503 */
2618                 if (!dentry->d_inode->i_nlink) {
2619                         spin_lock(&dcache_lock);
2620                         ll_drop_dentry(dentry);
2621                         spin_unlock(&dcache_lock);
2622                 }
2623
2624                 ll_lookup_finish_locks(&oit, dentry);
2625         } else if (!ll_have_md_lock(dentry->d_inode,
2626                                     MDS_INODELOCK_UPDATE)) {
2627                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2628                 obd_valid valid = OBD_MD_FLGETATTR;
2629                 struct obd_capa *oc;
2630                 int ealen = 0;
2631
2632                 if (S_ISREG(inode->i_mode)) {
2633                         rc = ll_get_max_mdsize(sbi, &ealen);
2634                         if (rc)
2635                                 RETURN(rc);
2636                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2637                 }
2638                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2639                  * capa for this inode. Because we only keep capas of dirs
2640                  * fresh. */
2641                 oc = ll_mdscapa_get(inode);
2642                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2643                                 ealen, &req);
2644                 capa_put(oc);
2645                 if (rc) {
2646                         rc = ll_inode_revalidate_fini(inode, rc);
2647                         RETURN(rc);
2648                 }
2649
2650                 rc = ll_prep_inode(&inode, req, REPLY_REC_OFF,
2651                                    NULL);
2652                 if (rc)
2653                         GOTO(out, rc);
2654         }
2655
2656         /* if object not yet allocated, don't validate size */
2657         if (ll_i2info(inode)->lli_smd == NULL)
2658                 GOTO(out, rc = 0);
2659
2660         /* ll_glimpse_size will prefer locally cached writes if they extend
2661          * the file */
2662         rc = ll_glimpse_size(inode, 0);
2663         EXIT;
2664 out:
2665         ptlrpc_req_finished(req);
2666         return rc;
2667 }
2668
2669 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2670 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2671                   struct lookup_intent *it, struct kstat *stat)
2672 {
2673         struct inode *inode = de->d_inode;
2674         int res = 0;
2675
2676         res = ll_inode_revalidate_it(de, it);
2677         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2678
2679         if (res)
2680                 return res;
2681
2682         stat->dev = inode->i_sb->s_dev;
2683         stat->ino = inode->i_ino;
2684         stat->mode = inode->i_mode;
2685         stat->nlink = inode->i_nlink;
2686         stat->uid = inode->i_uid;
2687         stat->gid = inode->i_gid;
2688         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2689         stat->atime = inode->i_atime;
2690         stat->mtime = inode->i_mtime;
2691         stat->ctime = inode->i_ctime;
2692 #ifdef HAVE_INODE_BLKSIZE
2693         stat->blksize = inode->i_blksize;
2694 #else
2695         stat->blksize = 1 << inode->i_blkbits;
2696 #endif
2697
2698         ll_inode_size_lock(inode, 0);
2699         stat->size = inode->i_size;
2700         stat->blocks = inode->i_blocks;
2701         ll_inode_size_unlock(inode, 0);
2702
2703         return 0;
2704 }
2705 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2706 {
2707         struct lookup_intent it = { .it_op = IT_GETATTR };
2708
2709         return ll_getattr_it(mnt, de, &it, stat);
2710 }
2711 #endif
2712
2713 static
2714 int lustre_check_acl(struct inode *inode, int mask)
2715 {
2716 #ifdef CONFIG_FS_POSIX_ACL
2717         struct ll_inode_info *lli = ll_i2info(inode);
2718         struct posix_acl *acl;
2719         int rc;
2720         ENTRY;
2721
2722         spin_lock(&lli->lli_lock);
2723         acl = posix_acl_dup(lli->lli_posix_acl);
2724         spin_unlock(&lli->lli_lock);
2725
2726         if (!acl)
2727                 RETURN(-EAGAIN);
2728
2729         rc = posix_acl_permission(inode, acl, mask);
2730         posix_acl_release(acl);
2731
2732         RETURN(rc);
2733 #else
2734         return -EAGAIN;
2735 #endif
2736 }
2737
2738 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2739 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2740 {
2741         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2742                inode->i_ino, inode->i_generation, inode, mask);
2743         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2744                 return lustre_check_remote_perm(inode, mask);
2745         
2746         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2747         return generic_permission(inode, mask, lustre_check_acl);
2748 }
2749 #else
2750 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
2751 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2752 #else
2753 int ll_inode_permission(struct inode *inode, int mask)
2754 #endif
2755 {
2756         int mode = inode->i_mode;
2757         int rc;
2758
2759         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2760                inode->i_ino, inode->i_generation, inode, mask);
2761
2762         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2763                 return lustre_check_remote_perm(inode, mask);
2764
2765         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2766
2767         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2768             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2769                 return -EROFS;
2770         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2771                 return -EACCES;
2772         if (current->fsuid == inode->i_uid) {
2773                 mode >>= 6;
2774         } else if (1) {
2775                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2776                         goto check_groups;
2777                 rc = lustre_check_acl(inode, mask);
2778                 if (rc == -EAGAIN)
2779                         goto check_groups;
2780                 if (rc == -EACCES)
2781                         goto check_capabilities;
2782                 return rc;
2783         } else {
2784 check_groups:
2785                 if (in_group_p(inode->i_gid))
2786                         mode >>= 3;
2787         }
2788         if ((mode & mask & S_IRWXO) == mask)
2789                 return 0;
2790
2791 check_capabilities:
2792         if (!(mask & MAY_EXEC) ||
2793             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2794                 if (capable(CAP_DAC_OVERRIDE))
2795                         return 0;
2796
2797         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2798             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2799                 return 0;
2800         
2801         return -EACCES;
2802 }
2803 #endif
2804
2805 /* -o localflock - only provides locally consistent flock locks */
2806 struct file_operations ll_file_operations = {
2807         .read           = ll_file_read,
2808         .write          = ll_file_write,
2809         .ioctl          = ll_file_ioctl,
2810         .open           = ll_file_open,
2811         .release        = ll_file_release,
2812         .mmap           = ll_file_mmap,
2813         .llseek         = ll_file_seek,
2814 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2815         .sendfile       = ll_file_sendfile,
2816 #endif
2817         .fsync          = ll_fsync,
2818 };
2819
2820 struct file_operations ll_file_operations_flock = {
2821         .read           = ll_file_read,
2822         .write          = ll_file_write,
2823         .ioctl          = ll_file_ioctl,
2824         .open           = ll_file_open,
2825         .release        = ll_file_release,
2826         .mmap           = ll_file_mmap,
2827         .llseek         = ll_file_seek,
2828 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2829         .sendfile       = ll_file_sendfile,
2830 #endif
2831         .fsync          = ll_fsync,
2832 #ifdef HAVE_F_OP_FLOCK
2833         .flock          = ll_file_flock,
2834 #endif
2835         .lock           = ll_file_flock
2836 };
2837
2838 /* These are for -o noflock - to return ENOSYS on flock calls */
2839 struct file_operations ll_file_operations_noflock = {
2840         .read           = ll_file_read,
2841         .write          = ll_file_write,
2842         .ioctl          = ll_file_ioctl,
2843         .open           = ll_file_open,
2844         .release        = ll_file_release,
2845         .mmap           = ll_file_mmap,
2846         .llseek         = ll_file_seek,
2847 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2848         .sendfile       = ll_file_sendfile,
2849 #endif
2850         .fsync          = ll_fsync,
2851 #ifdef HAVE_F_OP_FLOCK
2852         .flock          = ll_file_noflock,
2853 #endif
2854         .lock           = ll_file_noflock
2855 };
2856
2857 struct inode_operations ll_file_inode_operations = {
2858 #ifdef LUSTRE_KERNEL_VERSION
2859         .setattr_raw    = ll_setattr_raw,
2860 #endif
2861         .setattr        = ll_setattr,
2862         .truncate       = ll_truncate,
2863 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2864         .getattr        = ll_getattr,
2865 #else
2866         .revalidate_it  = ll_inode_revalidate_it,
2867 #endif
2868         .permission     = ll_inode_permission,
2869         .setxattr       = ll_setxattr,
2870         .getxattr       = ll_getxattr,
2871         .listxattr      = ll_listxattr,
2872         .removexattr    = ll_removexattr,
2873 };
2874