Whamcloud - gitweb
82a6108292ba7c1bc48baf2460e065a632427c26
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
32 #include <linux/lustre_compat25.h>
33 #endif
34 #include "llite_internal.h"
35
36 /* also used by llite/special.c:ll_special_open() */
37 struct ll_file_data *ll_file_data_get(void)
38 {
39         struct ll_file_data *fd;
40
41         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
42         return fd;
43 }
44
45 static void ll_file_data_put(struct ll_file_data *fd)
46 {
47         if (fd != NULL)
48                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
49 }
50
51 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
52                           struct lustre_handle *fh)
53 {
54         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
55         op_data->op_attr.ia_mode = inode->i_mode;
56         op_data->op_attr.ia_atime = inode->i_atime;
57         op_data->op_attr.ia_mtime = inode->i_mtime;
58         op_data->op_attr.ia_ctime = inode->i_ctime;
59         op_data->op_attr.ia_size = i_size_read(inode);
60         op_data->op_attr_blocks = inode->i_blocks;
61         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
62         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
63         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
64         op_data->op_capa1 = ll_mdscapa_get(inode);
65 }
66
67 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
68                              struct obd_client_handle *och)
69 {
70         ENTRY;
71
72         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
73                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
74
75         if (!(och->och_flags & FMODE_WRITE))
76                 goto out;
77
78         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
79             !S_ISREG(inode->i_mode))
80                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
81         else
82                 ll_epoch_close(inode, op_data, &och, 0);
83
84 out:
85         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
86         EXIT;
87 }
88
89 static int ll_close_inode_openhandle(struct obd_export *md_exp,
90                                      struct inode *inode,
91                                      struct obd_client_handle *och)
92 {
93         struct obd_export *exp = ll_i2mdexp(inode);
94         struct md_op_data *op_data;
95         struct ptlrpc_request *req = NULL;
96         struct obd_device *obd = class_exp2obd(exp);
97         int epoch_close = 1;
98         int rc;
99         ENTRY;
100
101         if (obd == NULL) {
102                 /*
103                  * XXX: in case of LMV, is this correct to access
104                  * ->exp_handle?
105                  */
106                 CERROR("Invalid MDC connection handle "LPX64"\n",
107                        ll_i2mdexp(inode)->exp_handle.h_cookie);
108                 GOTO(out, rc = 0);
109         }
110
111         /*
112          * here we check if this is forced umount. If so this is called on
113          * canceling "open lock" and we do not call md_close() in this case, as
114          * it will not be successful, as import is already deactivated.
115          */
116         if (obd->obd_force)
117                 GOTO(out, rc = 0);
118
119         OBD_ALLOC_PTR(op_data);
120         if (op_data == NULL)
121                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
122
123         ll_prepare_close(inode, op_data, och);
124         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
125         rc = md_close(md_exp, op_data, och, &req);
126
127         if (rc == -EAGAIN) {
128                 /* This close must have the epoch closed. */
129                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
130                 LASSERT(epoch_close);
131                 /* MDS has instructed us to obtain Size-on-MDS attribute from
132                  * OSTs and send setattr to back to MDS. */
133                 rc = ll_sizeonmds_update(inode, &och->och_fh,
134                                          op_data->op_ioepoch);
135                 if (rc) {
136                         CERROR("inode %lu mdc Size-on-MDS update failed: "
137                                "rc = %d\n", inode->i_ino, rc);
138                         rc = 0;
139                 }
140         } else if (rc) {
141                 CERROR("inode %lu mdc close failed: rc = %d\n",
142                        inode->i_ino, rc);
143         }
144         ll_finish_md_op_data(op_data);
145
146         if (rc == 0) {
147                 rc = ll_objects_destroy(req, inode);
148                 if (rc)
149                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
150                                inode->i_ino, rc);
151         }
152
153         ptlrpc_req_finished(req); /* This is close request */
154         EXIT;
155 out:
156       
157         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
158             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
159                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
160         } else {
161                 md_clear_open_replay_data(md_exp, och);
162                 /* Free @och if it is not waiting for DONE_WRITING. */
163                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
164                 OBD_FREE_PTR(och);
165         }
166         
167         return rc;
168 }
169
170 int ll_md_real_close(struct inode *inode, int flags)
171 {
172         struct ll_inode_info *lli = ll_i2info(inode);
173         struct obd_client_handle **och_p;
174         struct obd_client_handle *och;
175         __u64 *och_usecount;
176         int rc = 0;
177         ENTRY;
178
179         if (flags & FMODE_WRITE) {
180                 och_p = &lli->lli_mds_write_och;
181                 och_usecount = &lli->lli_open_fd_write_count;
182         } else if (flags & FMODE_EXEC) {
183                 och_p = &lli->lli_mds_exec_och;
184                 och_usecount = &lli->lli_open_fd_exec_count;
185         } else {
186                 LASSERT(flags & FMODE_READ);
187                 och_p = &lli->lli_mds_read_och;
188                 och_usecount = &lli->lli_open_fd_read_count;
189         }
190
191         down(&lli->lli_och_sem);
192         if (*och_usecount) { /* There are still users of this handle, so
193                                 skip freeing it. */
194                 up(&lli->lli_och_sem);
195                 RETURN(0);
196         }
197         och=*och_p;
198         *och_p = NULL;
199         up(&lli->lli_och_sem);
200
201         if (och) { /* There might be a race and somebody have freed this och
202                       already */
203                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
204                                                inode, och);
205         }
206
207         RETURN(rc);
208 }
209
210 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
211                 struct file *file)
212 {
213         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
214         struct ll_inode_info *lli = ll_i2info(inode);
215         int rc = 0;
216         ENTRY;
217
218         /* clear group lock, if present */
219         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
220                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
221                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
222                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
223                                       &fd->fd_cwlockh);
224         }
225
226         /* Let's see if we have good enough OPEN lock on the file and if
227            we can skip talking to MDS */
228         if (file->f_dentry->d_inode) { /* Can this ever be false? */
229                 int lockmode;
230                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
231                 struct lustre_handle lockh;
232                 struct inode *inode = file->f_dentry->d_inode;
233                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
234
235                 down(&lli->lli_och_sem);
236                 if (fd->fd_omode & FMODE_WRITE) {
237                         lockmode = LCK_CW;
238                         LASSERT(lli->lli_open_fd_write_count);
239                         lli->lli_open_fd_write_count--;
240                 } else if (fd->fd_omode & FMODE_EXEC) {
241                         lockmode = LCK_PR;
242                         LASSERT(lli->lli_open_fd_exec_count);
243                         lli->lli_open_fd_exec_count--;
244                 } else {
245                         lockmode = LCK_CR;
246                         LASSERT(lli->lli_open_fd_read_count);
247                         lli->lli_open_fd_read_count--;
248                 }
249                 up(&lli->lli_och_sem);
250
251                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
252                                    LDLM_IBITS, &policy, lockmode,
253                                    &lockh)) {
254                         rc = ll_md_real_close(file->f_dentry->d_inode,
255                                               fd->fd_omode);
256                 }
257         } else {
258                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
259                        file, file->f_dentry, file->f_dentry->d_name.name);
260         }
261
262         LUSTRE_FPRIVATE(file) = NULL;
263         ll_file_data_put(fd);
264         ll_capa_close(inode);
265
266         RETURN(rc);
267 }
268
269 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
270
271 /* While this returns an error code, fput() the caller does not, so we need
272  * to make every effort to clean up all of our state here.  Also, applications
273  * rarely check close errors and even if an error is returned they will not
274  * re-try the close call.
275  */
276 int ll_file_release(struct inode *inode, struct file *file)
277 {
278         struct ll_file_data *fd;
279         struct ll_sb_info *sbi = ll_i2sbi(inode);
280         struct ll_inode_info *lli = ll_i2info(inode);
281         struct lov_stripe_md *lsm = lli->lli_smd;
282         int rc;
283
284         ENTRY;
285         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
286                inode->i_generation, inode);
287
288         /* don't do anything for / */
289         if (inode->i_sb->s_root == file->f_dentry)
290                 RETURN(0);
291
292         ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
293         fd = LUSTRE_FPRIVATE(file);
294         LASSERT(fd != NULL);
295
296         /* don't do anything for / */
297         if (inode->i_sb->s_root == file->f_dentry) {
298                 LUSTRE_FPRIVATE(file) = NULL;
299                 ll_file_data_put(fd);
300                 RETURN(0);
301         }
302         
303         if (lsm)
304                 lov_test_and_clear_async_rc(lsm);
305         lli->lli_async_rc = 0;
306
307         rc = ll_md_close(sbi->ll_md_exp, inode, file);
308         RETURN(rc);
309 }
310
311 static int ll_intent_file_open(struct file *file, void *lmm,
312                                int lmmsize, struct lookup_intent *itp)
313 {
314         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
315         struct dentry *parent = file->f_dentry->d_parent;
316         const char *name = file->f_dentry->d_name.name;
317         const int len = file->f_dentry->d_name.len;
318         struct md_op_data *op_data;
319         struct ptlrpc_request *req;
320         int rc;
321
322         if (!parent)
323                 RETURN(-ENOENT);
324
325         /* Usually we come here only for NFSD, and we want open lock.
326            But we can also get here with pre 2.6.15 patchless kernels, and in
327            that case that lock is also ok */
328         /* We can also get here if there was cached open handle in revalidate_it
329          * but it disappeared while we were getting from there to ll_file_open.
330          * But this means this file was closed and immediatelly opened which
331          * makes a good candidate for using OPEN lock */
332         /* If lmmsize & lmm are not 0, we are just setting stripe info
333          * parameters. No need for the open lock */
334         if (!lmm && !lmmsize)
335                 itp->it_flags |= MDS_OPEN_LOCK;
336
337         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
338                                       file->f_dentry->d_inode, name, len,
339                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
340         if (IS_ERR(op_data))
341                 RETURN(PTR_ERR(op_data));
342
343         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
344                             0 /*unused */, &req, ll_md_blocking_ast, 0);
345         ll_finish_md_op_data(op_data);
346         if (rc == -ESTALE) {
347                 /* reason for keep own exit path - don`t flood log
348                 * with messages with -ESTALE errors.
349                 */
350                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
351                      it_open_error(DISP_OPEN_OPEN, itp))
352                         GOTO(out, rc);
353                 ll_release_openhandle(file->f_dentry, itp);
354                 GOTO(out_stale, rc);
355         }
356
357         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
358                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
359                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
360                 GOTO(out, rc);
361         }
362
363         if (itp->d.lustre.it_lock_mode)
364                 md_set_lock_data(sbi->ll_md_exp,
365                                  &itp->d.lustre.it_lock_handle, 
366                                  file->f_dentry->d_inode);
367
368         rc = ll_prep_inode(&file->f_dentry->d_inode, req, DLM_REPLY_REC_OFF,
369                            NULL);
370 out:
371         ptlrpc_req_finished(itp->d.lustre.it_data);
372
373 out_stale:
374         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
375         ll_intent_drop_lock(itp);
376
377         RETURN(rc);
378 }
379
380 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
381                        struct lookup_intent *it, struct obd_client_handle *och)
382 {
383         struct ptlrpc_request *req = it->d.lustre.it_data;
384         struct mdt_body *body;
385
386         LASSERT(och);
387
388         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
389         LASSERT(body != NULL);                      /* reply already checked out */
390         LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* and swabbed in md_enqueue */
391
392         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
393         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
394         och->och_fid = lli->lli_fid;
395         och->och_flags = it->it_flags;
396         lli->lli_ioepoch = body->ioepoch;
397
398         return md_set_open_replay_data(md_exp, och, req);
399 }
400
401 int ll_local_open(struct file *file, struct lookup_intent *it,
402                   struct ll_file_data *fd, struct obd_client_handle *och)
403 {
404         struct inode *inode = file->f_dentry->d_inode;
405         struct ll_inode_info *lli = ll_i2info(inode);
406         ENTRY;
407
408         LASSERT(!LUSTRE_FPRIVATE(file));
409
410         LASSERT(fd != NULL);
411
412         if (och) {
413                 struct ptlrpc_request *req = it->d.lustre.it_data;
414                 struct mdt_body *body;
415                 int rc;
416
417                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
418                 if (rc)
419                         RETURN(rc);
420
421                 body = lustre_msg_buf(req->rq_repmsg,
422                                       DLM_REPLY_REC_OFF, sizeof(*body));
423
424                 if ((it->it_flags & FMODE_WRITE) &&
425                     (body->valid & OBD_MD_FLSIZE))
426                 {
427                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
428                                lli->lli_ioepoch, PFID(&lli->lli_fid));
429                 }
430         }
431
432         LUSTRE_FPRIVATE(file) = fd;
433         ll_readahead_init(inode, &fd->fd_ras);
434         fd->fd_omode = it->it_flags;
435         RETURN(0);
436 }
437
438 /* Open a file, and (for the very first open) create objects on the OSTs at
439  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
440  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
441  * lli_open_sem to ensure no other process will create objects, send the
442  * stripe MD to the MDS, or try to destroy the objects if that fails.
443  *
444  * If we already have the stripe MD locally then we don't request it in
445  * md_open(), by passing a lmm_size = 0.
446  *
447  * It is up to the application to ensure no other processes open this file
448  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
449  * used.  We might be able to avoid races of that sort by getting lli_open_sem
450  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
451  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
452  */
453 int ll_file_open(struct inode *inode, struct file *file)
454 {
455         struct ll_inode_info *lli = ll_i2info(inode);
456         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
457                                           .it_flags = file->f_flags };
458         struct lov_stripe_md *lsm;
459         struct ptlrpc_request *req = NULL;
460         struct obd_client_handle **och_p;
461         __u64 *och_usecount;
462         struct ll_file_data *fd;
463         int rc = 0;
464         ENTRY;
465
466         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
467                inode->i_generation, inode, file->f_flags);
468
469         /* don't do anything for / */
470         if (inode->i_sb->s_root == file->f_dentry)
471                 RETURN(0);
472
473 #ifdef LUSTRE_KERNEL_VERSION
474         it = file->f_it;
475 #else
476         it = file->private_data; /* XXX: compat macro */
477         file->private_data = NULL; /* prevent ll_local_open assertion */
478 #endif
479
480         fd = ll_file_data_get();
481         if (fd == NULL)
482                 RETURN(-ENOMEM);
483
484         /* don't do anything for / */
485         if (inode->i_sb->s_root == file->f_dentry) {
486                 LUSTRE_FPRIVATE(file) = fd;
487                 RETURN(0);
488         }
489
490         if (!it || !it->d.lustre.it_disposition) {
491                 /* Convert f_flags into access mode. We cannot use file->f_mode,
492                  * because everything but O_ACCMODE mask was stripped from
493                  * there */
494                 if ((oit.it_flags + 1) & O_ACCMODE)
495                         oit.it_flags++;
496                 if (file->f_flags & O_TRUNC)
497                         oit.it_flags |= FMODE_WRITE;
498
499                 /* kernel only call f_op->open in dentry_open.  filp_open calls
500                  * dentry_open after call to open_namei that checks permissions.
501                  * Only nfsd_open call dentry_open directly without checking
502                  * permissions and because of that this code below is safe. */
503                 if (oit.it_flags & FMODE_WRITE)
504                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
505
506                 /* We do not want O_EXCL here, presumably we opened the file
507                  * already? XXX - NFS implications? */
508                 oit.it_flags &= ~O_EXCL;
509
510                 it = &oit;
511         }
512
513         /* Let's see if we have file open on MDS already. */
514         if (it->it_flags & FMODE_WRITE) {
515                 och_p = &lli->lli_mds_write_och;
516                 och_usecount = &lli->lli_open_fd_write_count;
517         } else if (it->it_flags & FMODE_EXEC) {
518                 och_p = &lli->lli_mds_exec_och;
519                 och_usecount = &lli->lli_open_fd_exec_count;
520          } else {
521                 och_p = &lli->lli_mds_read_och;
522                 och_usecount = &lli->lli_open_fd_read_count;
523         }
524         
525         down(&lli->lli_och_sem);
526         if (*och_p) { /* Open handle is present */
527                 if (it_disposition(it, DISP_OPEN_OPEN)) {
528                         /* Well, there's extra open request that we do not need,
529                            let's close it somehow. This will decref request. */
530                         rc = it_open_error(DISP_OPEN_OPEN, it);
531                         if (rc) {
532                                 ll_file_data_put(fd);
533                                 GOTO(out_och_free, rc);
534                         }       
535                         ll_release_openhandle(file->f_dentry, it);
536                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
537                                              LPROC_LL_OPEN);
538                 }
539                 (*och_usecount)++;
540
541                 rc = ll_local_open(file, it, fd, NULL);
542                 if (rc) {
543                         up(&lli->lli_och_sem);
544                         ll_file_data_put(fd);
545                         RETURN(rc);
546                 }
547         } else {
548                 LASSERT(*och_usecount == 0);
549                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
550                 if (!*och_p) {
551                         ll_file_data_put(fd);
552                         GOTO(out_och_free, rc = -ENOMEM);
553                 }
554                 (*och_usecount)++;
555                 if (!it->d.lustre.it_disposition) {
556                         it->it_flags |= O_CHECK_STALE;
557                         rc = ll_intent_file_open(file, NULL, 0, it);
558                         it->it_flags &= ~O_CHECK_STALE;
559                         if (rc) {
560                                 ll_file_data_put(fd);
561                                 GOTO(out_och_free, rc);
562                         }
563
564                         /* Got some error? Release the request */
565                         if (it->d.lustre.it_status < 0) {
566                                 req = it->d.lustre.it_data;
567                                 ptlrpc_req_finished(req);
568                         }
569                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
570                                          &it->d.lustre.it_lock_handle,
571                                          file->f_dentry->d_inode);
572                 }
573                 req = it->d.lustre.it_data;
574
575                 /* md_intent_lock() didn't get a request ref if there was an
576                  * open error, so don't do cleanup on the request here
577                  * (bug 3430) */
578                 /* XXX (green): Should not we bail out on any error here, not
579                  * just open error? */
580                 rc = it_open_error(DISP_OPEN_OPEN, it);
581                 if (rc) {
582                         ll_file_data_put(fd);
583                         GOTO(out_och_free, rc);
584                 }
585
586                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
587                 rc = ll_local_open(file, it, fd, *och_p);
588                 if (rc) {
589                         up(&lli->lli_och_sem);
590                         ll_file_data_put(fd);
591                         GOTO(out_och_free, rc);
592                 }
593         }
594         up(&lli->lli_och_sem);
595
596         /* Must do this outside lli_och_sem lock to prevent deadlock where
597            different kind of OPEN lock for this same inode gets cancelled
598            by ldlm_cancel_lru */
599         if (!S_ISREG(inode->i_mode))
600                 GOTO(out, rc);
601
602         ll_capa_open(inode);
603
604         lsm = lli->lli_smd;
605         if (lsm == NULL) {
606                 if (file->f_flags & O_LOV_DELAY_CREATE ||
607                     !(file->f_mode & FMODE_WRITE)) {
608                         CDEBUG(D_INODE, "object creation was delayed\n");
609                         GOTO(out, rc);
610                 }
611         }
612         file->f_flags &= ~O_LOV_DELAY_CREATE;
613         GOTO(out, rc);
614 out:
615         ptlrpc_req_finished(req);
616         if (req)
617                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
618 out_och_free:
619         if (rc) {
620                 if (*och_p) {
621                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
622                         *och_p = NULL; /* OBD_FREE writes some magic there */
623                         (*och_usecount)--;
624                 }
625                 up(&lli->lli_och_sem);
626         }
627
628         return rc;
629 }
630
631 /* Fills the obdo with the attributes for the inode defined by lsm */
632 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
633 {
634         struct ptlrpc_request_set *set;
635         struct ll_inode_info *lli = ll_i2info(inode);
636         struct lov_stripe_md *lsm = lli->lli_smd;
637
638         struct obd_info oinfo = { { { 0 } } };
639         int rc;
640         ENTRY;
641
642         LASSERT(lsm != NULL);
643
644         oinfo.oi_md = lsm;
645         oinfo.oi_oa = obdo;
646         oinfo.oi_oa->o_id = lsm->lsm_object_id;
647         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
648         oinfo.oi_oa->o_mode = S_IFREG;
649         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
650                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
651                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
652                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
653                                OBD_MD_FLGROUP;
654         oinfo.oi_capa = ll_mdscapa_get(inode);
655
656         set = ptlrpc_prep_set();
657         if (set == NULL) {
658                 CERROR("can't allocate ptlrpc set\n");
659                 rc = -ENOMEM;
660         } else {
661                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
662                 if (rc == 0)
663                         rc = ptlrpc_set_wait(set);
664                 ptlrpc_set_destroy(set);
665         }
666         capa_put(oinfo.oi_capa);
667         if (rc)
668                 RETURN(rc);
669
670         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
671                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
672                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
673
674         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
675         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
676                lli->lli_smd->lsm_object_id, i_size_read(inode),
677                inode->i_blocks, inode->i_blksize);
678         RETURN(0);
679 }
680
681 static inline void ll_remove_suid(struct inode *inode)
682 {
683         unsigned int mode;
684
685         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
686         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
687
688         /* was any of the uid bits set? */
689         mode &= inode->i_mode;
690         if (mode && !capable(CAP_FSETID)) {
691                 inode->i_mode &= ~mode;
692                 // XXX careful here - we cannot change the size
693         }
694 }
695
696 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
697 {
698         struct ll_inode_info *lli = ll_i2info(inode);
699         struct lov_stripe_md *lsm = lli->lli_smd;
700         struct obd_export *exp = ll_i2dtexp(inode);
701         struct {
702                 char name[16];
703                 struct ldlm_lock *lock;
704                 struct lov_stripe_md *lsm;
705         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
706         __u32 stripe, vallen = sizeof(stripe);
707         int rc;
708         ENTRY;
709
710         if (lsm->lsm_stripe_count == 1)
711                 GOTO(check, stripe = 0);
712
713         /* get our offset in the lov */
714         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
715         if (rc != 0) {
716                 CERROR("obd_get_info: rc = %d\n", rc);
717                 RETURN(rc);
718         }
719         LASSERT(stripe < lsm->lsm_stripe_count);
720
721 check:
722         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
723             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
724                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
725                            lsm->lsm_oinfo[stripe]->loi_id,
726                            lsm->lsm_oinfo[stripe]->loi_gr);
727                 RETURN(-ELDLM_NO_LOCK_DATA);
728         }
729
730         RETURN(stripe);
731 }
732
733 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
734  * we get a lock cancellation for each stripe, so we have to map the obd's
735  * region back onto the stripes in the file that it held.
736  *
737  * No one can dirty the extent until we've finished our work and they can
738  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
739  * but other kernel actors could have pages locked.
740  *
741  * Called with the DLM lock held. */
742 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
743                               struct ldlm_lock *lock, __u32 stripe)
744 {
745         ldlm_policy_data_t tmpex;
746         unsigned long start, end, count, skip, i, j;
747         struct page *page;
748         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
749         struct lustre_handle lockh;
750         ENTRY;
751
752         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
753         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
754                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
755                i_size_read(inode));
756
757         /* our locks are page granular thanks to osc_enqueue, we invalidate the
758          * whole page. */
759         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
760             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
761                 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
762                            CFS_PAGE_SIZE);
763         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
764         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
765
766         count = ~0;
767         skip = 0;
768         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
769         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
770         if (lsm->lsm_stripe_count > 1) {
771                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
772                 skip = (lsm->lsm_stripe_count - 1) * count;
773                 start += start/count * skip + stripe * count;
774                 if (end != ~0)
775                         end += end/count * skip + stripe * count;
776         }
777         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
778                 end = ~0;
779
780         i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
781             CFS_PAGE_SHIFT : 0;
782         if (i < end)
783                 end = i;
784
785         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
786                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
787                count, skip, end, discard ? " (DISCARDING)" : "");
788
789         /* walk through the vmas on the inode and tear down mmaped pages that
790          * intersect with the lock.  this stops immediately if there are no
791          * mmap()ed regions of the file.  This is not efficient at all and
792          * should be short lived. We'll associate mmap()ed pages with the lock
793          * and will be able to find them directly */
794         for (i = start; i <= end; i += (j + skip)) {
795                 j = min(count - (i % count), end - i + 1);
796                 LASSERT(j > 0);
797                 LASSERT(inode->i_mapping);
798                 if (ll_teardown_mmaps(inode->i_mapping,
799                                       (__u64)i << CFS_PAGE_SHIFT,
800                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
801                         break;
802         }
803
804         /* this is the simplistic implementation of page eviction at
805          * cancelation.  It is careful to get races with other page
806          * lockers handled correctly.  fixes from bug 20 will make it
807          * more efficient by associating locks with pages and with
808          * batching writeback under the lock explicitly. */
809         for (i = start, j = start % count; i <= end;
810              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
811                 if (j == count) {
812                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
813                         i += skip;
814                         j = 0;
815                         if (i > end)
816                                 break;
817                 }
818                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
819                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
820                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
821                          start, i, end);
822
823                 if (!mapping_has_pages(inode->i_mapping)) {
824                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
825                         break;
826                 }
827
828                 cond_resched();
829
830                 page = find_get_page(inode->i_mapping, i);
831                 if (page == NULL)
832                         continue;
833                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
834                                i, tmpex.l_extent.start);
835                 lock_page(page);
836
837                 /* page->mapping to check with racing against teardown */
838                 if (!discard && clear_page_dirty_for_io(page)) {
839                         rc = ll_call_writepage(inode, page);
840                         if (rc != 0)
841                                 CERROR("writepage inode %lu(%p) of page %p "
842                                        "failed: %d\n", inode->i_ino, inode,
843                                        page, rc);
844                         /* either waiting for io to complete or reacquiring
845                          * the lock that the failed writepage released */
846                         lock_page(page);
847                 }
848
849                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
850                 /* check to see if another DLM lock covers this page b=2765 */
851                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
852                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
853                                       LDLM_FL_TEST_LOCK,
854                                       &lock->l_resource->lr_name, LDLM_EXTENT,
855                                       &tmpex, LCK_PR | LCK_PW, &lockh);
856
857                 if (rc2 <= 0 && page->mapping != NULL) {
858                         struct ll_async_page *llap = llap_cast_private(page);
859                         /* checking again to account for writeback's
860                          * lock_page() */
861                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
862                         if (llap)
863                                 ll_ra_accounting(llap, inode->i_mapping);
864                         ll_truncate_complete_page(page);
865                 }
866                 unlock_page(page);
867                 page_cache_release(page);
868         }
869         LASSERTF(tmpex.l_extent.start <=
870                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
871                   lock->l_policy_data.l_extent.end + 1),
872                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
873                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
874                  start, i, end);
875         EXIT;
876 }
877
878 static int ll_extent_lock_callback(struct ldlm_lock *lock,
879                                    struct ldlm_lock_desc *new, void *data,
880                                    int flag)
881 {
882         struct lustre_handle lockh = { 0 };
883         int rc;
884         ENTRY;
885
886         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
887                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
888                 LBUG();
889         }
890
891         switch (flag) {
892         case LDLM_CB_BLOCKING:
893                 ldlm_lock2handle(lock, &lockh);
894                 rc = ldlm_cli_cancel(&lockh);
895                 if (rc != ELDLM_OK)
896                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
897                 break;
898         case LDLM_CB_CANCELING: {
899                 struct inode *inode;
900                 struct ll_inode_info *lli;
901                 struct lov_stripe_md *lsm;
902                 int stripe;
903                 __u64 kms;
904
905                 /* This lock wasn't granted, don't try to evict pages */
906                 if (lock->l_req_mode != lock->l_granted_mode)
907                         RETURN(0);
908
909                 inode = ll_inode_from_lock(lock);
910                 if (inode == NULL)
911                         RETURN(0);
912                 lli = ll_i2info(inode);
913                 if (lli == NULL)
914                         goto iput;
915                 if (lli->lli_smd == NULL)
916                         goto iput;
917                 lsm = lli->lli_smd;
918
919                 stripe = ll_lock_to_stripe_offset(inode, lock);
920                 if (stripe < 0)
921                         goto iput;
922
923                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
924
925                 lov_stripe_lock(lsm);
926                 lock_res_and_lock(lock);
927                 kms = ldlm_extent_shift_kms(lock,
928                                             lsm->lsm_oinfo[stripe]->loi_kms);
929
930                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
931                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
932                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
933                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
934                 unlock_res_and_lock(lock);
935                 lov_stripe_unlock(lsm);
936         iput:
937                 iput(inode);
938                 break;
939         }
940         default:
941                 LBUG();
942         }
943
944         RETURN(0);
945 }
946
947 #if 0
948 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
949 {
950         /* XXX ALLOCATE - 160 bytes */
951         struct inode *inode = ll_inode_from_lock(lock);
952         struct ll_inode_info *lli = ll_i2info(inode);
953         struct lustre_handle lockh = { 0 };
954         struct ost_lvb *lvb;
955         int stripe;
956         ENTRY;
957
958         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
959                      LDLM_FL_BLOCK_CONV)) {
960                 LBUG(); /* not expecting any blocked async locks yet */
961                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
962                            "lock, returning");
963                 ldlm_lock_dump(D_OTHER, lock, 0);
964                 ldlm_reprocess_all(lock->l_resource);
965                 RETURN(0);
966         }
967
968         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
969
970         stripe = ll_lock_to_stripe_offset(inode, lock);
971         if (stripe < 0)
972                 goto iput;
973
974         if (lock->l_lvb_len) {
975                 struct lov_stripe_md *lsm = lli->lli_smd;
976                 __u64 kms;
977                 lvb = lock->l_lvb_data;
978                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
979
980                 lock_res_and_lock(lock);
981                 ll_inode_size_lock(inode, 1);
982                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
983                 kms = ldlm_extent_shift_kms(NULL, kms);
984                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
985                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
986                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
987                 lsm->lsm_oinfo[stripe].loi_kms = kms;
988                 ll_inode_size_unlock(inode, 1);
989                 unlock_res_and_lock(lock);
990         }
991
992 iput:
993         iput(inode);
994         wake_up(&lock->l_waitq);
995
996         ldlm_lock2handle(lock, &lockh);
997         ldlm_lock_decref(&lockh, LCK_PR);
998         RETURN(0);
999 }
1000 #endif
1001
1002 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
1003 {
1004         struct ptlrpc_request *req = reqp;
1005         struct inode *inode = ll_inode_from_lock(lock);
1006         struct ll_inode_info *lli;
1007         struct lov_stripe_md *lsm;
1008         struct ost_lvb *lvb;
1009         int rc, stripe;
1010         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
1011         ENTRY;
1012
1013         if (inode == NULL)
1014                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
1015         lli = ll_i2info(inode);
1016         if (lli == NULL)
1017                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1018         lsm = lli->lli_smd;
1019         if (lsm == NULL)
1020                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1021
1022         /* First, find out which stripe index this lock corresponds to. */
1023         stripe = ll_lock_to_stripe_offset(inode, lock);
1024         if (stripe < 0)
1025                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1026
1027         rc = lustre_pack_reply(req, 2, size, NULL);
1028         if (rc) {
1029                 CERROR("lustre_pack_reply: %d\n", rc);
1030                 GOTO(iput, rc);
1031         }
1032
1033         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
1034         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1035         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1036         lvb->lvb_atime = LTIME_S(inode->i_atime);
1037         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1038
1039         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1040                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1041                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1042                    lvb->lvb_atime, lvb->lvb_ctime);
1043  iput:
1044         iput(inode);
1045
1046  out:
1047         /* These errors are normal races, so we don't want to fill the console
1048          * with messages by calling ptlrpc_error() */
1049         if (rc == -ELDLM_NO_LOCK_DATA)
1050                 lustre_pack_reply(req, 1, NULL, NULL);
1051
1052         req->rq_status = rc;
1053         return rc;
1054 }
1055
1056 static void ll_merge_lvb(struct inode *inode)
1057 {
1058         struct ll_inode_info *lli = ll_i2info(inode);
1059         struct ll_sb_info *sbi = ll_i2sbi(inode);
1060         struct ost_lvb lvb;
1061         ENTRY;
1062
1063         ll_inode_size_lock(inode, 1);
1064         inode_init_lvb(inode, &lvb);
1065         obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1066         i_size_write(inode, lvb.lvb_size);
1067         inode->i_blocks = lvb.lvb_blocks;
1068         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1069         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1070         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1071         ll_inode_size_unlock(inode, 1);
1072         EXIT;
1073 }
1074
1075 int ll_local_size(struct inode *inode)
1076 {
1077         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1078         struct ll_inode_info *lli = ll_i2info(inode);
1079         struct ll_sb_info *sbi = ll_i2sbi(inode);
1080         struct lustre_handle lockh = { 0 };
1081         int flags = 0;
1082         int rc;
1083         ENTRY;
1084
1085         if (lli->lli_smd->lsm_stripe_count == 0)
1086                 RETURN(0);
1087
1088         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1089                        &policy, LCK_PR | LCK_PW, &flags, inode, &lockh);
1090         if (rc < 0)
1091                 RETURN(rc);
1092         else if (rc == 0)
1093                 RETURN(-ENODATA);
1094
1095         ll_merge_lvb(inode);
1096         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR | LCK_PW, &lockh);
1097         RETURN(0);
1098 }
1099
1100 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1101                      lstat_t *st)
1102 {
1103         struct lustre_handle lockh = { 0 };
1104         struct obd_enqueue_info einfo = { 0 };
1105         struct obd_info oinfo = { { { 0 } } };
1106         struct ost_lvb lvb;
1107         int rc;
1108
1109         ENTRY;
1110
1111         einfo.ei_type = LDLM_EXTENT;
1112         einfo.ei_mode = LCK_PR;
1113         einfo.ei_flags = LDLM_FL_HAS_INTENT;
1114         einfo.ei_cb_bl = ll_extent_lock_callback;
1115         einfo.ei_cb_cp = ldlm_completion_ast;
1116         einfo.ei_cb_gl = ll_glimpse_callback;
1117         einfo.ei_cbdata = NULL;
1118
1119         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1120         oinfo.oi_lockh = &lockh;
1121         oinfo.oi_md = lsm;
1122
1123         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1124         if (rc == -ENOENT)
1125                 RETURN(rc);
1126         if (rc != 0) {
1127                 CERROR("obd_enqueue returned rc %d, "
1128                        "returning -EIO\n", rc);
1129                 RETURN(rc > 0 ? -EIO : rc);
1130         }
1131
1132         lov_stripe_lock(lsm);
1133         memset(&lvb, 0, sizeof(lvb));
1134         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1135         st->st_size = lvb.lvb_size;
1136         st->st_blocks = lvb.lvb_blocks;
1137         st->st_mtime = lvb.lvb_mtime;
1138         st->st_atime = lvb.lvb_atime;
1139         st->st_ctime = lvb.lvb_ctime;
1140         lov_stripe_unlock(lsm);
1141
1142         RETURN(rc);
1143 }
1144
1145 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1146  * file (because it prefers KMS over RSS when larger) */
1147 int ll_glimpse_size(struct inode *inode, int ast_flags)
1148 {
1149         struct ll_inode_info *lli = ll_i2info(inode);
1150         struct ll_sb_info *sbi = ll_i2sbi(inode);
1151         struct lustre_handle lockh = { 0 };
1152         struct obd_enqueue_info einfo = { 0 };
1153         struct obd_info oinfo = { { { 0 } } };
1154         int rc;
1155         ENTRY;
1156
1157         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1158                 RETURN(0);
1159
1160         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1161
1162         if (!lli->lli_smd) {
1163                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1164                 RETURN(0);
1165         }
1166
1167         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1168          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1169          *       won't revoke any conflicting DLM locks held. Instead,
1170          *       ll_glimpse_callback() will be called on each client
1171          *       holding a DLM lock against this file, and resulting size
1172          *       will be returned for each stripe. DLM lock on [0, EOF] is
1173          *       acquired only if there were no conflicting locks. */
1174         einfo.ei_type = LDLM_EXTENT;
1175         einfo.ei_mode = LCK_PR;
1176         einfo.ei_flags = ast_flags | LDLM_FL_HAS_INTENT;
1177         einfo.ei_cb_bl = ll_extent_lock_callback;
1178         einfo.ei_cb_cp = ldlm_completion_ast;
1179         einfo.ei_cb_gl = ll_glimpse_callback;
1180         einfo.ei_cbdata = inode;
1181
1182         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1183         oinfo.oi_lockh = &lockh;
1184         oinfo.oi_md = lli->lli_smd;
1185
1186         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1187         if (rc == -ENOENT)
1188                 RETURN(rc);
1189         if (rc != 0) {
1190                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1191                 RETURN(rc > 0 ? -EIO : rc);
1192         }
1193
1194         ll_merge_lvb(inode);
1195
1196         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1197                i_size_read(inode), inode->i_blocks);
1198
1199         RETURN(rc);
1200 }
1201
1202 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1203                    struct lov_stripe_md *lsm, int mode,
1204                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1205                    int ast_flags)
1206 {
1207         struct ll_sb_info *sbi = ll_i2sbi(inode);
1208         struct ost_lvb lvb;
1209         struct obd_enqueue_info einfo = { 0 };
1210         struct obd_info oinfo = { { { 0 } } };
1211         int rc;
1212         ENTRY;
1213
1214         LASSERT(!lustre_handle_is_used(lockh));
1215         LASSERT(lsm != NULL);
1216
1217         /* don't drop the mmapped file to LRU */
1218         if (mapping_mapped(inode->i_mapping))
1219                 ast_flags |= LDLM_FL_NO_LRU;
1220
1221         /* XXX phil: can we do this?  won't it screw the file size up? */
1222         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1223             (sbi->ll_flags & LL_SBI_NOLCK))
1224                 RETURN(0);
1225
1226         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1227                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1228
1229         einfo.ei_type = LDLM_EXTENT;
1230         einfo.ei_mode = mode;
1231         einfo.ei_flags = ast_flags;
1232         einfo.ei_cb_bl = ll_extent_lock_callback;
1233         einfo.ei_cb_cp = ldlm_completion_ast;
1234         einfo.ei_cb_gl = ll_glimpse_callback;
1235         einfo.ei_cbdata = inode;
1236
1237         oinfo.oi_policy = *policy;
1238         oinfo.oi_lockh = lockh;
1239         oinfo.oi_md = lsm;
1240
1241         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo);
1242         *policy = oinfo.oi_policy;
1243         if (rc > 0)
1244                 rc = -EIO;
1245
1246         ll_inode_size_lock(inode, 1);
1247         inode_init_lvb(inode, &lvb);
1248         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1249
1250         if (policy->l_extent.start == 0 &&
1251             policy->l_extent.end == OBD_OBJECT_EOF) {
1252                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1253                  * the kms under both a DLM lock and the
1254                  * ll_inode_size_lock().  If we don't get the
1255                  * ll_inode_size_lock() here we can match the DLM lock and
1256                  * reset i_size from the kms before the truncating path has
1257                  * updated the kms.  generic_file_write can then trust the
1258                  * stale i_size when doing appending writes and effectively
1259                  * cancel the result of the truncate.  Getting the
1260                  * ll_inode_size_lock() after the enqueue maintains the DLM
1261                  * -> ll_inode_size_lock() acquiring order. */
1262                 i_size_write(inode, lvb.lvb_size);
1263                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1264                        inode->i_ino, i_size_read(inode));
1265         }
1266
1267         if (rc == 0) {
1268                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1269                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1270                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1271         }
1272         ll_inode_size_unlock(inode, 1);
1273
1274         RETURN(rc);
1275 }
1276
1277 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1278                      struct lov_stripe_md *lsm, int mode,
1279                      struct lustre_handle *lockh)
1280 {
1281         struct ll_sb_info *sbi = ll_i2sbi(inode);
1282         int rc;
1283         ENTRY;
1284
1285         /* XXX phil: can we do this?  won't it screw the file size up? */
1286         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1287             (sbi->ll_flags & LL_SBI_NOLCK))
1288                 RETURN(0);
1289
1290         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1291
1292         RETURN(rc);
1293 }
1294
1295 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1296                             loff_t *ppos)
1297 {
1298         struct inode *inode = file->f_dentry->d_inode;
1299         struct ll_inode_info *lli = ll_i2info(inode);
1300         struct lov_stripe_md *lsm = lli->lli_smd;
1301         struct ll_sb_info *sbi = ll_i2sbi(inode);
1302         struct ll_lock_tree tree;
1303         struct ll_lock_tree_node *node;
1304         struct ost_lvb lvb;
1305         struct ll_ra_read bead;
1306         int rc, ra = 0;
1307         loff_t end;
1308         ssize_t retval, chunk, sum = 0;
1309
1310         __u64 kms;
1311         ENTRY;
1312         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1313                inode->i_ino, inode->i_generation, inode, count, *ppos);
1314         /* "If nbyte is 0, read() will return 0 and have no other results."
1315          *                      -- Single Unix Spec */
1316         if (count == 0)
1317                 RETURN(0);
1318
1319         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1320
1321         if (!lsm) {
1322                 /* Read on file with no objects should return zero-filled
1323                  * buffers up to file size (we can get non-zero sizes with
1324                  * mknod + truncate, then opening file for read. This is a
1325                  * common pattern in NFS case, it seems). Bug 6243 */
1326                 int notzeroed;
1327                 /* Since there are no objects on OSTs, we have nothing to get
1328                  * lock on and so we are forced to access inode->i_size
1329                  * unguarded */
1330
1331                 /* Read beyond end of file */
1332                 if (*ppos >= i_size_read(inode))
1333                         RETURN(0);
1334
1335                 if (count > i_size_read(inode) - *ppos)
1336                         count = i_size_read(inode) - *ppos;
1337                 /* Make sure to correctly adjust the file pos pointer for
1338                  * EFAULT case */
1339                 notzeroed = clear_user(buf, count);
1340                 count -= notzeroed;
1341                 *ppos += count;
1342                 if (!count)
1343                         RETURN(-EFAULT);
1344                 RETURN(count);
1345         }
1346
1347 repeat:
1348         if (sbi->ll_max_rw_chunk != 0) {
1349                 /* first, let's know the end of the current stripe */
1350                 end = *ppos;
1351                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
1352                                 (obd_off *)&end);
1353
1354                 /* correct, the end is beyond the request */
1355                 if (end > *ppos + count - 1)
1356                         end = *ppos + count - 1;
1357
1358                 /* and chunk shouldn't be too large even if striping is wide */
1359                 if (end - *ppos > sbi->ll_max_rw_chunk)
1360                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1361         } else {
1362                 end = *ppos + count - 1;
1363         }
1364
1365         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1366         if (IS_ERR(node)){
1367                 GOTO(out, retval = PTR_ERR(node));
1368         }
1369
1370         tree.lt_fd = LUSTRE_FPRIVATE(file);
1371         rc = ll_tree_lock(&tree, node, buf, count,
1372                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1373         if (rc != 0)
1374                 GOTO(out, retval = rc);
1375
1376         ll_inode_size_lock(inode, 1);
1377         /*
1378          * Consistency guarantees: following possibilities exist for the
1379          * relation between region being read and real file size at this
1380          * moment:
1381          *
1382          *  (A): the region is completely inside of the file;
1383          *
1384          *  (B-x): x bytes of region are inside of the file, the rest is
1385          *  outside;
1386          *
1387          *  (C): the region is completely outside of the file.
1388          *
1389          * This classification is stable under DLM lock acquired by
1390          * ll_tree_lock() above, because to change class, other client has to
1391          * take DLM lock conflicting with our lock. Also, any updates to
1392          * ->i_size by other threads on this client are serialized by
1393          * ll_inode_size_lock(). This guarantees that short reads are handled
1394          * correctly in the face of concurrent writes and truncates.
1395          */
1396         inode_init_lvb(inode, &lvb);
1397         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1398         kms = lvb.lvb_size;
1399         if (*ppos + count - 1 > kms) {
1400                 /* A glimpse is necessary to determine whether we return a
1401                  * short read (B) or some zeroes at the end of the buffer (C) */
1402                 ll_inode_size_unlock(inode, 1);
1403                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1404                 if (retval) {
1405                         ll_tree_unlock(&tree);
1406                         goto out;
1407                 }
1408         } else {
1409                 /* region is within kms and, hence, within real file size (A).
1410                  * We need to increase i_size to cover the read region so that
1411                  * generic_file_read() will do its job, but that doesn't mean
1412                  * the kms size is _correct_, it is only the _minimum_ size.
1413                  * If someone does a stat they will get the correct size which
1414                  * will always be >= the kms value here.  b=11081 */
1415                 if (i_size_read(inode) < kms)
1416                         i_size_write(inode, kms);
1417                 ll_inode_size_unlock(inode, 1);
1418         }
1419
1420         chunk = end - *ppos + 1;
1421         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1422                inode->i_ino, chunk, *ppos, i_size_read(inode));
1423
1424         /* turn off the kernel's read-ahead */
1425 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1426         file->f_ramax = 0;
1427 #else
1428         file->f_ra.ra_pages = 0;
1429 #endif
1430         /* initialize read-ahead window once per syscall */
1431         if (ra == 0) {
1432                 ra = 1;
1433                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1434                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1435                 ll_ra_read_in(file, &bead);
1436         }
1437
1438         /* BUG: 5972 */
1439         file_accessed(file);
1440         retval = generic_file_read(file, buf, chunk, ppos);
1441         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1442
1443         ll_tree_unlock(&tree);
1444
1445         if (retval > 0) {
1446                 buf += retval;
1447                 count -= retval;
1448                 sum += retval;
1449                 if (retval == chunk && count > 0)
1450                         goto repeat;
1451         }
1452
1453  out:
1454         if (ra != 0)
1455                 ll_ra_read_ex(file, &bead);
1456         retval = (sum > 0) ? sum : retval;
1457         RETURN(retval);
1458 }
1459
1460 /*
1461  * Write to a file (through the page cache).
1462  */
1463 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1464                              loff_t *ppos)
1465 {
1466         struct inode *inode = file->f_dentry->d_inode;
1467         struct ll_sb_info *sbi = ll_i2sbi(inode);
1468         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1469         struct ll_lock_tree tree;
1470         struct ll_lock_tree_node *node;
1471         loff_t maxbytes = ll_file_maxbytes(inode);
1472         loff_t lock_start, lock_end, end;
1473         ssize_t retval, chunk, sum = 0;
1474         int rc;
1475         ENTRY;
1476
1477         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1478                inode->i_ino, inode->i_generation, inode, count, *ppos);
1479
1480         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1481
1482         /* POSIX, but surprised the VFS doesn't check this already */
1483         if (count == 0)
1484                 RETURN(0);
1485
1486         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1487          * called on the file, don't fail the below assertion (bug 2388). */
1488         if (file->f_flags & O_LOV_DELAY_CREATE &&
1489             ll_i2info(inode)->lli_smd == NULL)
1490                 RETURN(-EBADF);
1491
1492         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1493
1494         down(&ll_i2info(inode)->lli_write_sem);
1495
1496 repeat:
1497         chunk = 0; /* just to fix gcc's warning */
1498         end = *ppos + count - 1;
1499
1500         if (file->f_flags & O_APPEND) {
1501                 lock_start = 0;
1502                 lock_end = OBD_OBJECT_EOF;
1503         } else if (sbi->ll_max_rw_chunk != 0) {
1504                 /* first, let's know the end of the current stripe */
1505                 end = *ppos;
1506                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1507                                 (obd_off *)&end);
1508
1509                 /* correct, the end is beyond the request */
1510                 if (end > *ppos + count - 1)
1511                         end = *ppos + count - 1;
1512
1513                 /* and chunk shouldn't be too large even if striping is wide */
1514                 if (end - *ppos > sbi->ll_max_rw_chunk)
1515                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1516                 lock_start = *ppos;
1517                 lock_end = end;
1518         } else {
1519                 lock_start = *ppos;
1520                 lock_end = *ppos + count - 1;
1521         }
1522         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1523
1524         if (IS_ERR(node))
1525                 GOTO(out, retval = PTR_ERR(node));
1526
1527         tree.lt_fd = LUSTRE_FPRIVATE(file);
1528         rc = ll_tree_lock(&tree, node, buf, count,
1529                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1530         if (rc != 0)
1531                 GOTO(out, retval = rc);
1532
1533         /* This is ok, g_f_w will overwrite this under i_sem if it races
1534          * with a local truncate, it just makes our maxbyte checking easier.
1535          * The i_size value gets updated in ll_extent_lock() as a consequence
1536          * of the [0,EOF] extent lock we requested above. */
1537         if (file->f_flags & O_APPEND) {
1538                 *ppos = i_size_read(inode);
1539                 end = *ppos + count - 1;
1540         }
1541
1542         if (*ppos >= maxbytes) {
1543                 send_sig(SIGXFSZ, current, 0);
1544                 GOTO(out_unlock, retval = -EFBIG);
1545         }
1546         if (*ppos + count > maxbytes)
1547                 count = maxbytes - *ppos;
1548
1549         /* generic_file_write handles O_APPEND after getting i_mutex */
1550         chunk = end - *ppos + 1;
1551         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1552                inode->i_ino, chunk, *ppos);
1553         retval = generic_file_write(file, buf, chunk, ppos);
1554         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1555
1556 out_unlock:
1557         ll_tree_unlock(&tree);
1558
1559 out:
1560         if (retval > 0) {
1561                 buf += retval;
1562                 count -= retval;
1563                 sum += retval;
1564                 if (retval == chunk && count > 0)
1565                         goto repeat;
1566         }
1567
1568         up(&ll_i2info(inode)->lli_write_sem);
1569
1570         retval = (sum > 0) ? sum : retval;
1571         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1572                            retval > 0 ? retval : 0);
1573         RETURN(retval);
1574 }
1575
1576 /*
1577  * Send file content (through pagecache) somewhere with helper
1578  */
1579 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1580 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1581                                 read_actor_t actor, void *target)
1582 {
1583         struct inode *inode = in_file->f_dentry->d_inode;
1584         struct ll_inode_info *lli = ll_i2info(inode);
1585         struct lov_stripe_md *lsm = lli->lli_smd;
1586         struct ll_lock_tree tree;
1587         struct ll_lock_tree_node *node;
1588         struct ost_lvb lvb;
1589         struct ll_ra_read bead;
1590         int rc;
1591         ssize_t retval;
1592         __u64 kms;
1593         ENTRY;
1594         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1595                inode->i_ino, inode->i_generation, inode, count, *ppos);
1596
1597         /* "If nbyte is 0, read() will return 0 and have no other results."
1598          *                      -- Single Unix Spec */
1599         if (count == 0)
1600                 RETURN(0);
1601
1602         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1603         /* turn off the kernel's read-ahead */
1604         in_file->f_ra.ra_pages = 0;
1605
1606         /* File with no objects, nothing to lock */
1607         if (!lsm)
1608                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1609
1610         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1611         if (IS_ERR(node))
1612                 RETURN(PTR_ERR(node));
1613
1614         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1615         rc = ll_tree_lock(&tree, node, NULL, count,
1616                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1617         if (rc != 0)
1618                 RETURN(rc);
1619
1620         ll_inode_size_lock(inode, 1);
1621         /*
1622          * Consistency guarantees: following possibilities exist for the
1623          * relation between region being read and real file size at this
1624          * moment:
1625          *
1626          *  (A): the region is completely inside of the file;
1627          *
1628          *  (B-x): x bytes of region are inside of the file, the rest is
1629          *  outside;
1630          *
1631          *  (C): the region is completely outside of the file.
1632          *
1633          * This classification is stable under DLM lock acquired by
1634          * ll_tree_lock() above, because to change class, other client has to
1635          * take DLM lock conflicting with our lock. Also, any updates to
1636          * ->i_size by other threads on this client are serialized by
1637          * ll_inode_size_lock(). This guarantees that short reads are handled
1638          * correctly in the face of concurrent writes and truncates.
1639          */
1640         inode_init_lvb(inode, &lvb);
1641         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1642         kms = lvb.lvb_size;
1643         if (*ppos + count - 1 > kms) {
1644                 /* A glimpse is necessary to determine whether we return a
1645                  * short read (B) or some zeroes at the end of the buffer (C) */
1646                 ll_inode_size_unlock(inode, 1);
1647                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1648                 if (retval)
1649                         goto out;
1650         } else {
1651                 /* region is within kms and, hence, within real file size (A) */
1652                 i_size_write(inode, kms);
1653                 ll_inode_size_unlock(inode, 1);
1654         }
1655
1656         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1657                inode->i_ino, count, *ppos, i_size_read(inode));
1658
1659         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1660         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1661         ll_ra_read_in(in_file, &bead);
1662         /* BUG: 5972 */
1663         file_accessed(in_file);
1664         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1665         ll_ra_read_ex(in_file, &bead);
1666
1667  out:
1668         ll_tree_unlock(&tree);
1669         RETURN(retval);
1670 }
1671 #endif
1672
1673 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1674                                unsigned long arg)
1675 {
1676         struct ll_inode_info *lli = ll_i2info(inode);
1677         struct obd_export *exp = ll_i2dtexp(inode);
1678         struct ll_recreate_obj ucreatp;
1679         struct obd_trans_info oti = { 0 };
1680         struct obdo *oa = NULL;
1681         int lsm_size;
1682         int rc = 0;
1683         struct lov_stripe_md *lsm, *lsm2;
1684         ENTRY;
1685
1686         if (!capable (CAP_SYS_ADMIN))
1687                 RETURN(-EPERM);
1688
1689         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1690                             sizeof(struct ll_recreate_obj));
1691         if (rc) {
1692                 RETURN(-EFAULT);
1693         }
1694         OBDO_ALLOC(oa);
1695         if (oa == NULL)
1696                 RETURN(-ENOMEM);
1697
1698         down(&lli->lli_size_sem);
1699         lsm = lli->lli_smd;
1700         if (lsm == NULL)
1701                 GOTO(out, rc = -ENOENT);
1702         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1703                    (lsm->lsm_stripe_count));
1704
1705         OBD_ALLOC(lsm2, lsm_size);
1706         if (lsm2 == NULL)
1707                 GOTO(out, rc = -ENOMEM);
1708
1709         oa->o_id = ucreatp.lrc_id;
1710         oa->o_gr = ucreatp.lrc_group;
1711         oa->o_nlink = ucreatp.lrc_ost_idx;
1712         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1713         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1714         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1715                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1716
1717         oti.oti_objid = NULL;
1718         memcpy(lsm2, lsm, lsm_size);
1719         rc = obd_create(exp, oa, &lsm2, &oti);
1720
1721         OBD_FREE(lsm2, lsm_size);
1722         GOTO(out, rc);
1723 out:
1724         up(&lli->lli_size_sem);
1725         OBDO_FREE(oa);
1726         return rc;
1727 }
1728
1729 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1730                              int flags, struct lov_user_md *lum, int lum_size)
1731 {
1732         struct ll_inode_info *lli = ll_i2info(inode);
1733         struct lov_stripe_md *lsm;
1734         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1735         int rc = 0;
1736         ENTRY;
1737
1738         down(&lli->lli_size_sem);
1739         lsm = lli->lli_smd;
1740         if (lsm) {
1741                 up(&lli->lli_size_sem);
1742                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1743                        inode->i_ino);
1744                 RETURN(-EEXIST);
1745         }
1746
1747         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1748         if (rc)
1749                 GOTO(out, rc);
1750         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1751                 GOTO(out_req_free, rc = -ENOENT);
1752         rc = oit.d.lustre.it_status;
1753         if (rc < 0)
1754                 GOTO(out_req_free, rc);
1755
1756         ll_release_openhandle(file->f_dentry, &oit);
1757
1758  out:
1759         up(&lli->lli_size_sem);
1760         ll_intent_release(&oit);
1761         RETURN(rc);
1762 out_req_free:
1763         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1764         goto out;
1765 }
1766
1767 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1768                              struct lov_mds_md **lmmp, int *lmm_size, 
1769                              struct ptlrpc_request **request)
1770 {
1771         struct ll_sb_info *sbi = ll_i2sbi(inode);
1772         struct mdt_body  *body;
1773         struct lov_mds_md *lmm = NULL;
1774         struct ptlrpc_request *req = NULL;
1775         struct obd_capa *oc;
1776         int rc, lmmsize;
1777
1778         rc = ll_get_max_mdsize(sbi, &lmmsize);
1779         if (rc)
1780                 RETURN(rc);
1781
1782         oc = ll_mdscapa_get(inode);
1783         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1784                              oc, filename, strlen(filename) + 1,
1785                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize, &req);
1786         capa_put(oc);
1787         if (rc < 0) {
1788                 CDEBUG(D_INFO, "md_getattr_name failed "
1789                        "on %s: rc %d\n", filename, rc);
1790                 GOTO(out, rc);
1791         }
1792
1793         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
1794         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1795         /* swabbed by mdc_getattr_name */
1796         LASSERT_REPSWABBED(req, REPLY_REC_OFF);
1797
1798         lmmsize = body->eadatasize;
1799
1800         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1801                         lmmsize == 0) {
1802                 GOTO(out, rc = -ENODATA);
1803         }
1804
1805         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, lmmsize);
1806         LASSERT(lmm != NULL);
1807         LASSERT_REPSWABBED(req, REPLY_REC_OFF + 1);
1808
1809         /*
1810          * This is coming from the MDS, so is probably in
1811          * little endian.  We convert it to host endian before
1812          * passing it to userspace.
1813          */
1814         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1815                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1816                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1817         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1818                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1819         }
1820
1821         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1822                 struct lov_stripe_md *lsm;
1823                 struct lov_user_md_join *lmj;
1824                 int lmj_size, i, aindex = 0;
1825
1826                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1827                 if (rc < 0)
1828                         GOTO(out, rc = -ENOMEM);
1829                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1830                 if (rc)
1831                         GOTO(out_free_memmd, rc);
1832
1833                 lmj_size = sizeof(struct lov_user_md_join) +
1834                            lsm->lsm_stripe_count *
1835                            sizeof(struct lov_user_ost_data_join);
1836                 OBD_ALLOC(lmj, lmj_size);
1837                 if (!lmj)
1838                         GOTO(out_free_memmd, rc = -ENOMEM);
1839
1840                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1841                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1842                         struct lov_extent *lex =
1843                                 &lsm->lsm_array->lai_ext_array[aindex];
1844
1845                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1846                                 aindex ++;
1847                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1848                                         LPU64" len %d\n", aindex, i,
1849                                         lex->le_start, (int)lex->le_len);
1850                         lmj->lmm_objects[i].l_extent_start =
1851                                 lex->le_start;
1852
1853                         if ((int)lex->le_len == -1)
1854                                 lmj->lmm_objects[i].l_extent_end = -1;
1855                         else
1856                                 lmj->lmm_objects[i].l_extent_end =
1857                                         lex->le_start + lex->le_len;
1858                         lmj->lmm_objects[i].l_object_id =
1859                                 lsm->lsm_oinfo[i]->loi_id;
1860                         lmj->lmm_objects[i].l_object_gr =
1861                                 lsm->lsm_oinfo[i]->loi_gr;
1862                         lmj->lmm_objects[i].l_ost_gen =
1863                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1864                         lmj->lmm_objects[i].l_ost_idx =
1865                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1866                 }
1867                 lmm = (struct lov_mds_md *)lmj;
1868                 lmmsize = lmj_size;
1869 out_free_memmd:
1870                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
1871         }
1872 out:
1873         *lmmp = lmm;
1874         *lmm_size = lmmsize;
1875         *request = req;
1876         return rc;
1877 }
1878
1879 static int ll_lov_setea(struct inode *inode, struct file *file,
1880                             unsigned long arg)
1881 {
1882         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1883         struct lov_user_md  *lump;
1884         int lum_size = sizeof(struct lov_user_md) +
1885                        sizeof(struct lov_user_ost_data);
1886         int rc;
1887         ENTRY;
1888
1889         if (!capable (CAP_SYS_ADMIN))
1890                 RETURN(-EPERM);
1891
1892         OBD_ALLOC(lump, lum_size);
1893         if (lump == NULL) {
1894                 RETURN(-ENOMEM);
1895         }
1896         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1897         if (rc) {
1898                 OBD_FREE(lump, lum_size);
1899                 RETURN(-EFAULT);
1900         }
1901
1902         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1903
1904         OBD_FREE(lump, lum_size);
1905         RETURN(rc);
1906 }
1907
1908 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1909                             unsigned long arg)
1910 {
1911         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1912         int rc;
1913         int flags = FMODE_WRITE;
1914         ENTRY;
1915
1916         /* Bug 1152: copy properly when this is no longer true */
1917         LASSERT(sizeof(lum) == sizeof(*lump));
1918         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1919         rc = copy_from_user(&lum, lump, sizeof(lum));
1920         if (rc)
1921                 RETURN(-EFAULT);
1922
1923         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1924         if (rc == 0) {
1925                  put_user(0, &lump->lmm_stripe_count);
1926                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1927                                     0, ll_i2info(inode)->lli_smd, lump);
1928         }
1929         RETURN(rc);
1930 }
1931
1932 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1933 {
1934         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1935
1936         if (!lsm)
1937                 RETURN(-ENODATA);
1938
1939         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1940                             (void *)arg);
1941 }
1942
1943 static int ll_get_grouplock(struct inode *inode, struct file *file,
1944                             unsigned long arg)
1945 {
1946         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1947         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1948                                                     .end = OBD_OBJECT_EOF}};
1949         struct lustre_handle lockh = { 0 };
1950         struct ll_inode_info *lli = ll_i2info(inode);
1951         struct lov_stripe_md *lsm = lli->lli_smd;
1952         int flags = 0, rc;
1953         ENTRY;
1954
1955         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1956                 RETURN(-EINVAL);
1957         }
1958
1959         policy.l_extent.gid = arg;
1960         if (file->f_flags & O_NONBLOCK)
1961                 flags = LDLM_FL_BLOCK_NOWAIT;
1962
1963         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1964         if (rc)
1965                 RETURN(rc);
1966
1967         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1968         fd->fd_gid = arg;
1969         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1970
1971         RETURN(0);
1972 }
1973
1974 static int ll_put_grouplock(struct inode *inode, struct file *file,
1975                             unsigned long arg)
1976 {
1977         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1978         struct ll_inode_info *lli = ll_i2info(inode);
1979         struct lov_stripe_md *lsm = lli->lli_smd;
1980         int rc;
1981         ENTRY;
1982
1983         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1984                 /* Ugh, it's already unlocked. */
1985                 RETURN(-EINVAL);
1986         }
1987
1988         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1989                 RETURN(-EINVAL);
1990
1991         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1992
1993         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1994         if (rc)
1995                 RETURN(rc);
1996
1997         fd->fd_gid = 0;
1998         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1999
2000         RETURN(0);
2001 }
2002
2003 static int join_sanity_check(struct inode *head, struct inode *tail)
2004 {
2005         ENTRY;
2006         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2007                 CERROR("server do not support join \n");
2008                 RETURN(-EINVAL);
2009         }
2010         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2011                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2012                        head->i_ino, tail->i_ino);
2013                 RETURN(-EINVAL);
2014         }
2015         if (head->i_ino == tail->i_ino) {
2016                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2017                 RETURN(-EINVAL);
2018         }
2019         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2020                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2021                 RETURN(-EINVAL);
2022         }
2023         RETURN(0);
2024 }
2025
2026 static int join_file(struct inode *head_inode, struct file *head_filp,
2027                      struct file *tail_filp)
2028 {
2029         struct dentry *tail_dentry = tail_filp->f_dentry;
2030         struct lookup_intent oit = {.it_op = IT_OPEN,
2031                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2032         struct lustre_handle lockh;
2033         struct md_op_data *op_data;
2034         int    rc;
2035         loff_t data;
2036         ENTRY;
2037
2038         tail_dentry = tail_filp->f_dentry;
2039
2040         data = i_size_read(head_inode);
2041         op_data = ll_prep_md_op_data(NULL, head_inode,
2042                                      tail_dentry->d_parent->d_inode,
2043                                      tail_dentry->d_name.name,
2044                                      tail_dentry->d_name.len, 0,
2045                                      LUSTRE_OPC_ANY, &data);
2046         if (IS_ERR(op_data))
2047                 RETURN(PTR_ERR(op_data));
2048
2049         rc = md_enqueue(ll_i2mdexp(head_inode), LDLM_IBITS, &oit, LCK_CW,
2050                         op_data, &lockh, NULL, 0, ldlm_completion_ast,
2051                         ll_md_blocking_ast, NULL, 0);
2052
2053         ll_finish_md_op_data(op_data);
2054         if (rc < 0)
2055                 GOTO(out, rc);
2056
2057         rc = oit.d.lustre.it_status;
2058
2059         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2060                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2061                 ptlrpc_req_finished((struct ptlrpc_request *)
2062                                     oit.d.lustre.it_data);
2063                 GOTO(out, rc);
2064         }
2065
2066         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2067                                            * away */
2068                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2069                 oit.d.lustre.it_lock_mode = 0;
2070         }
2071         ll_release_openhandle(head_filp->f_dentry, &oit);
2072 out:
2073         ll_intent_release(&oit);
2074         RETURN(rc);
2075 }
2076
2077 static int ll_file_join(struct inode *head, struct file *filp,
2078                         char *filename_tail)
2079 {
2080         struct inode *tail = NULL, *first = NULL, *second = NULL;
2081         struct dentry *tail_dentry;
2082         struct file *tail_filp, *first_filp, *second_filp;
2083         struct ll_lock_tree first_tree, second_tree;
2084         struct ll_lock_tree_node *first_node, *second_node;
2085         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2086         int rc = 0, cleanup_phase = 0;
2087         ENTRY;
2088
2089         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2090                head->i_ino, head->i_generation, head, filename_tail);
2091
2092         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2093         if (IS_ERR(tail_filp)) {
2094                 CERROR("Can not open tail file %s", filename_tail);
2095                 rc = PTR_ERR(tail_filp);
2096                 GOTO(cleanup, rc);
2097         }
2098         tail = igrab(tail_filp->f_dentry->d_inode);
2099
2100         tlli = ll_i2info(tail);
2101         tail_dentry = tail_filp->f_dentry;
2102         LASSERT(tail_dentry);
2103         cleanup_phase = 1;
2104
2105         /*reorder the inode for lock sequence*/
2106         first = head->i_ino > tail->i_ino ? head : tail;
2107         second = head->i_ino > tail->i_ino ? tail : head;
2108         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2109         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2110
2111         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2112                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2113         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2114         if (IS_ERR(first_node)){
2115                 rc = PTR_ERR(first_node);
2116                 GOTO(cleanup, rc);
2117         }
2118         first_tree.lt_fd = first_filp->private_data;
2119         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2120         if (rc != 0)
2121                 GOTO(cleanup, rc);
2122         cleanup_phase = 2;
2123
2124         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2125         if (IS_ERR(second_node)){
2126                 rc = PTR_ERR(second_node);
2127                 GOTO(cleanup, rc);
2128         }
2129         second_tree.lt_fd = second_filp->private_data;
2130         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2131         if (rc != 0)
2132                 GOTO(cleanup, rc);
2133         cleanup_phase = 3;
2134
2135         rc = join_sanity_check(head, tail);
2136         if (rc)
2137                 GOTO(cleanup, rc);
2138
2139         rc = join_file(head, filp, tail_filp);
2140         if (rc)
2141                 GOTO(cleanup, rc);
2142 cleanup:
2143         switch (cleanup_phase) {
2144         case 3:
2145                 ll_tree_unlock(&second_tree);
2146                 obd_cancel_unused(ll_i2dtexp(second),
2147                                   ll_i2info(second)->lli_smd, 0, NULL);
2148         case 2:
2149                 ll_tree_unlock(&first_tree);
2150                 obd_cancel_unused(ll_i2dtexp(first),
2151                                   ll_i2info(first)->lli_smd, 0, NULL);
2152         case 1:
2153                 filp_close(tail_filp, 0);
2154                 if (tail)
2155                         iput(tail);
2156                 if (head && rc == 0) {
2157                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2158                                        &hlli->lli_smd);
2159                         hlli->lli_smd = NULL;
2160                 }
2161         case 0:
2162                 break;
2163         default:
2164                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2165                 LBUG();
2166         }
2167         RETURN(rc);
2168 }
2169
2170 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2171 {
2172         struct inode *inode = dentry->d_inode;
2173         struct obd_client_handle *och;
2174         int rc;
2175         ENTRY;
2176
2177         LASSERT(inode);
2178
2179         /* Root ? Do nothing. */
2180         if (dentry->d_inode->i_sb->s_root == dentry)
2181                 RETURN(0);
2182
2183         /* No open handle to close? Move away */
2184         if (!it_disposition(it, DISP_OPEN_OPEN))
2185                 RETURN(0);
2186
2187         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2188
2189         OBD_ALLOC(och, sizeof(*och));
2190         if (!och)
2191                 GOTO(out, rc = -ENOMEM);
2192
2193         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2194                     ll_i2info(inode), it, och);
2195
2196         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2197                                        inode, och);
2198  out:
2199         /* this one is in place of ll_file_open */
2200         ptlrpc_req_finished(it->d.lustre.it_data);
2201         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2202         RETURN(rc);
2203 }
2204
2205 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2206                   unsigned long arg)
2207 {
2208         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2209         int flags;
2210         ENTRY;
2211
2212         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2213                inode->i_generation, inode, cmd);
2214         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2215
2216         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2217         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2218                 RETURN(-ENOTTY);
2219
2220         switch(cmd) {
2221         case LL_IOC_GETFLAGS:
2222                 /* Get the current value of the file flags */
2223                 return put_user(fd->fd_flags, (int *)arg);
2224         case LL_IOC_SETFLAGS:
2225         case LL_IOC_CLRFLAGS:
2226                 /* Set or clear specific file flags */
2227                 /* XXX This probably needs checks to ensure the flags are
2228                  *     not abused, and to handle any flag side effects.
2229                  */
2230                 if (get_user(flags, (int *) arg))
2231                         RETURN(-EFAULT);
2232
2233                 if (cmd == LL_IOC_SETFLAGS) {
2234                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2235                             !(file->f_flags & O_DIRECT)) {
2236                                 CERROR("%s: unable to disable locking on "
2237                                        "non-O_DIRECT file\n", current->comm);
2238                                 RETURN(-EINVAL);
2239                         }
2240
2241                         fd->fd_flags |= flags;
2242                 } else {
2243                         fd->fd_flags &= ~flags;
2244                 }
2245                 RETURN(0);
2246         case LL_IOC_LOV_SETSTRIPE:
2247                 RETURN(ll_lov_setstripe(inode, file, arg));
2248         case LL_IOC_LOV_SETEA:
2249                 RETURN(ll_lov_setea(inode, file, arg));
2250         case LL_IOC_LOV_GETSTRIPE:
2251                 RETURN(ll_lov_getstripe(inode, arg));
2252         case LL_IOC_RECREATE_OBJ:
2253                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2254         case EXT3_IOC_GETFLAGS:
2255         case EXT3_IOC_SETFLAGS:
2256                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2257         case EXT3_IOC_GETVERSION_OLD:
2258         case EXT3_IOC_GETVERSION:
2259                 RETURN(put_user(inode->i_generation, (int *)arg));
2260         case LL_IOC_JOIN: {
2261                 char *ftail;
2262                 int rc;
2263
2264                 ftail = getname((const char *)arg);
2265                 if (IS_ERR(ftail))
2266                         RETURN(PTR_ERR(ftail));
2267                 rc = ll_file_join(inode, file, ftail);
2268                 putname(ftail);
2269                 RETURN(rc);
2270         }
2271         case LL_IOC_GROUP_LOCK:
2272                 RETURN(ll_get_grouplock(inode, file, arg));
2273         case LL_IOC_GROUP_UNLOCK:
2274                 RETURN(ll_put_grouplock(inode, file, arg));
2275         case IOC_OBD_STATFS:
2276                 RETURN(ll_obd_statfs(inode, (void *)arg));
2277
2278         /* We need to special case any other ioctls we want to handle,
2279          * to send them to the MDS/OST as appropriate and to properly
2280          * network encode the arg field.
2281         case EXT3_IOC_SETVERSION_OLD:
2282         case EXT3_IOC_SETVERSION:
2283         */
2284         case LL_IOC_FLUSHCTX:
2285                 RETURN(ll_flush_ctx(inode));
2286         case LL_IOC_GETFACL: {
2287                 struct rmtacl_ioctl_data ioc;
2288
2289                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2290                         RETURN(-EFAULT);
2291
2292                 RETURN(ll_ioctl_getfacl(inode, &ioc));
2293         }
2294         case LL_IOC_SETFACL: {
2295                 struct rmtacl_ioctl_data ioc;
2296
2297                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2298                         RETURN(-EFAULT);
2299
2300                 RETURN(ll_ioctl_setfacl(inode, &ioc));
2301         }
2302         default:
2303                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2304                                      (void *)arg));
2305         }
2306 }
2307
2308 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2309 {
2310         struct inode *inode = file->f_dentry->d_inode;
2311         struct ll_inode_info *lli = ll_i2info(inode);
2312         struct lov_stripe_md *lsm = lli->lli_smd;
2313         loff_t retval;
2314         ENTRY;
2315         retval = offset + ((origin == 2) ? i_size_read(inode) :
2316                            (origin == 1) ? file->f_pos : 0);
2317         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2318                inode->i_ino, inode->i_generation, inode, retval, retval,
2319                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2320         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2321
2322         if (origin == 2) { /* SEEK_END */
2323                 int nonblock = 0, rc;
2324
2325                 if (file->f_flags & O_NONBLOCK)
2326                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2327
2328                 if (lsm != NULL) {
2329                         rc = ll_glimpse_size(inode, nonblock);
2330                         if (rc != 0)
2331                                 RETURN(rc);
2332                 }
2333
2334                 ll_inode_size_lock(inode, 0);
2335                 offset += i_size_read(inode);
2336                 ll_inode_size_unlock(inode, 0);
2337         } else if (origin == 1) { /* SEEK_CUR */
2338                 offset += file->f_pos;
2339         }
2340
2341         retval = -EINVAL;
2342         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2343                 if (offset != file->f_pos) {
2344                         file->f_pos = offset;
2345 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2346                         file->f_reada = 0;
2347                         file->f_version = ++event;
2348 #endif
2349                 }
2350                 retval = offset;
2351         }
2352         
2353         RETURN(retval);
2354 }
2355
2356 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2357 {
2358         struct inode *inode = dentry->d_inode;
2359         struct ll_inode_info *lli = ll_i2info(inode);
2360         struct lov_stripe_md *lsm = lli->lli_smd;
2361         struct ptlrpc_request *req;
2362         struct obd_capa *oc;
2363         int rc, err;
2364         ENTRY;
2365         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2366                inode->i_generation, inode);
2367         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2368
2369         /* fsync's caller has already called _fdata{sync,write}, we want
2370          * that IO to finish before calling the osc and mdc sync methods */
2371         rc = filemap_fdatawait(inode->i_mapping);
2372
2373         /* catch async errors that were recorded back when async writeback
2374          * failed for pages in this mapping. */
2375         err = lli->lli_async_rc;
2376         lli->lli_async_rc = 0;
2377         if (rc == 0)
2378                 rc = err;
2379         if (lsm) {
2380                 err = lov_test_and_clear_async_rc(lsm);
2381                 if (rc == 0)
2382                         rc = err;
2383         }
2384
2385         oc = ll_mdscapa_get(inode);
2386         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2387                       &req);
2388         capa_put(oc);
2389         if (!rc)
2390                 rc = err;
2391         if (!err)
2392                 ptlrpc_req_finished(req);
2393
2394         if (data && lsm) {
2395                 struct obdo *oa;
2396                 
2397                 OBDO_ALLOC(oa);
2398                 if (!oa)
2399                         RETURN(rc ? rc : -ENOMEM);
2400
2401                 oa->o_id = lsm->lsm_object_id;
2402                 oa->o_gr = lsm->lsm_object_gr;
2403                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2404                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2405                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2406                                            OBD_MD_FLGROUP);
2407
2408                 oc = ll_osscapa_get(inode, 0, CAPA_OPC_OSS_WRITE);
2409                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2410                                0, OBD_OBJECT_EOF, oc);
2411                 capa_put(oc);
2412                 if (!rc)
2413                         rc = err;
2414                 OBDO_FREE(oa);
2415         }
2416
2417         RETURN(rc);
2418 }
2419
2420 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2421 {
2422         struct inode *inode = file->f_dentry->d_inode;
2423         struct ll_sb_info *sbi = ll_i2sbi(inode);
2424         struct ldlm_res_id res_id =
2425                 { .name = { fid_seq(ll_inode2fid(inode)),
2426                             fid_oid(ll_inode2fid(inode)),
2427                             fid_ver(ll_inode2fid(inode)),
2428                             LDLM_FLOCK} };
2429         struct lustre_handle lockh = {0};
2430         ldlm_policy_data_t flock;
2431         ldlm_mode_t mode = 0;
2432         int flags = 0;
2433         int rc;
2434         ENTRY;
2435
2436         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2437                inode->i_ino, file_lock);
2438
2439         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2440  
2441         if (file_lock->fl_flags & FL_FLOCK) {
2442                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2443                 /* set missing params for flock() calls */
2444                 file_lock->fl_end = OFFSET_MAX;
2445                 file_lock->fl_pid = current->tgid;
2446         }
2447         flock.l_flock.pid = file_lock->fl_pid;
2448         flock.l_flock.start = file_lock->fl_start;
2449         flock.l_flock.end = file_lock->fl_end;
2450
2451         switch (file_lock->fl_type) {
2452         case F_RDLCK:
2453                 mode = LCK_PR;
2454                 break;
2455         case F_UNLCK:
2456                 /* An unlock request may or may not have any relation to
2457                  * existing locks so we may not be able to pass a lock handle
2458                  * via a normal ldlm_lock_cancel() request. The request may even
2459                  * unlock a byte range in the middle of an existing lock. In
2460                  * order to process an unlock request we need all of the same
2461                  * information that is given with a normal read or write record
2462                  * lock request. To avoid creating another ldlm unlock (cancel)
2463                  * message we'll treat a LCK_NL flock request as an unlock. */
2464                 mode = LCK_NL;
2465                 break;
2466         case F_WRLCK:
2467                 mode = LCK_PW;
2468                 break;
2469         default:
2470                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2471                 LBUG();
2472         }
2473
2474         switch (cmd) {
2475         case F_SETLKW:
2476 #ifdef F_SETLKW64
2477         case F_SETLKW64:
2478 #endif
2479                 flags = 0;
2480                 break;
2481         case F_SETLK:
2482 #ifdef F_SETLK64
2483         case F_SETLK64:
2484 #endif
2485                 flags = LDLM_FL_BLOCK_NOWAIT;
2486                 break;
2487         case F_GETLK:
2488 #ifdef F_GETLK64
2489         case F_GETLK64:
2490 #endif
2491                 flags = LDLM_FL_TEST_LOCK;
2492                 /* Save the old mode so that if the mode in the lock changes we
2493                  * can decrement the appropriate reader or writer refcount. */
2494                 file_lock->fl_type = mode;
2495                 break;
2496         default:
2497                 CERROR("unknown fcntl lock command: %d\n", cmd);
2498                 LBUG();
2499         }
2500
2501         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2502                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2503                flags, mode, flock.l_flock.start, flock.l_flock.end);
2504
2505         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &res_id,
2506                               LDLM_FLOCK, &flock, mode, &flags, NULL,
2507                               ldlm_flock_completion_ast, NULL, file_lock,
2508                               NULL, 0, NULL, &lockh, 0);
2509         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2510                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2511 #ifdef HAVE_F_OP_FLOCK
2512         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2513             !(flags & LDLM_FL_TEST_LOCK))
2514                 posix_lock_file_wait(file, file_lock);
2515 #endif
2516
2517         RETURN(rc);
2518 }
2519
2520 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2521 {
2522         ENTRY;
2523
2524         RETURN(-ENOSYS);
2525 }
2526
2527 int ll_have_md_lock(struct inode *inode, __u64 bits)
2528 {
2529         struct lustre_handle lockh;
2530         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2531         struct lu_fid *fid;
2532         int flags;
2533         ENTRY;
2534
2535         if (!inode)
2536                RETURN(0);
2537
2538         fid = &ll_i2info(inode)->lli_fid;
2539         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2540
2541         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2542         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2543                           LCK_CR|LCK_CW|LCK_PR, &lockh)) {
2544                 RETURN(1);
2545         }
2546
2547         RETURN(0);
2548 }
2549
2550 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2551         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2552                               * and return success */
2553                 inode->i_nlink = 0;
2554                 /* This path cannot be hit for regular files unless in
2555                  * case of obscure races, so no need to to validate
2556                  * size. */
2557                 if (!S_ISREG(inode->i_mode) &&
2558                     !S_ISDIR(inode->i_mode))
2559                         return 0;
2560         }
2561
2562         if (rc) {
2563                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2564                 return -abs(rc);
2565
2566         }
2567
2568         return 0;
2569 }
2570
2571 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2572 {
2573         struct inode *inode = dentry->d_inode;
2574         struct ptlrpc_request *req = NULL;
2575         struct ll_sb_info *sbi;
2576         struct obd_export *exp;
2577         int rc;
2578         ENTRY;
2579
2580         if (!inode) {
2581                 CERROR("REPORT THIS LINE TO PETER\n");
2582                 RETURN(0);
2583         }
2584         sbi = ll_i2sbi(inode);
2585
2586         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2587                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2588 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
2589         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_REVALIDATE, 1);
2590 #endif
2591
2592         exp = ll_i2mdexp(inode);
2593
2594         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2595                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2596                 struct md_op_data *op_data;
2597
2598                 /* Call getattr by fid, so do not provide name at all. */
2599                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2600                                              dentry->d_inode, NULL, 0, 0,
2601                                              LUSTRE_OPC_ANY, NULL);
2602                 if (IS_ERR(op_data))
2603                         RETURN(PTR_ERR(op_data));
2604
2605                 oit.it_flags |= O_CHECK_STALE;
2606                 rc = md_intent_lock(exp, op_data, NULL, 0,
2607                                     /* we are not interested in name
2608                                        based lookup */
2609                                     &oit, 0, &req,
2610                                     ll_md_blocking_ast, 0);
2611                 ll_finish_md_op_data(op_data);
2612                 oit.it_flags &= ~O_CHECK_STALE;
2613                 if (rc < 0) {
2614                         rc = ll_inode_revalidate_fini(inode, rc);
2615                         GOTO (out, rc);
2616                 }
2617
2618                 rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2619                 if (rc != 0) {
2620                         ll_intent_release(&oit);
2621                         GOTO(out, rc);
2622                 }
2623
2624                 /* Unlinked? Unhash dentry, so it is not picked up later by
2625                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2626                    here to preserve get_cwd functionality on 2.6.
2627                    Bug 10503 */
2628                 if (!dentry->d_inode->i_nlink) {
2629                         spin_lock(&dcache_lock);
2630                         ll_drop_dentry(dentry);
2631                         spin_unlock(&dcache_lock);
2632                 }
2633
2634                 ll_lookup_finish_locks(&oit, dentry);
2635         } else if (!ll_have_md_lock(dentry->d_inode,
2636                                     MDS_INODELOCK_UPDATE)) {
2637                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2638                 obd_valid valid = OBD_MD_FLGETATTR;
2639                 struct obd_capa *oc;
2640                 int ealen = 0;
2641
2642                 if (S_ISREG(inode->i_mode)) {
2643                         rc = ll_get_max_mdsize(sbi, &ealen);
2644                         if (rc)
2645                                 RETURN(rc);
2646                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2647                 }
2648                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2649                  * capa for this inode. Because we only keep capas of dirs
2650                  * fresh. */
2651                 oc = ll_mdscapa_get(inode);
2652                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2653                                 ealen, &req);
2654                 capa_put(oc);
2655                 if (rc) {
2656                         rc = ll_inode_revalidate_fini(inode, rc);
2657                         RETURN(rc);
2658                 }
2659
2660                 rc = ll_prep_inode(&inode, req, REPLY_REC_OFF,
2661                                    NULL);
2662                 if (rc)
2663                         GOTO(out, rc);
2664         }
2665
2666         /* if object not yet allocated, don't validate size */
2667         if (ll_i2info(inode)->lli_smd == NULL)
2668                 GOTO(out, rc = 0);
2669
2670         /* ll_glimpse_size will prefer locally cached writes if they extend
2671          * the file */
2672         rc = ll_glimpse_size(inode, 0);
2673         EXIT;
2674 out:
2675         ptlrpc_req_finished(req);
2676         return rc;
2677 }
2678
2679 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2680 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2681                   struct lookup_intent *it, struct kstat *stat)
2682 {
2683         struct inode *inode = de->d_inode;
2684         int res = 0;
2685
2686         res = ll_inode_revalidate_it(de, it);
2687         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2688
2689         if (res)
2690                 return res;
2691
2692         stat->dev = inode->i_sb->s_dev;
2693         stat->ino = inode->i_ino;
2694         stat->mode = inode->i_mode;
2695         stat->nlink = inode->i_nlink;
2696         stat->uid = inode->i_uid;
2697         stat->gid = inode->i_gid;
2698         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2699         stat->atime = inode->i_atime;
2700         stat->mtime = inode->i_mtime;
2701         stat->ctime = inode->i_ctime;
2702 #ifdef HAVE_INODE_BLKSIZE
2703         stat->blksize = inode->i_blksize;
2704 #else
2705         stat->blksize = 1 << inode->i_blkbits;
2706 #endif
2707
2708         ll_inode_size_lock(inode, 0);
2709         stat->size = i_size_read(inode);
2710         stat->blocks = inode->i_blocks;
2711         ll_inode_size_unlock(inode, 0);
2712
2713         return 0;
2714 }
2715 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2716 {
2717         struct lookup_intent it = { .it_op = IT_GETATTR };
2718
2719         return ll_getattr_it(mnt, de, &it, stat);
2720 }
2721 #endif
2722
2723 static
2724 int lustre_check_acl(struct inode *inode, int mask)
2725 {
2726 #ifdef CONFIG_FS_POSIX_ACL
2727         struct ll_inode_info *lli = ll_i2info(inode);
2728         struct posix_acl *acl;
2729         int rc;
2730         ENTRY;
2731
2732         spin_lock(&lli->lli_lock);
2733         acl = posix_acl_dup(lli->lli_posix_acl);
2734         spin_unlock(&lli->lli_lock);
2735
2736         if (!acl)
2737                 RETURN(-EAGAIN);
2738
2739         rc = posix_acl_permission(inode, acl, mask);
2740         posix_acl_release(acl);
2741
2742         RETURN(rc);
2743 #else
2744         return -EAGAIN;
2745 #endif
2746 }
2747
2748 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2749 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2750 {
2751         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2752                inode->i_ino, inode->i_generation, inode, mask);
2753         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2754                 return lustre_check_remote_perm(inode, mask);
2755         
2756         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2757         return generic_permission(inode, mask, lustre_check_acl);
2758 }
2759 #else
2760 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
2761 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2762 #else
2763 int ll_inode_permission(struct inode *inode, int mask)
2764 #endif
2765 {
2766         int mode = inode->i_mode;
2767         int rc;
2768
2769         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2770                inode->i_ino, inode->i_generation, inode, mask);
2771
2772         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2773                 return lustre_check_remote_perm(inode, mask);
2774
2775         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2776
2777         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2778             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2779                 return -EROFS;
2780         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2781                 return -EACCES;
2782         if (current->fsuid == inode->i_uid) {
2783                 mode >>= 6;
2784         } else if (1) {
2785                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2786                         goto check_groups;
2787                 rc = lustre_check_acl(inode, mask);
2788                 if (rc == -EAGAIN)
2789                         goto check_groups;
2790                 if (rc == -EACCES)
2791                         goto check_capabilities;
2792                 return rc;
2793         } else {
2794 check_groups:
2795                 if (in_group_p(inode->i_gid))
2796                         mode >>= 3;
2797         }
2798         if ((mode & mask & S_IRWXO) == mask)
2799                 return 0;
2800
2801 check_capabilities:
2802         if (!(mask & MAY_EXEC) ||
2803             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2804                 if (capable(CAP_DAC_OVERRIDE))
2805                         return 0;
2806
2807         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2808             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2809                 return 0;
2810         
2811         return -EACCES;
2812 }
2813 #endif
2814
2815 /* -o localflock - only provides locally consistent flock locks */
2816 struct file_operations ll_file_operations = {
2817         .read           = ll_file_read,
2818         .write          = ll_file_write,
2819         .ioctl          = ll_file_ioctl,
2820         .open           = ll_file_open,
2821         .release        = ll_file_release,
2822         .mmap           = ll_file_mmap,
2823         .llseek         = ll_file_seek,
2824 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2825         .sendfile       = ll_file_sendfile,
2826 #endif
2827         .fsync          = ll_fsync,
2828 };
2829
2830 struct file_operations ll_file_operations_flock = {
2831         .read           = ll_file_read,
2832         .write          = ll_file_write,
2833         .ioctl          = ll_file_ioctl,
2834         .open           = ll_file_open,
2835         .release        = ll_file_release,
2836         .mmap           = ll_file_mmap,
2837         .llseek         = ll_file_seek,
2838 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2839         .sendfile       = ll_file_sendfile,
2840 #endif
2841         .fsync          = ll_fsync,
2842 #ifdef HAVE_F_OP_FLOCK
2843         .flock          = ll_file_flock,
2844 #endif
2845         .lock           = ll_file_flock
2846 };
2847
2848 /* These are for -o noflock - to return ENOSYS on flock calls */
2849 struct file_operations ll_file_operations_noflock = {
2850         .read           = ll_file_read,
2851         .write          = ll_file_write,
2852         .ioctl          = ll_file_ioctl,
2853         .open           = ll_file_open,
2854         .release        = ll_file_release,
2855         .mmap           = ll_file_mmap,
2856         .llseek         = ll_file_seek,
2857 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2858         .sendfile       = ll_file_sendfile,
2859 #endif
2860         .fsync          = ll_fsync,
2861 #ifdef HAVE_F_OP_FLOCK
2862         .flock          = ll_file_noflock,
2863 #endif
2864         .lock           = ll_file_noflock
2865 };
2866
2867 struct inode_operations ll_file_inode_operations = {
2868 #ifdef LUSTRE_KERNEL_VERSION
2869         .setattr_raw    = ll_setattr_raw,
2870 #endif
2871         .setattr        = ll_setattr,
2872         .truncate       = ll_truncate,
2873 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2874         .getattr        = ll_getattr,
2875 #else
2876         .revalidate_it  = ll_inode_revalidate_it,
2877 #endif
2878         .permission     = ll_inode_permission,
2879         .setxattr       = ll_setxattr,
2880         .getxattr       = ll_getxattr,
2881         .listxattr      = ll_listxattr,
2882         .removexattr    = ll_removexattr,
2883 };
2884