Whamcloud - gitweb
741bbb6201eb4c10129f40cbb08f1529afde4f6e
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
32 #include <linux/lustre_compat25.h>
33 #endif
34 #include "llite_internal.h"
35
36 /* also used by llite/special.c:ll_special_open() */
37 struct ll_file_data *ll_file_data_get(void)
38 {
39         struct ll_file_data *fd;
40
41         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
42         return fd;
43 }
44
45 static void ll_file_data_put(struct ll_file_data *fd)
46 {
47         if (fd != NULL)
48                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
49 }
50
51 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
52                           struct lustre_handle *fh)
53 {
54         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
55         op_data->op_attr.ia_mode = inode->i_mode;
56         op_data->op_attr.ia_atime = inode->i_atime;
57         op_data->op_attr.ia_mtime = inode->i_mtime;
58         op_data->op_attr.ia_ctime = inode->i_ctime;
59         op_data->op_attr.ia_size = i_size_read(inode);
60         op_data->op_attr_blocks = inode->i_blocks;
61         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
62         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
63         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
64         op_data->op_capa1 = ll_mdscapa_get(inode);
65 }
66
67 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
68                              struct obd_client_handle *och)
69 {
70         ENTRY;
71
72         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
73                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
74
75         if (!(och->och_flags & FMODE_WRITE))
76                 goto out;
77
78         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
79             !S_ISREG(inode->i_mode))
80                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
81         else
82                 ll_epoch_close(inode, op_data, &och, 0);
83
84 out:
85         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
86         EXIT;
87 }
88
89 static int ll_close_inode_openhandle(struct obd_export *md_exp,
90                                      struct inode *inode,
91                                      struct obd_client_handle *och)
92 {
93         struct obd_export *exp = ll_i2mdexp(inode);
94         struct md_op_data *op_data;
95         struct ptlrpc_request *req = NULL;
96         struct obd_device *obd = class_exp2obd(exp);
97         int epoch_close = 1;
98         int seq_end = 0, rc;
99         ENTRY;
100
101         if (obd == NULL) {
102                 /*
103                  * XXX: in case of LMV, is this correct to access
104                  * ->exp_handle?
105                  */
106                 CERROR("Invalid MDC connection handle "LPX64"\n",
107                        ll_i2mdexp(inode)->exp_handle.h_cookie);
108                 GOTO(out, rc = 0);
109         }
110
111         /*
112          * here we check if this is forced umount. If so this is called on
113          * canceling "open lock" and we do not call md_close() in this case, as
114          * it will not be successful, as import is already deactivated.
115          */
116         if (obd->obd_force)
117                 GOTO(out, rc = 0);
118
119         OBD_ALLOC_PTR(op_data);
120         if (op_data == NULL)
121                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
122
123         ll_prepare_close(inode, op_data, och);
124         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
125         rc = md_close(md_exp, op_data, och->och_mod, &req);
126         if (rc != -EAGAIN)
127                 seq_end = 1;
128
129         if (rc == -EAGAIN) {
130                 /* This close must have the epoch closed. */
131                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
132                 LASSERT(epoch_close);
133                 /* MDS has instructed us to obtain Size-on-MDS attribute from
134                  * OSTs and send setattr to back to MDS. */
135                 rc = ll_sizeonmds_update(inode, och->och_mod,
136                                          &och->och_fh, op_data->op_ioepoch);
137                 if (rc) {
138                         CERROR("inode %lu mdc Size-on-MDS update failed: "
139                                "rc = %d\n", inode->i_ino, rc);
140                         rc = 0;
141                 }
142         } else if (rc) {
143                 CERROR("inode %lu mdc close failed: rc = %d\n",
144                        inode->i_ino, rc);
145         }
146         ll_finish_md_op_data(op_data);
147
148         if (rc == 0) {
149                 rc = ll_objects_destroy(req, inode);
150                 if (rc)
151                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
152                                inode->i_ino, rc);
153         }
154
155         EXIT;
156 out:
157       
158         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
159             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
160                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
161         } else {
162                 if (seq_end)
163                         ptlrpc_close_replay_seq(req);
164                 md_clear_open_replay_data(md_exp, och);
165                 /* Free @och if it is not waiting for DONE_WRITING. */
166                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
167                 OBD_FREE_PTR(och);
168         }
169         if (req) /* This is close request */
170                 ptlrpc_req_finished(req);
171         return rc;
172 }
173
174 int ll_md_real_close(struct inode *inode, int flags)
175 {
176         struct ll_inode_info *lli = ll_i2info(inode);
177         struct obd_client_handle **och_p;
178         struct obd_client_handle *och;
179         __u64 *och_usecount;
180         int rc = 0;
181         ENTRY;
182
183         if (flags & FMODE_WRITE) {
184                 och_p = &lli->lli_mds_write_och;
185                 och_usecount = &lli->lli_open_fd_write_count;
186         } else if (flags & FMODE_EXEC) {
187                 och_p = &lli->lli_mds_exec_och;
188                 och_usecount = &lli->lli_open_fd_exec_count;
189         } else {
190                 LASSERT(flags & FMODE_READ);
191                 och_p = &lli->lli_mds_read_och;
192                 och_usecount = &lli->lli_open_fd_read_count;
193         }
194
195         down(&lli->lli_och_sem);
196         if (*och_usecount) { /* There are still users of this handle, so
197                                 skip freeing it. */
198                 up(&lli->lli_och_sem);
199                 RETURN(0);
200         }
201         och=*och_p;
202         *och_p = NULL;
203         up(&lli->lli_och_sem);
204
205         if (och) { /* There might be a race and somebody have freed this och
206                       already */
207                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
208                                                inode, och);
209         }
210
211         RETURN(rc);
212 }
213
214 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
215                 struct file *file)
216 {
217         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
218         struct ll_inode_info *lli = ll_i2info(inode);
219         int rc = 0;
220         ENTRY;
221
222         /* clear group lock, if present */
223         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
224                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
225                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
226                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
227                                       &fd->fd_cwlockh);
228         }
229
230         /* Let's see if we have good enough OPEN lock on the file and if
231            we can skip talking to MDS */
232         if (file->f_dentry->d_inode) { /* Can this ever be false? */
233                 int lockmode;
234                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
235                 struct lustre_handle lockh;
236                 struct inode *inode = file->f_dentry->d_inode;
237                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
238
239                 down(&lli->lli_och_sem);
240                 if (fd->fd_omode & FMODE_WRITE) {
241                         lockmode = LCK_CW;
242                         LASSERT(lli->lli_open_fd_write_count);
243                         lli->lli_open_fd_write_count--;
244                 } else if (fd->fd_omode & FMODE_EXEC) {
245                         lockmode = LCK_PR;
246                         LASSERT(lli->lli_open_fd_exec_count);
247                         lli->lli_open_fd_exec_count--;
248                 } else {
249                         lockmode = LCK_CR;
250                         LASSERT(lli->lli_open_fd_read_count);
251                         lli->lli_open_fd_read_count--;
252                 }
253                 up(&lli->lli_och_sem);
254
255                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
256                                    LDLM_IBITS, &policy, lockmode,
257                                    &lockh)) {
258                         rc = ll_md_real_close(file->f_dentry->d_inode,
259                                               fd->fd_omode);
260                 }
261         } else {
262                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
263                        file, file->f_dentry, file->f_dentry->d_name.name);
264         }
265
266         LUSTRE_FPRIVATE(file) = NULL;
267         ll_file_data_put(fd);
268         ll_capa_close(inode);
269
270         RETURN(rc);
271 }
272
273 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
274
275 /* While this returns an error code, fput() the caller does not, so we need
276  * to make every effort to clean up all of our state here.  Also, applications
277  * rarely check close errors and even if an error is returned they will not
278  * re-try the close call.
279  */
280 int ll_file_release(struct inode *inode, struct file *file)
281 {
282         struct ll_file_data *fd;
283         struct ll_sb_info *sbi = ll_i2sbi(inode);
284         struct ll_inode_info *lli = ll_i2info(inode);
285         struct lov_stripe_md *lsm = lli->lli_smd;
286         int rc;
287
288         ENTRY;
289         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
290                inode->i_generation, inode);
291
292         /* don't do anything for / */
293         if (inode->i_sb->s_root == file->f_dentry)
294                 RETURN(0);
295
296         ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
297         fd = LUSTRE_FPRIVATE(file);
298         LASSERT(fd != NULL);
299
300         /* don't do anything for / */
301         if (inode->i_sb->s_root == file->f_dentry) {
302                 LUSTRE_FPRIVATE(file) = NULL;
303                 ll_file_data_put(fd);
304                 RETURN(0);
305         }
306         
307         if (lsm)
308                 lov_test_and_clear_async_rc(lsm);
309         lli->lli_async_rc = 0;
310
311         rc = ll_md_close(sbi->ll_md_exp, inode, file);
312         RETURN(rc);
313 }
314
315 static int ll_intent_file_open(struct file *file, void *lmm,
316                                int lmmsize, struct lookup_intent *itp)
317 {
318         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
319         struct dentry *parent = file->f_dentry->d_parent;
320         const char *name = file->f_dentry->d_name.name;
321         const int len = file->f_dentry->d_name.len;
322         struct md_op_data *op_data;
323         struct ptlrpc_request *req;
324         int rc;
325
326         if (!parent)
327                 RETURN(-ENOENT);
328
329         /* Usually we come here only for NFSD, and we want open lock.
330            But we can also get here with pre 2.6.15 patchless kernels, and in
331            that case that lock is also ok */
332         /* We can also get here if there was cached open handle in revalidate_it
333          * but it disappeared while we were getting from there to ll_file_open.
334          * But this means this file was closed and immediatelly opened which
335          * makes a good candidate for using OPEN lock */
336         /* If lmmsize & lmm are not 0, we are just setting stripe info
337          * parameters. No need for the open lock */
338         if (!lmm && !lmmsize)
339                 itp->it_flags |= MDS_OPEN_LOCK;
340
341         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
342                                       file->f_dentry->d_inode, name, len,
343                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
344         if (IS_ERR(op_data))
345                 RETURN(PTR_ERR(op_data));
346
347         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
348                             0 /*unused */, &req, ll_md_blocking_ast, 0);
349         ll_finish_md_op_data(op_data);
350         if (rc == -ESTALE) {
351                 /* reason for keep own exit path - don`t flood log
352                 * with messages with -ESTALE errors.
353                 */
354                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
355                      it_open_error(DISP_OPEN_OPEN, itp))
356                         GOTO(out, rc);
357                 ll_release_openhandle(file->f_dentry, itp);
358                 GOTO(out_stale, rc);
359         }
360
361         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
362                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
363                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
364                 GOTO(out, rc);
365         }
366
367         if (itp->d.lustre.it_lock_mode)
368                 md_set_lock_data(sbi->ll_md_exp,
369                                  &itp->d.lustre.it_lock_handle, 
370                                  file->f_dentry->d_inode);
371
372         rc = ll_prep_inode(&file->f_dentry->d_inode, req, DLM_REPLY_REC_OFF,
373                            NULL);
374 out:
375         ptlrpc_req_finished(itp->d.lustre.it_data);
376
377 out_stale:
378         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
379         ll_intent_drop_lock(itp);
380
381         RETURN(rc);
382 }
383
384 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
385                        struct lookup_intent *it, struct obd_client_handle *och)
386 {
387         struct ptlrpc_request *req = it->d.lustre.it_data;
388         struct mdt_body *body;
389
390         LASSERT(och);
391
392         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
393         LASSERT(body != NULL);                      /* reply already checked out */
394         LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* and swabbed in md_enqueue */
395
396         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
397         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
398         och->och_fid = lli->lli_fid;
399         och->och_flags = it->it_flags;
400         lli->lli_ioepoch = body->ioepoch;
401
402         return md_set_open_replay_data(md_exp, och, req);
403 }
404
405 int ll_local_open(struct file *file, struct lookup_intent *it,
406                   struct ll_file_data *fd, struct obd_client_handle *och)
407 {
408         struct inode *inode = file->f_dentry->d_inode;
409         struct ll_inode_info *lli = ll_i2info(inode);
410         ENTRY;
411
412         LASSERT(!LUSTRE_FPRIVATE(file));
413
414         LASSERT(fd != NULL);
415
416         if (och) {
417                 struct ptlrpc_request *req = it->d.lustre.it_data;
418                 struct mdt_body *body;
419                 int rc;
420
421                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
422                 if (rc)
423                         RETURN(rc);
424
425                 body = lustre_msg_buf(req->rq_repmsg,
426                                       DLM_REPLY_REC_OFF, sizeof(*body));
427
428                 if ((it->it_flags & FMODE_WRITE) &&
429                     (body->valid & OBD_MD_FLSIZE))
430                 {
431                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
432                                lli->lli_ioepoch, PFID(&lli->lli_fid));
433                 }
434         }
435
436         LUSTRE_FPRIVATE(file) = fd;
437         ll_readahead_init(inode, &fd->fd_ras);
438         fd->fd_omode = it->it_flags;
439         RETURN(0);
440 }
441
442 /* Open a file, and (for the very first open) create objects on the OSTs at
443  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
444  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
445  * lli_open_sem to ensure no other process will create objects, send the
446  * stripe MD to the MDS, or try to destroy the objects if that fails.
447  *
448  * If we already have the stripe MD locally then we don't request it in
449  * md_open(), by passing a lmm_size = 0.
450  *
451  * It is up to the application to ensure no other processes open this file
452  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
453  * used.  We might be able to avoid races of that sort by getting lli_open_sem
454  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
455  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
456  */
457 int ll_file_open(struct inode *inode, struct file *file)
458 {
459         struct ll_inode_info *lli = ll_i2info(inode);
460         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
461                                           .it_flags = file->f_flags };
462         struct lov_stripe_md *lsm;
463         struct ptlrpc_request *req = NULL;
464         struct obd_client_handle **och_p;
465         __u64 *och_usecount;
466         struct ll_file_data *fd;
467         int rc = 0;
468         ENTRY;
469
470         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
471                inode->i_generation, inode, file->f_flags);
472
473         /* don't do anything for / */
474         if (inode->i_sb->s_root == file->f_dentry)
475                 RETURN(0);
476
477 #ifdef LUSTRE_KERNEL_VERSION
478         it = file->f_it;
479 #else
480         it = file->private_data; /* XXX: compat macro */
481         file->private_data = NULL; /* prevent ll_local_open assertion */
482 #endif
483
484         fd = ll_file_data_get();
485         if (fd == NULL)
486                 RETURN(-ENOMEM);
487
488         /* don't do anything for / */
489         if (inode->i_sb->s_root == file->f_dentry) {
490                 LUSTRE_FPRIVATE(file) = fd;
491                 RETURN(0);
492         }
493
494         if (!it || !it->d.lustre.it_disposition) {
495                 /* Convert f_flags into access mode. We cannot use file->f_mode,
496                  * because everything but O_ACCMODE mask was stripped from
497                  * there */
498                 if ((oit.it_flags + 1) & O_ACCMODE)
499                         oit.it_flags++;
500                 if (file->f_flags & O_TRUNC)
501                         oit.it_flags |= FMODE_WRITE;
502
503                 /* kernel only call f_op->open in dentry_open.  filp_open calls
504                  * dentry_open after call to open_namei that checks permissions.
505                  * Only nfsd_open call dentry_open directly without checking
506                  * permissions and because of that this code below is safe. */
507                 if (oit.it_flags & FMODE_WRITE)
508                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
509
510                 /* We do not want O_EXCL here, presumably we opened the file
511                  * already? XXX - NFS implications? */
512                 oit.it_flags &= ~O_EXCL;
513
514                 it = &oit;
515         }
516
517         /* Let's see if we have file open on MDS already. */
518         if (it->it_flags & FMODE_WRITE) {
519                 och_p = &lli->lli_mds_write_och;
520                 och_usecount = &lli->lli_open_fd_write_count;
521         } else if (it->it_flags & FMODE_EXEC) {
522                 och_p = &lli->lli_mds_exec_och;
523                 och_usecount = &lli->lli_open_fd_exec_count;
524          } else {
525                 och_p = &lli->lli_mds_read_och;
526                 och_usecount = &lli->lli_open_fd_read_count;
527         }
528         
529         down(&lli->lli_och_sem);
530         if (*och_p) { /* Open handle is present */
531                 if (it_disposition(it, DISP_OPEN_OPEN)) {
532                         /* Well, there's extra open request that we do not need,
533                            let's close it somehow. This will decref request. */
534                         rc = it_open_error(DISP_OPEN_OPEN, it);
535                         if (rc) {
536                                 ll_file_data_put(fd);
537                                 GOTO(out_och_free, rc);
538                         }       
539                         ll_release_openhandle(file->f_dentry, it);
540                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
541                                              LPROC_LL_OPEN);
542                 }
543                 (*och_usecount)++;
544
545                 rc = ll_local_open(file, it, fd, NULL);
546                 if (rc) {
547                         up(&lli->lli_och_sem);
548                         ll_file_data_put(fd);
549                         RETURN(rc);
550                 }
551         } else {
552                 LASSERT(*och_usecount == 0);
553                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
554                 if (!*och_p) {
555                         ll_file_data_put(fd);
556                         GOTO(out_och_free, rc = -ENOMEM);
557                 }
558                 (*och_usecount)++;
559                 if (!it->d.lustre.it_disposition) {
560                         it->it_flags |= O_CHECK_STALE;
561                         rc = ll_intent_file_open(file, NULL, 0, it);
562                         it->it_flags &= ~O_CHECK_STALE;
563                         if (rc) {
564                                 ll_file_data_put(fd);
565                                 GOTO(out_och_free, rc);
566                         }
567
568                         /* Got some error? Release the request */
569                         if (it->d.lustre.it_status < 0) {
570                                 req = it->d.lustre.it_data;
571                                 ptlrpc_req_finished(req);
572                         }
573                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
574                                          &it->d.lustre.it_lock_handle,
575                                          file->f_dentry->d_inode);
576                 }
577                 req = it->d.lustre.it_data;
578
579                 /* md_intent_lock() didn't get a request ref if there was an
580                  * open error, so don't do cleanup on the request here
581                  * (bug 3430) */
582                 /* XXX (green): Should not we bail out on any error here, not
583                  * just open error? */
584                 rc = it_open_error(DISP_OPEN_OPEN, it);
585                 if (rc) {
586                         ll_file_data_put(fd);
587                         GOTO(out_och_free, rc);
588                 }
589
590                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
591                 rc = ll_local_open(file, it, fd, *och_p);
592                 if (rc) {
593                         up(&lli->lli_och_sem);
594                         ll_file_data_put(fd);
595                         GOTO(out_och_free, rc);
596                 }
597         }
598         up(&lli->lli_och_sem);
599
600         /* Must do this outside lli_och_sem lock to prevent deadlock where
601            different kind of OPEN lock for this same inode gets cancelled
602            by ldlm_cancel_lru */
603         if (!S_ISREG(inode->i_mode))
604                 GOTO(out, rc);
605
606         ll_capa_open(inode);
607
608         lsm = lli->lli_smd;
609         if (lsm == NULL) {
610                 if (file->f_flags & O_LOV_DELAY_CREATE ||
611                     !(file->f_mode & FMODE_WRITE)) {
612                         CDEBUG(D_INODE, "object creation was delayed\n");
613                         GOTO(out, rc);
614                 }
615         }
616         file->f_flags &= ~O_LOV_DELAY_CREATE;
617         GOTO(out, rc);
618 out:
619         ptlrpc_req_finished(req);
620         if (req)
621                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
622 out_och_free:
623         if (rc) {
624                 if (*och_p) {
625                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
626                         *och_p = NULL; /* OBD_FREE writes some magic there */
627                         (*och_usecount)--;
628                 }
629                 up(&lli->lli_och_sem);
630         }
631
632         return rc;
633 }
634
635 /* Fills the obdo with the attributes for the inode defined by lsm */
636 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
637 {
638         struct ptlrpc_request_set *set;
639         struct ll_inode_info *lli = ll_i2info(inode);
640         struct lov_stripe_md *lsm = lli->lli_smd;
641
642         struct obd_info oinfo = { { { 0 } } };
643         int rc;
644         ENTRY;
645
646         LASSERT(lsm != NULL);
647
648         oinfo.oi_md = lsm;
649         oinfo.oi_oa = obdo;
650         oinfo.oi_oa->o_id = lsm->lsm_object_id;
651         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
652         oinfo.oi_oa->o_mode = S_IFREG;
653         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
654                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
655                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
656                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
657                                OBD_MD_FLGROUP;
658         oinfo.oi_capa = ll_mdscapa_get(inode);
659
660         set = ptlrpc_prep_set();
661         if (set == NULL) {
662                 CERROR("can't allocate ptlrpc set\n");
663                 rc = -ENOMEM;
664         } else {
665                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
666                 if (rc == 0)
667                         rc = ptlrpc_set_wait(set);
668                 ptlrpc_set_destroy(set);
669         }
670         capa_put(oinfo.oi_capa);
671         if (rc)
672                 RETURN(rc);
673
674         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
675                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
676                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
677
678         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
679         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
680                lli->lli_smd->lsm_object_id, i_size_read(inode),
681                inode->i_blocks, inode->i_blksize);
682         RETURN(0);
683 }
684
685 static inline void ll_remove_suid(struct inode *inode)
686 {
687         unsigned int mode;
688
689         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
690         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
691
692         /* was any of the uid bits set? */
693         mode &= inode->i_mode;
694         if (mode && !capable(CAP_FSETID)) {
695                 inode->i_mode &= ~mode;
696                 // XXX careful here - we cannot change the size
697         }
698 }
699
700 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
701 {
702         struct ll_inode_info *lli = ll_i2info(inode);
703         struct lov_stripe_md *lsm = lli->lli_smd;
704         struct obd_export *exp = ll_i2dtexp(inode);
705         struct {
706                 char name[16];
707                 struct ldlm_lock *lock;
708                 struct lov_stripe_md *lsm;
709         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
710         __u32 stripe, vallen = sizeof(stripe);
711         int rc;
712         ENTRY;
713
714         if (lsm->lsm_stripe_count == 1)
715                 GOTO(check, stripe = 0);
716
717         /* get our offset in the lov */
718         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
719         if (rc != 0) {
720                 CERROR("obd_get_info: rc = %d\n", rc);
721                 RETURN(rc);
722         }
723         LASSERT(stripe < lsm->lsm_stripe_count);
724
725 check:
726         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
727             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
728                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
729                            lsm->lsm_oinfo[stripe]->loi_id,
730                            lsm->lsm_oinfo[stripe]->loi_gr);
731                 RETURN(-ELDLM_NO_LOCK_DATA);
732         }
733
734         RETURN(stripe);
735 }
736
737 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
738  * we get a lock cancellation for each stripe, so we have to map the obd's
739  * region back onto the stripes in the file that it held.
740  *
741  * No one can dirty the extent until we've finished our work and they can
742  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
743  * but other kernel actors could have pages locked.
744  *
745  * Called with the DLM lock held. */
746 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
747                               struct ldlm_lock *lock, __u32 stripe)
748 {
749         ldlm_policy_data_t tmpex;
750         unsigned long start, end, count, skip, i, j;
751         struct page *page;
752         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
753         struct lustre_handle lockh;
754         ENTRY;
755
756         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
757         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
758                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
759                i_size_read(inode));
760
761         /* our locks are page granular thanks to osc_enqueue, we invalidate the
762          * whole page. */
763         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
764             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
765                 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
766                            CFS_PAGE_SIZE);
767         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
768         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
769
770         count = ~0;
771         skip = 0;
772         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
773         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
774         if (lsm->lsm_stripe_count > 1) {
775                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
776                 skip = (lsm->lsm_stripe_count - 1) * count;
777                 start += start/count * skip + stripe * count;
778                 if (end != ~0)
779                         end += end/count * skip + stripe * count;
780         }
781         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
782                 end = ~0;
783
784         i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
785             CFS_PAGE_SHIFT : 0;
786         if (i < end)
787                 end = i;
788
789         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
790                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
791                count, skip, end, discard ? " (DISCARDING)" : "");
792
793         /* walk through the vmas on the inode and tear down mmaped pages that
794          * intersect with the lock.  this stops immediately if there are no
795          * mmap()ed regions of the file.  This is not efficient at all and
796          * should be short lived. We'll associate mmap()ed pages with the lock
797          * and will be able to find them directly */
798         for (i = start; i <= end; i += (j + skip)) {
799                 j = min(count - (i % count), end - i + 1);
800                 LASSERT(j > 0);
801                 LASSERT(inode->i_mapping);
802                 if (ll_teardown_mmaps(inode->i_mapping,
803                                       (__u64)i << CFS_PAGE_SHIFT,
804                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
805                         break;
806         }
807
808         /* this is the simplistic implementation of page eviction at
809          * cancelation.  It is careful to get races with other page
810          * lockers handled correctly.  fixes from bug 20 will make it
811          * more efficient by associating locks with pages and with
812          * batching writeback under the lock explicitly. */
813         for (i = start, j = start % count; i <= end;
814              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
815                 if (j == count) {
816                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
817                         i += skip;
818                         j = 0;
819                         if (i > end)
820                                 break;
821                 }
822                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
823                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
824                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
825                          start, i, end);
826
827                 if (!mapping_has_pages(inode->i_mapping)) {
828                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
829                         break;
830                 }
831
832                 cond_resched();
833
834                 page = find_get_page(inode->i_mapping, i);
835                 if (page == NULL)
836                         continue;
837                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
838                                i, tmpex.l_extent.start);
839                 lock_page(page);
840
841                 /* page->mapping to check with racing against teardown */
842                 if (!discard && clear_page_dirty_for_io(page)) {
843                         rc = ll_call_writepage(inode, page);
844                         if (rc != 0)
845                                 CERROR("writepage inode %lu(%p) of page %p "
846                                        "failed: %d\n", inode->i_ino, inode,
847                                        page, rc);
848                         /* either waiting for io to complete or reacquiring
849                          * the lock that the failed writepage released */
850                         lock_page(page);
851                 }
852
853                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
854                 /* check to see if another DLM lock covers this page b=2765 */
855                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
856                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
857                                       LDLM_FL_TEST_LOCK,
858                                       &lock->l_resource->lr_name, LDLM_EXTENT,
859                                       &tmpex, LCK_PR | LCK_PW, &lockh);
860
861                 if (rc2 <= 0 && page->mapping != NULL) {
862                         struct ll_async_page *llap = llap_cast_private(page);
863                         /* checking again to account for writeback's
864                          * lock_page() */
865                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
866                         if (llap)
867                                 ll_ra_accounting(llap, inode->i_mapping);
868                         ll_truncate_complete_page(page);
869                 }
870                 unlock_page(page);
871                 page_cache_release(page);
872         }
873         LASSERTF(tmpex.l_extent.start <=
874                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
875                   lock->l_policy_data.l_extent.end + 1),
876                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
877                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
878                  start, i, end);
879         EXIT;
880 }
881
882 static int ll_extent_lock_callback(struct ldlm_lock *lock,
883                                    struct ldlm_lock_desc *new, void *data,
884                                    int flag)
885 {
886         struct lustre_handle lockh = { 0 };
887         int rc;
888         ENTRY;
889
890         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
891                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
892                 LBUG();
893         }
894
895         switch (flag) {
896         case LDLM_CB_BLOCKING:
897                 ldlm_lock2handle(lock, &lockh);
898                 rc = ldlm_cli_cancel(&lockh);
899                 if (rc != ELDLM_OK)
900                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
901                 break;
902         case LDLM_CB_CANCELING: {
903                 struct inode *inode;
904                 struct ll_inode_info *lli;
905                 struct lov_stripe_md *lsm;
906                 int stripe;
907                 __u64 kms;
908
909                 /* This lock wasn't granted, don't try to evict pages */
910                 if (lock->l_req_mode != lock->l_granted_mode)
911                         RETURN(0);
912
913                 inode = ll_inode_from_lock(lock);
914                 if (inode == NULL)
915                         RETURN(0);
916                 lli = ll_i2info(inode);
917                 if (lli == NULL)
918                         goto iput;
919                 if (lli->lli_smd == NULL)
920                         goto iput;
921                 lsm = lli->lli_smd;
922
923                 stripe = ll_lock_to_stripe_offset(inode, lock);
924                 if (stripe < 0)
925                         goto iput;
926
927                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
928
929                 lov_stripe_lock(lsm);
930                 lock_res_and_lock(lock);
931                 kms = ldlm_extent_shift_kms(lock,
932                                             lsm->lsm_oinfo[stripe]->loi_kms);
933
934                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
935                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
936                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
937                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
938                 unlock_res_and_lock(lock);
939                 lov_stripe_unlock(lsm);
940         iput:
941                 iput(inode);
942                 break;
943         }
944         default:
945                 LBUG();
946         }
947
948         RETURN(0);
949 }
950
951 #if 0
952 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
953 {
954         /* XXX ALLOCATE - 160 bytes */
955         struct inode *inode = ll_inode_from_lock(lock);
956         struct ll_inode_info *lli = ll_i2info(inode);
957         struct lustre_handle lockh = { 0 };
958         struct ost_lvb *lvb;
959         int stripe;
960         ENTRY;
961
962         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
963                      LDLM_FL_BLOCK_CONV)) {
964                 LBUG(); /* not expecting any blocked async locks yet */
965                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
966                            "lock, returning");
967                 ldlm_lock_dump(D_OTHER, lock, 0);
968                 ldlm_reprocess_all(lock->l_resource);
969                 RETURN(0);
970         }
971
972         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
973
974         stripe = ll_lock_to_stripe_offset(inode, lock);
975         if (stripe < 0)
976                 goto iput;
977
978         if (lock->l_lvb_len) {
979                 struct lov_stripe_md *lsm = lli->lli_smd;
980                 __u64 kms;
981                 lvb = lock->l_lvb_data;
982                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
983
984                 lock_res_and_lock(lock);
985                 ll_inode_size_lock(inode, 1);
986                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
987                 kms = ldlm_extent_shift_kms(NULL, kms);
988                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
989                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
990                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
991                 lsm->lsm_oinfo[stripe].loi_kms = kms;
992                 ll_inode_size_unlock(inode, 1);
993                 unlock_res_and_lock(lock);
994         }
995
996 iput:
997         iput(inode);
998         wake_up(&lock->l_waitq);
999
1000         ldlm_lock2handle(lock, &lockh);
1001         ldlm_lock_decref(&lockh, LCK_PR);
1002         RETURN(0);
1003 }
1004 #endif
1005
1006 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
1007 {
1008         struct ptlrpc_request *req = reqp;
1009         struct inode *inode = ll_inode_from_lock(lock);
1010         struct ll_inode_info *lli;
1011         struct lov_stripe_md *lsm;
1012         struct ost_lvb *lvb;
1013         int rc, stripe;
1014         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
1015         ENTRY;
1016
1017         if (inode == NULL)
1018                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
1019         lli = ll_i2info(inode);
1020         if (lli == NULL)
1021                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1022         lsm = lli->lli_smd;
1023         if (lsm == NULL)
1024                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1025
1026         /* First, find out which stripe index this lock corresponds to. */
1027         stripe = ll_lock_to_stripe_offset(inode, lock);
1028         if (stripe < 0)
1029                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1030
1031         rc = lustre_pack_reply(req, 2, size, NULL);
1032         if (rc) {
1033                 CERROR("lustre_pack_reply: %d\n", rc);
1034                 GOTO(iput, rc);
1035         }
1036
1037         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
1038         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1039         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1040         lvb->lvb_atime = LTIME_S(inode->i_atime);
1041         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1042
1043         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1044                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1045                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1046                    lvb->lvb_atime, lvb->lvb_ctime);
1047  iput:
1048         iput(inode);
1049
1050  out:
1051         /* These errors are normal races, so we don't want to fill the console
1052          * with messages by calling ptlrpc_error() */
1053         if (rc == -ELDLM_NO_LOCK_DATA)
1054                 lustre_pack_reply(req, 1, NULL, NULL);
1055
1056         req->rq_status = rc;
1057         return rc;
1058 }
1059
1060 static void ll_merge_lvb(struct inode *inode)
1061 {
1062         struct ll_inode_info *lli = ll_i2info(inode);
1063         struct ll_sb_info *sbi = ll_i2sbi(inode);
1064         struct ost_lvb lvb;
1065         ENTRY;
1066
1067         ll_inode_size_lock(inode, 1);
1068         inode_init_lvb(inode, &lvb);
1069         obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1070         i_size_write(inode, lvb.lvb_size);
1071         inode->i_blocks = lvb.lvb_blocks;
1072         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1073         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1074         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1075         ll_inode_size_unlock(inode, 1);
1076         EXIT;
1077 }
1078
1079 int ll_local_size(struct inode *inode)
1080 {
1081         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1082         struct ll_inode_info *lli = ll_i2info(inode);
1083         struct ll_sb_info *sbi = ll_i2sbi(inode);
1084         struct lustre_handle lockh = { 0 };
1085         int flags = 0;
1086         int rc;
1087         ENTRY;
1088
1089         if (lli->lli_smd->lsm_stripe_count == 0)
1090                 RETURN(0);
1091
1092         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1093                        &policy, LCK_PR | LCK_PW, &flags, inode, &lockh);
1094         if (rc < 0)
1095                 RETURN(rc);
1096         else if (rc == 0)
1097                 RETURN(-ENODATA);
1098
1099         ll_merge_lvb(inode);
1100         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR | LCK_PW, &lockh);
1101         RETURN(0);
1102 }
1103
1104 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1105                      lstat_t *st)
1106 {
1107         struct lustre_handle lockh = { 0 };
1108         struct ldlm_enqueue_info einfo = { 0 };
1109         struct obd_info oinfo = { { { 0 } } };
1110         struct ost_lvb lvb;
1111         int rc;
1112
1113         ENTRY;
1114
1115         einfo.ei_type = LDLM_EXTENT;
1116         einfo.ei_mode = LCK_PR;
1117         einfo.ei_cb_bl = ll_extent_lock_callback;
1118         einfo.ei_cb_cp = ldlm_completion_ast;
1119         einfo.ei_cb_gl = ll_glimpse_callback;
1120         einfo.ei_cbdata = NULL;
1121
1122         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1123         oinfo.oi_lockh = &lockh;
1124         oinfo.oi_md = lsm;
1125         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1126
1127         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1128         if (rc == -ENOENT)
1129                 RETURN(rc);
1130         if (rc != 0) {
1131                 CERROR("obd_enqueue returned rc %d, "
1132                        "returning -EIO\n", rc);
1133                 RETURN(rc > 0 ? -EIO : rc);
1134         }
1135
1136         lov_stripe_lock(lsm);
1137         memset(&lvb, 0, sizeof(lvb));
1138         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1139         st->st_size = lvb.lvb_size;
1140         st->st_blocks = lvb.lvb_blocks;
1141         st->st_mtime = lvb.lvb_mtime;
1142         st->st_atime = lvb.lvb_atime;
1143         st->st_ctime = lvb.lvb_ctime;
1144         lov_stripe_unlock(lsm);
1145
1146         RETURN(rc);
1147 }
1148
1149 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1150  * file (because it prefers KMS over RSS when larger) */
1151 int ll_glimpse_size(struct inode *inode, int ast_flags)
1152 {
1153         struct ll_inode_info *lli = ll_i2info(inode);
1154         struct ll_sb_info *sbi = ll_i2sbi(inode);
1155         struct lustre_handle lockh = { 0 };
1156         struct ldlm_enqueue_info einfo = { 0 };
1157         struct obd_info oinfo = { { { 0 } } };
1158         int rc;
1159         ENTRY;
1160
1161         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1162                 RETURN(0);
1163
1164         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1165
1166         if (!lli->lli_smd) {
1167                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1168                 RETURN(0);
1169         }
1170
1171         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1172          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1173          *       won't revoke any conflicting DLM locks held. Instead,
1174          *       ll_glimpse_callback() will be called on each client
1175          *       holding a DLM lock against this file, and resulting size
1176          *       will be returned for each stripe. DLM lock on [0, EOF] is
1177          *       acquired only if there were no conflicting locks. */
1178         einfo.ei_type = LDLM_EXTENT;
1179         einfo.ei_mode = LCK_PR;
1180         einfo.ei_cb_bl = ll_extent_lock_callback;
1181         einfo.ei_cb_cp = ldlm_completion_ast;
1182         einfo.ei_cb_gl = ll_glimpse_callback;
1183         einfo.ei_cbdata = inode;
1184
1185         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1186         oinfo.oi_lockh = &lockh;
1187         oinfo.oi_md = lli->lli_smd;
1188         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1189
1190         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1191         if (rc == -ENOENT)
1192                 RETURN(rc);
1193         if (rc != 0) {
1194                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1195                 RETURN(rc > 0 ? -EIO : rc);
1196         }
1197
1198         ll_merge_lvb(inode);
1199
1200         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1201                i_size_read(inode), inode->i_blocks);
1202
1203         RETURN(rc);
1204 }
1205
1206 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1207                    struct lov_stripe_md *lsm, int mode,
1208                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1209                    int ast_flags)
1210 {
1211         struct ll_sb_info *sbi = ll_i2sbi(inode);
1212         struct ost_lvb lvb;
1213         struct ldlm_enqueue_info einfo = { 0 };
1214         struct obd_info oinfo = { { { 0 } } };
1215         int rc;
1216         ENTRY;
1217
1218         LASSERT(!lustre_handle_is_used(lockh));
1219         LASSERT(lsm != NULL);
1220
1221         /* don't drop the mmapped file to LRU */
1222         if (mapping_mapped(inode->i_mapping))
1223                 ast_flags |= LDLM_FL_NO_LRU;
1224
1225         /* XXX phil: can we do this?  won't it screw the file size up? */
1226         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1227             (sbi->ll_flags & LL_SBI_NOLCK))
1228                 RETURN(0);
1229
1230         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1231                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1232
1233         einfo.ei_type = LDLM_EXTENT;
1234         einfo.ei_mode = mode;
1235         einfo.ei_cb_bl = ll_extent_lock_callback;
1236         einfo.ei_cb_cp = ldlm_completion_ast;
1237         einfo.ei_cb_gl = ll_glimpse_callback;
1238         einfo.ei_cbdata = inode;
1239
1240         oinfo.oi_policy = *policy;
1241         oinfo.oi_lockh = lockh;
1242         oinfo.oi_md = lsm;
1243         oinfo.oi_flags = ast_flags;
1244
1245         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1246         *policy = oinfo.oi_policy;
1247         if (rc > 0)
1248                 rc = -EIO;
1249
1250         ll_inode_size_lock(inode, 1);
1251         inode_init_lvb(inode, &lvb);
1252         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1253
1254         if (policy->l_extent.start == 0 &&
1255             policy->l_extent.end == OBD_OBJECT_EOF) {
1256                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1257                  * the kms under both a DLM lock and the
1258                  * ll_inode_size_lock().  If we don't get the
1259                  * ll_inode_size_lock() here we can match the DLM lock and
1260                  * reset i_size from the kms before the truncating path has
1261                  * updated the kms.  generic_file_write can then trust the
1262                  * stale i_size when doing appending writes and effectively
1263                  * cancel the result of the truncate.  Getting the
1264                  * ll_inode_size_lock() after the enqueue maintains the DLM
1265                  * -> ll_inode_size_lock() acquiring order. */
1266                 i_size_write(inode, lvb.lvb_size);
1267                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1268                        inode->i_ino, i_size_read(inode));
1269         }
1270
1271         if (rc == 0) {
1272                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1273                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1274                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1275         }
1276         ll_inode_size_unlock(inode, 1);
1277
1278         RETURN(rc);
1279 }
1280
1281 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1282                      struct lov_stripe_md *lsm, int mode,
1283                      struct lustre_handle *lockh)
1284 {
1285         struct ll_sb_info *sbi = ll_i2sbi(inode);
1286         int rc;
1287         ENTRY;
1288
1289         /* XXX phil: can we do this?  won't it screw the file size up? */
1290         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1291             (sbi->ll_flags & LL_SBI_NOLCK))
1292                 RETURN(0);
1293
1294         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1295
1296         RETURN(rc);
1297 }
1298
1299 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1300                             loff_t *ppos)
1301 {
1302         struct inode *inode = file->f_dentry->d_inode;
1303         struct ll_inode_info *lli = ll_i2info(inode);
1304         struct lov_stripe_md *lsm = lli->lli_smd;
1305         struct ll_sb_info *sbi = ll_i2sbi(inode);
1306         struct ll_lock_tree tree;
1307         struct ll_lock_tree_node *node;
1308         struct ost_lvb lvb;
1309         struct ll_ra_read bead;
1310         int rc, ra = 0;
1311         loff_t end;
1312         ssize_t retval, chunk, sum = 0;
1313
1314         __u64 kms;
1315         ENTRY;
1316         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1317                inode->i_ino, inode->i_generation, inode, count, *ppos);
1318         /* "If nbyte is 0, read() will return 0 and have no other results."
1319          *                      -- Single Unix Spec */
1320         if (count == 0)
1321                 RETURN(0);
1322
1323         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1324
1325         if (!lsm) {
1326                 /* Read on file with no objects should return zero-filled
1327                  * buffers up to file size (we can get non-zero sizes with
1328                  * mknod + truncate, then opening file for read. This is a
1329                  * common pattern in NFS case, it seems). Bug 6243 */
1330                 int notzeroed;
1331                 /* Since there are no objects on OSTs, we have nothing to get
1332                  * lock on and so we are forced to access inode->i_size
1333                  * unguarded */
1334
1335                 /* Read beyond end of file */
1336                 if (*ppos >= i_size_read(inode))
1337                         RETURN(0);
1338
1339                 if (count > i_size_read(inode) - *ppos)
1340                         count = i_size_read(inode) - *ppos;
1341                 /* Make sure to correctly adjust the file pos pointer for
1342                  * EFAULT case */
1343                 notzeroed = clear_user(buf, count);
1344                 count -= notzeroed;
1345                 *ppos += count;
1346                 if (!count)
1347                         RETURN(-EFAULT);
1348                 RETURN(count);
1349         }
1350
1351 repeat:
1352         if (sbi->ll_max_rw_chunk != 0) {
1353                 /* first, let's know the end of the current stripe */
1354                 end = *ppos;
1355                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
1356                                 (obd_off *)&end);
1357
1358                 /* correct, the end is beyond the request */
1359                 if (end > *ppos + count - 1)
1360                         end = *ppos + count - 1;
1361
1362                 /* and chunk shouldn't be too large even if striping is wide */
1363                 if (end - *ppos > sbi->ll_max_rw_chunk)
1364                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1365         } else {
1366                 end = *ppos + count - 1;
1367         }
1368
1369         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1370         if (IS_ERR(node)){
1371                 GOTO(out, retval = PTR_ERR(node));
1372         }
1373
1374         tree.lt_fd = LUSTRE_FPRIVATE(file);
1375         rc = ll_tree_lock(&tree, node, buf, count,
1376                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1377         if (rc != 0)
1378                 GOTO(out, retval = rc);
1379
1380         ll_inode_size_lock(inode, 1);
1381         /*
1382          * Consistency guarantees: following possibilities exist for the
1383          * relation between region being read and real file size at this
1384          * moment:
1385          *
1386          *  (A): the region is completely inside of the file;
1387          *
1388          *  (B-x): x bytes of region are inside of the file, the rest is
1389          *  outside;
1390          *
1391          *  (C): the region is completely outside of the file.
1392          *
1393          * This classification is stable under DLM lock acquired by
1394          * ll_tree_lock() above, because to change class, other client has to
1395          * take DLM lock conflicting with our lock. Also, any updates to
1396          * ->i_size by other threads on this client are serialized by
1397          * ll_inode_size_lock(). This guarantees that short reads are handled
1398          * correctly in the face of concurrent writes and truncates.
1399          */
1400         inode_init_lvb(inode, &lvb);
1401         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1402         kms = lvb.lvb_size;
1403         if (*ppos + count - 1 > kms) {
1404                 /* A glimpse is necessary to determine whether we return a
1405                  * short read (B) or some zeroes at the end of the buffer (C) */
1406                 ll_inode_size_unlock(inode, 1);
1407                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1408                 if (retval) {
1409                         ll_tree_unlock(&tree);
1410                         goto out;
1411                 }
1412         } else {
1413                 /* region is within kms and, hence, within real file size (A).
1414                  * We need to increase i_size to cover the read region so that
1415                  * generic_file_read() will do its job, but that doesn't mean
1416                  * the kms size is _correct_, it is only the _minimum_ size.
1417                  * If someone does a stat they will get the correct size which
1418                  * will always be >= the kms value here.  b=11081 */
1419                 if (i_size_read(inode) < kms)
1420                         i_size_write(inode, kms);
1421                 ll_inode_size_unlock(inode, 1);
1422         }
1423
1424         chunk = end - *ppos + 1;
1425         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1426                inode->i_ino, chunk, *ppos, i_size_read(inode));
1427
1428         /* turn off the kernel's read-ahead */
1429 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1430         file->f_ramax = 0;
1431 #else
1432         file->f_ra.ra_pages = 0;
1433 #endif
1434         /* initialize read-ahead window once per syscall */
1435         if (ra == 0) {
1436                 ra = 1;
1437                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1438                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1439                 ll_ra_read_in(file, &bead);
1440         }
1441
1442         /* BUG: 5972 */
1443         file_accessed(file);
1444         retval = generic_file_read(file, buf, chunk, ppos);
1445         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1446
1447         ll_tree_unlock(&tree);
1448
1449         if (retval > 0) {
1450                 buf += retval;
1451                 count -= retval;
1452                 sum += retval;
1453                 if (retval == chunk && count > 0)
1454                         goto repeat;
1455         }
1456
1457  out:
1458         if (ra != 0)
1459                 ll_ra_read_ex(file, &bead);
1460         retval = (sum > 0) ? sum : retval;
1461         RETURN(retval);
1462 }
1463
1464 /*
1465  * Write to a file (through the page cache).
1466  */
1467 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1468                              loff_t *ppos)
1469 {
1470         struct inode *inode = file->f_dentry->d_inode;
1471         struct ll_sb_info *sbi = ll_i2sbi(inode);
1472         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1473         struct ll_lock_tree tree;
1474         struct ll_lock_tree_node *node;
1475         loff_t maxbytes = ll_file_maxbytes(inode);
1476         loff_t lock_start, lock_end, end;
1477         ssize_t retval, chunk, sum = 0;
1478         int rc;
1479         ENTRY;
1480
1481         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1482                inode->i_ino, inode->i_generation, inode, count, *ppos);
1483
1484         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1485
1486         /* POSIX, but surprised the VFS doesn't check this already */
1487         if (count == 0)
1488                 RETURN(0);
1489
1490         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1491          * called on the file, don't fail the below assertion (bug 2388). */
1492         if (file->f_flags & O_LOV_DELAY_CREATE &&
1493             ll_i2info(inode)->lli_smd == NULL)
1494                 RETURN(-EBADF);
1495
1496         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1497
1498         down(&ll_i2info(inode)->lli_write_sem);
1499
1500 repeat:
1501         chunk = 0; /* just to fix gcc's warning */
1502         end = *ppos + count - 1;
1503
1504         if (file->f_flags & O_APPEND) {
1505                 lock_start = 0;
1506                 lock_end = OBD_OBJECT_EOF;
1507         } else if (sbi->ll_max_rw_chunk != 0) {
1508                 /* first, let's know the end of the current stripe */
1509                 end = *ppos;
1510                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1511                                 (obd_off *)&end);
1512
1513                 /* correct, the end is beyond the request */
1514                 if (end > *ppos + count - 1)
1515                         end = *ppos + count - 1;
1516
1517                 /* and chunk shouldn't be too large even if striping is wide */
1518                 if (end - *ppos > sbi->ll_max_rw_chunk)
1519                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1520                 lock_start = *ppos;
1521                 lock_end = end;
1522         } else {
1523                 lock_start = *ppos;
1524                 lock_end = *ppos + count - 1;
1525         }
1526         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1527
1528         if (IS_ERR(node))
1529                 GOTO(out, retval = PTR_ERR(node));
1530
1531         tree.lt_fd = LUSTRE_FPRIVATE(file);
1532         rc = ll_tree_lock(&tree, node, buf, count,
1533                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1534         if (rc != 0)
1535                 GOTO(out, retval = rc);
1536
1537         /* This is ok, g_f_w will overwrite this under i_sem if it races
1538          * with a local truncate, it just makes our maxbyte checking easier.
1539          * The i_size value gets updated in ll_extent_lock() as a consequence
1540          * of the [0,EOF] extent lock we requested above. */
1541         if (file->f_flags & O_APPEND) {
1542                 *ppos = i_size_read(inode);
1543                 end = *ppos + count - 1;
1544         }
1545
1546         if (*ppos >= maxbytes) {
1547                 send_sig(SIGXFSZ, current, 0);
1548                 GOTO(out_unlock, retval = -EFBIG);
1549         }
1550         if (*ppos + count > maxbytes)
1551                 count = maxbytes - *ppos;
1552
1553         /* generic_file_write handles O_APPEND after getting i_mutex */
1554         chunk = end - *ppos + 1;
1555         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1556                inode->i_ino, chunk, *ppos);
1557         retval = generic_file_write(file, buf, chunk, ppos);
1558         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1559
1560 out_unlock:
1561         ll_tree_unlock(&tree);
1562
1563 out:
1564         if (retval > 0) {
1565                 buf += retval;
1566                 count -= retval;
1567                 sum += retval;
1568                 if (retval == chunk && count > 0)
1569                         goto repeat;
1570         }
1571
1572         up(&ll_i2info(inode)->lli_write_sem);
1573
1574         retval = (sum > 0) ? sum : retval;
1575         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1576                            retval > 0 ? retval : 0);
1577         RETURN(retval);
1578 }
1579
1580 /*
1581  * Send file content (through pagecache) somewhere with helper
1582  */
1583 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1584 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1585                                 read_actor_t actor, void *target)
1586 {
1587         struct inode *inode = in_file->f_dentry->d_inode;
1588         struct ll_inode_info *lli = ll_i2info(inode);
1589         struct lov_stripe_md *lsm = lli->lli_smd;
1590         struct ll_lock_tree tree;
1591         struct ll_lock_tree_node *node;
1592         struct ost_lvb lvb;
1593         struct ll_ra_read bead;
1594         int rc;
1595         ssize_t retval;
1596         __u64 kms;
1597         ENTRY;
1598         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1599                inode->i_ino, inode->i_generation, inode, count, *ppos);
1600
1601         /* "If nbyte is 0, read() will return 0 and have no other results."
1602          *                      -- Single Unix Spec */
1603         if (count == 0)
1604                 RETURN(0);
1605
1606         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1607         /* turn off the kernel's read-ahead */
1608         in_file->f_ra.ra_pages = 0;
1609
1610         /* File with no objects, nothing to lock */
1611         if (!lsm)
1612                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1613
1614         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1615         if (IS_ERR(node))
1616                 RETURN(PTR_ERR(node));
1617
1618         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1619         rc = ll_tree_lock(&tree, node, NULL, count,
1620                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1621         if (rc != 0)
1622                 RETURN(rc);
1623
1624         ll_inode_size_lock(inode, 1);
1625         /*
1626          * Consistency guarantees: following possibilities exist for the
1627          * relation between region being read and real file size at this
1628          * moment:
1629          *
1630          *  (A): the region is completely inside of the file;
1631          *
1632          *  (B-x): x bytes of region are inside of the file, the rest is
1633          *  outside;
1634          *
1635          *  (C): the region is completely outside of the file.
1636          *
1637          * This classification is stable under DLM lock acquired by
1638          * ll_tree_lock() above, because to change class, other client has to
1639          * take DLM lock conflicting with our lock. Also, any updates to
1640          * ->i_size by other threads on this client are serialized by
1641          * ll_inode_size_lock(). This guarantees that short reads are handled
1642          * correctly in the face of concurrent writes and truncates.
1643          */
1644         inode_init_lvb(inode, &lvb);
1645         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1646         kms = lvb.lvb_size;
1647         if (*ppos + count - 1 > kms) {
1648                 /* A glimpse is necessary to determine whether we return a
1649                  * short read (B) or some zeroes at the end of the buffer (C) */
1650                 ll_inode_size_unlock(inode, 1);
1651                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1652                 if (retval)
1653                         goto out;
1654         } else {
1655                 /* region is within kms and, hence, within real file size (A) */
1656                 i_size_write(inode, kms);
1657                 ll_inode_size_unlock(inode, 1);
1658         }
1659
1660         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1661                inode->i_ino, count, *ppos, i_size_read(inode));
1662
1663         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1664         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1665         ll_ra_read_in(in_file, &bead);
1666         /* BUG: 5972 */
1667         file_accessed(in_file);
1668         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1669         ll_ra_read_ex(in_file, &bead);
1670
1671  out:
1672         ll_tree_unlock(&tree);
1673         RETURN(retval);
1674 }
1675 #endif
1676
1677 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1678                                unsigned long arg)
1679 {
1680         struct ll_inode_info *lli = ll_i2info(inode);
1681         struct obd_export *exp = ll_i2dtexp(inode);
1682         struct ll_recreate_obj ucreatp;
1683         struct obd_trans_info oti = { 0 };
1684         struct obdo *oa = NULL;
1685         int lsm_size;
1686         int rc = 0;
1687         struct lov_stripe_md *lsm, *lsm2;
1688         ENTRY;
1689
1690         if (!capable (CAP_SYS_ADMIN))
1691                 RETURN(-EPERM);
1692
1693         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1694                             sizeof(struct ll_recreate_obj));
1695         if (rc) {
1696                 RETURN(-EFAULT);
1697         }
1698         OBDO_ALLOC(oa);
1699         if (oa == NULL)
1700                 RETURN(-ENOMEM);
1701
1702         down(&lli->lli_size_sem);
1703         lsm = lli->lli_smd;
1704         if (lsm == NULL)
1705                 GOTO(out, rc = -ENOENT);
1706         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1707                    (lsm->lsm_stripe_count));
1708
1709         OBD_ALLOC(lsm2, lsm_size);
1710         if (lsm2 == NULL)
1711                 GOTO(out, rc = -ENOMEM);
1712
1713         oa->o_id = ucreatp.lrc_id;
1714         oa->o_gr = ucreatp.lrc_group;
1715         oa->o_nlink = ucreatp.lrc_ost_idx;
1716         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1717         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1718         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1719                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1720
1721         oti.oti_objid = NULL;
1722         memcpy(lsm2, lsm, lsm_size);
1723         rc = obd_create(exp, oa, &lsm2, &oti);
1724
1725         OBD_FREE(lsm2, lsm_size);
1726         GOTO(out, rc);
1727 out:
1728         up(&lli->lli_size_sem);
1729         OBDO_FREE(oa);
1730         return rc;
1731 }
1732
1733 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1734                              int flags, struct lov_user_md *lum, int lum_size)
1735 {
1736         struct ll_inode_info *lli = ll_i2info(inode);
1737         struct lov_stripe_md *lsm;
1738         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1739         int rc = 0;
1740         ENTRY;
1741
1742         down(&lli->lli_size_sem);
1743         lsm = lli->lli_smd;
1744         if (lsm) {
1745                 up(&lli->lli_size_sem);
1746                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1747                        inode->i_ino);
1748                 RETURN(-EEXIST);
1749         }
1750
1751         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1752         if (rc)
1753                 GOTO(out, rc);
1754         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1755                 GOTO(out_req_free, rc = -ENOENT);
1756         rc = oit.d.lustre.it_status;
1757         if (rc < 0)
1758                 GOTO(out_req_free, rc);
1759
1760         ll_release_openhandle(file->f_dentry, &oit);
1761
1762  out:
1763         up(&lli->lli_size_sem);
1764         ll_intent_release(&oit);
1765         RETURN(rc);
1766 out_req_free:
1767         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1768         goto out;
1769 }
1770
1771 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1772                              struct lov_mds_md **lmmp, int *lmm_size, 
1773                              struct ptlrpc_request **request)
1774 {
1775         struct ll_sb_info *sbi = ll_i2sbi(inode);
1776         struct mdt_body  *body;
1777         struct lov_mds_md *lmm = NULL;
1778         struct ptlrpc_request *req = NULL;
1779         struct obd_capa *oc;
1780         int rc, lmmsize;
1781
1782         rc = ll_get_max_mdsize(sbi, &lmmsize);
1783         if (rc)
1784                 RETURN(rc);
1785
1786         oc = ll_mdscapa_get(inode);
1787         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1788                              oc, filename, strlen(filename) + 1,
1789                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize, &req);
1790         capa_put(oc);
1791         if (rc < 0) {
1792                 CDEBUG(D_INFO, "md_getattr_name failed "
1793                        "on %s: rc %d\n", filename, rc);
1794                 GOTO(out, rc);
1795         }
1796
1797         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
1798         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1799         /* swabbed by mdc_getattr_name */
1800         LASSERT_REPSWABBED(req, REPLY_REC_OFF);
1801
1802         lmmsize = body->eadatasize;
1803
1804         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1805                         lmmsize == 0) {
1806                 GOTO(out, rc = -ENODATA);
1807         }
1808
1809         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, lmmsize);
1810         LASSERT(lmm != NULL);
1811         LASSERT_REPSWABBED(req, REPLY_REC_OFF + 1);
1812
1813         /*
1814          * This is coming from the MDS, so is probably in
1815          * little endian.  We convert it to host endian before
1816          * passing it to userspace.
1817          */
1818         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1819                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1820                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1821         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1822                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1823         }
1824
1825         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1826                 struct lov_stripe_md *lsm;
1827                 struct lov_user_md_join *lmj;
1828                 int lmj_size, i, aindex = 0;
1829
1830                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1831                 if (rc < 0)
1832                         GOTO(out, rc = -ENOMEM);
1833                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1834                 if (rc)
1835                         GOTO(out_free_memmd, rc);
1836
1837                 lmj_size = sizeof(struct lov_user_md_join) +
1838                            lsm->lsm_stripe_count *
1839                            sizeof(struct lov_user_ost_data_join);
1840                 OBD_ALLOC(lmj, lmj_size);
1841                 if (!lmj)
1842                         GOTO(out_free_memmd, rc = -ENOMEM);
1843
1844                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1845                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1846                         struct lov_extent *lex =
1847                                 &lsm->lsm_array->lai_ext_array[aindex];
1848
1849                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1850                                 aindex ++;
1851                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1852                                         LPU64" len %d\n", aindex, i,
1853                                         lex->le_start, (int)lex->le_len);
1854                         lmj->lmm_objects[i].l_extent_start =
1855                                 lex->le_start;
1856
1857                         if ((int)lex->le_len == -1)
1858                                 lmj->lmm_objects[i].l_extent_end = -1;
1859                         else
1860                                 lmj->lmm_objects[i].l_extent_end =
1861                                         lex->le_start + lex->le_len;
1862                         lmj->lmm_objects[i].l_object_id =
1863                                 lsm->lsm_oinfo[i]->loi_id;
1864                         lmj->lmm_objects[i].l_object_gr =
1865                                 lsm->lsm_oinfo[i]->loi_gr;
1866                         lmj->lmm_objects[i].l_ost_gen =
1867                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1868                         lmj->lmm_objects[i].l_ost_idx =
1869                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1870                 }
1871                 lmm = (struct lov_mds_md *)lmj;
1872                 lmmsize = lmj_size;
1873 out_free_memmd:
1874                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
1875         }
1876 out:
1877         *lmmp = lmm;
1878         *lmm_size = lmmsize;
1879         *request = req;
1880         return rc;
1881 }
1882
1883 static int ll_lov_setea(struct inode *inode, struct file *file,
1884                             unsigned long arg)
1885 {
1886         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1887         struct lov_user_md  *lump;
1888         int lum_size = sizeof(struct lov_user_md) +
1889                        sizeof(struct lov_user_ost_data);
1890         int rc;
1891         ENTRY;
1892
1893         if (!capable (CAP_SYS_ADMIN))
1894                 RETURN(-EPERM);
1895
1896         OBD_ALLOC(lump, lum_size);
1897         if (lump == NULL) {
1898                 RETURN(-ENOMEM);
1899         }
1900         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1901         if (rc) {
1902                 OBD_FREE(lump, lum_size);
1903                 RETURN(-EFAULT);
1904         }
1905
1906         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1907
1908         OBD_FREE(lump, lum_size);
1909         RETURN(rc);
1910 }
1911
1912 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1913                             unsigned long arg)
1914 {
1915         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1916         int rc;
1917         int flags = FMODE_WRITE;
1918         ENTRY;
1919
1920         /* Bug 1152: copy properly when this is no longer true */
1921         LASSERT(sizeof(lum) == sizeof(*lump));
1922         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1923         rc = copy_from_user(&lum, lump, sizeof(lum));
1924         if (rc)
1925                 RETURN(-EFAULT);
1926
1927         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1928         if (rc == 0) {
1929                  put_user(0, &lump->lmm_stripe_count);
1930                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1931                                     0, ll_i2info(inode)->lli_smd, lump);
1932         }
1933         RETURN(rc);
1934 }
1935
1936 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1937 {
1938         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1939
1940         if (!lsm)
1941                 RETURN(-ENODATA);
1942
1943         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1944                             (void *)arg);
1945 }
1946
1947 static int ll_get_grouplock(struct inode *inode, struct file *file,
1948                             unsigned long arg)
1949 {
1950         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1951         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1952                                                     .end = OBD_OBJECT_EOF}};
1953         struct lustre_handle lockh = { 0 };
1954         struct ll_inode_info *lli = ll_i2info(inode);
1955         struct lov_stripe_md *lsm = lli->lli_smd;
1956         int flags = 0, rc;
1957         ENTRY;
1958
1959         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1960                 RETURN(-EINVAL);
1961         }
1962
1963         policy.l_extent.gid = arg;
1964         if (file->f_flags & O_NONBLOCK)
1965                 flags = LDLM_FL_BLOCK_NOWAIT;
1966
1967         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1968         if (rc)
1969                 RETURN(rc);
1970
1971         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1972         fd->fd_gid = arg;
1973         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1974
1975         RETURN(0);
1976 }
1977
1978 static int ll_put_grouplock(struct inode *inode, struct file *file,
1979                             unsigned long arg)
1980 {
1981         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1982         struct ll_inode_info *lli = ll_i2info(inode);
1983         struct lov_stripe_md *lsm = lli->lli_smd;
1984         int rc;
1985         ENTRY;
1986
1987         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1988                 /* Ugh, it's already unlocked. */
1989                 RETURN(-EINVAL);
1990         }
1991
1992         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1993                 RETURN(-EINVAL);
1994
1995         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1996
1997         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1998         if (rc)
1999                 RETURN(rc);
2000
2001         fd->fd_gid = 0;
2002         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2003
2004         RETURN(0);
2005 }
2006
2007 static int join_sanity_check(struct inode *head, struct inode *tail)
2008 {
2009         ENTRY;
2010         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2011                 CERROR("server do not support join \n");
2012                 RETURN(-EINVAL);
2013         }
2014         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2015                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2016                        head->i_ino, tail->i_ino);
2017                 RETURN(-EINVAL);
2018         }
2019         if (head->i_ino == tail->i_ino) {
2020                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2021                 RETURN(-EINVAL);
2022         }
2023         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2024                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2025                 RETURN(-EINVAL);
2026         }
2027         RETURN(0);
2028 }
2029
2030 static int join_file(struct inode *head_inode, struct file *head_filp,
2031                      struct file *tail_filp)
2032 {
2033         struct dentry *tail_dentry = tail_filp->f_dentry;
2034         struct lookup_intent oit = {.it_op = IT_OPEN,
2035                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2036         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2037                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2038
2039         struct lustre_handle lockh;
2040         struct md_op_data *op_data;
2041         int    rc;
2042         loff_t data;
2043         ENTRY;
2044
2045         tail_dentry = tail_filp->f_dentry;
2046
2047         data = i_size_read(head_inode);
2048         op_data = ll_prep_md_op_data(NULL, head_inode,
2049                                      tail_dentry->d_parent->d_inode,
2050                                      tail_dentry->d_name.name,
2051                                      tail_dentry->d_name.len, 0,
2052                                      LUSTRE_OPC_ANY, &data);
2053         if (IS_ERR(op_data))
2054                 RETURN(PTR_ERR(op_data));
2055
2056         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2057                          op_data, &lockh, NULL, 0, 0);
2058
2059         ll_finish_md_op_data(op_data);
2060         if (rc < 0)
2061                 GOTO(out, rc);
2062
2063         rc = oit.d.lustre.it_status;
2064
2065         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2066                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2067                 ptlrpc_req_finished((struct ptlrpc_request *)
2068                                     oit.d.lustre.it_data);
2069                 GOTO(out, rc);
2070         }
2071
2072         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2073                                            * away */
2074                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2075                 oit.d.lustre.it_lock_mode = 0;
2076         }
2077         ll_release_openhandle(head_filp->f_dentry, &oit);
2078 out:
2079         ll_intent_release(&oit);
2080         RETURN(rc);
2081 }
2082
2083 static int ll_file_join(struct inode *head, struct file *filp,
2084                         char *filename_tail)
2085 {
2086         struct inode *tail = NULL, *first = NULL, *second = NULL;
2087         struct dentry *tail_dentry;
2088         struct file *tail_filp, *first_filp, *second_filp;
2089         struct ll_lock_tree first_tree, second_tree;
2090         struct ll_lock_tree_node *first_node, *second_node;
2091         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2092         int rc = 0, cleanup_phase = 0;
2093         ENTRY;
2094
2095         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2096                head->i_ino, head->i_generation, head, filename_tail);
2097
2098         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2099         if (IS_ERR(tail_filp)) {
2100                 CERROR("Can not open tail file %s", filename_tail);
2101                 rc = PTR_ERR(tail_filp);
2102                 GOTO(cleanup, rc);
2103         }
2104         tail = igrab(tail_filp->f_dentry->d_inode);
2105
2106         tlli = ll_i2info(tail);
2107         tail_dentry = tail_filp->f_dentry;
2108         LASSERT(tail_dentry);
2109         cleanup_phase = 1;
2110
2111         /*reorder the inode for lock sequence*/
2112         first = head->i_ino > tail->i_ino ? head : tail;
2113         second = head->i_ino > tail->i_ino ? tail : head;
2114         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2115         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2116
2117         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2118                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2119         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2120         if (IS_ERR(first_node)){
2121                 rc = PTR_ERR(first_node);
2122                 GOTO(cleanup, rc);
2123         }
2124         first_tree.lt_fd = first_filp->private_data;
2125         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2126         if (rc != 0)
2127                 GOTO(cleanup, rc);
2128         cleanup_phase = 2;
2129
2130         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2131         if (IS_ERR(second_node)){
2132                 rc = PTR_ERR(second_node);
2133                 GOTO(cleanup, rc);
2134         }
2135         second_tree.lt_fd = second_filp->private_data;
2136         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2137         if (rc != 0)
2138                 GOTO(cleanup, rc);
2139         cleanup_phase = 3;
2140
2141         rc = join_sanity_check(head, tail);
2142         if (rc)
2143                 GOTO(cleanup, rc);
2144
2145         rc = join_file(head, filp, tail_filp);
2146         if (rc)
2147                 GOTO(cleanup, rc);
2148 cleanup:
2149         switch (cleanup_phase) {
2150         case 3:
2151                 ll_tree_unlock(&second_tree);
2152                 obd_cancel_unused(ll_i2dtexp(second),
2153                                   ll_i2info(second)->lli_smd, 0, NULL);
2154         case 2:
2155                 ll_tree_unlock(&first_tree);
2156                 obd_cancel_unused(ll_i2dtexp(first),
2157                                   ll_i2info(first)->lli_smd, 0, NULL);
2158         case 1:
2159                 filp_close(tail_filp, 0);
2160                 if (tail)
2161                         iput(tail);
2162                 if (head && rc == 0) {
2163                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2164                                        &hlli->lli_smd);
2165                         hlli->lli_smd = NULL;
2166                 }
2167         case 0:
2168                 break;
2169         default:
2170                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2171                 LBUG();
2172         }
2173         RETURN(rc);
2174 }
2175
2176 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2177 {
2178         struct inode *inode = dentry->d_inode;
2179         struct obd_client_handle *och;
2180         int rc;
2181         ENTRY;
2182
2183         LASSERT(inode);
2184
2185         /* Root ? Do nothing. */
2186         if (dentry->d_inode->i_sb->s_root == dentry)
2187                 RETURN(0);
2188
2189         /* No open handle to close? Move away */
2190         if (!it_disposition(it, DISP_OPEN_OPEN))
2191                 RETURN(0);
2192
2193         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2194
2195         OBD_ALLOC(och, sizeof(*och));
2196         if (!och)
2197                 GOTO(out, rc = -ENOMEM);
2198
2199         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2200                     ll_i2info(inode), it, och);
2201
2202         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2203                                        inode, och);
2204  out:
2205         /* this one is in place of ll_file_open */
2206         ptlrpc_req_finished(it->d.lustre.it_data);
2207         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2208         RETURN(rc);
2209 }
2210
2211 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2212                   unsigned long arg)
2213 {
2214         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2215         int flags;
2216         ENTRY;
2217
2218         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2219                inode->i_generation, inode, cmd);
2220         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2221
2222         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2223         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2224                 RETURN(-ENOTTY);
2225
2226         switch(cmd) {
2227         case LL_IOC_GETFLAGS:
2228                 /* Get the current value of the file flags */
2229                 return put_user(fd->fd_flags, (int *)arg);
2230         case LL_IOC_SETFLAGS:
2231         case LL_IOC_CLRFLAGS:
2232                 /* Set or clear specific file flags */
2233                 /* XXX This probably needs checks to ensure the flags are
2234                  *     not abused, and to handle any flag side effects.
2235                  */
2236                 if (get_user(flags, (int *) arg))
2237                         RETURN(-EFAULT);
2238
2239                 if (cmd == LL_IOC_SETFLAGS) {
2240                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2241                             !(file->f_flags & O_DIRECT)) {
2242                                 CERROR("%s: unable to disable locking on "
2243                                        "non-O_DIRECT file\n", current->comm);
2244                                 RETURN(-EINVAL);
2245                         }
2246
2247                         fd->fd_flags |= flags;
2248                 } else {
2249                         fd->fd_flags &= ~flags;
2250                 }
2251                 RETURN(0);
2252         case LL_IOC_LOV_SETSTRIPE:
2253                 RETURN(ll_lov_setstripe(inode, file, arg));
2254         case LL_IOC_LOV_SETEA:
2255                 RETURN(ll_lov_setea(inode, file, arg));
2256         case LL_IOC_LOV_GETSTRIPE:
2257                 RETURN(ll_lov_getstripe(inode, arg));
2258         case LL_IOC_RECREATE_OBJ:
2259                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2260         case EXT3_IOC_GETFLAGS:
2261         case EXT3_IOC_SETFLAGS:
2262                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2263         case EXT3_IOC_GETVERSION_OLD:
2264         case EXT3_IOC_GETVERSION:
2265                 RETURN(put_user(inode->i_generation, (int *)arg));
2266         case LL_IOC_JOIN: {
2267                 char *ftail;
2268                 int rc;
2269
2270                 ftail = getname((const char *)arg);
2271                 if (IS_ERR(ftail))
2272                         RETURN(PTR_ERR(ftail));
2273                 rc = ll_file_join(inode, file, ftail);
2274                 putname(ftail);
2275                 RETURN(rc);
2276         }
2277         case LL_IOC_GROUP_LOCK:
2278                 RETURN(ll_get_grouplock(inode, file, arg));
2279         case LL_IOC_GROUP_UNLOCK:
2280                 RETURN(ll_put_grouplock(inode, file, arg));
2281         case IOC_OBD_STATFS:
2282                 RETURN(ll_obd_statfs(inode, (void *)arg));
2283
2284         /* We need to special case any other ioctls we want to handle,
2285          * to send them to the MDS/OST as appropriate and to properly
2286          * network encode the arg field.
2287         case EXT3_IOC_SETVERSION_OLD:
2288         case EXT3_IOC_SETVERSION:
2289         */
2290         case LL_IOC_FLUSHCTX:
2291                 RETURN(ll_flush_ctx(inode));
2292         case LL_IOC_GETFACL: {
2293                 struct rmtacl_ioctl_data ioc;
2294
2295                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2296                         RETURN(-EFAULT);
2297
2298                 RETURN(ll_ioctl_getfacl(inode, &ioc));
2299         }
2300         case LL_IOC_SETFACL: {
2301                 struct rmtacl_ioctl_data ioc;
2302
2303                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2304                         RETURN(-EFAULT);
2305
2306                 RETURN(ll_ioctl_setfacl(inode, &ioc));
2307         }
2308         default:
2309                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2310                                      (void *)arg));
2311         }
2312 }
2313
2314 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2315 {
2316         struct inode *inode = file->f_dentry->d_inode;
2317         struct ll_inode_info *lli = ll_i2info(inode);
2318         struct lov_stripe_md *lsm = lli->lli_smd;
2319         loff_t retval;
2320         ENTRY;
2321         retval = offset + ((origin == 2) ? i_size_read(inode) :
2322                            (origin == 1) ? file->f_pos : 0);
2323         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2324                inode->i_ino, inode->i_generation, inode, retval, retval,
2325                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2326         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2327
2328         if (origin == 2) { /* SEEK_END */
2329                 int nonblock = 0, rc;
2330
2331                 if (file->f_flags & O_NONBLOCK)
2332                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2333
2334                 if (lsm != NULL) {
2335                         rc = ll_glimpse_size(inode, nonblock);
2336                         if (rc != 0)
2337                                 RETURN(rc);
2338                 }
2339
2340                 ll_inode_size_lock(inode, 0);
2341                 offset += i_size_read(inode);
2342                 ll_inode_size_unlock(inode, 0);
2343         } else if (origin == 1) { /* SEEK_CUR */
2344                 offset += file->f_pos;
2345         }
2346
2347         retval = -EINVAL;
2348         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2349                 if (offset != file->f_pos) {
2350                         file->f_pos = offset;
2351 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2352                         file->f_reada = 0;
2353                         file->f_version = ++event;
2354 #endif
2355                 }
2356                 retval = offset;
2357         }
2358         
2359         RETURN(retval);
2360 }
2361
2362 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2363 {
2364         struct inode *inode = dentry->d_inode;
2365         struct ll_inode_info *lli = ll_i2info(inode);
2366         struct lov_stripe_md *lsm = lli->lli_smd;
2367         struct ptlrpc_request *req;
2368         struct obd_capa *oc;
2369         int rc, err;
2370         ENTRY;
2371         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2372                inode->i_generation, inode);
2373         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2374
2375         /* fsync's caller has already called _fdata{sync,write}, we want
2376          * that IO to finish before calling the osc and mdc sync methods */
2377         rc = filemap_fdatawait(inode->i_mapping);
2378
2379         /* catch async errors that were recorded back when async writeback
2380          * failed for pages in this mapping. */
2381         err = lli->lli_async_rc;
2382         lli->lli_async_rc = 0;
2383         if (rc == 0)
2384                 rc = err;
2385         if (lsm) {
2386                 err = lov_test_and_clear_async_rc(lsm);
2387                 if (rc == 0)
2388                         rc = err;
2389         }
2390
2391         oc = ll_mdscapa_get(inode);
2392         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2393                       &req);
2394         capa_put(oc);
2395         if (!rc)
2396                 rc = err;
2397         if (!err)
2398                 ptlrpc_req_finished(req);
2399
2400         if (data && lsm) {
2401                 struct obdo *oa;
2402                 
2403                 OBDO_ALLOC(oa);
2404                 if (!oa)
2405                         RETURN(rc ? rc : -ENOMEM);
2406
2407                 oa->o_id = lsm->lsm_object_id;
2408                 oa->o_gr = lsm->lsm_object_gr;
2409                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2410                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2411                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2412                                            OBD_MD_FLGROUP);
2413
2414                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2415                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2416                                0, OBD_OBJECT_EOF, oc);
2417                 capa_put(oc);
2418                 if (!rc)
2419                         rc = err;
2420                 OBDO_FREE(oa);
2421         }
2422
2423         RETURN(rc);
2424 }
2425
2426 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2427 {
2428         struct inode *inode = file->f_dentry->d_inode;
2429         struct ll_sb_info *sbi = ll_i2sbi(inode);
2430         struct ldlm_res_id res_id =
2431                 { .name = { fid_seq(ll_inode2fid(inode)),
2432                             fid_oid(ll_inode2fid(inode)),
2433                             fid_ver(ll_inode2fid(inode)),
2434                             LDLM_FLOCK} };
2435         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2436                 ldlm_flock_completion_ast, NULL, file_lock };
2437         struct lustre_handle lockh = {0};
2438         ldlm_policy_data_t flock;
2439         int flags = 0;
2440         int rc;
2441         ENTRY;
2442
2443         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2444                inode->i_ino, file_lock);
2445
2446         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2447  
2448         if (file_lock->fl_flags & FL_FLOCK) {
2449                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2450                 /* set missing params for flock() calls */
2451                 file_lock->fl_end = OFFSET_MAX;
2452                 file_lock->fl_pid = current->tgid;
2453         }
2454         flock.l_flock.pid = file_lock->fl_pid;
2455         flock.l_flock.start = file_lock->fl_start;
2456         flock.l_flock.end = file_lock->fl_end;
2457
2458         switch (file_lock->fl_type) {
2459         case F_RDLCK:
2460                 einfo.ei_mode = LCK_PR;
2461                 break;
2462         case F_UNLCK:
2463                 /* An unlock request may or may not have any relation to
2464                  * existing locks so we may not be able to pass a lock handle
2465                  * via a normal ldlm_lock_cancel() request. The request may even
2466                  * unlock a byte range in the middle of an existing lock. In
2467                  * order to process an unlock request we need all of the same
2468                  * information that is given with a normal read or write record
2469                  * lock request. To avoid creating another ldlm unlock (cancel)
2470                  * message we'll treat a LCK_NL flock request as an unlock. */
2471                 einfo.ei_mode = LCK_NL;
2472                 break;
2473         case F_WRLCK:
2474                 einfo.ei_mode = LCK_PW;
2475                 break;
2476         default:
2477                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2478                 LBUG();
2479         }
2480
2481         switch (cmd) {
2482         case F_SETLKW:
2483 #ifdef F_SETLKW64
2484         case F_SETLKW64:
2485 #endif
2486                 flags = 0;
2487                 break;
2488         case F_SETLK:
2489 #ifdef F_SETLK64
2490         case F_SETLK64:
2491 #endif
2492                 flags = LDLM_FL_BLOCK_NOWAIT;
2493                 break;
2494         case F_GETLK:
2495 #ifdef F_GETLK64
2496         case F_GETLK64:
2497 #endif
2498                 flags = LDLM_FL_TEST_LOCK;
2499                 /* Save the old mode so that if the mode in the lock changes we
2500                  * can decrement the appropriate reader or writer refcount. */
2501                 file_lock->fl_type = einfo.ei_mode;
2502                 break;
2503         default:
2504                 CERROR("unknown fcntl lock command: %d\n", cmd);
2505                 LBUG();
2506         }
2507
2508         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2509                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2510                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2511
2512         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2513                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2514         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2515                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2516 #ifdef HAVE_F_OP_FLOCK
2517         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2518             !(flags & LDLM_FL_TEST_LOCK))
2519                 posix_lock_file_wait(file, file_lock);
2520 #endif
2521
2522         RETURN(rc);
2523 }
2524
2525 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2526 {
2527         ENTRY;
2528
2529         RETURN(-ENOSYS);
2530 }
2531
2532 int ll_have_md_lock(struct inode *inode, __u64 bits)
2533 {
2534         struct lustre_handle lockh;
2535         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2536         struct lu_fid *fid;
2537         int flags;
2538         ENTRY;
2539
2540         if (!inode)
2541                RETURN(0);
2542
2543         fid = &ll_i2info(inode)->lli_fid;
2544         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2545
2546         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2547         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2548                           LCK_CR|LCK_CW|LCK_PR, &lockh)) {
2549                 RETURN(1);
2550         }
2551
2552         RETURN(0);
2553 }
2554
2555 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2556         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2557                               * and return success */
2558                 inode->i_nlink = 0;
2559                 /* This path cannot be hit for regular files unless in
2560                  * case of obscure races, so no need to to validate
2561                  * size. */
2562                 if (!S_ISREG(inode->i_mode) &&
2563                     !S_ISDIR(inode->i_mode))
2564                         return 0;
2565         }
2566
2567         if (rc) {
2568                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2569                 return -abs(rc);
2570
2571         }
2572
2573         return 0;
2574 }
2575
2576 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2577 {
2578         struct inode *inode = dentry->d_inode;
2579         struct ptlrpc_request *req = NULL;
2580         struct ll_sb_info *sbi;
2581         struct obd_export *exp;
2582         int rc;
2583         ENTRY;
2584
2585         if (!inode) {
2586                 CERROR("REPORT THIS LINE TO PETER\n");
2587                 RETURN(0);
2588         }
2589         sbi = ll_i2sbi(inode);
2590
2591         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2592                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2593 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
2594         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_REVALIDATE, 1);
2595 #endif
2596
2597         exp = ll_i2mdexp(inode);
2598
2599         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2600                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2601                 struct md_op_data *op_data;
2602
2603                 /* Call getattr by fid, so do not provide name at all. */
2604                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2605                                              dentry->d_inode, NULL, 0, 0,
2606                                              LUSTRE_OPC_ANY, NULL);
2607                 if (IS_ERR(op_data))
2608                         RETURN(PTR_ERR(op_data));
2609
2610                 oit.it_flags |= O_CHECK_STALE;
2611                 rc = md_intent_lock(exp, op_data, NULL, 0,
2612                                     /* we are not interested in name
2613                                        based lookup */
2614                                     &oit, 0, &req,
2615                                     ll_md_blocking_ast, 0);
2616                 ll_finish_md_op_data(op_data);
2617                 oit.it_flags &= ~O_CHECK_STALE;
2618                 if (rc < 0) {
2619                         rc = ll_inode_revalidate_fini(inode, rc);
2620                         GOTO (out, rc);
2621                 }
2622
2623                 rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2624                 if (rc != 0) {
2625                         ll_intent_release(&oit);
2626                         GOTO(out, rc);
2627                 }
2628
2629                 /* Unlinked? Unhash dentry, so it is not picked up later by
2630                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2631                    here to preserve get_cwd functionality on 2.6.
2632                    Bug 10503 */
2633                 if (!dentry->d_inode->i_nlink) {
2634                         spin_lock(&dcache_lock);
2635                         ll_drop_dentry(dentry);
2636                         spin_unlock(&dcache_lock);
2637                 }
2638
2639                 ll_lookup_finish_locks(&oit, dentry);
2640         } else if (!ll_have_md_lock(dentry->d_inode,
2641                                     MDS_INODELOCK_UPDATE)) {
2642                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2643                 obd_valid valid = OBD_MD_FLGETATTR;
2644                 struct obd_capa *oc;
2645                 int ealen = 0;
2646
2647                 if (S_ISREG(inode->i_mode)) {
2648                         rc = ll_get_max_mdsize(sbi, &ealen);
2649                         if (rc)
2650                                 RETURN(rc);
2651                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2652                 }
2653                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2654                  * capa for this inode. Because we only keep capas of dirs
2655                  * fresh. */
2656                 oc = ll_mdscapa_get(inode);
2657                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2658                                 ealen, &req);
2659                 capa_put(oc);
2660                 if (rc) {
2661                         rc = ll_inode_revalidate_fini(inode, rc);
2662                         RETURN(rc);
2663                 }
2664
2665                 rc = ll_prep_inode(&inode, req, REPLY_REC_OFF,
2666                                    NULL);
2667                 if (rc)
2668                         GOTO(out, rc);
2669         }
2670
2671         /* if object not yet allocated, don't validate size */
2672         if (ll_i2info(inode)->lli_smd == NULL)
2673                 GOTO(out, rc = 0);
2674
2675         /* ll_glimpse_size will prefer locally cached writes if they extend
2676          * the file */
2677         rc = ll_glimpse_size(inode, 0);
2678         EXIT;
2679 out:
2680         ptlrpc_req_finished(req);
2681         return rc;
2682 }
2683
2684 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2685 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2686                   struct lookup_intent *it, struct kstat *stat)
2687 {
2688         struct inode *inode = de->d_inode;
2689         int res = 0;
2690
2691         res = ll_inode_revalidate_it(de, it);
2692         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2693
2694         if (res)
2695                 return res;
2696
2697         stat->dev = inode->i_sb->s_dev;
2698         stat->ino = inode->i_ino;
2699         stat->mode = inode->i_mode;
2700         stat->nlink = inode->i_nlink;
2701         stat->uid = inode->i_uid;
2702         stat->gid = inode->i_gid;
2703         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2704         stat->atime = inode->i_atime;
2705         stat->mtime = inode->i_mtime;
2706         stat->ctime = inode->i_ctime;
2707 #ifdef HAVE_INODE_BLKSIZE
2708         stat->blksize = inode->i_blksize;
2709 #else
2710         stat->blksize = 1 << inode->i_blkbits;
2711 #endif
2712
2713         ll_inode_size_lock(inode, 0);
2714         stat->size = i_size_read(inode);
2715         stat->blocks = inode->i_blocks;
2716         ll_inode_size_unlock(inode, 0);
2717
2718         return 0;
2719 }
2720 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2721 {
2722         struct lookup_intent it = { .it_op = IT_GETATTR };
2723
2724         return ll_getattr_it(mnt, de, &it, stat);
2725 }
2726 #endif
2727
2728 static
2729 int lustre_check_acl(struct inode *inode, int mask)
2730 {
2731 #ifdef CONFIG_FS_POSIX_ACL
2732         struct ll_inode_info *lli = ll_i2info(inode);
2733         struct posix_acl *acl;
2734         int rc;
2735         ENTRY;
2736
2737         spin_lock(&lli->lli_lock);
2738         acl = posix_acl_dup(lli->lli_posix_acl);
2739         spin_unlock(&lli->lli_lock);
2740
2741         if (!acl)
2742                 RETURN(-EAGAIN);
2743
2744         rc = posix_acl_permission(inode, acl, mask);
2745         posix_acl_release(acl);
2746
2747         RETURN(rc);
2748 #else
2749         return -EAGAIN;
2750 #endif
2751 }
2752
2753 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2754 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2755 {
2756         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2757                inode->i_ino, inode->i_generation, inode, mask);
2758         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2759                 return lustre_check_remote_perm(inode, mask);
2760         
2761         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2762         return generic_permission(inode, mask, lustre_check_acl);
2763 }
2764 #else
2765 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
2766 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2767 #else
2768 int ll_inode_permission(struct inode *inode, int mask)
2769 #endif
2770 {
2771         int mode = inode->i_mode;
2772         int rc;
2773
2774         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2775                inode->i_ino, inode->i_generation, inode, mask);
2776
2777         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2778                 return lustre_check_remote_perm(inode, mask);
2779
2780         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2781
2782         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2783             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2784                 return -EROFS;
2785         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2786                 return -EACCES;
2787         if (current->fsuid == inode->i_uid) {
2788                 mode >>= 6;
2789         } else if (1) {
2790                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2791                         goto check_groups;
2792                 rc = lustre_check_acl(inode, mask);
2793                 if (rc == -EAGAIN)
2794                         goto check_groups;
2795                 if (rc == -EACCES)
2796                         goto check_capabilities;
2797                 return rc;
2798         } else {
2799 check_groups:
2800                 if (in_group_p(inode->i_gid))
2801                         mode >>= 3;
2802         }
2803         if ((mode & mask & S_IRWXO) == mask)
2804                 return 0;
2805
2806 check_capabilities:
2807         if (!(mask & MAY_EXEC) ||
2808             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2809                 if (capable(CAP_DAC_OVERRIDE))
2810                         return 0;
2811
2812         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2813             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2814                 return 0;
2815         
2816         return -EACCES;
2817 }
2818 #endif
2819
2820 /* -o localflock - only provides locally consistent flock locks */
2821 struct file_operations ll_file_operations = {
2822         .read           = ll_file_read,
2823         .write          = ll_file_write,
2824         .ioctl          = ll_file_ioctl,
2825         .open           = ll_file_open,
2826         .release        = ll_file_release,
2827         .mmap           = ll_file_mmap,
2828         .llseek         = ll_file_seek,
2829 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2830         .sendfile       = ll_file_sendfile,
2831 #endif
2832         .fsync          = ll_fsync,
2833 };
2834
2835 struct file_operations ll_file_operations_flock = {
2836         .read           = ll_file_read,
2837         .write          = ll_file_write,
2838         .ioctl          = ll_file_ioctl,
2839         .open           = ll_file_open,
2840         .release        = ll_file_release,
2841         .mmap           = ll_file_mmap,
2842         .llseek         = ll_file_seek,
2843 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2844         .sendfile       = ll_file_sendfile,
2845 #endif
2846         .fsync          = ll_fsync,
2847 #ifdef HAVE_F_OP_FLOCK
2848         .flock          = ll_file_flock,
2849 #endif
2850         .lock           = ll_file_flock
2851 };
2852
2853 /* These are for -o noflock - to return ENOSYS on flock calls */
2854 struct file_operations ll_file_operations_noflock = {
2855         .read           = ll_file_read,
2856         .write          = ll_file_write,
2857         .ioctl          = ll_file_ioctl,
2858         .open           = ll_file_open,
2859         .release        = ll_file_release,
2860         .mmap           = ll_file_mmap,
2861         .llseek         = ll_file_seek,
2862 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2863         .sendfile       = ll_file_sendfile,
2864 #endif
2865         .fsync          = ll_fsync,
2866 #ifdef HAVE_F_OP_FLOCK
2867         .flock          = ll_file_noflock,
2868 #endif
2869         .lock           = ll_file_noflock
2870 };
2871
2872 struct inode_operations ll_file_inode_operations = {
2873 #ifdef LUSTRE_KERNEL_VERSION
2874         .setattr_raw    = ll_setattr_raw,
2875 #endif
2876         .setattr        = ll_setattr,
2877         .truncate       = ll_truncate,
2878 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2879         .getattr        = ll_getattr,
2880 #else
2881         .revalidate_it  = ll_inode_revalidate_it,
2882 #endif
2883         .permission     = ll_inode_permission,
2884         .setxattr       = ll_setxattr,
2885         .getxattr       = ll_getxattr,
2886         .listxattr      = ll_listxattr,
2887         .removexattr    = ll_removexattr,
2888 };
2889