Whamcloud - gitweb
Land b1_8_dir_ra onto HEAD (20080521_1834)
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #include "llite_internal.h"
32
33 /* also used by llite/special.c:ll_special_open() */
34 struct ll_file_data *ll_file_data_get(void)
35 {
36         struct ll_file_data *fd;
37
38         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
39         return fd;
40 }
41
42 static void ll_file_data_put(struct ll_file_data *fd)
43 {
44         if (fd != NULL)
45                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
46 }
47
48 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
49                           struct lustre_handle *fh)
50 {
51         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
52         op_data->op_attr.ia_mode = inode->i_mode;
53         op_data->op_attr.ia_atime = inode->i_atime;
54         op_data->op_attr.ia_mtime = inode->i_mtime;
55         op_data->op_attr.ia_ctime = inode->i_ctime;
56         op_data->op_attr.ia_size = i_size_read(inode);
57         op_data->op_attr_blocks = inode->i_blocks;
58         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
59         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
60         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
61         op_data->op_capa1 = ll_mdscapa_get(inode);
62 }
63
64 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
65                              struct obd_client_handle *och)
66 {
67         ENTRY;
68
69         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
70                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
71
72         if (!(och->och_flags & FMODE_WRITE))
73                 goto out;
74
75         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
76             !S_ISREG(inode->i_mode))
77                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
78         else
79                 ll_epoch_close(inode, op_data, &och, 0);
80
81 out:
82         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
83         EXIT;
84 }
85
86 static int ll_close_inode_openhandle(struct obd_export *md_exp,
87                                      struct inode *inode,
88                                      struct obd_client_handle *och)
89 {
90         struct obd_export *exp = ll_i2mdexp(inode);
91         struct md_op_data *op_data;
92         struct ptlrpc_request *req = NULL;
93         struct obd_device *obd = class_exp2obd(exp);
94         int epoch_close = 1;
95         int seq_end = 0, rc;
96         ENTRY;
97
98         if (obd == NULL) {
99                 /*
100                  * XXX: in case of LMV, is this correct to access
101                  * ->exp_handle?
102                  */
103                 CERROR("Invalid MDC connection handle "LPX64"\n",
104                        ll_i2mdexp(inode)->exp_handle.h_cookie);
105                 GOTO(out, rc = 0);
106         }
107
108         /*
109          * here we check if this is forced umount. If so this is called on
110          * canceling "open lock" and we do not call md_close() in this case, as
111          * it will not be successful, as import is already deactivated.
112          */
113         if (obd->obd_force)
114                 GOTO(out, rc = 0);
115
116         OBD_ALLOC_PTR(op_data);
117         if (op_data == NULL)
118                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
119
120         ll_prepare_close(inode, op_data, och);
121         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
122         rc = md_close(md_exp, op_data, och->och_mod, &req);
123         if (rc != -EAGAIN)
124                 seq_end = 1;
125
126         if (rc == -EAGAIN) {
127                 /* This close must have the epoch closed. */
128                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
129                 LASSERT(epoch_close);
130                 /* MDS has instructed us to obtain Size-on-MDS attribute from
131                  * OSTs and send setattr to back to MDS. */
132                 rc = ll_sizeonmds_update(inode, och->och_mod,
133                                          &och->och_fh, op_data->op_ioepoch);
134                 if (rc) {
135                         CERROR("inode %lu mdc Size-on-MDS update failed: "
136                                "rc = %d\n", inode->i_ino, rc);
137                         rc = 0;
138                 }
139         } else if (rc) {
140                 CERROR("inode %lu mdc close failed: rc = %d\n",
141                        inode->i_ino, rc);
142         }
143         ll_finish_md_op_data(op_data);
144
145         if (rc == 0) {
146                 rc = ll_objects_destroy(req, inode);
147                 if (rc)
148                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
149                                inode->i_ino, rc);
150         }
151
152         EXIT;
153 out:
154       
155         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
156             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
157                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
158         } else {
159                 if (seq_end)
160                         ptlrpc_close_replay_seq(req);
161                 md_clear_open_replay_data(md_exp, och);
162                 /* Free @och if it is not waiting for DONE_WRITING. */
163                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
164                 OBD_FREE_PTR(och);
165         }
166         if (req) /* This is close request */
167                 ptlrpc_req_finished(req);
168         return rc;
169 }
170
171 int ll_md_real_close(struct inode *inode, int flags)
172 {
173         struct ll_inode_info *lli = ll_i2info(inode);
174         struct obd_client_handle **och_p;
175         struct obd_client_handle *och;
176         __u64 *och_usecount;
177         int rc = 0;
178         ENTRY;
179
180         if (flags & FMODE_WRITE) {
181                 och_p = &lli->lli_mds_write_och;
182                 och_usecount = &lli->lli_open_fd_write_count;
183         } else if (flags & FMODE_EXEC) {
184                 och_p = &lli->lli_mds_exec_och;
185                 och_usecount = &lli->lli_open_fd_exec_count;
186         } else {
187                 LASSERT(flags & FMODE_READ);
188                 och_p = &lli->lli_mds_read_och;
189                 och_usecount = &lli->lli_open_fd_read_count;
190         }
191
192         down(&lli->lli_och_sem);
193         if (*och_usecount) { /* There are still users of this handle, so
194                                 skip freeing it. */
195                 up(&lli->lli_och_sem);
196                 RETURN(0);
197         }
198         och=*och_p;
199         *och_p = NULL;
200         up(&lli->lli_och_sem);
201
202         if (och) { /* There might be a race and somebody have freed this och
203                       already */
204                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
205                                                inode, och);
206         }
207
208         RETURN(rc);
209 }
210
211 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
212                 struct file *file)
213 {
214         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
215         struct ll_inode_info *lli = ll_i2info(inode);
216         int rc = 0;
217         ENTRY;
218
219         /* clear group lock, if present */
220         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
221                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
222                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
223                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
224                                       &fd->fd_cwlockh);
225         }
226
227         /* Let's see if we have good enough OPEN lock on the file and if
228            we can skip talking to MDS */
229         if (file->f_dentry->d_inode) { /* Can this ever be false? */
230                 int lockmode;
231                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
232                 struct lustre_handle lockh;
233                 struct inode *inode = file->f_dentry->d_inode;
234                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
235
236                 down(&lli->lli_och_sem);
237                 if (fd->fd_omode & FMODE_WRITE) {
238                         lockmode = LCK_CW;
239                         LASSERT(lli->lli_open_fd_write_count);
240                         lli->lli_open_fd_write_count--;
241                 } else if (fd->fd_omode & FMODE_EXEC) {
242                         lockmode = LCK_PR;
243                         LASSERT(lli->lli_open_fd_exec_count);
244                         lli->lli_open_fd_exec_count--;
245                 } else {
246                         lockmode = LCK_CR;
247                         LASSERT(lli->lli_open_fd_read_count);
248                         lli->lli_open_fd_read_count--;
249                 }
250                 up(&lli->lli_och_sem);
251
252                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
253                                    LDLM_IBITS, &policy, lockmode,
254                                    &lockh)) {
255                         rc = ll_md_real_close(file->f_dentry->d_inode,
256                                               fd->fd_omode);
257                 }
258         } else {
259                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
260                        file, file->f_dentry, file->f_dentry->d_name.name);
261         }
262
263         LUSTRE_FPRIVATE(file) = NULL;
264         ll_file_data_put(fd);
265         ll_capa_close(inode);
266
267         RETURN(rc);
268 }
269
270 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
271
272 /* While this returns an error code, fput() the caller does not, so we need
273  * to make every effort to clean up all of our state here.  Also, applications
274  * rarely check close errors and even if an error is returned they will not
275  * re-try the close call.
276  */
277 int ll_file_release(struct inode *inode, struct file *file)
278 {
279         struct ll_file_data *fd;
280         struct ll_sb_info *sbi = ll_i2sbi(inode);
281         struct ll_inode_info *lli = ll_i2info(inode);
282         struct lov_stripe_md *lsm = lli->lli_smd;
283         int rc;
284
285         ENTRY;
286         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
287                inode->i_generation, inode);
288
289 #ifdef CONFIG_FS_POSIX_ACL
290         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
291             inode == inode->i_sb->s_root->d_inode) {
292                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
293
294                 LASSERT(fd != NULL);
295                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
296                         fd->fd_flags &= ~LL_FILE_RMTACL;
297                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
298                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
299                 }
300         }
301 #endif
302
303         if (inode->i_sb->s_root != file->f_dentry)
304                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
305         fd = LUSTRE_FPRIVATE(file);
306         LASSERT(fd != NULL);
307
308         /* The last ref on @file, maybe not the the owner pid of statahead.
309          * Different processes can open the same dir, "ll_opendir_key" means:
310          * it is me that should stop the statahead thread. */
311         if (lli->lli_opendir_key == fd)
312                 ll_stop_statahead(inode, fd);
313
314         if (inode->i_sb->s_root == file->f_dentry) {
315                 LUSTRE_FPRIVATE(file) = NULL;
316                 ll_file_data_put(fd);
317                 RETURN(0);
318         }
319         
320         if (lsm)
321                 lov_test_and_clear_async_rc(lsm);
322         lli->lli_async_rc = 0;
323
324         rc = ll_md_close(sbi->ll_md_exp, inode, file);
325         RETURN(rc);
326 }
327
328 static int ll_intent_file_open(struct file *file, void *lmm,
329                                int lmmsize, struct lookup_intent *itp)
330 {
331         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
332         struct dentry *parent = file->f_dentry->d_parent;
333         const char *name = file->f_dentry->d_name.name;
334         const int len = file->f_dentry->d_name.len;
335         struct md_op_data *op_data;
336         struct ptlrpc_request *req;
337         int rc;
338         ENTRY;
339
340         if (!parent)
341                 RETURN(-ENOENT);
342
343         /* Usually we come here only for NFSD, and we want open lock.
344            But we can also get here with pre 2.6.15 patchless kernels, and in
345            that case that lock is also ok */
346         /* We can also get here if there was cached open handle in revalidate_it
347          * but it disappeared while we were getting from there to ll_file_open.
348          * But this means this file was closed and immediatelly opened which
349          * makes a good candidate for using OPEN lock */
350         /* If lmmsize & lmm are not 0, we are just setting stripe info
351          * parameters. No need for the open lock */
352         if (!lmm && !lmmsize)
353                 itp->it_flags |= MDS_OPEN_LOCK;
354
355         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
356                                       file->f_dentry->d_inode, name, len,
357                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
358         if (IS_ERR(op_data))
359                 RETURN(PTR_ERR(op_data));
360
361         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
362                             0 /*unused */, &req, ll_md_blocking_ast, 0);
363         ll_finish_md_op_data(op_data);
364         if (rc == -ESTALE) {
365                 /* reason for keep own exit path - don`t flood log
366                 * with messages with -ESTALE errors.
367                 */
368                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
369                      it_open_error(DISP_OPEN_OPEN, itp))
370                         GOTO(out, rc);
371                 ll_release_openhandle(file->f_dentry, itp);
372                 GOTO(out_stale, rc);
373         }
374
375         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
376                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
377                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
378                 GOTO(out, rc);
379         }
380
381         if (itp->d.lustre.it_lock_mode)
382                 md_set_lock_data(sbi->ll_md_exp,
383                                  &itp->d.lustre.it_lock_handle, 
384                                  file->f_dentry->d_inode);
385
386         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
387 out:
388         ptlrpc_req_finished(itp->d.lustre.it_data);
389
390 out_stale:
391         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
392         ll_intent_drop_lock(itp);
393
394         RETURN(rc);
395 }
396
397 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
398                        struct lookup_intent *it, struct obd_client_handle *och)
399 {
400         struct ptlrpc_request *req = it->d.lustre.it_data;
401         struct mdt_body *body;
402
403         LASSERT(och);
404
405         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
406         LASSERT(body != NULL);                      /* reply already checked out */
407
408         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
409         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
410         och->och_fid = lli->lli_fid;
411         och->och_flags = it->it_flags;
412         lli->lli_ioepoch = body->ioepoch;
413
414         return md_set_open_replay_data(md_exp, och, req);
415 }
416
417 int ll_local_open(struct file *file, struct lookup_intent *it,
418                   struct ll_file_data *fd, struct obd_client_handle *och)
419 {
420         struct inode *inode = file->f_dentry->d_inode;
421         struct ll_inode_info *lli = ll_i2info(inode);
422         ENTRY;
423
424         LASSERT(!LUSTRE_FPRIVATE(file));
425
426         LASSERT(fd != NULL);
427
428         if (och) {
429                 struct ptlrpc_request *req = it->d.lustre.it_data;
430                 struct mdt_body *body;
431                 int rc;
432
433                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
434                 if (rc)
435                         RETURN(rc);
436
437                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
438                 if ((it->it_flags & FMODE_WRITE) &&
439                     (body->valid & OBD_MD_FLSIZE))
440                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
441                                lli->lli_ioepoch, PFID(&lli->lli_fid));
442         }
443
444         LUSTRE_FPRIVATE(file) = fd;
445         ll_readahead_init(inode, &fd->fd_ras);
446         fd->fd_omode = it->it_flags;
447         RETURN(0);
448 }
449
450 /* Open a file, and (for the very first open) create objects on the OSTs at
451  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
452  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
453  * lli_open_sem to ensure no other process will create objects, send the
454  * stripe MD to the MDS, or try to destroy the objects if that fails.
455  *
456  * If we already have the stripe MD locally then we don't request it in
457  * md_open(), by passing a lmm_size = 0.
458  *
459  * It is up to the application to ensure no other processes open this file
460  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
461  * used.  We might be able to avoid races of that sort by getting lli_open_sem
462  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
463  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
464  */
465 int ll_file_open(struct inode *inode, struct file *file)
466 {
467         struct ll_inode_info *lli = ll_i2info(inode);
468         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
469                                           .it_flags = file->f_flags };
470         struct lov_stripe_md *lsm;
471         struct ptlrpc_request *req = NULL;
472         struct obd_client_handle **och_p;
473         __u64 *och_usecount;
474         struct ll_file_data *fd;
475         int rc = 0, opendir_set = 0;
476         ENTRY;
477
478         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
479                inode->i_generation, inode, file->f_flags);
480
481 #ifdef HAVE_VFS_INTENT_PATCHES
482         it = file->f_it;
483 #else
484         it = file->private_data; /* XXX: compat macro */
485         file->private_data = NULL; /* prevent ll_local_open assertion */
486 #endif
487
488         fd = ll_file_data_get();
489         if (fd == NULL)
490                 RETURN(-ENOMEM);
491
492         if (S_ISDIR(inode->i_mode)) {
493                 spin_lock(&lli->lli_lock);
494                 /* "lli->lli_opendir_pid != 0" means someone has set it.
495                  * "lli->lli_sai != NULL" means the previous statahead has not
496                  *                        been cleanup. */ 
497                 if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) {
498                         opendir_set = 1;
499                         lli->lli_opendir_pid = cfs_curproc_pid();
500                         lli->lli_opendir_key = fd;
501                 } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) {
502                         /* Two cases for this:
503                          * (1) The same process open such directory many times.
504                          * (2) The old process opened the directory, and exited
505                          *     before its children processes. Then new process
506                          *     with the same pid opens such directory before the
507                          *     old process's children processes exit.
508                          * Change the owner to the latest one. */
509                         opendir_set = 2;
510                         lli->lli_opendir_key = fd;
511                 }
512                 spin_unlock(&lli->lli_lock);
513         }
514
515         if (inode->i_sb->s_root == file->f_dentry) {
516                 LUSTRE_FPRIVATE(file) = fd;
517                 RETURN(0);
518         }
519
520         if (!it || !it->d.lustre.it_disposition) {
521                 /* Convert f_flags into access mode. We cannot use file->f_mode,
522                  * because everything but O_ACCMODE mask was stripped from
523                  * there */
524                 if ((oit.it_flags + 1) & O_ACCMODE)
525                         oit.it_flags++;
526                 if (file->f_flags & O_TRUNC)
527                         oit.it_flags |= FMODE_WRITE;
528
529                 /* kernel only call f_op->open in dentry_open.  filp_open calls
530                  * dentry_open after call to open_namei that checks permissions.
531                  * Only nfsd_open call dentry_open directly without checking
532                  * permissions and because of that this code below is safe. */
533                 if (oit.it_flags & FMODE_WRITE)
534                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
535
536                 /* We do not want O_EXCL here, presumably we opened the file
537                  * already? XXX - NFS implications? */
538                 oit.it_flags &= ~O_EXCL;
539
540                 it = &oit;
541         }
542
543 restart:
544         /* Let's see if we have file open on MDS already. */
545         if (it->it_flags & FMODE_WRITE) {
546                 och_p = &lli->lli_mds_write_och;
547                 och_usecount = &lli->lli_open_fd_write_count;
548         } else if (it->it_flags & FMODE_EXEC) {
549                 och_p = &lli->lli_mds_exec_och;
550                 och_usecount = &lli->lli_open_fd_exec_count;
551          } else {
552                 och_p = &lli->lli_mds_read_och;
553                 och_usecount = &lli->lli_open_fd_read_count;
554         }
555         
556         down(&lli->lli_och_sem);
557         if (*och_p) { /* Open handle is present */
558                 if (it_disposition(it, DISP_OPEN_OPEN)) {
559                         /* Well, there's extra open request that we do not need,
560                            let's close it somehow. This will decref request. */
561                         rc = it_open_error(DISP_OPEN_OPEN, it);
562                         if (rc) {
563                                 ll_file_data_put(fd);
564                                 GOTO(out_och_free, rc);
565                         }       
566                         ll_release_openhandle(file->f_dentry, it);
567                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
568                                              LPROC_LL_OPEN);
569                 }
570                 (*och_usecount)++;
571
572                 rc = ll_local_open(file, it, fd, NULL);
573                 if (rc) {
574                         up(&lli->lli_och_sem);
575                         ll_file_data_put(fd);
576                         RETURN(rc);
577                 }
578         } else {
579                 LASSERT(*och_usecount == 0);
580                 if (!it->d.lustre.it_disposition) {
581                         /* We cannot just request lock handle now, new ELC code
582                            means that one of other OPEN locks for this file
583                            could be cancelled, and since blocking ast handler
584                            would attempt to grab och_sem as well, that would
585                            result in a deadlock */
586                         up(&lli->lli_och_sem);
587                         it->it_flags |= O_CHECK_STALE;
588                         rc = ll_intent_file_open(file, NULL, 0, it);
589                         it->it_flags &= ~O_CHECK_STALE;
590                         if (rc) {
591                                 ll_file_data_put(fd);
592                                 GOTO(out_openerr, rc);
593                         }
594
595                         /* Got some error? Release the request */
596                         if (it->d.lustre.it_status < 0) {
597                                 req = it->d.lustre.it_data;
598                                 ptlrpc_req_finished(req);
599                         }
600                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
601                                          &it->d.lustre.it_lock_handle,
602                                          file->f_dentry->d_inode);
603                         goto restart;
604                 }
605                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
606                 if (!*och_p) {
607                         ll_file_data_put(fd);
608                         GOTO(out_och_free, rc = -ENOMEM);
609                 }
610                 (*och_usecount)++;
611                 req = it->d.lustre.it_data;
612
613                 /* md_intent_lock() didn't get a request ref if there was an
614                  * open error, so don't do cleanup on the request here
615                  * (bug 3430) */
616                 /* XXX (green): Should not we bail out on any error here, not
617                  * just open error? */
618                 rc = it_open_error(DISP_OPEN_OPEN, it);
619                 if (rc) {
620                         ll_file_data_put(fd);
621                         GOTO(out_och_free, rc);
622                 }
623
624                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
625                 rc = ll_local_open(file, it, fd, *och_p);
626                 if (rc) {
627                         up(&lli->lli_och_sem);
628                         ll_file_data_put(fd);
629                         GOTO(out_och_free, rc);
630                 }
631         }
632         up(&lli->lli_och_sem);
633
634         /* Must do this outside lli_och_sem lock to prevent deadlock where
635            different kind of OPEN lock for this same inode gets cancelled
636            by ldlm_cancel_lru */
637         if (!S_ISREG(inode->i_mode))
638                 GOTO(out, rc);
639
640         ll_capa_open(inode);
641
642         lsm = lli->lli_smd;
643         if (lsm == NULL) {
644                 if (file->f_flags & O_LOV_DELAY_CREATE ||
645                     !(file->f_mode & FMODE_WRITE)) {
646                         CDEBUG(D_INODE, "object creation was delayed\n");
647                         GOTO(out, rc);
648                 }
649         }
650         file->f_flags &= ~O_LOV_DELAY_CREATE;
651         GOTO(out, rc);
652 out:
653         ptlrpc_req_finished(req);
654         if (req)
655                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
656 out_och_free:
657         if (rc) {
658                 if (*och_p) {
659                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
660                         *och_p = NULL; /* OBD_FREE writes some magic there */
661                         (*och_usecount)--;
662                 }
663                 up(&lli->lli_och_sem);
664 out_openerr:
665                 if (opendir_set == 1) {
666                         lli->lli_opendir_key = NULL;
667                         lli->lli_opendir_pid = 0;
668                 } else if (unlikely(opendir_set == 2)) {
669                         ll_stop_statahead(inode, fd);
670                 }
671         }
672
673         return rc;
674 }
675
676 /* Fills the obdo with the attributes for the inode defined by lsm */
677 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
678 {
679         struct ptlrpc_request_set *set;
680         struct ll_inode_info *lli = ll_i2info(inode);
681         struct lov_stripe_md *lsm = lli->lli_smd;
682
683         struct obd_info oinfo = { { { 0 } } };
684         int rc;
685         ENTRY;
686
687         LASSERT(lsm != NULL);
688
689         oinfo.oi_md = lsm;
690         oinfo.oi_oa = obdo;
691         oinfo.oi_oa->o_id = lsm->lsm_object_id;
692         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
693         oinfo.oi_oa->o_mode = S_IFREG;
694         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
695                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
696                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
697                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
698                                OBD_MD_FLGROUP;
699         oinfo.oi_capa = ll_mdscapa_get(inode);
700
701         set = ptlrpc_prep_set();
702         if (set == NULL) {
703                 CERROR("can't allocate ptlrpc set\n");
704                 rc = -ENOMEM;
705         } else {
706                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
707                 if (rc == 0)
708                         rc = ptlrpc_set_wait(set);
709                 ptlrpc_set_destroy(set);
710         }
711         capa_put(oinfo.oi_capa);
712         if (rc)
713                 RETURN(rc);
714
715         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
716                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
717                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
718
719         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
720         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
721                lli->lli_smd->lsm_object_id, i_size_read(inode),
722                (unsigned long long)inode->i_blocks,
723                (unsigned long)ll_inode_blksize(inode));
724         RETURN(0);
725 }
726
727 static inline void ll_remove_suid(struct inode *inode)
728 {
729         unsigned int mode;
730
731         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
732         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
733
734         /* was any of the uid bits set? */
735         mode &= inode->i_mode;
736         if (mode && !capable(CAP_FSETID)) {
737                 inode->i_mode &= ~mode;
738                 // XXX careful here - we cannot change the size
739         }
740 }
741
742 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
743 {
744         struct ll_inode_info *lli = ll_i2info(inode);
745         struct lov_stripe_md *lsm = lli->lli_smd;
746         struct obd_export *exp = ll_i2dtexp(inode);
747         struct {
748                 char name[16];
749                 struct ldlm_lock *lock;
750                 struct lov_stripe_md *lsm;
751         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
752         __u32 stripe, vallen = sizeof(stripe);
753         int rc;
754         ENTRY;
755
756         if (lsm->lsm_stripe_count == 1)
757                 GOTO(check, stripe = 0);
758
759         /* get our offset in the lov */
760         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
761         if (rc != 0) {
762                 CERROR("obd_get_info: rc = %d\n", rc);
763                 RETURN(rc);
764         }
765         LASSERT(stripe < lsm->lsm_stripe_count);
766
767 check:
768         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
769             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
770                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
771                            lsm->lsm_oinfo[stripe]->loi_id,
772                            lsm->lsm_oinfo[stripe]->loi_gr);
773                 RETURN(-ELDLM_NO_LOCK_DATA);
774         }
775
776         RETURN(stripe);
777 }
778
779 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
780  * we get a lock cancellation for each stripe, so we have to map the obd's
781  * region back onto the stripes in the file that it held.
782  *
783  * No one can dirty the extent until we've finished our work and they can
784  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
785  * but other kernel actors could have pages locked.
786  *
787  * Called with the DLM lock held. */
788 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
789                               struct ldlm_lock *lock, __u32 stripe)
790 {
791         ldlm_policy_data_t tmpex;
792         unsigned long start, end, count, skip, i, j;
793         struct page *page;
794         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
795         struct lustre_handle lockh;
796         struct address_space *mapping = inode->i_mapping;
797
798         ENTRY;
799         tmpex = lock->l_policy_data;
800         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
801                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
802                i_size_read(inode));
803
804         /* our locks are page granular thanks to osc_enqueue, we invalidate the
805          * whole page. */
806         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
807             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
808                 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
809                            CFS_PAGE_SIZE);
810         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
811         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
812
813         count = ~0;
814         skip = 0;
815         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
816         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
817         if (lsm->lsm_stripe_count > 1) {
818                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
819                 skip = (lsm->lsm_stripe_count - 1) * count;
820                 start += start/count * skip + stripe * count;
821                 if (end != ~0)
822                         end += end/count * skip + stripe * count;
823         }
824         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
825                 end = ~0;
826
827         i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
828             CFS_PAGE_SHIFT : 0;
829         if (i < end)
830                 end = i;
831
832         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
833                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
834                count, skip, end, discard ? " (DISCARDING)" : "");
835
836         /* walk through the vmas on the inode and tear down mmaped pages that
837          * intersect with the lock.  this stops immediately if there are no
838          * mmap()ed regions of the file.  This is not efficient at all and
839          * should be short lived. We'll associate mmap()ed pages with the lock
840          * and will be able to find them directly */
841         for (i = start; i <= end; i += (j + skip)) {
842                 j = min(count - (i % count), end - i + 1);
843                 LASSERT(j > 0);
844                 LASSERT(mapping);
845                 if (ll_teardown_mmaps(mapping,
846                                       (__u64)i << CFS_PAGE_SHIFT,
847                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
848                         break;
849         }
850
851         /* this is the simplistic implementation of page eviction at
852          * cancelation.  It is careful to get races with other page
853          * lockers handled correctly.  fixes from bug 20 will make it
854          * more efficient by associating locks with pages and with
855          * batching writeback under the lock explicitly. */
856         for (i = start, j = start % count; i <= end;
857              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
858                 if (j == count) {
859                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
860                         i += skip;
861                         j = 0;
862                         if (i > end)
863                                 break;
864                 }
865                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
866                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
867                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
868                          start, i, end);
869
870                 if (!mapping_has_pages(mapping)) {
871                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
872                         break;
873                 }
874
875                 cond_resched();
876
877                 page = find_lock_page(mapping, i);
878                 if (page == NULL)
879                         continue;
880                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
881                                i, tmpex.l_extent.start);
882                 if (!discard && PageWriteback(page))
883                         wait_on_page_writeback(page);
884
885                 /* page->mapping to check with racing against teardown */
886                 if (!discard && clear_page_dirty_for_io(page)) {
887                         rc = ll_call_writepage(inode, page);
888                         /* either waiting for io to complete or reacquiring
889                          * the lock that the failed writepage released */
890                         lock_page(page);
891                         wait_on_page_writeback(page);
892                         if (rc < 0) {
893                                 CERROR("writepage inode %lu(%p) of page %p "
894                                        "failed: %d\n", inode->i_ino, inode,
895                                        page, rc);
896                                 if (rc == -ENOSPC)
897                                         set_bit(AS_ENOSPC, &mapping->flags);
898                                 else
899                                         set_bit(AS_EIO, &mapping->flags);
900                         }
901                 }
902
903                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
904                 /* check to see if another DLM lock covers this page b=2765 */
905                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
906                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
907                                       LDLM_FL_TEST_LOCK,
908                                       &lock->l_resource->lr_name, LDLM_EXTENT,
909                                       &tmpex, LCK_PR | LCK_PW, &lockh);
910
911                 if (rc2 <= 0 && page->mapping != NULL) {
912                         struct ll_async_page *llap = llap_cast_private(page);
913                         /* checking again to account for writeback's
914                          * lock_page() */
915                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
916                         if (llap)
917                                 ll_ra_accounting(llap, mapping);
918                         ll_truncate_complete_page(page);
919                 }
920                 unlock_page(page);
921                 page_cache_release(page);
922         }
923         LASSERTF(tmpex.l_extent.start <=
924                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
925                   lock->l_policy_data.l_extent.end + 1),
926                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
927                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
928                  start, i, end);
929         EXIT;
930 }
931
932 static int ll_extent_lock_callback(struct ldlm_lock *lock,
933                                    struct ldlm_lock_desc *new, void *data,
934                                    int flag)
935 {
936         struct lustre_handle lockh = { 0 };
937         int rc;
938         ENTRY;
939
940         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
941                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
942                 LBUG();
943         }
944
945         switch (flag) {
946         case LDLM_CB_BLOCKING:
947                 ldlm_lock2handle(lock, &lockh);
948                 rc = ldlm_cli_cancel(&lockh);
949                 if (rc != ELDLM_OK)
950                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
951                 break;
952         case LDLM_CB_CANCELING: {
953                 struct inode *inode;
954                 struct ll_inode_info *lli;
955                 struct lov_stripe_md *lsm;
956                 int stripe;
957                 __u64 kms;
958
959                 /* This lock wasn't granted, don't try to evict pages */
960                 if (lock->l_req_mode != lock->l_granted_mode)
961                         RETURN(0);
962
963                 inode = ll_inode_from_lock(lock);
964                 if (inode == NULL)
965                         RETURN(0);
966                 lli = ll_i2info(inode);
967                 if (lli == NULL)
968                         goto iput;
969                 if (lli->lli_smd == NULL)
970                         goto iput;
971                 lsm = lli->lli_smd;
972
973                 stripe = ll_lock_to_stripe_offset(inode, lock);
974                 if (stripe < 0)
975                         goto iput;
976
977                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
978
979                 lov_stripe_lock(lsm);
980                 lock_res_and_lock(lock);
981                 kms = ldlm_extent_shift_kms(lock,
982                                             lsm->lsm_oinfo[stripe]->loi_kms);
983
984                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
985                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
986                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
987                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
988                 unlock_res_and_lock(lock);
989                 lov_stripe_unlock(lsm);
990         iput:
991                 iput(inode);
992                 break;
993         }
994         default:
995                 LBUG();
996         }
997
998         RETURN(0);
999 }
1000
1001 #if 0
1002 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
1003 {
1004         /* XXX ALLOCATE - 160 bytes */
1005         struct inode *inode = ll_inode_from_lock(lock);
1006         struct ll_inode_info *lli = ll_i2info(inode);
1007         struct lustre_handle lockh = { 0 };
1008         struct ost_lvb *lvb;
1009         int stripe;
1010         ENTRY;
1011
1012         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
1013                      LDLM_FL_BLOCK_CONV)) {
1014                 LBUG(); /* not expecting any blocked async locks yet */
1015                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
1016                            "lock, returning");
1017                 ldlm_lock_dump(D_OTHER, lock, 0);
1018                 ldlm_reprocess_all(lock->l_resource);
1019                 RETURN(0);
1020         }
1021
1022         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
1023
1024         stripe = ll_lock_to_stripe_offset(inode, lock);
1025         if (stripe < 0)
1026                 goto iput;
1027
1028         if (lock->l_lvb_len) {
1029                 struct lov_stripe_md *lsm = lli->lli_smd;
1030                 __u64 kms;
1031                 lvb = lock->l_lvb_data;
1032                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
1033
1034                 lock_res_and_lock(lock);
1035                 ll_inode_size_lock(inode, 1);
1036                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
1037                 kms = ldlm_extent_shift_kms(NULL, kms);
1038                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
1039                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
1040                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
1041                 lsm->lsm_oinfo[stripe].loi_kms = kms;
1042                 ll_inode_size_unlock(inode, 1);
1043                 unlock_res_and_lock(lock);
1044         }
1045
1046 iput:
1047         iput(inode);
1048         wake_up(&lock->l_waitq);
1049
1050         ldlm_lock2handle(lock, &lockh);
1051         ldlm_lock_decref(&lockh, LCK_PR);
1052         RETURN(0);
1053 }
1054 #endif
1055
1056 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
1057 {
1058         struct ptlrpc_request *req = reqp;
1059         struct inode *inode = ll_inode_from_lock(lock);
1060         struct ll_inode_info *lli;
1061         struct lov_stripe_md *lsm;
1062         struct ost_lvb *lvb;
1063         int rc, stripe;
1064         ENTRY;
1065
1066         if (inode == NULL)
1067                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
1068         lli = ll_i2info(inode);
1069         if (lli == NULL)
1070                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1071         lsm = lli->lli_smd;
1072         if (lsm == NULL)
1073                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1074
1075         /* First, find out which stripe index this lock corresponds to. */
1076         stripe = ll_lock_to_stripe_offset(inode, lock);
1077         if (stripe < 0)
1078                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1079
1080         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
1081         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
1082                              sizeof(*lvb));
1083         rc = req_capsule_server_pack(&req->rq_pill);
1084         if (rc) {
1085                 CERROR("lustre_pack_reply: %d\n", rc);
1086                 GOTO(iput, rc);
1087         }
1088
1089         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
1090         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1091         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1092         lvb->lvb_atime = LTIME_S(inode->i_atime);
1093         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1094
1095         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1096                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1097                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1098                    lvb->lvb_atime, lvb->lvb_ctime);
1099  iput:
1100         iput(inode);
1101
1102  out:
1103         /* These errors are normal races, so we don't want to fill the console
1104          * with messages by calling ptlrpc_error() */
1105         if (rc == -ELDLM_NO_LOCK_DATA)
1106                 lustre_pack_reply(req, 1, NULL, NULL);
1107
1108         req->rq_status = rc;
1109         return rc;
1110 }
1111
1112 static int ll_merge_lvb(struct inode *inode)
1113 {
1114         struct ll_inode_info *lli = ll_i2info(inode);
1115         struct ll_sb_info *sbi = ll_i2sbi(inode);
1116         struct ost_lvb lvb;
1117         int rc;
1118
1119         ENTRY;
1120
1121         ll_inode_size_lock(inode, 1);
1122         inode_init_lvb(inode, &lvb);
1123         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1124         i_size_write(inode, lvb.lvb_size);
1125         inode->i_blocks = lvb.lvb_blocks;
1126
1127         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1128         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1129         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1130         ll_inode_size_unlock(inode, 1);
1131
1132         RETURN(rc);
1133 }
1134
1135 int ll_local_size(struct inode *inode)
1136 {
1137         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1138         struct ll_inode_info *lli = ll_i2info(inode);
1139         struct ll_sb_info *sbi = ll_i2sbi(inode);
1140         struct lustre_handle lockh = { 0 };
1141         int flags = 0;
1142         int rc;
1143         ENTRY;
1144
1145         if (lli->lli_smd->lsm_stripe_count == 0)
1146                 RETURN(0);
1147
1148         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1149                        &policy, LCK_PR, &flags, inode, &lockh);
1150         if (rc < 0)
1151                 RETURN(rc);
1152         else if (rc == 0)
1153                 RETURN(-ENODATA);
1154
1155         rc = ll_merge_lvb(inode);
1156         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1157         RETURN(rc);
1158 }
1159
1160 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1161                      lstat_t *st)
1162 {
1163         struct lustre_handle lockh = { 0 };
1164         struct ldlm_enqueue_info einfo = { 0 };
1165         struct obd_info oinfo = { { { 0 } } };
1166         struct ost_lvb lvb;
1167         int rc;
1168
1169         ENTRY;
1170
1171         einfo.ei_type = LDLM_EXTENT;
1172         einfo.ei_mode = LCK_PR;
1173         einfo.ei_cb_bl = ll_extent_lock_callback;
1174         einfo.ei_cb_cp = ldlm_completion_ast;
1175         einfo.ei_cb_gl = ll_glimpse_callback;
1176         einfo.ei_cbdata = NULL;
1177
1178         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1179         oinfo.oi_lockh = &lockh;
1180         oinfo.oi_md = lsm;
1181         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1182
1183         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1184         if (rc == -ENOENT)
1185                 RETURN(rc);
1186         if (rc != 0) {
1187                 CERROR("obd_enqueue returned rc %d, "
1188                        "returning -EIO\n", rc);
1189                 RETURN(rc > 0 ? -EIO : rc);
1190         }
1191
1192         lov_stripe_lock(lsm);
1193         memset(&lvb, 0, sizeof(lvb));
1194         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1195         st->st_size = lvb.lvb_size;
1196         st->st_blocks = lvb.lvb_blocks;
1197         st->st_mtime = lvb.lvb_mtime;
1198         st->st_atime = lvb.lvb_atime;
1199         st->st_ctime = lvb.lvb_ctime;
1200         lov_stripe_unlock(lsm);
1201
1202         RETURN(rc);
1203 }
1204
1205 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1206  * file (because it prefers KMS over RSS when larger) */
1207 int ll_glimpse_size(struct inode *inode, int ast_flags)
1208 {
1209         struct ll_inode_info *lli = ll_i2info(inode);
1210         struct ll_sb_info *sbi = ll_i2sbi(inode);
1211         struct lustre_handle lockh = { 0 };
1212         struct ldlm_enqueue_info einfo = { 0 };
1213         struct obd_info oinfo = { { { 0 } } };
1214         int rc;
1215         ENTRY;
1216
1217         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1218                 RETURN(0);
1219
1220         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1221
1222         if (!lli->lli_smd) {
1223                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1224                 RETURN(0);
1225         }
1226
1227         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1228          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1229          *       won't revoke any conflicting DLM locks held. Instead,
1230          *       ll_glimpse_callback() will be called on each client
1231          *       holding a DLM lock against this file, and resulting size
1232          *       will be returned for each stripe. DLM lock on [0, EOF] is
1233          *       acquired only if there were no conflicting locks. */
1234         einfo.ei_type = LDLM_EXTENT;
1235         einfo.ei_mode = LCK_PR;
1236         einfo.ei_cb_bl = ll_extent_lock_callback;
1237         einfo.ei_cb_cp = ldlm_completion_ast;
1238         einfo.ei_cb_gl = ll_glimpse_callback;
1239         einfo.ei_cbdata = inode;
1240
1241         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1242         oinfo.oi_lockh = &lockh;
1243         oinfo.oi_md = lli->lli_smd;
1244         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1245
1246         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1247         if (rc == -ENOENT)
1248                 RETURN(rc);
1249         if (rc != 0) {
1250                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1251                 RETURN(rc > 0 ? -EIO : rc);
1252         }
1253
1254         rc = ll_merge_lvb(inode);
1255
1256         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1257                i_size_read(inode), (unsigned long long)inode->i_blocks);
1258
1259         RETURN(rc);
1260 }
1261
1262 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1263                    struct lov_stripe_md *lsm, int mode,
1264                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1265                    int ast_flags)
1266 {
1267         struct ll_sb_info *sbi = ll_i2sbi(inode);
1268         struct ost_lvb lvb;
1269         struct ldlm_enqueue_info einfo = { 0 };
1270         struct obd_info oinfo = { { { 0 } } };
1271         int rc;
1272         ENTRY;
1273
1274         LASSERT(!lustre_handle_is_used(lockh));
1275         LASSERT(lsm != NULL);
1276
1277         /* don't drop the mmapped file to LRU */
1278         if (mapping_mapped(inode->i_mapping))
1279                 ast_flags |= LDLM_FL_NO_LRU;
1280
1281         /* XXX phil: can we do this?  won't it screw the file size up? */
1282         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1283             (sbi->ll_flags & LL_SBI_NOLCK))
1284                 RETURN(0);
1285
1286         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1287                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1288
1289         einfo.ei_type = LDLM_EXTENT;
1290         einfo.ei_mode = mode;
1291         einfo.ei_cb_bl = ll_extent_lock_callback;
1292         einfo.ei_cb_cp = ldlm_completion_ast;
1293         einfo.ei_cb_gl = ll_glimpse_callback;
1294         einfo.ei_cbdata = inode;
1295
1296         oinfo.oi_policy = *policy;
1297         oinfo.oi_lockh = lockh;
1298         oinfo.oi_md = lsm;
1299         oinfo.oi_flags = ast_flags;
1300
1301         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1302         *policy = oinfo.oi_policy;
1303         if (rc > 0)
1304                 rc = -EIO;
1305
1306         ll_inode_size_lock(inode, 1);
1307         inode_init_lvb(inode, &lvb);
1308         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1309
1310         if (policy->l_extent.start == 0 &&
1311             policy->l_extent.end == OBD_OBJECT_EOF) {
1312                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1313                  * the kms under both a DLM lock and the
1314                  * ll_inode_size_lock().  If we don't get the
1315                  * ll_inode_size_lock() here we can match the DLM lock and
1316                  * reset i_size from the kms before the truncating path has
1317                  * updated the kms.  generic_file_write can then trust the
1318                  * stale i_size when doing appending writes and effectively
1319                  * cancel the result of the truncate.  Getting the
1320                  * ll_inode_size_lock() after the enqueue maintains the DLM
1321                  * -> ll_inode_size_lock() acquiring order. */
1322                 i_size_write(inode, lvb.lvb_size);
1323                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1324                        inode->i_ino, i_size_read(inode));
1325         }
1326
1327         if (rc == 0) {
1328                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1329                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1330                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1331         }
1332         ll_inode_size_unlock(inode, 1);
1333
1334         RETURN(rc);
1335 }
1336
1337 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1338                      struct lov_stripe_md *lsm, int mode,
1339                      struct lustre_handle *lockh)
1340 {
1341         struct ll_sb_info *sbi = ll_i2sbi(inode);
1342         int rc;
1343         ENTRY;
1344
1345         /* XXX phil: can we do this?  won't it screw the file size up? */
1346         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1347             (sbi->ll_flags & LL_SBI_NOLCK))
1348                 RETURN(0);
1349
1350         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1351
1352         RETURN(rc);
1353 }
1354
1355 static void ll_set_file_contended(struct inode *inode)
1356 {
1357         struct ll_inode_info *lli = ll_i2info(inode);
1358         cfs_time_t now = cfs_time_current();
1359
1360         spin_lock(&lli->lli_lock);
1361         lli->lli_contention_time = now;
1362         lli->lli_flags |= LLIF_CONTENDED;
1363         spin_unlock(&lli->lli_lock);
1364 }
1365
1366 void ll_clear_file_contended(struct inode *inode)
1367 {
1368         struct ll_inode_info *lli = ll_i2info(inode);
1369
1370         spin_lock(&lli->lli_lock);
1371         lli->lli_flags &= ~LLIF_CONTENDED;
1372         spin_unlock(&lli->lli_lock);
1373 }
1374
1375 static int ll_is_file_contended(struct file *file)
1376 {
1377         struct inode *inode = file->f_dentry->d_inode;
1378         struct ll_inode_info *lli = ll_i2info(inode);
1379         struct ll_sb_info *sbi = ll_i2sbi(inode);
1380         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1381         ENTRY;
1382
1383         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1384                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1385                        " osc connect flags = 0x"LPX64"\n",
1386                        sbi->ll_lco.lco_flags);
1387                 RETURN(0);
1388         }
1389         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1390                 RETURN(1);
1391         if (lli->lli_flags & LLIF_CONTENDED) {
1392                 cfs_time_t cur_time = cfs_time_current();
1393                 cfs_time_t retry_time;
1394
1395                 retry_time = cfs_time_add(
1396                         lli->lli_contention_time,
1397                         cfs_time_seconds(sbi->ll_contention_time));
1398                 if (cfs_time_after(cur_time, retry_time)) {
1399                         ll_clear_file_contended(inode);
1400                         RETURN(0);
1401                 }
1402                 RETURN(1);
1403         }
1404         RETURN(0);
1405 }
1406
1407 static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
1408                                  const char *buf, size_t count,
1409                                  loff_t start, loff_t end, int rw)
1410 {
1411         int append;
1412         int tree_locked = 0;
1413         int rc;
1414         struct inode * inode = file->f_dentry->d_inode;
1415         ENTRY;
1416
1417         append = (rw == WRITE) && (file->f_flags & O_APPEND);
1418
1419         if (append || !ll_is_file_contended(file)) {
1420                 struct ll_lock_tree_node *node;
1421                 int ast_flags;
1422
1423                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1424                 if (file->f_flags & O_NONBLOCK)
1425                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1426                 node = ll_node_from_inode(inode, start, end,
1427                                           (rw == WRITE) ? LCK_PW : LCK_PR);
1428                 if (IS_ERR(node)) {
1429                         rc = PTR_ERR(node);
1430                         GOTO(out, rc);
1431                 }
1432                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1433                 rc = ll_tree_lock(tree, node, buf, count, ast_flags);
1434                 if (rc == 0)
1435                         tree_locked = 1;
1436                 else if (rc == -EUSERS)
1437                         ll_set_file_contended(inode);
1438                 else
1439                         GOTO(out, rc);
1440         }
1441         RETURN(tree_locked);
1442 out:
1443         return rc;
1444 }
1445
1446 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1447                             loff_t *ppos)
1448 {
1449         struct inode *inode = file->f_dentry->d_inode;
1450         struct ll_inode_info *lli = ll_i2info(inode);
1451         struct lov_stripe_md *lsm = lli->lli_smd;
1452         struct ll_sb_info *sbi = ll_i2sbi(inode);
1453         struct ll_lock_tree tree;
1454         struct ost_lvb lvb;
1455         struct ll_ra_read bead;
1456         int ra = 0;
1457         loff_t end;
1458         ssize_t retval, chunk, sum = 0;
1459         int tree_locked;
1460
1461         __u64 kms;
1462         ENTRY;
1463         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1464                inode->i_ino, inode->i_generation, inode, count, *ppos);
1465         /* "If nbyte is 0, read() will return 0 and have no other results."
1466          *                      -- Single Unix Spec */
1467         if (count == 0)
1468                 RETURN(0);
1469
1470         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1471
1472         if (!lsm) {
1473                 /* Read on file with no objects should return zero-filled
1474                  * buffers up to file size (we can get non-zero sizes with
1475                  * mknod + truncate, then opening file for read. This is a
1476                  * common pattern in NFS case, it seems). Bug 6243 */
1477                 int notzeroed;
1478                 /* Since there are no objects on OSTs, we have nothing to get
1479                  * lock on and so we are forced to access inode->i_size
1480                  * unguarded */
1481
1482                 /* Read beyond end of file */
1483                 if (*ppos >= i_size_read(inode))
1484                         RETURN(0);
1485
1486                 if (count > i_size_read(inode) - *ppos)
1487                         count = i_size_read(inode) - *ppos;
1488                 /* Make sure to correctly adjust the file pos pointer for
1489                  * EFAULT case */
1490                 notzeroed = clear_user(buf, count);
1491                 count -= notzeroed;
1492                 *ppos += count;
1493                 if (!count)
1494                         RETURN(-EFAULT);
1495                 RETURN(count);
1496         }
1497 repeat:
1498         if (sbi->ll_max_rw_chunk != 0) {
1499                 /* first, let's know the end of the current stripe */
1500                 end = *ppos;
1501                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
1502                                 (obd_off *)&end);
1503
1504                 /* correct, the end is beyond the request */
1505                 if (end > *ppos + count - 1)
1506                         end = *ppos + count - 1;
1507
1508                 /* and chunk shouldn't be too large even if striping is wide */
1509                 if (end - *ppos > sbi->ll_max_rw_chunk)
1510                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1511         } else {
1512                 end = *ppos + count - 1;
1513         }
1514
1515         tree_locked = ll_file_get_tree_lock(&tree, file, buf,
1516                                             count, *ppos, end, READ);
1517         if (tree_locked < 0)
1518                 GOTO(out, retval = tree_locked);
1519
1520         ll_inode_size_lock(inode, 1);
1521         /*
1522          * Consistency guarantees: following possibilities exist for the
1523          * relation between region being read and real file size at this
1524          * moment:
1525          *
1526          *  (A): the region is completely inside of the file;
1527          *
1528          *  (B-x): x bytes of region are inside of the file, the rest is
1529          *  outside;
1530          *
1531          *  (C): the region is completely outside of the file.
1532          *
1533          * This classification is stable under DLM lock acquired by
1534          * ll_tree_lock() above, because to change class, other client has to
1535          * take DLM lock conflicting with our lock. Also, any updates to
1536          * ->i_size by other threads on this client are serialized by
1537          * ll_inode_size_lock(). This guarantees that short reads are handled
1538          * correctly in the face of concurrent writes and truncates.
1539          */
1540         inode_init_lvb(inode, &lvb);
1541         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1542         kms = lvb.lvb_size;
1543         if (*ppos + count - 1 > kms) {
1544                 /* A glimpse is necessary to determine whether we return a
1545                  * short read (B) or some zeroes at the end of the buffer (C) */
1546                 ll_inode_size_unlock(inode, 1);
1547                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1548                 if (retval) {
1549                         if (tree_locked)
1550                                 ll_tree_unlock(&tree);
1551                         goto out;
1552                 }
1553         } else {
1554                 /* region is within kms and, hence, within real file size (A).
1555                  * We need to increase i_size to cover the read region so that
1556                  * generic_file_read() will do its job, but that doesn't mean
1557                  * the kms size is _correct_, it is only the _minimum_ size.
1558                  * If someone does a stat they will get the correct size which
1559                  * will always be >= the kms value here.  b=11081 */
1560                 if (i_size_read(inode) < kms)
1561                         i_size_write(inode, kms);
1562                 ll_inode_size_unlock(inode, 1);
1563         }
1564
1565         chunk = end - *ppos + 1;
1566         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1567                inode->i_ino, chunk, *ppos, i_size_read(inode));
1568
1569         if (tree_locked) {
1570                 /* turn off the kernel's read-ahead */
1571                 file->f_ra.ra_pages = 0;
1572
1573                 /* initialize read-ahead window once per syscall */
1574                 if (ra == 0) {
1575                         ra = 1;
1576                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1577                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1578                         ll_ra_read_in(file, &bead);
1579                 }
1580
1581                 /* BUG: 5972 */
1582                 file_accessed(file);
1583                 retval = generic_file_read(file, buf, chunk, ppos);
1584                 ll_tree_unlock(&tree);
1585         } else {
1586                 retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
1587         }
1588
1589         ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
1590
1591         if (retval > 0) {
1592                 buf += retval;
1593                 count -= retval;
1594                 sum += retval;
1595                 if (retval == chunk && count > 0)
1596                         goto repeat;
1597         }
1598
1599  out:
1600         if (ra != 0)
1601                 ll_ra_read_ex(file, &bead);
1602         retval = (sum > 0) ? sum : retval;
1603         RETURN(retval);
1604 }
1605
1606 /*
1607  * Write to a file (through the page cache).
1608  */
1609 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1610                              loff_t *ppos)
1611 {
1612         struct inode *inode = file->f_dentry->d_inode;
1613         struct ll_sb_info *sbi = ll_i2sbi(inode);
1614         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1615         struct ll_lock_tree tree;
1616         loff_t maxbytes = ll_file_maxbytes(inode);
1617         loff_t lock_start, lock_end, end;
1618         ssize_t retval, chunk, sum = 0;
1619         int tree_locked;
1620         ENTRY;
1621
1622         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1623                inode->i_ino, inode->i_generation, inode, count, *ppos);
1624
1625         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1626
1627         /* POSIX, but surprised the VFS doesn't check this already */
1628         if (count == 0)
1629                 RETURN(0);
1630
1631         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1632          * called on the file, don't fail the below assertion (bug 2388). */
1633         if (file->f_flags & O_LOV_DELAY_CREATE &&
1634             ll_i2info(inode)->lli_smd == NULL)
1635                 RETURN(-EBADF);
1636
1637         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1638
1639         down(&ll_i2info(inode)->lli_write_sem);
1640
1641 repeat:
1642         chunk = 0; /* just to fix gcc's warning */
1643         end = *ppos + count - 1;
1644
1645         if (file->f_flags & O_APPEND) {
1646                 lock_start = 0;
1647                 lock_end = OBD_OBJECT_EOF;
1648         } else if (sbi->ll_max_rw_chunk != 0) {
1649                 /* first, let's know the end of the current stripe */
1650                 end = *ppos;
1651                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1652                                 (obd_off *)&end);
1653
1654                 /* correct, the end is beyond the request */
1655                 if (end > *ppos + count - 1)
1656                         end = *ppos + count - 1;
1657
1658                 /* and chunk shouldn't be too large even if striping is wide */
1659                 if (end - *ppos > sbi->ll_max_rw_chunk)
1660                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1661                 lock_start = *ppos;
1662                 lock_end = end;
1663         } else {
1664                 lock_start = *ppos;
1665                 lock_end = *ppos + count - 1;
1666         }
1667
1668         tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
1669                                             lock_start, lock_end, WRITE);
1670         if (tree_locked < 0)
1671                 GOTO(out, retval = tree_locked);
1672
1673         /* This is ok, g_f_w will overwrite this under i_sem if it races
1674          * with a local truncate, it just makes our maxbyte checking easier.
1675          * The i_size value gets updated in ll_extent_lock() as a consequence
1676          * of the [0,EOF] extent lock we requested above. */
1677         if (file->f_flags & O_APPEND) {
1678                 *ppos = i_size_read(inode);
1679                 end = *ppos + count - 1;
1680         }
1681
1682         if (*ppos >= maxbytes) {
1683                 send_sig(SIGXFSZ, current, 0);
1684                 GOTO(out_unlock, retval = -EFBIG);
1685         }
1686         if (end > maxbytes - 1)
1687                 end = maxbytes - 1;
1688
1689         /* generic_file_write handles O_APPEND after getting i_mutex */
1690         chunk = end - *ppos + 1;
1691         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1692                inode->i_ino, chunk, *ppos);
1693         if (tree_locked)
1694                 retval = generic_file_write(file, buf, chunk, ppos);
1695         else
1696                 retval = ll_file_lockless_io(file, (char*)buf, chunk,
1697                                              ppos, WRITE);
1698         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1699
1700 out_unlock:
1701         if (tree_locked)
1702                 ll_tree_unlock(&tree);
1703
1704 out:
1705         if (retval > 0) {
1706                 buf += retval;
1707                 count -= retval;
1708                 sum += retval;
1709                 if (retval == chunk && count > 0)
1710                         goto repeat;
1711         }
1712
1713         up(&ll_i2info(inode)->lli_write_sem);
1714
1715         retval = (sum > 0) ? sum : retval;
1716         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1717                            retval > 0 ? retval : 0);
1718         RETURN(retval);
1719 }
1720
1721 /*
1722  * Send file content (through pagecache) somewhere with helper
1723  */
1724 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1725                                 read_actor_t actor, void *target)
1726 {
1727         struct inode *inode = in_file->f_dentry->d_inode;
1728         struct ll_inode_info *lli = ll_i2info(inode);
1729         struct lov_stripe_md *lsm = lli->lli_smd;
1730         struct ll_lock_tree tree;
1731         struct ll_lock_tree_node *node;
1732         struct ost_lvb lvb;
1733         struct ll_ra_read bead;
1734         int rc;
1735         ssize_t retval;
1736         __u64 kms;
1737         ENTRY;
1738         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1739                inode->i_ino, inode->i_generation, inode, count, *ppos);
1740
1741         /* "If nbyte is 0, read() will return 0 and have no other results."
1742          *                      -- Single Unix Spec */
1743         if (count == 0)
1744                 RETURN(0);
1745
1746         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1747         /* turn off the kernel's read-ahead */
1748         in_file->f_ra.ra_pages = 0;
1749
1750         /* File with no objects, nothing to lock */
1751         if (!lsm)
1752                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1753
1754         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1755         if (IS_ERR(node))
1756                 RETURN(PTR_ERR(node));
1757
1758         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1759         rc = ll_tree_lock(&tree, node, NULL, count,
1760                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1761         if (rc != 0)
1762                 RETURN(rc);
1763
1764         ll_clear_file_contended(inode);
1765         ll_inode_size_lock(inode, 1);
1766         /*
1767          * Consistency guarantees: following possibilities exist for the
1768          * relation between region being read and real file size at this
1769          * moment:
1770          *
1771          *  (A): the region is completely inside of the file;
1772          *
1773          *  (B-x): x bytes of region are inside of the file, the rest is
1774          *  outside;
1775          *
1776          *  (C): the region is completely outside of the file.
1777          *
1778          * This classification is stable under DLM lock acquired by
1779          * ll_tree_lock() above, because to change class, other client has to
1780          * take DLM lock conflicting with our lock. Also, any updates to
1781          * ->i_size by other threads on this client are serialized by
1782          * ll_inode_size_lock(). This guarantees that short reads are handled
1783          * correctly in the face of concurrent writes and truncates.
1784          */
1785         inode_init_lvb(inode, &lvb);
1786         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1787         kms = lvb.lvb_size;
1788         if (*ppos + count - 1 > kms) {
1789                 /* A glimpse is necessary to determine whether we return a
1790                  * short read (B) or some zeroes at the end of the buffer (C) */
1791                 ll_inode_size_unlock(inode, 1);
1792                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1793                 if (retval)
1794                         goto out;
1795         } else {
1796                 /* region is within kms and, hence, within real file size (A) */
1797                 i_size_write(inode, kms);
1798                 ll_inode_size_unlock(inode, 1);
1799         }
1800
1801         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1802                inode->i_ino, count, *ppos, i_size_read(inode));
1803
1804         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1805         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1806         ll_ra_read_in(in_file, &bead);
1807         /* BUG: 5972 */
1808         file_accessed(in_file);
1809         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1810         ll_ra_read_ex(in_file, &bead);
1811
1812  out:
1813         ll_tree_unlock(&tree);
1814         RETURN(retval);
1815 }
1816
1817 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1818                                unsigned long arg)
1819 {
1820         struct ll_inode_info *lli = ll_i2info(inode);
1821         struct obd_export *exp = ll_i2dtexp(inode);
1822         struct ll_recreate_obj ucreatp;
1823         struct obd_trans_info oti = { 0 };
1824         struct obdo *oa = NULL;
1825         int lsm_size;
1826         int rc = 0;
1827         struct lov_stripe_md *lsm, *lsm2;
1828         ENTRY;
1829
1830         if (!capable (CAP_SYS_ADMIN))
1831                 RETURN(-EPERM);
1832
1833         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1834                             sizeof(struct ll_recreate_obj));
1835         if (rc) {
1836                 RETURN(-EFAULT);
1837         }
1838         OBDO_ALLOC(oa);
1839         if (oa == NULL)
1840                 RETURN(-ENOMEM);
1841
1842         down(&lli->lli_size_sem);
1843         lsm = lli->lli_smd;
1844         if (lsm == NULL)
1845                 GOTO(out, rc = -ENOENT);
1846         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1847                    (lsm->lsm_stripe_count));
1848
1849         OBD_ALLOC(lsm2, lsm_size);
1850         if (lsm2 == NULL)
1851                 GOTO(out, rc = -ENOMEM);
1852
1853         oa->o_id = ucreatp.lrc_id;
1854         oa->o_gr = ucreatp.lrc_group;
1855         oa->o_nlink = ucreatp.lrc_ost_idx;
1856         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1857         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1858         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1859                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1860
1861         memcpy(lsm2, lsm, lsm_size);
1862         rc = obd_create(exp, oa, &lsm2, &oti);
1863
1864         OBD_FREE(lsm2, lsm_size);
1865         GOTO(out, rc);
1866 out:
1867         up(&lli->lli_size_sem);
1868         OBDO_FREE(oa);
1869         return rc;
1870 }
1871
1872 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1873                              int flags, struct lov_user_md *lum, int lum_size)
1874 {
1875         struct ll_inode_info *lli = ll_i2info(inode);
1876         struct lov_stripe_md *lsm;
1877         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1878         int rc = 0;
1879         ENTRY;
1880
1881         down(&lli->lli_size_sem);
1882         lsm = lli->lli_smd;
1883         if (lsm) {
1884                 up(&lli->lli_size_sem);
1885                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1886                        inode->i_ino);
1887                 RETURN(-EEXIST);
1888         }
1889
1890         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1891         if (rc)
1892                 GOTO(out, rc);
1893         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1894                 GOTO(out_req_free, rc = -ENOENT);
1895         rc = oit.d.lustre.it_status;
1896         if (rc < 0)
1897                 GOTO(out_req_free, rc);
1898
1899         ll_release_openhandle(file->f_dentry, &oit);
1900
1901  out:
1902         up(&lli->lli_size_sem);
1903         ll_intent_release(&oit);
1904         RETURN(rc);
1905 out_req_free:
1906         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1907         goto out;
1908 }
1909
1910 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1911                              struct lov_mds_md **lmmp, int *lmm_size, 
1912                              struct ptlrpc_request **request)
1913 {
1914         struct ll_sb_info *sbi = ll_i2sbi(inode);
1915         struct mdt_body  *body;
1916         struct lov_mds_md *lmm = NULL;
1917         struct ptlrpc_request *req = NULL;
1918         struct obd_capa *oc;
1919         int rc, lmmsize;
1920
1921         rc = ll_get_max_mdsize(sbi, &lmmsize);
1922         if (rc)
1923                 RETURN(rc);
1924
1925         oc = ll_mdscapa_get(inode);
1926         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1927                              oc, filename, strlen(filename) + 1,
1928                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
1929                              ll_i2suppgid(inode), &req);
1930         capa_put(oc);
1931         if (rc < 0) {
1932                 CDEBUG(D_INFO, "md_getattr_name failed "
1933                        "on %s: rc %d\n", filename, rc);
1934                 GOTO(out, rc);
1935         }
1936
1937         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1938         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1939
1940         lmmsize = body->eadatasize;
1941
1942         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1943                         lmmsize == 0) {
1944                 GOTO(out, rc = -ENODATA);
1945         }
1946
1947         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1948         LASSERT(lmm != NULL);
1949
1950         /*
1951          * This is coming from the MDS, so is probably in
1952          * little endian.  We convert it to host endian before
1953          * passing it to userspace.
1954          */
1955         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1956                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1957                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1958         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1959                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1960         }
1961
1962         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1963                 struct lov_stripe_md *lsm;
1964                 struct lov_user_md_join *lmj;
1965                 int lmj_size, i, aindex = 0;
1966
1967                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1968                 if (rc < 0)
1969                         GOTO(out, rc = -ENOMEM);
1970                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1971                 if (rc)
1972                         GOTO(out_free_memmd, rc);
1973
1974                 lmj_size = sizeof(struct lov_user_md_join) +
1975                            lsm->lsm_stripe_count *
1976                            sizeof(struct lov_user_ost_data_join);
1977                 OBD_ALLOC(lmj, lmj_size);
1978                 if (!lmj)
1979                         GOTO(out_free_memmd, rc = -ENOMEM);
1980
1981                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1982                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1983                         struct lov_extent *lex =
1984                                 &lsm->lsm_array->lai_ext_array[aindex];
1985
1986                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1987                                 aindex ++;
1988                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1989                                         LPU64" len %d\n", aindex, i,
1990                                         lex->le_start, (int)lex->le_len);
1991                         lmj->lmm_objects[i].l_extent_start =
1992                                 lex->le_start;
1993
1994                         if ((int)lex->le_len == -1)
1995                                 lmj->lmm_objects[i].l_extent_end = -1;
1996                         else
1997                                 lmj->lmm_objects[i].l_extent_end =
1998                                         lex->le_start + lex->le_len;
1999                         lmj->lmm_objects[i].l_object_id =
2000                                 lsm->lsm_oinfo[i]->loi_id;
2001                         lmj->lmm_objects[i].l_object_gr =
2002                                 lsm->lsm_oinfo[i]->loi_gr;
2003                         lmj->lmm_objects[i].l_ost_gen =
2004                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2005                         lmj->lmm_objects[i].l_ost_idx =
2006                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2007                 }
2008                 lmm = (struct lov_mds_md *)lmj;
2009                 lmmsize = lmj_size;
2010 out_free_memmd:
2011                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
2012         }
2013 out:
2014         *lmmp = lmm;
2015         *lmm_size = lmmsize;
2016         *request = req;
2017         return rc;
2018 }
2019
2020 static int ll_lov_setea(struct inode *inode, struct file *file,
2021                             unsigned long arg)
2022 {
2023         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2024         struct lov_user_md  *lump;
2025         int lum_size = sizeof(struct lov_user_md) +
2026                        sizeof(struct lov_user_ost_data);
2027         int rc;
2028         ENTRY;
2029
2030         if (!capable (CAP_SYS_ADMIN))
2031                 RETURN(-EPERM);
2032
2033         OBD_ALLOC(lump, lum_size);
2034         if (lump == NULL) {
2035                 RETURN(-ENOMEM);
2036         }
2037         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2038         if (rc) {
2039                 OBD_FREE(lump, lum_size);
2040                 RETURN(-EFAULT);
2041         }
2042
2043         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2044
2045         OBD_FREE(lump, lum_size);
2046         RETURN(rc);
2047 }
2048
2049 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2050                             unsigned long arg)
2051 {
2052         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
2053         int rc;
2054         int flags = FMODE_WRITE;
2055         ENTRY;
2056
2057         /* Bug 1152: copy properly when this is no longer true */
2058         LASSERT(sizeof(lum) == sizeof(*lump));
2059         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
2060         rc = copy_from_user(&lum, lump, sizeof(lum));
2061         if (rc)
2062                 RETURN(-EFAULT);
2063
2064         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
2065         if (rc == 0) {
2066                  put_user(0, &lump->lmm_stripe_count);
2067                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
2068                                     0, ll_i2info(inode)->lli_smd, lump);
2069         }
2070         RETURN(rc);
2071 }
2072
2073 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2074 {
2075         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2076
2077         if (!lsm)
2078                 RETURN(-ENODATA);
2079
2080         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
2081                             (void *)arg);
2082 }
2083
2084 static int ll_get_grouplock(struct inode *inode, struct file *file,
2085                             unsigned long arg)
2086 {
2087         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2088         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2089                                                     .end = OBD_OBJECT_EOF}};
2090         struct lustre_handle lockh = { 0 };
2091         struct ll_inode_info *lli = ll_i2info(inode);
2092         struct lov_stripe_md *lsm = lli->lli_smd;
2093         int flags = 0, rc;
2094         ENTRY;
2095
2096         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2097                 RETURN(-EINVAL);
2098         }
2099
2100         policy.l_extent.gid = arg;
2101         if (file->f_flags & O_NONBLOCK)
2102                 flags = LDLM_FL_BLOCK_NOWAIT;
2103
2104         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2105         if (rc)
2106                 RETURN(rc);
2107
2108         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2109         fd->fd_gid = arg;
2110         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2111
2112         RETURN(0);
2113 }
2114
2115 static int ll_put_grouplock(struct inode *inode, struct file *file,
2116                             unsigned long arg)
2117 {
2118         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2119         struct ll_inode_info *lli = ll_i2info(inode);
2120         struct lov_stripe_md *lsm = lli->lli_smd;
2121         int rc;
2122         ENTRY;
2123
2124         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2125                 /* Ugh, it's already unlocked. */
2126                 RETURN(-EINVAL);
2127         }
2128
2129         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2130                 RETURN(-EINVAL);
2131
2132         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2133
2134         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2135         if (rc)
2136                 RETURN(rc);
2137
2138         fd->fd_gid = 0;
2139         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2140
2141         RETURN(0);
2142 }
2143
2144 static int join_sanity_check(struct inode *head, struct inode *tail)
2145 {
2146         ENTRY;
2147         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2148                 CERROR("server do not support join \n");
2149                 RETURN(-EINVAL);
2150         }
2151         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2152                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2153                        head->i_ino, tail->i_ino);
2154                 RETURN(-EINVAL);
2155         }
2156         if (head->i_ino == tail->i_ino) {
2157                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2158                 RETURN(-EINVAL);
2159         }
2160         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2161                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2162                 RETURN(-EINVAL);
2163         }
2164         RETURN(0);
2165 }
2166
2167 static int join_file(struct inode *head_inode, struct file *head_filp,
2168                      struct file *tail_filp)
2169 {
2170         struct dentry *tail_dentry = tail_filp->f_dentry;
2171         struct lookup_intent oit = {.it_op = IT_OPEN,
2172                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2173         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2174                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2175
2176         struct lustre_handle lockh;
2177         struct md_op_data *op_data;
2178         int    rc;
2179         loff_t data;
2180         ENTRY;
2181
2182         tail_dentry = tail_filp->f_dentry;
2183
2184         data = i_size_read(head_inode);
2185         op_data = ll_prep_md_op_data(NULL, head_inode,
2186                                      tail_dentry->d_parent->d_inode,
2187                                      tail_dentry->d_name.name,
2188                                      tail_dentry->d_name.len, 0,
2189                                      LUSTRE_OPC_ANY, &data);
2190         if (IS_ERR(op_data))
2191                 RETURN(PTR_ERR(op_data));
2192
2193         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2194                          op_data, &lockh, NULL, 0, 0);
2195
2196         ll_finish_md_op_data(op_data);
2197         if (rc < 0)
2198                 GOTO(out, rc);
2199
2200         rc = oit.d.lustre.it_status;
2201
2202         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2203                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2204                 ptlrpc_req_finished((struct ptlrpc_request *)
2205                                     oit.d.lustre.it_data);
2206                 GOTO(out, rc);
2207         }
2208
2209         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2210                                            * away */
2211                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2212                 oit.d.lustre.it_lock_mode = 0;
2213         }
2214         ll_release_openhandle(head_filp->f_dentry, &oit);
2215 out:
2216         ll_intent_release(&oit);
2217         RETURN(rc);
2218 }
2219
2220 static int ll_file_join(struct inode *head, struct file *filp,
2221                         char *filename_tail)
2222 {
2223         struct inode *tail = NULL, *first = NULL, *second = NULL;
2224         struct dentry *tail_dentry;
2225         struct file *tail_filp, *first_filp, *second_filp;
2226         struct ll_lock_tree first_tree, second_tree;
2227         struct ll_lock_tree_node *first_node, *second_node;
2228         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2229         int rc = 0, cleanup_phase = 0;
2230         ENTRY;
2231
2232         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2233                head->i_ino, head->i_generation, head, filename_tail);
2234
2235         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2236         if (IS_ERR(tail_filp)) {
2237                 CERROR("Can not open tail file %s", filename_tail);
2238                 rc = PTR_ERR(tail_filp);
2239                 GOTO(cleanup, rc);
2240         }
2241         tail = igrab(tail_filp->f_dentry->d_inode);
2242
2243         tlli = ll_i2info(tail);
2244         tail_dentry = tail_filp->f_dentry;
2245         LASSERT(tail_dentry);
2246         cleanup_phase = 1;
2247
2248         /*reorder the inode for lock sequence*/
2249         first = head->i_ino > tail->i_ino ? head : tail;
2250         second = head->i_ino > tail->i_ino ? tail : head;
2251         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2252         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2253
2254         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2255                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2256         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2257         if (IS_ERR(first_node)){
2258                 rc = PTR_ERR(first_node);
2259                 GOTO(cleanup, rc);
2260         }
2261         first_tree.lt_fd = first_filp->private_data;
2262         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2263         if (rc != 0)
2264                 GOTO(cleanup, rc);
2265         cleanup_phase = 2;
2266
2267         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2268         if (IS_ERR(second_node)){
2269                 rc = PTR_ERR(second_node);
2270                 GOTO(cleanup, rc);
2271         }
2272         second_tree.lt_fd = second_filp->private_data;
2273         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2274         if (rc != 0)
2275                 GOTO(cleanup, rc);
2276         cleanup_phase = 3;
2277
2278         rc = join_sanity_check(head, tail);
2279         if (rc)
2280                 GOTO(cleanup, rc);
2281
2282         rc = join_file(head, filp, tail_filp);
2283         if (rc)
2284                 GOTO(cleanup, rc);
2285 cleanup:
2286         switch (cleanup_phase) {
2287         case 3:
2288                 ll_tree_unlock(&second_tree);
2289                 obd_cancel_unused(ll_i2dtexp(second),
2290                                   ll_i2info(second)->lli_smd, 0, NULL);
2291         case 2:
2292                 ll_tree_unlock(&first_tree);
2293                 obd_cancel_unused(ll_i2dtexp(first),
2294                                   ll_i2info(first)->lli_smd, 0, NULL);
2295         case 1:
2296                 filp_close(tail_filp, 0);
2297                 if (tail)
2298                         iput(tail);
2299                 if (head && rc == 0) {
2300                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2301                                        &hlli->lli_smd);
2302                         hlli->lli_smd = NULL;
2303                 }
2304         case 0:
2305                 break;
2306         default:
2307                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2308                 LBUG();
2309         }
2310         RETURN(rc);
2311 }
2312
2313 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2314 {
2315         struct inode *inode = dentry->d_inode;
2316         struct obd_client_handle *och;
2317         int rc;
2318         ENTRY;
2319
2320         LASSERT(inode);
2321
2322         /* Root ? Do nothing. */
2323         if (dentry->d_inode->i_sb->s_root == dentry)
2324                 RETURN(0);
2325
2326         /* No open handle to close? Move away */
2327         if (!it_disposition(it, DISP_OPEN_OPEN))
2328                 RETURN(0);
2329
2330         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2331
2332         OBD_ALLOC(och, sizeof(*och));
2333         if (!och)
2334                 GOTO(out, rc = -ENOMEM);
2335
2336         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2337                     ll_i2info(inode), it, och);
2338
2339         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2340                                        inode, och);
2341  out:
2342         /* this one is in place of ll_file_open */
2343         ptlrpc_req_finished(it->d.lustre.it_data);
2344         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2345         RETURN(rc);
2346 }
2347
2348 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2349                   unsigned long arg)
2350 {
2351         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2352         int flags;
2353         ENTRY;
2354
2355         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2356                inode->i_generation, inode, cmd);
2357         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2358
2359         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2360         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2361                 RETURN(-ENOTTY);
2362
2363         switch(cmd) {
2364         case LL_IOC_GETFLAGS:
2365                 /* Get the current value of the file flags */
2366                 return put_user(fd->fd_flags, (int *)arg);
2367         case LL_IOC_SETFLAGS:
2368         case LL_IOC_CLRFLAGS:
2369                 /* Set or clear specific file flags */
2370                 /* XXX This probably needs checks to ensure the flags are
2371                  *     not abused, and to handle any flag side effects.
2372                  */
2373                 if (get_user(flags, (int *) arg))
2374                         RETURN(-EFAULT);
2375
2376                 if (cmd == LL_IOC_SETFLAGS) {
2377                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2378                             !(file->f_flags & O_DIRECT)) {
2379                                 CERROR("%s: unable to disable locking on "
2380                                        "non-O_DIRECT file\n", current->comm);
2381                                 RETURN(-EINVAL);
2382                         }
2383
2384                         fd->fd_flags |= flags;
2385                 } else {
2386                         fd->fd_flags &= ~flags;
2387                 }
2388                 RETURN(0);
2389         case LL_IOC_LOV_SETSTRIPE:
2390                 RETURN(ll_lov_setstripe(inode, file, arg));
2391         case LL_IOC_LOV_SETEA:
2392                 RETURN(ll_lov_setea(inode, file, arg));
2393         case LL_IOC_LOV_GETSTRIPE:
2394                 RETURN(ll_lov_getstripe(inode, arg));
2395         case LL_IOC_RECREATE_OBJ:
2396                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2397         case EXT3_IOC_GETFLAGS:
2398         case EXT3_IOC_SETFLAGS:
2399                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2400         case EXT3_IOC_GETVERSION_OLD:
2401         case EXT3_IOC_GETVERSION:
2402                 RETURN(put_user(inode->i_generation, (int *)arg));
2403         case LL_IOC_JOIN: {
2404                 char *ftail;
2405                 int rc;
2406
2407                 ftail = getname((const char *)arg);
2408                 if (IS_ERR(ftail))
2409                         RETURN(PTR_ERR(ftail));
2410                 rc = ll_file_join(inode, file, ftail);
2411                 putname(ftail);
2412                 RETURN(rc);
2413         }
2414         case LL_IOC_GROUP_LOCK:
2415                 RETURN(ll_get_grouplock(inode, file, arg));
2416         case LL_IOC_GROUP_UNLOCK:
2417                 RETURN(ll_put_grouplock(inode, file, arg));
2418         case IOC_OBD_STATFS:
2419                 RETURN(ll_obd_statfs(inode, (void *)arg));
2420
2421         /* We need to special case any other ioctls we want to handle,
2422          * to send them to the MDS/OST as appropriate and to properly
2423          * network encode the arg field.
2424         case EXT3_IOC_SETVERSION_OLD:
2425         case EXT3_IOC_SETVERSION:
2426         */
2427         case LL_IOC_FLUSHCTX:
2428                 RETURN(ll_flush_ctx(inode));
2429         default: {
2430                 int err;
2431
2432                 if (LLIOC_STOP == 
2433                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2434                         RETURN(err);
2435
2436                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2437                                      (void *)arg));
2438         }
2439         }
2440 }
2441
2442 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2443 {
2444         struct inode *inode = file->f_dentry->d_inode;
2445         struct ll_inode_info *lli = ll_i2info(inode);
2446         struct lov_stripe_md *lsm = lli->lli_smd;
2447         loff_t retval;
2448         ENTRY;
2449         retval = offset + ((origin == 2) ? i_size_read(inode) :
2450                            (origin == 1) ? file->f_pos : 0);
2451         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2452                inode->i_ino, inode->i_generation, inode, retval, retval,
2453                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2454         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2455
2456         if (origin == 2) { /* SEEK_END */
2457                 int nonblock = 0, rc;
2458
2459                 if (file->f_flags & O_NONBLOCK)
2460                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2461
2462                 if (lsm != NULL) {
2463                         rc = ll_glimpse_size(inode, nonblock);
2464                         if (rc != 0)
2465                                 RETURN(rc);
2466                 }
2467
2468                 ll_inode_size_lock(inode, 0);
2469                 offset += i_size_read(inode);
2470                 ll_inode_size_unlock(inode, 0);
2471         } else if (origin == 1) { /* SEEK_CUR */
2472                 offset += file->f_pos;
2473         }
2474
2475         retval = -EINVAL;
2476         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2477                 if (offset != file->f_pos) {
2478                         file->f_pos = offset;
2479 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2480                         file->f_reada = 0;
2481                         file->f_version = ++event;
2482 #endif
2483                 }
2484                 retval = offset;
2485         }
2486         
2487         RETURN(retval);
2488 }
2489
2490 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2491 {
2492         struct inode *inode = dentry->d_inode;
2493         struct ll_inode_info *lli = ll_i2info(inode);
2494         struct lov_stripe_md *lsm = lli->lli_smd;
2495         struct ptlrpc_request *req;
2496         struct obd_capa *oc;
2497         int rc, err;
2498         ENTRY;
2499         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2500                inode->i_generation, inode);
2501         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2502
2503         /* fsync's caller has already called _fdata{sync,write}, we want
2504          * that IO to finish before calling the osc and mdc sync methods */
2505         rc = filemap_fdatawait(inode->i_mapping);
2506
2507         /* catch async errors that were recorded back when async writeback
2508          * failed for pages in this mapping. */
2509         err = lli->lli_async_rc;
2510         lli->lli_async_rc = 0;
2511         if (rc == 0)
2512                 rc = err;
2513         if (lsm) {
2514                 err = lov_test_and_clear_async_rc(lsm);
2515                 if (rc == 0)
2516                         rc = err;
2517         }
2518
2519         oc = ll_mdscapa_get(inode);
2520         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2521                       &req);
2522         capa_put(oc);
2523         if (!rc)
2524                 rc = err;
2525         if (!err)
2526                 ptlrpc_req_finished(req);
2527
2528         if (data && lsm) {
2529                 struct obdo *oa;
2530                 
2531                 OBDO_ALLOC(oa);
2532                 if (!oa)
2533                         RETURN(rc ? rc : -ENOMEM);
2534
2535                 oa->o_id = lsm->lsm_object_id;
2536                 oa->o_gr = lsm->lsm_object_gr;
2537                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2538                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2539                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2540                                            OBD_MD_FLGROUP);
2541
2542                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2543                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2544                                0, OBD_OBJECT_EOF, oc);
2545                 capa_put(oc);
2546                 if (!rc)
2547                         rc = err;
2548                 OBDO_FREE(oa);
2549         }
2550
2551         RETURN(rc);
2552 }
2553
2554 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2555 {
2556         struct inode *inode = file->f_dentry->d_inode;
2557         struct ll_sb_info *sbi = ll_i2sbi(inode);
2558         struct ldlm_res_id res_id =
2559                 { .name = { fid_seq(ll_inode2fid(inode)),
2560                             fid_oid(ll_inode2fid(inode)),
2561                             fid_ver(ll_inode2fid(inode)),
2562                             LDLM_FLOCK} };
2563         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2564                 ldlm_flock_completion_ast, NULL, file_lock };
2565         struct lustre_handle lockh = {0};
2566         ldlm_policy_data_t flock;
2567         int flags = 0;
2568         int rc;
2569         ENTRY;
2570
2571         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2572                inode->i_ino, file_lock);
2573
2574         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2575  
2576         if (file_lock->fl_flags & FL_FLOCK) {
2577                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2578                 /* set missing params for flock() calls */
2579                 file_lock->fl_end = OFFSET_MAX;
2580                 file_lock->fl_pid = current->tgid;
2581         }
2582         flock.l_flock.pid = file_lock->fl_pid;
2583         flock.l_flock.start = file_lock->fl_start;
2584         flock.l_flock.end = file_lock->fl_end;
2585
2586         switch (file_lock->fl_type) {
2587         case F_RDLCK:
2588                 einfo.ei_mode = LCK_PR;
2589                 break;
2590         case F_UNLCK:
2591                 /* An unlock request may or may not have any relation to
2592                  * existing locks so we may not be able to pass a lock handle
2593                  * via a normal ldlm_lock_cancel() request. The request may even
2594                  * unlock a byte range in the middle of an existing lock. In
2595                  * order to process an unlock request we need all of the same
2596                  * information that is given with a normal read or write record
2597                  * lock request. To avoid creating another ldlm unlock (cancel)
2598                  * message we'll treat a LCK_NL flock request as an unlock. */
2599                 einfo.ei_mode = LCK_NL;
2600                 break;
2601         case F_WRLCK:
2602                 einfo.ei_mode = LCK_PW;
2603                 break;
2604         default:
2605                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2606                 LBUG();
2607         }
2608
2609         switch (cmd) {
2610         case F_SETLKW:
2611 #ifdef F_SETLKW64
2612         case F_SETLKW64:
2613 #endif
2614                 flags = 0;
2615                 break;
2616         case F_SETLK:
2617 #ifdef F_SETLK64
2618         case F_SETLK64:
2619 #endif
2620                 flags = LDLM_FL_BLOCK_NOWAIT;
2621                 break;
2622         case F_GETLK:
2623 #ifdef F_GETLK64
2624         case F_GETLK64:
2625 #endif
2626                 flags = LDLM_FL_TEST_LOCK;
2627                 /* Save the old mode so that if the mode in the lock changes we
2628                  * can decrement the appropriate reader or writer refcount. */
2629                 file_lock->fl_type = einfo.ei_mode;
2630                 break;
2631         default:
2632                 CERROR("unknown fcntl lock command: %d\n", cmd);
2633                 LBUG();
2634         }
2635
2636         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2637                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2638                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2639
2640         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2641                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2642         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2643                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2644 #ifdef HAVE_F_OP_FLOCK
2645         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2646             !(flags & LDLM_FL_TEST_LOCK))
2647                 posix_lock_file_wait(file, file_lock);
2648 #endif
2649
2650         RETURN(rc);
2651 }
2652
2653 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2654 {
2655         ENTRY;
2656
2657         RETURN(-ENOSYS);
2658 }
2659
2660 int ll_have_md_lock(struct inode *inode, __u64 bits)
2661 {
2662         struct lustre_handle lockh;
2663         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2664         struct lu_fid *fid;
2665         int flags;
2666         ENTRY;
2667
2668         if (!inode)
2669                RETURN(0);
2670
2671         fid = &ll_i2info(inode)->lli_fid;
2672         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2673
2674         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2675         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2676                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2677                 RETURN(1);
2678         }
2679         RETURN(0);
2680 }
2681
2682 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2683                             struct lustre_handle *lockh)
2684 {
2685         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2686         struct lu_fid *fid;
2687         ldlm_mode_t rc;
2688         int flags;
2689         ENTRY;
2690
2691         fid = &ll_i2info(inode)->lli_fid;
2692         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2693
2694         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2695         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2696                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2697         RETURN(rc);
2698 }
2699
2700 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2701         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2702                               * and return success */
2703                 inode->i_nlink = 0;
2704                 /* This path cannot be hit for regular files unless in
2705                  * case of obscure races, so no need to to validate
2706                  * size. */
2707                 if (!S_ISREG(inode->i_mode) &&
2708                     !S_ISDIR(inode->i_mode))
2709                         return 0;
2710         }
2711
2712         if (rc) {
2713                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2714                 return -abs(rc);
2715
2716         }
2717
2718         return 0;
2719 }
2720
2721 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2722 {
2723         struct inode *inode = dentry->d_inode;
2724         struct ptlrpc_request *req = NULL;
2725         struct ll_sb_info *sbi;
2726         struct obd_export *exp;
2727         int rc;
2728         ENTRY;
2729
2730         if (!inode) {
2731                 CERROR("REPORT THIS LINE TO PETER\n");
2732                 RETURN(0);
2733         }
2734         sbi = ll_i2sbi(inode);
2735
2736         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2737                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2738
2739         exp = ll_i2mdexp(inode);
2740
2741         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2742                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2743                 struct md_op_data *op_data;
2744
2745                 /* Call getattr by fid, so do not provide name at all. */
2746                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2747                                              dentry->d_inode, NULL, 0, 0,
2748                                              LUSTRE_OPC_ANY, NULL);
2749                 if (IS_ERR(op_data))
2750                         RETURN(PTR_ERR(op_data));
2751
2752                 oit.it_flags |= O_CHECK_STALE;
2753                 rc = md_intent_lock(exp, op_data, NULL, 0,
2754                                     /* we are not interested in name
2755                                        based lookup */
2756                                     &oit, 0, &req,
2757                                     ll_md_blocking_ast, 0);
2758                 ll_finish_md_op_data(op_data);
2759                 oit.it_flags &= ~O_CHECK_STALE;
2760                 if (rc < 0) {
2761                         rc = ll_inode_revalidate_fini(inode, rc);
2762                         GOTO (out, rc);
2763                 }
2764
2765                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2766                 if (rc != 0) {
2767                         ll_intent_release(&oit);
2768                         GOTO(out, rc);
2769                 }
2770
2771                 /* Unlinked? Unhash dentry, so it is not picked up later by
2772                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2773                    here to preserve get_cwd functionality on 2.6.
2774                    Bug 10503 */
2775                 if (!dentry->d_inode->i_nlink) {
2776                         spin_lock(&dcache_lock);
2777                         ll_drop_dentry(dentry);
2778                         spin_unlock(&dcache_lock);
2779                 }
2780
2781                 ll_lookup_finish_locks(&oit, dentry);
2782         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
2783                                                      MDS_INODELOCK_LOOKUP)) {
2784                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2785                 obd_valid valid = OBD_MD_FLGETATTR;
2786                 struct obd_capa *oc;
2787                 int ealen = 0;
2788
2789                 if (S_ISREG(inode->i_mode)) {
2790                         rc = ll_get_max_mdsize(sbi, &ealen);
2791                         if (rc)
2792                                 RETURN(rc);
2793                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2794                 }
2795                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2796                  * capa for this inode. Because we only keep capas of dirs
2797                  * fresh. */
2798                 oc = ll_mdscapa_get(inode);
2799                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2800                                 ealen, &req);
2801                 capa_put(oc);
2802                 if (rc) {
2803                         rc = ll_inode_revalidate_fini(inode, rc);
2804                         RETURN(rc);
2805                 }
2806
2807                 rc = ll_prep_inode(&inode, req, NULL);
2808                 if (rc)
2809                         GOTO(out, rc);
2810         }
2811
2812         /* if object not yet allocated, don't validate size */
2813         if (ll_i2info(inode)->lli_smd == NULL)
2814                 GOTO(out, rc = 0);
2815
2816         /* ll_glimpse_size will prefer locally cached writes if they extend
2817          * the file */
2818         rc = ll_glimpse_size(inode, 0);
2819         EXIT;
2820 out:
2821         ptlrpc_req_finished(req);
2822         return rc;
2823 }
2824
2825 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2826                   struct lookup_intent *it, struct kstat *stat)
2827 {
2828         struct inode *inode = de->d_inode;
2829         int res = 0;
2830
2831         res = ll_inode_revalidate_it(de, it);
2832         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2833
2834         if (res)
2835                 return res;
2836
2837         stat->dev = inode->i_sb->s_dev;
2838         stat->ino = inode->i_ino;
2839         stat->mode = inode->i_mode;
2840         stat->nlink = inode->i_nlink;
2841         stat->uid = inode->i_uid;
2842         stat->gid = inode->i_gid;
2843         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2844         stat->atime = inode->i_atime;
2845         stat->mtime = inode->i_mtime;
2846         stat->ctime = inode->i_ctime;
2847 #ifdef HAVE_INODE_BLKSIZE
2848         stat->blksize = inode->i_blksize;
2849 #else
2850         stat->blksize = 1 << inode->i_blkbits;
2851 #endif
2852
2853         ll_inode_size_lock(inode, 0);
2854         stat->size = i_size_read(inode);
2855         stat->blocks = inode->i_blocks;
2856         ll_inode_size_unlock(inode, 0);
2857
2858         return 0;
2859 }
2860 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2861 {
2862         struct lookup_intent it = { .it_op = IT_GETATTR };
2863
2864         return ll_getattr_it(mnt, de, &it, stat);
2865 }
2866
2867 static
2868 int lustre_check_acl(struct inode *inode, int mask)
2869 {
2870 #ifdef CONFIG_FS_POSIX_ACL
2871         struct ll_inode_info *lli = ll_i2info(inode);
2872         struct posix_acl *acl;
2873         int rc;
2874         ENTRY;
2875
2876         spin_lock(&lli->lli_lock);
2877         acl = posix_acl_dup(lli->lli_posix_acl);
2878         spin_unlock(&lli->lli_lock);
2879
2880         if (!acl)
2881                 RETURN(-EAGAIN);
2882
2883         rc = posix_acl_permission(inode, acl, mask);
2884         posix_acl_release(acl);
2885
2886         RETURN(rc);
2887 #else
2888         return -EAGAIN;
2889 #endif
2890 }
2891
2892 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2893 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2894 {
2895         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2896                inode->i_ino, inode->i_generation, inode, mask);
2897         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2898                 return lustre_check_remote_perm(inode, mask);
2899         
2900         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2901         return generic_permission(inode, mask, lustre_check_acl);
2902 }
2903 #else
2904 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2905 {
2906         int mode = inode->i_mode;
2907         int rc;
2908
2909         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2910                inode->i_ino, inode->i_generation, inode, mask);
2911
2912         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2913                 return lustre_check_remote_perm(inode, mask);
2914
2915         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2916
2917         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2918             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2919                 return -EROFS;
2920         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2921                 return -EACCES;
2922         if (current->fsuid == inode->i_uid) {
2923                 mode >>= 6;
2924         } else if (1) {
2925                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2926                         goto check_groups;
2927                 rc = lustre_check_acl(inode, mask);
2928                 if (rc == -EAGAIN)
2929                         goto check_groups;
2930                 if (rc == -EACCES)
2931                         goto check_capabilities;
2932                 return rc;
2933         } else {
2934 check_groups:
2935                 if (in_group_p(inode->i_gid))
2936                         mode >>= 3;
2937         }
2938         if ((mode & mask & S_IRWXO) == mask)
2939                 return 0;
2940
2941 check_capabilities:
2942         if (!(mask & MAY_EXEC) ||
2943             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2944                 if (capable(CAP_DAC_OVERRIDE))
2945                         return 0;
2946
2947         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2948             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2949                 return 0;
2950         
2951         return -EACCES;
2952 }
2953 #endif
2954
2955 /* -o localflock - only provides locally consistent flock locks */
2956 struct file_operations ll_file_operations = {
2957         .read           = ll_file_read,
2958         .write          = ll_file_write,
2959         .ioctl          = ll_file_ioctl,
2960         .open           = ll_file_open,
2961         .release        = ll_file_release,
2962         .mmap           = ll_file_mmap,
2963         .llseek         = ll_file_seek,
2964         .sendfile       = ll_file_sendfile,
2965         .fsync          = ll_fsync,
2966 };
2967
2968 struct file_operations ll_file_operations_flock = {
2969         .read           = ll_file_read,
2970         .write          = ll_file_write,
2971         .ioctl          = ll_file_ioctl,
2972         .open           = ll_file_open,
2973         .release        = ll_file_release,
2974         .mmap           = ll_file_mmap,
2975         .llseek         = ll_file_seek,
2976         .sendfile       = ll_file_sendfile,
2977         .fsync          = ll_fsync,
2978 #ifdef HAVE_F_OP_FLOCK
2979         .flock          = ll_file_flock,
2980 #endif
2981         .lock           = ll_file_flock
2982 };
2983
2984 /* These are for -o noflock - to return ENOSYS on flock calls */
2985 struct file_operations ll_file_operations_noflock = {
2986         .read           = ll_file_read,
2987         .write          = ll_file_write,
2988         .ioctl          = ll_file_ioctl,
2989         .open           = ll_file_open,
2990         .release        = ll_file_release,
2991         .mmap           = ll_file_mmap,
2992         .llseek         = ll_file_seek,
2993         .sendfile       = ll_file_sendfile,
2994         .fsync          = ll_fsync,
2995 #ifdef HAVE_F_OP_FLOCK
2996         .flock          = ll_file_noflock,
2997 #endif
2998         .lock           = ll_file_noflock
2999 };
3000
3001 struct inode_operations ll_file_inode_operations = {
3002 #ifdef HAVE_VFS_INTENT_PATCHES
3003         .setattr_raw    = ll_setattr_raw,
3004 #endif
3005         .setattr        = ll_setattr,
3006         .truncate       = ll_truncate,
3007         .getattr        = ll_getattr,
3008         .permission     = ll_inode_permission,
3009         .setxattr       = ll_setxattr,
3010         .getxattr       = ll_getxattr,
3011         .listxattr      = ll_listxattr,
3012         .removexattr    = ll_removexattr,
3013 };
3014
3015 /* dynamic ioctl number support routins */
3016 static struct llioc_ctl_data {
3017         struct rw_semaphore ioc_sem;
3018         struct list_head    ioc_head;
3019 } llioc = { 
3020         __RWSEM_INITIALIZER(llioc.ioc_sem), 
3021         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3022 };
3023
3024
3025 struct llioc_data {
3026         struct list_head        iocd_list;
3027         unsigned int            iocd_size;
3028         llioc_callback_t        iocd_cb;
3029         unsigned int            iocd_count;
3030         unsigned int            iocd_cmd[0];
3031 };
3032
3033 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3034 {
3035         unsigned int size;
3036         struct llioc_data *in_data = NULL;
3037         ENTRY;
3038
3039         if (cb == NULL || cmd == NULL ||
3040             count > LLIOC_MAX_CMD || count < 0)
3041                 RETURN(NULL);
3042
3043         size = sizeof(*in_data) + count * sizeof(unsigned int);
3044         OBD_ALLOC(in_data, size);
3045         if (in_data == NULL)
3046                 RETURN(NULL);
3047
3048         memset(in_data, 0, sizeof(*in_data));
3049         in_data->iocd_size = size;
3050         in_data->iocd_cb = cb;
3051         in_data->iocd_count = count;
3052         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3053
3054         down_write(&llioc.ioc_sem);
3055         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3056         up_write(&llioc.ioc_sem);
3057
3058         RETURN(in_data);
3059 }
3060
3061 void ll_iocontrol_unregister(void *magic)
3062 {
3063         struct llioc_data *tmp;
3064
3065         if (magic == NULL)
3066                 return;
3067
3068         down_write(&llioc.ioc_sem);
3069         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3070                 if (tmp == magic) {
3071                         unsigned int size = tmp->iocd_size;
3072
3073                         list_del(&tmp->iocd_list);
3074                         up_write(&llioc.ioc_sem);
3075
3076                         OBD_FREE(tmp, size);
3077                         return;
3078                 }
3079         }
3080         up_write(&llioc.ioc_sem);
3081
3082         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3083 }
3084
3085 EXPORT_SYMBOL(ll_iocontrol_register);
3086 EXPORT_SYMBOL(ll_iocontrol_unregister);
3087
3088 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
3089                         unsigned int cmd, unsigned long arg, int *rcp)
3090 {
3091         enum llioc_iter ret = LLIOC_CONT;
3092         struct llioc_data *data;
3093         int rc = -EINVAL, i;
3094
3095         down_read(&llioc.ioc_sem);
3096         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3097                 for (i = 0; i < data->iocd_count; i++) {
3098                         if (cmd != data->iocd_cmd[i]) 
3099                                 continue;
3100
3101                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3102                         break;
3103                 }
3104
3105                 if (ret == LLIOC_STOP)
3106                         break;
3107         }
3108         up_read(&llioc.ioc_sem);
3109
3110         if (rcp)
3111                 *rcp = rc;
3112         return ret;
3113 }