Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #include "llite_internal.h"
32
33 /* also used by llite/special.c:ll_special_open() */
34 struct ll_file_data *ll_file_data_get(void)
35 {
36         struct ll_file_data *fd;
37
38         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
39         return fd;
40 }
41
42 static void ll_file_data_put(struct ll_file_data *fd)
43 {
44         if (fd != NULL)
45                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
46 }
47
48 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
49                           struct lustre_handle *fh)
50 {
51         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
52         op_data->op_attr.ia_mode = inode->i_mode;
53         op_data->op_attr.ia_atime = inode->i_atime;
54         op_data->op_attr.ia_mtime = inode->i_mtime;
55         op_data->op_attr.ia_ctime = inode->i_ctime;
56         op_data->op_attr.ia_size = i_size_read(inode);
57         op_data->op_attr_blocks = inode->i_blocks;
58         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
59         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
60         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
61         op_data->op_capa1 = ll_mdscapa_get(inode);
62 }
63
64 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
65                              struct obd_client_handle *och)
66 {
67         ENTRY;
68
69         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
70                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
71
72         if (!(och->och_flags & FMODE_WRITE))
73                 goto out;
74
75         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
76             !S_ISREG(inode->i_mode))
77                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
78         else
79                 ll_epoch_close(inode, op_data, &och, 0);
80
81 out:
82         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
83         EXIT;
84 }
85
86 static int ll_close_inode_openhandle(struct obd_export *md_exp,
87                                      struct inode *inode,
88                                      struct obd_client_handle *och)
89 {
90         struct obd_export *exp = ll_i2mdexp(inode);
91         struct md_op_data *op_data;
92         struct ptlrpc_request *req = NULL;
93         struct obd_device *obd = class_exp2obd(exp);
94         int epoch_close = 1;
95         int seq_end = 0, rc;
96         ENTRY;
97
98         if (obd == NULL) {
99                 /*
100                  * XXX: in case of LMV, is this correct to access
101                  * ->exp_handle?
102                  */
103                 CERROR("Invalid MDC connection handle "LPX64"\n",
104                        ll_i2mdexp(inode)->exp_handle.h_cookie);
105                 GOTO(out, rc = 0);
106         }
107
108         /*
109          * here we check if this is forced umount. If so this is called on
110          * canceling "open lock" and we do not call md_close() in this case, as
111          * it will not be successful, as import is already deactivated.
112          */
113         if (obd->obd_force)
114                 GOTO(out, rc = 0);
115
116         OBD_ALLOC_PTR(op_data);
117         if (op_data == NULL)
118                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
119
120         ll_prepare_close(inode, op_data, och);
121         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
122         rc = md_close(md_exp, op_data, och->och_mod, &req);
123         if (rc != -EAGAIN)
124                 seq_end = 1;
125
126         if (rc == -EAGAIN) {
127                 /* This close must have the epoch closed. */
128                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
129                 LASSERT(epoch_close);
130                 /* MDS has instructed us to obtain Size-on-MDS attribute from
131                  * OSTs and send setattr to back to MDS. */
132                 rc = ll_sizeonmds_update(inode, och->och_mod,
133                                          &och->och_fh, op_data->op_ioepoch);
134                 if (rc) {
135                         CERROR("inode %lu mdc Size-on-MDS update failed: "
136                                "rc = %d\n", inode->i_ino, rc);
137                         rc = 0;
138                 }
139         } else if (rc) {
140                 CERROR("inode %lu mdc close failed: rc = %d\n",
141                        inode->i_ino, rc);
142         }
143         ll_finish_md_op_data(op_data);
144
145         if (rc == 0) {
146                 rc = ll_objects_destroy(req, inode);
147                 if (rc)
148                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
149                                inode->i_ino, rc);
150         }
151
152         EXIT;
153 out:
154       
155         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
156             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
157                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
158         } else {
159                 if (seq_end)
160                         ptlrpc_close_replay_seq(req);
161                 md_clear_open_replay_data(md_exp, och);
162                 /* Free @och if it is not waiting for DONE_WRITING. */
163                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
164                 OBD_FREE_PTR(och);
165         }
166         if (req) /* This is close request */
167                 ptlrpc_req_finished(req);
168         return rc;
169 }
170
171 int ll_md_real_close(struct inode *inode, int flags)
172 {
173         struct ll_inode_info *lli = ll_i2info(inode);
174         struct obd_client_handle **och_p;
175         struct obd_client_handle *och;
176         __u64 *och_usecount;
177         int rc = 0;
178         ENTRY;
179
180         if (flags & FMODE_WRITE) {
181                 och_p = &lli->lli_mds_write_och;
182                 och_usecount = &lli->lli_open_fd_write_count;
183         } else if (flags & FMODE_EXEC) {
184                 och_p = &lli->lli_mds_exec_och;
185                 och_usecount = &lli->lli_open_fd_exec_count;
186         } else {
187                 LASSERT(flags & FMODE_READ);
188                 och_p = &lli->lli_mds_read_och;
189                 och_usecount = &lli->lli_open_fd_read_count;
190         }
191
192         down(&lli->lli_och_sem);
193         if (*och_usecount) { /* There are still users of this handle, so
194                                 skip freeing it. */
195                 up(&lli->lli_och_sem);
196                 RETURN(0);
197         }
198         och=*och_p;
199         *och_p = NULL;
200         up(&lli->lli_och_sem);
201
202         if (och) { /* There might be a race and somebody have freed this och
203                       already */
204                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
205                                                inode, och);
206         }
207
208         RETURN(rc);
209 }
210
211 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
212                 struct file *file)
213 {
214         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
215         struct ll_inode_info *lli = ll_i2info(inode);
216         int rc = 0;
217         ENTRY;
218
219         /* clear group lock, if present */
220         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
221                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
222                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
223                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
224                                       &fd->fd_cwlockh);
225         }
226
227         /* Let's see if we have good enough OPEN lock on the file and if
228            we can skip talking to MDS */
229         if (file->f_dentry->d_inode) { /* Can this ever be false? */
230                 int lockmode;
231                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
232                 struct lustre_handle lockh;
233                 struct inode *inode = file->f_dentry->d_inode;
234                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
235
236                 down(&lli->lli_och_sem);
237                 if (fd->fd_omode & FMODE_WRITE) {
238                         lockmode = LCK_CW;
239                         LASSERT(lli->lli_open_fd_write_count);
240                         lli->lli_open_fd_write_count--;
241                 } else if (fd->fd_omode & FMODE_EXEC) {
242                         lockmode = LCK_PR;
243                         LASSERT(lli->lli_open_fd_exec_count);
244                         lli->lli_open_fd_exec_count--;
245                 } else {
246                         lockmode = LCK_CR;
247                         LASSERT(lli->lli_open_fd_read_count);
248                         lli->lli_open_fd_read_count--;
249                 }
250                 up(&lli->lli_och_sem);
251
252                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
253                                    LDLM_IBITS, &policy, lockmode,
254                                    &lockh)) {
255                         rc = ll_md_real_close(file->f_dentry->d_inode,
256                                               fd->fd_omode);
257                 }
258         } else {
259                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
260                        file, file->f_dentry, file->f_dentry->d_name.name);
261         }
262
263         LUSTRE_FPRIVATE(file) = NULL;
264         ll_file_data_put(fd);
265         ll_capa_close(inode);
266
267         RETURN(rc);
268 }
269
270 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
271
272 /* While this returns an error code, fput() the caller does not, so we need
273  * to make every effort to clean up all of our state here.  Also, applications
274  * rarely check close errors and even if an error is returned they will not
275  * re-try the close call.
276  */
277 int ll_file_release(struct inode *inode, struct file *file)
278 {
279         struct ll_file_data *fd;
280         struct ll_sb_info *sbi = ll_i2sbi(inode);
281         struct ll_inode_info *lli = ll_i2info(inode);
282         struct lov_stripe_md *lsm = lli->lli_smd;
283         int rc;
284
285         ENTRY;
286         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
287                inode->i_generation, inode);
288
289 #ifdef CONFIG_FS_POSIX_ACL
290         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
291             inode == inode->i_sb->s_root->d_inode) {
292                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
293
294                 LASSERT(fd != NULL);
295                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
296                         fd->fd_flags &= ~LL_FILE_RMTACL;
297                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
298                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
299                 }
300         }
301 #endif
302
303         if (inode->i_sb->s_root != file->f_dentry)
304                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
305         fd = LUSTRE_FPRIVATE(file);
306         LASSERT(fd != NULL);
307
308         /* The last ref on @file, maybe not the the owner pid of statahead.
309          * Different processes can open the same dir, "ll_opendir_key" means:
310          * it is me that should stop the statahead thread. */
311         if (lli->lli_opendir_key == fd)
312                 ll_stop_statahead(inode, fd);
313
314         if (inode->i_sb->s_root == file->f_dentry) {
315                 LUSTRE_FPRIVATE(file) = NULL;
316                 ll_file_data_put(fd);
317                 RETURN(0);
318         }
319         
320         if (lsm)
321                 lov_test_and_clear_async_rc(lsm);
322         lli->lli_async_rc = 0;
323
324         rc = ll_md_close(sbi->ll_md_exp, inode, file);
325         RETURN(rc);
326 }
327
328 static int ll_intent_file_open(struct file *file, void *lmm,
329                                int lmmsize, struct lookup_intent *itp)
330 {
331         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
332         struct dentry *parent = file->f_dentry->d_parent;
333         const char *name = file->f_dentry->d_name.name;
334         const int len = file->f_dentry->d_name.len;
335         struct md_op_data *op_data;
336         struct ptlrpc_request *req;
337         int rc;
338         ENTRY;
339
340         if (!parent)
341                 RETURN(-ENOENT);
342
343         /* Usually we come here only for NFSD, and we want open lock.
344            But we can also get here with pre 2.6.15 patchless kernels, and in
345            that case that lock is also ok */
346         /* We can also get here if there was cached open handle in revalidate_it
347          * but it disappeared while we were getting from there to ll_file_open.
348          * But this means this file was closed and immediatelly opened which
349          * makes a good candidate for using OPEN lock */
350         /* If lmmsize & lmm are not 0, we are just setting stripe info
351          * parameters. No need for the open lock */
352         if (!lmm && !lmmsize)
353                 itp->it_flags |= MDS_OPEN_LOCK;
354
355         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
356                                       file->f_dentry->d_inode, name, len,
357                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
358         if (IS_ERR(op_data))
359                 RETURN(PTR_ERR(op_data));
360
361         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
362                             0 /*unused */, &req, ll_md_blocking_ast, 0);
363         ll_finish_md_op_data(op_data);
364         if (rc == -ESTALE) {
365                 /* reason for keep own exit path - don`t flood log
366                 * with messages with -ESTALE errors.
367                 */
368                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
369                      it_open_error(DISP_OPEN_OPEN, itp))
370                         GOTO(out, rc);
371                 ll_release_openhandle(file->f_dentry, itp);
372                 GOTO(out_stale, rc);
373         }
374
375         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
376                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
377                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
378                 GOTO(out, rc);
379         }
380
381         if (itp->d.lustre.it_lock_mode)
382                 md_set_lock_data(sbi->ll_md_exp,
383                                  &itp->d.lustre.it_lock_handle, 
384                                  file->f_dentry->d_inode);
385
386         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
387 out:
388         ptlrpc_req_finished(itp->d.lustre.it_data);
389
390 out_stale:
391         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
392         ll_intent_drop_lock(itp);
393
394         RETURN(rc);
395 }
396
397 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
398                        struct lookup_intent *it, struct obd_client_handle *och)
399 {
400         struct ptlrpc_request *req = it->d.lustre.it_data;
401         struct mdt_body *body;
402
403         LASSERT(och);
404
405         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
406         LASSERT(body != NULL);                      /* reply already checked out */
407
408         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
409         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
410         och->och_fid = lli->lli_fid;
411         och->och_flags = it->it_flags;
412         lli->lli_ioepoch = body->ioepoch;
413
414         return md_set_open_replay_data(md_exp, och, req);
415 }
416
417 int ll_local_open(struct file *file, struct lookup_intent *it,
418                   struct ll_file_data *fd, struct obd_client_handle *och)
419 {
420         struct inode *inode = file->f_dentry->d_inode;
421         struct ll_inode_info *lli = ll_i2info(inode);
422         ENTRY;
423
424         LASSERT(!LUSTRE_FPRIVATE(file));
425
426         LASSERT(fd != NULL);
427
428         if (och) {
429                 struct ptlrpc_request *req = it->d.lustre.it_data;
430                 struct mdt_body *body;
431                 int rc;
432
433                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
434                 if (rc)
435                         RETURN(rc);
436
437                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
438                 if ((it->it_flags & FMODE_WRITE) &&
439                     (body->valid & OBD_MD_FLSIZE))
440                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
441                                lli->lli_ioepoch, PFID(&lli->lli_fid));
442         }
443
444         LUSTRE_FPRIVATE(file) = fd;
445         ll_readahead_init(inode, &fd->fd_ras);
446         fd->fd_omode = it->it_flags;
447         RETURN(0);
448 }
449
450 /* Open a file, and (for the very first open) create objects on the OSTs at
451  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
452  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
453  * lli_open_sem to ensure no other process will create objects, send the
454  * stripe MD to the MDS, or try to destroy the objects if that fails.
455  *
456  * If we already have the stripe MD locally then we don't request it in
457  * md_open(), by passing a lmm_size = 0.
458  *
459  * It is up to the application to ensure no other processes open this file
460  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
461  * used.  We might be able to avoid races of that sort by getting lli_open_sem
462  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
463  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
464  */
465 int ll_file_open(struct inode *inode, struct file *file)
466 {
467         struct ll_inode_info *lli = ll_i2info(inode);
468         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
469                                           .it_flags = file->f_flags };
470         struct lov_stripe_md *lsm;
471         struct ptlrpc_request *req = NULL;
472         struct obd_client_handle **och_p;
473         __u64 *och_usecount;
474         struct ll_file_data *fd;
475         int rc = 0, opendir_set = 0;
476         ENTRY;
477
478         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
479                inode->i_generation, inode, file->f_flags);
480
481 #ifdef HAVE_VFS_INTENT_PATCHES
482         it = file->f_it;
483 #else
484         it = file->private_data; /* XXX: compat macro */
485         file->private_data = NULL; /* prevent ll_local_open assertion */
486 #endif
487
488         fd = ll_file_data_get();
489         if (fd == NULL)
490                 RETURN(-ENOMEM);
491
492         if (S_ISDIR(inode->i_mode)) {
493                 spin_lock(&lli->lli_lock);
494                 /* "lli->lli_opendir_pid != 0" means someone has set it.
495                  * "lli->lli_sai != NULL" means the previous statahead has not
496                  *                        been cleanup. */ 
497                 if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) {
498                         opendir_set = 1;
499                         lli->lli_opendir_pid = cfs_curproc_pid();
500                         lli->lli_opendir_key = fd;
501                 } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) {
502                         /* Two cases for this:
503                          * (1) The same process open such directory many times.
504                          * (2) The old process opened the directory, and exited
505                          *     before its children processes. Then new process
506                          *     with the same pid opens such directory before the
507                          *     old process's children processes exit.
508                          * Change the owner to the latest one. */
509                         opendir_set = 2;
510                         lli->lli_opendir_key = fd;
511                 }
512                 spin_unlock(&lli->lli_lock);
513         }
514
515         if (inode->i_sb->s_root == file->f_dentry) {
516                 LUSTRE_FPRIVATE(file) = fd;
517                 RETURN(0);
518         }
519
520         if (!it || !it->d.lustre.it_disposition) {
521                 /* Convert f_flags into access mode. We cannot use file->f_mode,
522                  * because everything but O_ACCMODE mask was stripped from
523                  * there */
524                 if ((oit.it_flags + 1) & O_ACCMODE)
525                         oit.it_flags++;
526                 if (file->f_flags & O_TRUNC)
527                         oit.it_flags |= FMODE_WRITE;
528
529                 /* kernel only call f_op->open in dentry_open.  filp_open calls
530                  * dentry_open after call to open_namei that checks permissions.
531                  * Only nfsd_open call dentry_open directly without checking
532                  * permissions and because of that this code below is safe. */
533                 if (oit.it_flags & FMODE_WRITE)
534                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
535
536                 /* We do not want O_EXCL here, presumably we opened the file
537                  * already? XXX - NFS implications? */
538                 oit.it_flags &= ~O_EXCL;
539
540                 it = &oit;
541         }
542
543 restart:
544         /* Let's see if we have file open on MDS already. */
545         if (it->it_flags & FMODE_WRITE) {
546                 och_p = &lli->lli_mds_write_och;
547                 och_usecount = &lli->lli_open_fd_write_count;
548         } else if (it->it_flags & FMODE_EXEC) {
549                 och_p = &lli->lli_mds_exec_och;
550                 och_usecount = &lli->lli_open_fd_exec_count;
551          } else {
552                 och_p = &lli->lli_mds_read_och;
553                 och_usecount = &lli->lli_open_fd_read_count;
554         }
555         
556         down(&lli->lli_och_sem);
557         if (*och_p) { /* Open handle is present */
558                 if (it_disposition(it, DISP_OPEN_OPEN)) {
559                         /* Well, there's extra open request that we do not need,
560                            let's close it somehow. This will decref request. */
561                         rc = it_open_error(DISP_OPEN_OPEN, it);
562                         if (rc) {
563                                 ll_file_data_put(fd);
564                                 GOTO(out_och_free, rc);
565                         }       
566                         ll_release_openhandle(file->f_dentry, it);
567                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
568                                              LPROC_LL_OPEN);
569                 }
570                 (*och_usecount)++;
571
572                 rc = ll_local_open(file, it, fd, NULL);
573                 if (rc) {
574                         up(&lli->lli_och_sem);
575                         ll_file_data_put(fd);
576                         RETURN(rc);
577                 }
578         } else {
579                 LASSERT(*och_usecount == 0);
580                 if (!it->d.lustre.it_disposition) {
581                         /* We cannot just request lock handle now, new ELC code
582                            means that one of other OPEN locks for this file
583                            could be cancelled, and since blocking ast handler
584                            would attempt to grab och_sem as well, that would
585                            result in a deadlock */
586                         up(&lli->lli_och_sem);
587                         it->it_flags |= O_CHECK_STALE;
588                         rc = ll_intent_file_open(file, NULL, 0, it);
589                         it->it_flags &= ~O_CHECK_STALE;
590                         if (rc) {
591                                 ll_file_data_put(fd);
592                                 GOTO(out_openerr, rc);
593                         }
594
595                         /* Got some error? Release the request */
596                         if (it->d.lustre.it_status < 0) {
597                                 req = it->d.lustre.it_data;
598                                 ptlrpc_req_finished(req);
599                         }
600                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
601                                          &it->d.lustre.it_lock_handle,
602                                          file->f_dentry->d_inode);
603                         goto restart;
604                 }
605                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
606                 if (!*och_p) {
607                         ll_file_data_put(fd);
608                         GOTO(out_och_free, rc = -ENOMEM);
609                 }
610                 (*och_usecount)++;
611                 req = it->d.lustre.it_data;
612
613                 /* md_intent_lock() didn't get a request ref if there was an
614                  * open error, so don't do cleanup on the request here
615                  * (bug 3430) */
616                 /* XXX (green): Should not we bail out on any error here, not
617                  * just open error? */
618                 rc = it_open_error(DISP_OPEN_OPEN, it);
619                 if (rc) {
620                         ll_file_data_put(fd);
621                         GOTO(out_och_free, rc);
622                 }
623
624                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
625                 rc = ll_local_open(file, it, fd, *och_p);
626                 if (rc) {
627                         up(&lli->lli_och_sem);
628                         ll_file_data_put(fd);
629                         GOTO(out_och_free, rc);
630                 }
631         }
632         up(&lli->lli_och_sem);
633
634         /* Must do this outside lli_och_sem lock to prevent deadlock where
635            different kind of OPEN lock for this same inode gets cancelled
636            by ldlm_cancel_lru */
637         if (!S_ISREG(inode->i_mode))
638                 GOTO(out, rc);
639
640         ll_capa_open(inode);
641
642         lsm = lli->lli_smd;
643         if (lsm == NULL) {
644                 if (file->f_flags & O_LOV_DELAY_CREATE ||
645                     !(file->f_mode & FMODE_WRITE)) {
646                         CDEBUG(D_INODE, "object creation was delayed\n");
647                         GOTO(out, rc);
648                 }
649         }
650         file->f_flags &= ~O_LOV_DELAY_CREATE;
651         GOTO(out, rc);
652 out:
653         ptlrpc_req_finished(req);
654         if (req)
655                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
656 out_och_free:
657         if (rc) {
658                 if (*och_p) {
659                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
660                         *och_p = NULL; /* OBD_FREE writes some magic there */
661                         (*och_usecount)--;
662                 }
663                 up(&lli->lli_och_sem);
664 out_openerr:
665                 if (opendir_set == 1) {
666                         lli->lli_opendir_key = NULL;
667                         lli->lli_opendir_pid = 0;
668                 } else if (unlikely(opendir_set == 2)) {
669                         ll_stop_statahead(inode, fd);
670                 }
671         }
672
673         return rc;
674 }
675
676 /* Fills the obdo with the attributes for the inode defined by lsm */
677 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
678 {
679         struct ptlrpc_request_set *set;
680         struct ll_inode_info *lli = ll_i2info(inode);
681         struct lov_stripe_md *lsm = lli->lli_smd;
682
683         struct obd_info oinfo = { { { 0 } } };
684         int rc;
685         ENTRY;
686
687         LASSERT(lsm != NULL);
688
689         oinfo.oi_md = lsm;
690         oinfo.oi_oa = obdo;
691         oinfo.oi_oa->o_id = lsm->lsm_object_id;
692         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
693         oinfo.oi_oa->o_mode = S_IFREG;
694         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
695                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
696                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
697                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
698                                OBD_MD_FLGROUP;
699         oinfo.oi_capa = ll_mdscapa_get(inode);
700
701         set = ptlrpc_prep_set();
702         if (set == NULL) {
703                 CERROR("can't allocate ptlrpc set\n");
704                 rc = -ENOMEM;
705         } else {
706                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
707                 if (rc == 0)
708                         rc = ptlrpc_set_wait(set);
709                 ptlrpc_set_destroy(set);
710         }
711         capa_put(oinfo.oi_capa);
712         if (rc)
713                 RETURN(rc);
714
715         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
716                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
717                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
718
719         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
720         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
721                lli->lli_smd->lsm_object_id, i_size_read(inode),
722                (unsigned long long)inode->i_blocks,
723                (unsigned long)ll_inode_blksize(inode));
724         RETURN(0);
725 }
726
727 static inline void ll_remove_suid(struct inode *inode)
728 {
729         unsigned int mode;
730
731         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
732         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
733
734         /* was any of the uid bits set? */
735         mode &= inode->i_mode;
736         if (mode && !capable(CAP_FSETID)) {
737                 inode->i_mode &= ~mode;
738                 // XXX careful here - we cannot change the size
739         }
740 }
741
742 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
743 {
744         struct ll_inode_info *lli = ll_i2info(inode);
745         struct lov_stripe_md *lsm = lli->lli_smd;
746         struct obd_export *exp = ll_i2dtexp(inode);
747         struct {
748                 char name[16];
749                 struct ldlm_lock *lock;
750                 struct lov_stripe_md *lsm;
751         } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock, .lsm = lsm };
752         __u32 stripe, vallen = sizeof(stripe);
753         int rc;
754         ENTRY;
755
756         if (lsm->lsm_stripe_count == 1)
757                 GOTO(check, stripe = 0);
758
759         /* get our offset in the lov */
760         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
761         if (rc != 0) {
762                 CERROR("obd_get_info: rc = %d\n", rc);
763                 RETURN(rc);
764         }
765         LASSERT(stripe < lsm->lsm_stripe_count);
766
767 check:
768         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
769             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
770                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
771                            lsm->lsm_oinfo[stripe]->loi_id,
772                            lsm->lsm_oinfo[stripe]->loi_gr);
773                 RETURN(-ELDLM_NO_LOCK_DATA);
774         }
775
776         RETURN(stripe);
777 }
778
779 /* Get extra page reference to ensure it is not going away */
780 void ll_pin_extent_cb(void *data)
781 {
782         struct page *page = data;
783         
784         page_cache_get(page);
785
786         return;
787 }
788
789 /* Flush the page from page cache for an extent as its canceled.
790  * Page to remove is delivered as @data.
791  *
792  * No one can dirty the extent until we've finished our work and they cannot
793  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
794  * but other kernel actors could have pages locked.
795  *
796  * If @discard is set, there is no need to write the page if it is dirty.
797  *
798  * Called with the DLM lock held. */
799 int ll_page_removal_cb(void *data, int discard)
800 {
801         int rc;
802         struct page *page = data;
803         struct address_space *mapping;
804  
805         ENTRY;
806
807         /* We have page reference already from ll_pin_page */
808         lock_page(page);
809
810         /* Already truncated by somebody */
811         if (!page->mapping)
812                 GOTO(out, rc = 0);
813         mapping = page->mapping;
814
815         ll_teardown_mmaps(mapping,
816                           (__u64)page->index << PAGE_CACHE_SHIFT,
817                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
818                                                               ~PAGE_CACHE_MASK);        
819         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
820
821         if (!discard && clear_page_dirty_for_io(page)) {
822                 LASSERT(page->mapping);
823                 rc = ll_call_writepage(page->mapping->host, page);
824                 /* either waiting for io to complete or reacquiring
825                  * the lock that the failed writepage released */
826                 lock_page(page);
827                 wait_on_page_writeback(page);
828                 if (rc != 0) {
829                         CERROR("writepage inode %lu(%p) of page %p "
830                                "failed: %d\n", mapping->host->i_ino,
831                                mapping->host, page, rc);
832                         if (rc == -ENOSPC)
833                                 set_bit(AS_ENOSPC, &mapping->flags);
834                         else
835                                 set_bit(AS_EIO, &mapping->flags);
836                 }
837                 set_bit(AS_EIO, &mapping->flags);
838         }
839         if (page->mapping != NULL) {
840                 struct ll_async_page *llap = llap_cast_private(page);
841                 /* checking again to account for writeback's lock_page() */
842                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
843                 if (llap)
844                         ll_ra_accounting(llap, page->mapping);
845                 ll_truncate_complete_page(page);
846         }
847         EXIT;
848 out:
849         LASSERT(!PageWriteback(page));
850         unlock_page(page);
851         page_cache_release(page);
852
853         return 0;
854 }
855
856 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
857                              void *data, int flag)
858 {
859         struct inode *inode;
860         struct ll_inode_info *lli;
861         struct lov_stripe_md *lsm;
862         int stripe;
863         __u64 kms;
864
865         ENTRY;
866
867         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
868                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
869                 LBUG();
870         }
871
872         inode = ll_inode_from_lock(lock);
873         if (inode == NULL)
874                 RETURN(0);
875         lli = ll_i2info(inode);
876         if (lli == NULL)
877                 GOTO(iput, 0);
878         if (lli->lli_smd == NULL)
879                 GOTO(iput, 0);
880         lsm = lli->lli_smd;
881
882         stripe = ll_lock_to_stripe_offset(inode, lock);
883         if (stripe < 0)
884                 GOTO(iput, 0);
885
886         lov_stripe_lock(lsm);
887         lock_res_and_lock(lock);
888         kms = ldlm_extent_shift_kms(lock,
889                                     lsm->lsm_oinfo[stripe]->loi_kms);
890
891         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
892                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
893                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
894         lsm->lsm_oinfo[stripe]->loi_kms = kms;
895         unlock_res_and_lock(lock);
896         lov_stripe_unlock(lsm);
897         ll_queue_done_writing(inode, 0);
898         EXIT;
899 iput:
900         iput(inode);
901
902         return 0;
903 }
904
905 #if 0
906 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
907 {
908         /* XXX ALLOCATE - 160 bytes */
909         struct inode *inode = ll_inode_from_lock(lock);
910         struct ll_inode_info *lli = ll_i2info(inode);
911         struct lustre_handle lockh = { 0 };
912         struct ost_lvb *lvb;
913         int stripe;
914         ENTRY;
915
916         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
917                      LDLM_FL_BLOCK_CONV)) {
918                 LBUG(); /* not expecting any blocked async locks yet */
919                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
920                            "lock, returning");
921                 ldlm_lock_dump(D_OTHER, lock, 0);
922                 ldlm_reprocess_all(lock->l_resource);
923                 RETURN(0);
924         }
925
926         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
927
928         stripe = ll_lock_to_stripe_offset(inode, lock);
929         if (stripe < 0)
930                 goto iput;
931
932         if (lock->l_lvb_len) {
933                 struct lov_stripe_md *lsm = lli->lli_smd;
934                 __u64 kms;
935                 lvb = lock->l_lvb_data;
936                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
937
938                 lock_res_and_lock(lock);
939                 ll_inode_size_lock(inode, 1);
940                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
941                 kms = ldlm_extent_shift_kms(NULL, kms);
942                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
943                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
944                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
945                 lsm->lsm_oinfo[stripe].loi_kms = kms;
946                 ll_inode_size_unlock(inode, 1);
947                 unlock_res_and_lock(lock);
948         }
949
950 iput:
951         iput(inode);
952         wake_up(&lock->l_waitq);
953
954         ldlm_lock2handle(lock, &lockh);
955         ldlm_lock_decref(&lockh, LCK_PR);
956         RETURN(0);
957 }
958 #endif
959
960 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
961 {
962         struct ptlrpc_request *req = reqp;
963         struct inode *inode = ll_inode_from_lock(lock);
964         struct ll_inode_info *lli;
965         struct lov_stripe_md *lsm;
966         struct ost_lvb *lvb;
967         int rc, stripe;
968         ENTRY;
969
970         if (inode == NULL)
971                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
972         lli = ll_i2info(inode);
973         if (lli == NULL)
974                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
975         lsm = lli->lli_smd;
976         if (lsm == NULL)
977                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
978
979         /* First, find out which stripe index this lock corresponds to. */
980         stripe = ll_lock_to_stripe_offset(inode, lock);
981         if (stripe < 0)
982                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
983
984         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
985         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
986                              sizeof(*lvb));
987         rc = req_capsule_server_pack(&req->rq_pill);
988         if (rc) {
989                 CERROR("lustre_pack_reply: %d\n", rc);
990                 GOTO(iput, rc);
991         }
992
993         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
994         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
995         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
996         lvb->lvb_atime = LTIME_S(inode->i_atime);
997         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
998
999         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1000                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1001                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1002                    lvb->lvb_atime, lvb->lvb_ctime);
1003  iput:
1004         iput(inode);
1005
1006  out:
1007         /* These errors are normal races, so we don't want to fill the console
1008          * with messages by calling ptlrpc_error() */
1009         if (rc == -ELDLM_NO_LOCK_DATA)
1010                 lustre_pack_reply(req, 1, NULL, NULL);
1011
1012         req->rq_status = rc;
1013         return rc;
1014 }
1015
1016 static int ll_merge_lvb(struct inode *inode)
1017 {
1018         struct ll_inode_info *lli = ll_i2info(inode);
1019         struct ll_sb_info *sbi = ll_i2sbi(inode);
1020         struct ost_lvb lvb;
1021         int rc;
1022
1023         ENTRY;
1024
1025         ll_inode_size_lock(inode, 1);
1026         inode_init_lvb(inode, &lvb);
1027         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1028         i_size_write(inode, lvb.lvb_size);
1029         inode->i_blocks = lvb.lvb_blocks;
1030
1031         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1032         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1033         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1034         ll_inode_size_unlock(inode, 1);
1035
1036         RETURN(rc);
1037 }
1038
1039 int ll_local_size(struct inode *inode)
1040 {
1041         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1042         struct ll_inode_info *lli = ll_i2info(inode);
1043         struct ll_sb_info *sbi = ll_i2sbi(inode);
1044         struct lustre_handle lockh = { 0 };
1045         int flags = 0;
1046         int rc;
1047         ENTRY;
1048
1049         if (lli->lli_smd->lsm_stripe_count == 0)
1050                 RETURN(0);
1051
1052         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1053                        &policy, LCK_PR, &flags, inode, &lockh);
1054         if (rc < 0)
1055                 RETURN(rc);
1056         else if (rc == 0)
1057                 RETURN(-ENODATA);
1058
1059         rc = ll_merge_lvb(inode);
1060         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1061         RETURN(rc);
1062 }
1063
1064 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1065                      lstat_t *st)
1066 {
1067         struct lustre_handle lockh = { 0 };
1068         struct ldlm_enqueue_info einfo = { 0 };
1069         struct obd_info oinfo = { { { 0 } } };
1070         struct ost_lvb lvb;
1071         int rc;
1072
1073         ENTRY;
1074
1075         einfo.ei_type = LDLM_EXTENT;
1076         einfo.ei_mode = LCK_PR;
1077         einfo.ei_cb_bl = osc_extent_blocking_cb;
1078         einfo.ei_cb_cp = ldlm_completion_ast;
1079         einfo.ei_cb_gl = ll_glimpse_callback;
1080         einfo.ei_cbdata = NULL;
1081
1082         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1083         oinfo.oi_lockh = &lockh;
1084         oinfo.oi_md = lsm;
1085         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1086
1087         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1088         if (rc == -ENOENT)
1089                 RETURN(rc);
1090         if (rc != 0) {
1091                 CERROR("obd_enqueue returned rc %d, "
1092                        "returning -EIO\n", rc);
1093                 RETURN(rc > 0 ? -EIO : rc);
1094         }
1095
1096         lov_stripe_lock(lsm);
1097         memset(&lvb, 0, sizeof(lvb));
1098         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1099         st->st_size = lvb.lvb_size;
1100         st->st_blocks = lvb.lvb_blocks;
1101         st->st_mtime = lvb.lvb_mtime;
1102         st->st_atime = lvb.lvb_atime;
1103         st->st_ctime = lvb.lvb_ctime;
1104         lov_stripe_unlock(lsm);
1105
1106         RETURN(rc);
1107 }
1108
1109 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1110  * file (because it prefers KMS over RSS when larger) */
1111 int ll_glimpse_size(struct inode *inode, int ast_flags)
1112 {
1113         struct ll_inode_info *lli = ll_i2info(inode);
1114         struct ll_sb_info *sbi = ll_i2sbi(inode);
1115         struct lustre_handle lockh = { 0 };
1116         struct ldlm_enqueue_info einfo = { 0 };
1117         struct obd_info oinfo = { { { 0 } } };
1118         int rc;
1119         ENTRY;
1120
1121         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1122                 RETURN(0);
1123
1124         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1125
1126         if (!lli->lli_smd) {
1127                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1128                 RETURN(0);
1129         }
1130
1131         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1132          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1133          *       won't revoke any conflicting DLM locks held. Instead,
1134          *       ll_glimpse_callback() will be called on each client
1135          *       holding a DLM lock against this file, and resulting size
1136          *       will be returned for each stripe. DLM lock on [0, EOF] is
1137          *       acquired only if there were no conflicting locks. */
1138         einfo.ei_type = LDLM_EXTENT;
1139         einfo.ei_mode = LCK_PR;
1140         einfo.ei_cb_bl = osc_extent_blocking_cb;
1141         einfo.ei_cb_cp = ldlm_completion_ast;
1142         einfo.ei_cb_gl = ll_glimpse_callback;
1143         einfo.ei_cbdata = inode;
1144
1145         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1146         oinfo.oi_lockh = &lockh;
1147         oinfo.oi_md = lli->lli_smd;
1148         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1149
1150         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1151         if (rc == -ENOENT)
1152                 RETURN(rc);
1153         if (rc != 0) {
1154                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1155                 RETURN(rc > 0 ? -EIO : rc);
1156         }
1157
1158         rc = ll_merge_lvb(inode);
1159
1160         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1161                i_size_read(inode), (unsigned long long)inode->i_blocks);
1162
1163         RETURN(rc);
1164 }
1165
1166 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1167                    struct lov_stripe_md *lsm, int mode,
1168                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1169                    int ast_flags)
1170 {
1171         struct ll_sb_info *sbi = ll_i2sbi(inode);
1172         struct ost_lvb lvb;
1173         struct ldlm_enqueue_info einfo = { 0 };
1174         struct obd_info oinfo = { { { 0 } } };
1175         int rc;
1176         ENTRY;
1177
1178         LASSERT(!lustre_handle_is_used(lockh));
1179         LASSERT(lsm != NULL);
1180
1181         /* don't drop the mmapped file to LRU */
1182         if (mapping_mapped(inode->i_mapping))
1183                 ast_flags |= LDLM_FL_NO_LRU;
1184
1185         /* XXX phil: can we do this?  won't it screw the file size up? */
1186         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1187             (sbi->ll_flags & LL_SBI_NOLCK))
1188                 RETURN(0);
1189
1190         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1191                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1192
1193         einfo.ei_type = LDLM_EXTENT;
1194         einfo.ei_mode = mode;
1195         einfo.ei_cb_bl = osc_extent_blocking_cb;
1196         einfo.ei_cb_cp = ldlm_completion_ast;
1197         einfo.ei_cb_gl = ll_glimpse_callback;
1198         einfo.ei_cbdata = inode;
1199
1200         oinfo.oi_policy = *policy;
1201         oinfo.oi_lockh = lockh;
1202         oinfo.oi_md = lsm;
1203         oinfo.oi_flags = ast_flags;
1204
1205         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1206         *policy = oinfo.oi_policy;
1207         if (rc > 0)
1208                 rc = -EIO;
1209
1210         ll_inode_size_lock(inode, 1);
1211         inode_init_lvb(inode, &lvb);
1212         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1213
1214         if (policy->l_extent.start == 0 &&
1215             policy->l_extent.end == OBD_OBJECT_EOF) {
1216                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1217                  * the kms under both a DLM lock and the
1218                  * ll_inode_size_lock().  If we don't get the
1219                  * ll_inode_size_lock() here we can match the DLM lock and
1220                  * reset i_size from the kms before the truncating path has
1221                  * updated the kms.  generic_file_write can then trust the
1222                  * stale i_size when doing appending writes and effectively
1223                  * cancel the result of the truncate.  Getting the
1224                  * ll_inode_size_lock() after the enqueue maintains the DLM
1225                  * -> ll_inode_size_lock() acquiring order. */
1226                 i_size_write(inode, lvb.lvb_size);
1227                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1228                        inode->i_ino, i_size_read(inode));
1229         }
1230
1231         if (rc == 0) {
1232                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1233                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1234                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1235         }
1236         ll_inode_size_unlock(inode, 1);
1237
1238         RETURN(rc);
1239 }
1240
1241 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1242                      struct lov_stripe_md *lsm, int mode,
1243                      struct lustre_handle *lockh)
1244 {
1245         struct ll_sb_info *sbi = ll_i2sbi(inode);
1246         int rc;
1247         ENTRY;
1248
1249         /* XXX phil: can we do this?  won't it screw the file size up? */
1250         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1251             (sbi->ll_flags & LL_SBI_NOLCK))
1252                 RETURN(0);
1253
1254         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1255
1256         RETURN(rc);
1257 }
1258
1259 static void ll_set_file_contended(struct inode *inode)
1260 {
1261         struct ll_inode_info *lli = ll_i2info(inode);
1262         cfs_time_t now = cfs_time_current();
1263
1264         spin_lock(&lli->lli_lock);
1265         lli->lli_contention_time = now;
1266         lli->lli_flags |= LLIF_CONTENDED;
1267         spin_unlock(&lli->lli_lock);
1268 }
1269
1270 void ll_clear_file_contended(struct inode *inode)
1271 {
1272         struct ll_inode_info *lli = ll_i2info(inode);
1273
1274         spin_lock(&lli->lli_lock);
1275         lli->lli_flags &= ~LLIF_CONTENDED;
1276         spin_unlock(&lli->lli_lock);
1277 }
1278
1279 static int ll_is_file_contended(struct file *file)
1280 {
1281         struct inode *inode = file->f_dentry->d_inode;
1282         struct ll_inode_info *lli = ll_i2info(inode);
1283         struct ll_sb_info *sbi = ll_i2sbi(inode);
1284         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1285         ENTRY;
1286
1287         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1288                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1289                        " osc connect flags = 0x"LPX64"\n",
1290                        sbi->ll_lco.lco_flags);
1291                 RETURN(0);
1292         }
1293         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1294                 RETURN(1);
1295         if (lli->lli_flags & LLIF_CONTENDED) {
1296                 cfs_time_t cur_time = cfs_time_current();
1297                 cfs_time_t retry_time;
1298
1299                 retry_time = cfs_time_add(
1300                         lli->lli_contention_time,
1301                         cfs_time_seconds(sbi->ll_contention_time));
1302                 if (cfs_time_after(cur_time, retry_time)) {
1303                         ll_clear_file_contended(inode);
1304                         RETURN(0);
1305                 }
1306                 RETURN(1);
1307         }
1308         RETURN(0);
1309 }
1310
1311 static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
1312                                  const char *buf, size_t count,
1313                                  loff_t start, loff_t end, int rw)
1314 {
1315         int append;
1316         int tree_locked = 0;
1317         int rc;
1318         struct inode * inode = file->f_dentry->d_inode;
1319         ENTRY;
1320
1321         append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
1322
1323         if (append || !ll_is_file_contended(file)) {
1324                 struct ll_lock_tree_node *node;
1325                 int ast_flags;
1326
1327                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1328                 if (file->f_flags & O_NONBLOCK)
1329                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1330                 node = ll_node_from_inode(inode, start, end,
1331                                           (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
1332                 if (IS_ERR(node)) {
1333                         rc = PTR_ERR(node);
1334                         GOTO(out, rc);
1335                 }
1336                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1337                 rc = ll_tree_lock(tree, node, buf, count, ast_flags);
1338                 if (rc == 0)
1339                         tree_locked = 1;
1340                 else if (rc == -EUSERS)
1341                         ll_set_file_contended(inode);
1342                 else
1343                         GOTO(out, rc);
1344         }
1345         RETURN(tree_locked);
1346 out:
1347         return rc;
1348 }
1349
1350 static int ll_reget_short_lock(struct page *page, int rw,
1351                                obd_off start, obd_off end,
1352                                void **cookie)
1353 {
1354         struct ll_async_page *llap;
1355         struct obd_export *exp;
1356         struct inode *inode = page->mapping->host;
1357
1358         ENTRY;
1359
1360         exp = ll_i2dtexp(inode);
1361         if (exp == NULL)
1362                 RETURN(0);
1363
1364         llap = llap_cast_private(page);
1365         if (llap == NULL)
1366                 RETURN(0);
1367
1368         RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
1369                                     &llap->llap_cookie, rw, start, end,
1370                                     cookie));
1371 }
1372
1373 static void ll_release_short_lock(struct inode *inode, obd_off end,
1374                                   void *cookie, int rw)
1375 {
1376         struct obd_export *exp;
1377         int rc;
1378
1379         exp = ll_i2dtexp(inode);
1380         if (exp == NULL)
1381                 return;
1382
1383         rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
1384                                     cookie, rw);
1385         if (rc < 0)
1386                 CERROR("unlock failed (%d)\n", rc);
1387 }
1388
1389 static inline int ll_file_get_fast_lock(struct file *file,
1390                                         obd_off ppos, obd_off end,
1391                                         char *buf, void **cookie, int rw)
1392 {
1393         int rc = 0;
1394         struct page *page;
1395
1396         ENTRY;
1397
1398         if (!ll_region_mapped((unsigned long)buf, end - ppos)) {
1399                 page = find_lock_page(file->f_dentry->d_inode->i_mapping,
1400                                       ppos >> CFS_PAGE_SHIFT);
1401                 if (page) {
1402                         if (ll_reget_short_lock(page, rw, ppos, end, cookie))
1403                                 rc = 1;
1404
1405                         unlock_page(page);
1406                         page_cache_release(page);
1407                 }
1408         }
1409
1410         RETURN(rc);
1411 }
1412
1413 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
1414                                          void *cookie, int rw)
1415 {
1416         ll_release_short_lock(inode, end, cookie, rw);
1417 }
1418
1419 enum ll_lock_style {
1420         LL_LOCK_STYLE_NOLOCK   = 0,
1421         LL_LOCK_STYLE_FASTLOCK = 1,
1422         LL_LOCK_STYLE_TREELOCK = 2
1423 };
1424
1425 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
1426                                    obd_off end, char *buf, void **cookie,
1427                                    struct ll_lock_tree *tree, int rw)
1428 {
1429         int rc;
1430
1431         ENTRY;
1432
1433         if (ll_file_get_fast_lock(file, ppos, end, buf, cookie, rw))
1434                 RETURN(LL_LOCK_STYLE_FASTLOCK);
1435
1436         rc = ll_file_get_tree_lock(tree, file, buf, ppos - end, ppos, end, rw);
1437         /* rc: 1 for tree lock, 0 for no lock, <0 for error */
1438         switch (rc) {
1439         case 1:
1440                 RETURN(LL_LOCK_STYLE_TREELOCK);
1441         case 0:
1442                 RETURN(LL_LOCK_STYLE_NOLOCK);
1443         }
1444
1445         /* an error happened if we reached this point, rc = -errno here */
1446         RETURN(rc);
1447 }
1448
1449 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
1450                                     enum ll_lock_style lock_style,
1451                                     void *cookie, struct ll_lock_tree *tree,
1452                                     int rw)
1453
1454 {
1455         switch (lock_style) {
1456         case LL_LOCK_STYLE_TREELOCK:
1457                 ll_tree_unlock(tree);
1458                 break;
1459         case LL_LOCK_STYLE_FASTLOCK:
1460                 ll_file_put_fast_lock(inode, end, cookie, rw);
1461                 break;
1462         default:
1463                 CERROR("invalid locking style (%d)\n", lock_style);
1464         }
1465 }
1466
1467 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1468                             loff_t *ppos)
1469 {
1470         struct inode *inode = file->f_dentry->d_inode;
1471         struct ll_inode_info *lli = ll_i2info(inode);
1472         struct lov_stripe_md *lsm = lli->lli_smd;
1473         struct ll_sb_info *sbi = ll_i2sbi(inode);
1474         struct ll_lock_tree tree;
1475         struct ost_lvb lvb;
1476         struct ll_ra_read bead;
1477         int ra = 0;
1478         obd_off end;
1479         ssize_t retval, chunk, sum = 0;
1480         int lock_style;
1481         void *cookie;
1482
1483         __u64 kms;
1484         ENTRY;
1485         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1486                inode->i_ino, inode->i_generation, inode, count, *ppos);
1487         /* "If nbyte is 0, read() will return 0 and have no other results."
1488          *                      -- Single Unix Spec */
1489         if (count == 0)
1490                 RETURN(0);
1491
1492         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1493
1494         if (!lsm) {
1495                 /* Read on file with no objects should return zero-filled
1496                  * buffers up to file size (we can get non-zero sizes with
1497                  * mknod + truncate, then opening file for read. This is a
1498                  * common pattern in NFS case, it seems). Bug 6243 */
1499                 int notzeroed;
1500                 /* Since there are no objects on OSTs, we have nothing to get
1501                  * lock on and so we are forced to access inode->i_size
1502                  * unguarded */
1503
1504                 /* Read beyond end of file */
1505                 if (*ppos >= i_size_read(inode))
1506                         RETURN(0);
1507
1508                 if (count > i_size_read(inode) - *ppos)
1509                         count = i_size_read(inode) - *ppos;
1510                 /* Make sure to correctly adjust the file pos pointer for
1511                  * EFAULT case */
1512                 notzeroed = clear_user(buf, count);
1513                 count -= notzeroed;
1514                 *ppos += count;
1515                 if (!count)
1516                         RETURN(-EFAULT);
1517                 RETURN(count);
1518         }
1519 repeat:
1520         if (sbi->ll_max_rw_chunk != 0) {
1521                 /* first, let's know the end of the current stripe */
1522                 end = *ppos;
1523                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, &end);
1524
1525                 /* correct, the end is beyond the request */
1526                 if (end > *ppos + count - 1)
1527                         end = *ppos + count - 1;
1528
1529                 /* and chunk shouldn't be too large even if striping is wide */
1530                 if (end - *ppos > sbi->ll_max_rw_chunk)
1531                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1532         } else {
1533                 end = *ppos + count - 1;
1534         }
1535
1536         lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
1537                                       buf, &cookie, &tree, OBD_BRW_READ);
1538         if (lock_style < 0)
1539                 GOTO(out, retval = lock_style);
1540
1541         ll_inode_size_lock(inode, 1);
1542         /*
1543          * Consistency guarantees: following possibilities exist for the
1544          * relation between region being read and real file size at this
1545          * moment:
1546          *
1547          *  (A): the region is completely inside of the file;
1548          *
1549          *  (B-x): x bytes of region are inside of the file, the rest is
1550          *  outside;
1551          *
1552          *  (C): the region is completely outside of the file.
1553          *
1554          * This classification is stable under DLM lock acquired by
1555          * ll_tree_lock() above, because to change class, other client has to
1556          * take DLM lock conflicting with our lock. Also, any updates to
1557          * ->i_size by other threads on this client are serialized by
1558          * ll_inode_size_lock(). This guarantees that short reads are handled
1559          * correctly in the face of concurrent writes and truncates.
1560          */
1561         inode_init_lvb(inode, &lvb);
1562         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1563         kms = lvb.lvb_size;
1564         if (*ppos + count - 1 > kms) {
1565                 /* A glimpse is necessary to determine whether we return a
1566                  * short read (B) or some zeroes at the end of the buffer (C) */
1567                 ll_inode_size_unlock(inode, 1);
1568                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1569                 if (retval) {
1570                         if (lock_style != LL_LOCK_STYLE_NOLOCK)
1571                                 ll_file_put_lock(inode, end, lock_style,
1572                                                  cookie, &tree, OBD_BRW_READ);
1573                         goto out;
1574                 }
1575         } else {
1576                 /* region is within kms and, hence, within real file size (A).
1577                  * We need to increase i_size to cover the read region so that
1578                  * generic_file_read() will do its job, but that doesn't mean
1579                  * the kms size is _correct_, it is only the _minimum_ size.
1580                  * If someone does a stat they will get the correct size which
1581                  * will always be >= the kms value here.  b=11081 */
1582                 if (i_size_read(inode) < kms)
1583                         i_size_write(inode, kms);
1584                 ll_inode_size_unlock(inode, 1);
1585         }
1586
1587         chunk = end - *ppos + 1;
1588         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1589                inode->i_ino, chunk, *ppos, i_size_read(inode));
1590
1591         if (lock_style != LL_LOCK_STYLE_NOLOCK) {
1592                 /* turn off the kernel's read-ahead */
1593                 file->f_ra.ra_pages = 0;
1594
1595                 /* initialize read-ahead window once per syscall */
1596                 if (ra == 0) {
1597                         ra = 1;
1598                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1599                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1600                         ll_ra_read_in(file, &bead);
1601                 }
1602
1603                 /* BUG: 5972 */
1604                 file_accessed(file);
1605                 retval = generic_file_read(file, buf, chunk, ppos);
1606                 ll_file_put_lock(inode, end, lock_style, cookie, &tree, 
1607                                  OBD_BRW_READ);
1608         } else {
1609                 retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
1610         }
1611
1612         ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
1613
1614         if (retval > 0) {
1615                 buf += retval;
1616                 count -= retval;
1617                 sum += retval;
1618                 if (retval == chunk && count > 0)
1619                         goto repeat;
1620         }
1621
1622  out:
1623         if (ra != 0)
1624                 ll_ra_read_ex(file, &bead);
1625         retval = (sum > 0) ? sum : retval;
1626         RETURN(retval);
1627 }
1628
1629 /*
1630  * Write to a file (through the page cache).
1631  */
1632 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1633                              loff_t *ppos)
1634 {
1635         struct inode *inode = file->f_dentry->d_inode;
1636         struct ll_sb_info *sbi = ll_i2sbi(inode);
1637         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1638         struct ll_lock_tree tree;
1639         loff_t maxbytes = ll_file_maxbytes(inode);
1640         loff_t lock_start, lock_end, end;
1641         ssize_t retval, chunk, sum = 0;
1642         int tree_locked;
1643         ENTRY;
1644
1645         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1646                inode->i_ino, inode->i_generation, inode, count, *ppos);
1647
1648         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1649
1650         /* POSIX, but surprised the VFS doesn't check this already */
1651         if (count == 0)
1652                 RETURN(0);
1653
1654         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1655          * called on the file, don't fail the below assertion (bug 2388). */
1656         if (file->f_flags & O_LOV_DELAY_CREATE &&
1657             ll_i2info(inode)->lli_smd == NULL)
1658                 RETURN(-EBADF);
1659
1660         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1661
1662         down(&ll_i2info(inode)->lli_write_sem);
1663
1664 repeat:
1665         chunk = 0; /* just to fix gcc's warning */
1666         end = *ppos + count - 1;
1667
1668         if (file->f_flags & O_APPEND) {
1669                 lock_start = 0;
1670                 lock_end = OBD_OBJECT_EOF;
1671         } else if (sbi->ll_max_rw_chunk != 0) {
1672                 /* first, let's know the end of the current stripe */
1673                 end = *ppos;
1674                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1675                                 (obd_off *)&end);
1676
1677                 /* correct, the end is beyond the request */
1678                 if (end > *ppos + count - 1)
1679                         end = *ppos + count - 1;
1680
1681                 /* and chunk shouldn't be too large even if striping is wide */
1682                 if (end - *ppos > sbi->ll_max_rw_chunk)
1683                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1684                 lock_start = *ppos;
1685                 lock_end = end;
1686         } else {
1687                 lock_start = *ppos;
1688                 lock_end = *ppos + count - 1;
1689         }
1690
1691         tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
1692                                             lock_start, lock_end, OBD_BRW_WRITE);
1693         if (tree_locked < 0)
1694                 GOTO(out, retval = tree_locked);
1695
1696         /* This is ok, g_f_w will overwrite this under i_sem if it races
1697          * with a local truncate, it just makes our maxbyte checking easier.
1698          * The i_size value gets updated in ll_extent_lock() as a consequence
1699          * of the [0,EOF] extent lock we requested above. */
1700         if (file->f_flags & O_APPEND) {
1701                 *ppos = i_size_read(inode);
1702                 end = *ppos + count - 1;
1703         }
1704
1705         if (*ppos >= maxbytes) {
1706                 send_sig(SIGXFSZ, current, 0);
1707                 GOTO(out_unlock, retval = -EFBIG);
1708         }
1709         if (end > maxbytes - 1)
1710                 end = maxbytes - 1;
1711
1712         /* generic_file_write handles O_APPEND after getting i_mutex */
1713         chunk = end - *ppos + 1;
1714         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1715                inode->i_ino, chunk, *ppos);
1716         if (tree_locked)
1717                 retval = generic_file_write(file, buf, chunk, ppos);
1718         else
1719                 retval = ll_file_lockless_io(file, (char*)buf, chunk,
1720                                              ppos, WRITE);
1721         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1722
1723 out_unlock:
1724         if (tree_locked)
1725                 ll_tree_unlock(&tree);
1726
1727 out:
1728         if (retval > 0) {
1729                 buf += retval;
1730                 count -= retval;
1731                 sum += retval;
1732                 if (retval == chunk && count > 0)
1733                         goto repeat;
1734         }
1735
1736         up(&ll_i2info(inode)->lli_write_sem);
1737
1738         retval = (sum > 0) ? sum : retval;
1739         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1740                            retval > 0 ? retval : 0);
1741         RETURN(retval);
1742 }
1743
1744 /*
1745  * Send file content (through pagecache) somewhere with helper
1746  */
1747 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1748                                 read_actor_t actor, void *target)
1749 {
1750         struct inode *inode = in_file->f_dentry->d_inode;
1751         struct ll_inode_info *lli = ll_i2info(inode);
1752         struct lov_stripe_md *lsm = lli->lli_smd;
1753         struct ll_lock_tree tree;
1754         struct ll_lock_tree_node *node;
1755         struct ost_lvb lvb;
1756         struct ll_ra_read bead;
1757         int rc;
1758         ssize_t retval;
1759         __u64 kms;
1760         ENTRY;
1761         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1762                inode->i_ino, inode->i_generation, inode, count, *ppos);
1763
1764         /* "If nbyte is 0, read() will return 0 and have no other results."
1765          *                      -- Single Unix Spec */
1766         if (count == 0)
1767                 RETURN(0);
1768
1769         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1770         /* turn off the kernel's read-ahead */
1771         in_file->f_ra.ra_pages = 0;
1772
1773         /* File with no objects, nothing to lock */
1774         if (!lsm)
1775                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1776
1777         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1778         if (IS_ERR(node))
1779                 RETURN(PTR_ERR(node));
1780
1781         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1782         rc = ll_tree_lock(&tree, node, NULL, count,
1783                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1784         if (rc != 0)
1785                 RETURN(rc);
1786
1787         ll_clear_file_contended(inode);
1788         ll_inode_size_lock(inode, 1);
1789         /*
1790          * Consistency guarantees: following possibilities exist for the
1791          * relation between region being read and real file size at this
1792          * moment:
1793          *
1794          *  (A): the region is completely inside of the file;
1795          *
1796          *  (B-x): x bytes of region are inside of the file, the rest is
1797          *  outside;
1798          *
1799          *  (C): the region is completely outside of the file.
1800          *
1801          * This classification is stable under DLM lock acquired by
1802          * ll_tree_lock() above, because to change class, other client has to
1803          * take DLM lock conflicting with our lock. Also, any updates to
1804          * ->i_size by other threads on this client are serialized by
1805          * ll_inode_size_lock(). This guarantees that short reads are handled
1806          * correctly in the face of concurrent writes and truncates.
1807          */
1808         inode_init_lvb(inode, &lvb);
1809         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1810         kms = lvb.lvb_size;
1811         if (*ppos + count - 1 > kms) {
1812                 /* A glimpse is necessary to determine whether we return a
1813                  * short read (B) or some zeroes at the end of the buffer (C) */
1814                 ll_inode_size_unlock(inode, 1);
1815                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1816                 if (retval)
1817                         goto out;
1818         } else {
1819                 /* region is within kms and, hence, within real file size (A) */
1820                 i_size_write(inode, kms);
1821                 ll_inode_size_unlock(inode, 1);
1822         }
1823
1824         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1825                inode->i_ino, count, *ppos, i_size_read(inode));
1826
1827         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1828         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1829         ll_ra_read_in(in_file, &bead);
1830         /* BUG: 5972 */
1831         file_accessed(in_file);
1832         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1833         ll_ra_read_ex(in_file, &bead);
1834
1835  out:
1836         ll_tree_unlock(&tree);
1837         RETURN(retval);
1838 }
1839
1840 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1841                                unsigned long arg)
1842 {
1843         struct ll_inode_info *lli = ll_i2info(inode);
1844         struct obd_export *exp = ll_i2dtexp(inode);
1845         struct ll_recreate_obj ucreatp;
1846         struct obd_trans_info oti = { 0 };
1847         struct obdo *oa = NULL;
1848         int lsm_size;
1849         int rc = 0;
1850         struct lov_stripe_md *lsm, *lsm2;
1851         ENTRY;
1852
1853         if (!capable (CAP_SYS_ADMIN))
1854                 RETURN(-EPERM);
1855
1856         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1857                             sizeof(struct ll_recreate_obj));
1858         if (rc) {
1859                 RETURN(-EFAULT);
1860         }
1861         OBDO_ALLOC(oa);
1862         if (oa == NULL)
1863                 RETURN(-ENOMEM);
1864
1865         down(&lli->lli_size_sem);
1866         lsm = lli->lli_smd;
1867         if (lsm == NULL)
1868                 GOTO(out, rc = -ENOENT);
1869         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1870                    (lsm->lsm_stripe_count));
1871
1872         OBD_ALLOC(lsm2, lsm_size);
1873         if (lsm2 == NULL)
1874                 GOTO(out, rc = -ENOMEM);
1875
1876         oa->o_id = ucreatp.lrc_id;
1877         oa->o_gr = ucreatp.lrc_group;
1878         oa->o_nlink = ucreatp.lrc_ost_idx;
1879         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1880         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1881         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1882                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1883
1884         memcpy(lsm2, lsm, lsm_size);
1885         rc = obd_create(exp, oa, &lsm2, &oti);
1886
1887         OBD_FREE(lsm2, lsm_size);
1888         GOTO(out, rc);
1889 out:
1890         up(&lli->lli_size_sem);
1891         OBDO_FREE(oa);
1892         return rc;
1893 }
1894
1895 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1896                              int flags, struct lov_user_md *lum, int lum_size)
1897 {
1898         struct ll_inode_info *lli = ll_i2info(inode);
1899         struct lov_stripe_md *lsm;
1900         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1901         int rc = 0;
1902         ENTRY;
1903
1904         down(&lli->lli_size_sem);
1905         lsm = lli->lli_smd;
1906         if (lsm) {
1907                 up(&lli->lli_size_sem);
1908                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1909                        inode->i_ino);
1910                 RETURN(-EEXIST);
1911         }
1912
1913         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1914         if (rc)
1915                 GOTO(out, rc);
1916         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1917                 GOTO(out_req_free, rc = -ENOENT);
1918         rc = oit.d.lustre.it_status;
1919         if (rc < 0)
1920                 GOTO(out_req_free, rc);
1921
1922         ll_release_openhandle(file->f_dentry, &oit);
1923
1924  out:
1925         up(&lli->lli_size_sem);
1926         ll_intent_release(&oit);
1927         RETURN(rc);
1928 out_req_free:
1929         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1930         goto out;
1931 }
1932
1933 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1934                              struct lov_mds_md **lmmp, int *lmm_size, 
1935                              struct ptlrpc_request **request)
1936 {
1937         struct ll_sb_info *sbi = ll_i2sbi(inode);
1938         struct mdt_body  *body;
1939         struct lov_mds_md *lmm = NULL;
1940         struct ptlrpc_request *req = NULL;
1941         struct obd_capa *oc;
1942         int rc, lmmsize;
1943
1944         rc = ll_get_max_mdsize(sbi, &lmmsize);
1945         if (rc)
1946                 RETURN(rc);
1947
1948         oc = ll_mdscapa_get(inode);
1949         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1950                              oc, filename, strlen(filename) + 1,
1951                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
1952                              ll_i2suppgid(inode), &req);
1953         capa_put(oc);
1954         if (rc < 0) {
1955                 CDEBUG(D_INFO, "md_getattr_name failed "
1956                        "on %s: rc %d\n", filename, rc);
1957                 GOTO(out, rc);
1958         }
1959
1960         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1961         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1962
1963         lmmsize = body->eadatasize;
1964
1965         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1966                         lmmsize == 0) {
1967                 GOTO(out, rc = -ENODATA);
1968         }
1969
1970         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1971         LASSERT(lmm != NULL);
1972
1973         /*
1974          * This is coming from the MDS, so is probably in
1975          * little endian.  We convert it to host endian before
1976          * passing it to userspace.
1977          */
1978         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1979                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1980                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1981         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1982                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1983         }
1984
1985         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1986                 struct lov_stripe_md *lsm;
1987                 struct lov_user_md_join *lmj;
1988                 int lmj_size, i, aindex = 0;
1989
1990                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1991                 if (rc < 0)
1992                         GOTO(out, rc = -ENOMEM);
1993                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1994                 if (rc)
1995                         GOTO(out_free_memmd, rc);
1996
1997                 lmj_size = sizeof(struct lov_user_md_join) +
1998                            lsm->lsm_stripe_count *
1999                            sizeof(struct lov_user_ost_data_join);
2000                 OBD_ALLOC(lmj, lmj_size);
2001                 if (!lmj)
2002                         GOTO(out_free_memmd, rc = -ENOMEM);
2003
2004                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
2005                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2006                         struct lov_extent *lex =
2007                                 &lsm->lsm_array->lai_ext_array[aindex];
2008
2009                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
2010                                 aindex ++;
2011                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
2012                                         LPU64" len %d\n", aindex, i,
2013                                         lex->le_start, (int)lex->le_len);
2014                         lmj->lmm_objects[i].l_extent_start =
2015                                 lex->le_start;
2016
2017                         if ((int)lex->le_len == -1)
2018                                 lmj->lmm_objects[i].l_extent_end = -1;
2019                         else
2020                                 lmj->lmm_objects[i].l_extent_end =
2021                                         lex->le_start + lex->le_len;
2022                         lmj->lmm_objects[i].l_object_id =
2023                                 lsm->lsm_oinfo[i]->loi_id;
2024                         lmj->lmm_objects[i].l_object_gr =
2025                                 lsm->lsm_oinfo[i]->loi_gr;
2026                         lmj->lmm_objects[i].l_ost_gen =
2027                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2028                         lmj->lmm_objects[i].l_ost_idx =
2029                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2030                 }
2031                 lmm = (struct lov_mds_md *)lmj;
2032                 lmmsize = lmj_size;
2033 out_free_memmd:
2034                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
2035         }
2036 out:
2037         *lmmp = lmm;
2038         *lmm_size = lmmsize;
2039         *request = req;
2040         return rc;
2041 }
2042
2043 static int ll_lov_setea(struct inode *inode, struct file *file,
2044                             unsigned long arg)
2045 {
2046         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2047         struct lov_user_md  *lump;
2048         int lum_size = sizeof(struct lov_user_md) +
2049                        sizeof(struct lov_user_ost_data);
2050         int rc;
2051         ENTRY;
2052
2053         if (!capable (CAP_SYS_ADMIN))
2054                 RETURN(-EPERM);
2055
2056         OBD_ALLOC(lump, lum_size);
2057         if (lump == NULL) {
2058                 RETURN(-ENOMEM);
2059         }
2060         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2061         if (rc) {
2062                 OBD_FREE(lump, lum_size);
2063                 RETURN(-EFAULT);
2064         }
2065
2066         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2067
2068         OBD_FREE(lump, lum_size);
2069         RETURN(rc);
2070 }
2071
2072 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2073                             unsigned long arg)
2074 {
2075         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
2076         int rc;
2077         int flags = FMODE_WRITE;
2078         ENTRY;
2079
2080         /* Bug 1152: copy properly when this is no longer true */
2081         LASSERT(sizeof(lum) == sizeof(*lump));
2082         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
2083         rc = copy_from_user(&lum, lump, sizeof(lum));
2084         if (rc)
2085                 RETURN(-EFAULT);
2086
2087         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
2088         if (rc == 0) {
2089                  put_user(0, &lump->lmm_stripe_count);
2090                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
2091                                     0, ll_i2info(inode)->lli_smd, lump);
2092         }
2093         RETURN(rc);
2094 }
2095
2096 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2097 {
2098         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2099
2100         if (!lsm)
2101                 RETURN(-ENODATA);
2102
2103         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
2104                             (void *)arg);
2105 }
2106
2107 static int ll_get_grouplock(struct inode *inode, struct file *file,
2108                             unsigned long arg)
2109 {
2110         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2111         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2112                                                     .end = OBD_OBJECT_EOF}};
2113         struct lustre_handle lockh = { 0 };
2114         struct ll_inode_info *lli = ll_i2info(inode);
2115         struct lov_stripe_md *lsm = lli->lli_smd;
2116         int flags = 0, rc;
2117         ENTRY;
2118
2119         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2120                 RETURN(-EINVAL);
2121         }
2122
2123         policy.l_extent.gid = arg;
2124         if (file->f_flags & O_NONBLOCK)
2125                 flags = LDLM_FL_BLOCK_NOWAIT;
2126
2127         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2128         if (rc)
2129                 RETURN(rc);
2130
2131         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2132         fd->fd_gid = arg;
2133         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2134
2135         RETURN(0);
2136 }
2137
2138 static int ll_put_grouplock(struct inode *inode, struct file *file,
2139                             unsigned long arg)
2140 {
2141         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2142         struct ll_inode_info *lli = ll_i2info(inode);
2143         struct lov_stripe_md *lsm = lli->lli_smd;
2144         int rc;
2145         ENTRY;
2146
2147         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2148                 /* Ugh, it's already unlocked. */
2149                 RETURN(-EINVAL);
2150         }
2151
2152         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2153                 RETURN(-EINVAL);
2154
2155         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2156
2157         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2158         if (rc)
2159                 RETURN(rc);
2160
2161         fd->fd_gid = 0;
2162         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2163
2164         RETURN(0);
2165 }
2166
2167 static int join_sanity_check(struct inode *head, struct inode *tail)
2168 {
2169         ENTRY;
2170         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2171                 CERROR("server do not support join \n");
2172                 RETURN(-EINVAL);
2173         }
2174         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2175                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2176                        head->i_ino, tail->i_ino);
2177                 RETURN(-EINVAL);
2178         }
2179         if (head->i_ino == tail->i_ino) {
2180                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2181                 RETURN(-EINVAL);
2182         }
2183         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2184                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2185                 RETURN(-EINVAL);
2186         }
2187         RETURN(0);
2188 }
2189
2190 static int join_file(struct inode *head_inode, struct file *head_filp,
2191                      struct file *tail_filp)
2192 {
2193         struct dentry *tail_dentry = tail_filp->f_dentry;
2194         struct lookup_intent oit = {.it_op = IT_OPEN,
2195                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2196         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2197                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2198
2199         struct lustre_handle lockh;
2200         struct md_op_data *op_data;
2201         int    rc;
2202         loff_t data;
2203         ENTRY;
2204
2205         tail_dentry = tail_filp->f_dentry;
2206
2207         data = i_size_read(head_inode);
2208         op_data = ll_prep_md_op_data(NULL, head_inode,
2209                                      tail_dentry->d_parent->d_inode,
2210                                      tail_dentry->d_name.name,
2211                                      tail_dentry->d_name.len, 0,
2212                                      LUSTRE_OPC_ANY, &data);
2213         if (IS_ERR(op_data))
2214                 RETURN(PTR_ERR(op_data));
2215
2216         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2217                          op_data, &lockh, NULL, 0, 0);
2218
2219         ll_finish_md_op_data(op_data);
2220         if (rc < 0)
2221                 GOTO(out, rc);
2222
2223         rc = oit.d.lustre.it_status;
2224
2225         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2226                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2227                 ptlrpc_req_finished((struct ptlrpc_request *)
2228                                     oit.d.lustre.it_data);
2229                 GOTO(out, rc);
2230         }
2231
2232         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2233                                            * away */
2234                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2235                 oit.d.lustre.it_lock_mode = 0;
2236         }
2237         ll_release_openhandle(head_filp->f_dentry, &oit);
2238 out:
2239         ll_intent_release(&oit);
2240         RETURN(rc);
2241 }
2242
2243 static int ll_file_join(struct inode *head, struct file *filp,
2244                         char *filename_tail)
2245 {
2246         struct inode *tail = NULL, *first = NULL, *second = NULL;
2247         struct dentry *tail_dentry;
2248         struct file *tail_filp, *first_filp, *second_filp;
2249         struct ll_lock_tree first_tree, second_tree;
2250         struct ll_lock_tree_node *first_node, *second_node;
2251         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2252         int rc = 0, cleanup_phase = 0;
2253         ENTRY;
2254
2255         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2256                head->i_ino, head->i_generation, head, filename_tail);
2257
2258         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2259         if (IS_ERR(tail_filp)) {
2260                 CERROR("Can not open tail file %s", filename_tail);
2261                 rc = PTR_ERR(tail_filp);
2262                 GOTO(cleanup, rc);
2263         }
2264         tail = igrab(tail_filp->f_dentry->d_inode);
2265
2266         tlli = ll_i2info(tail);
2267         tail_dentry = tail_filp->f_dentry;
2268         LASSERT(tail_dentry);
2269         cleanup_phase = 1;
2270
2271         /*reorder the inode for lock sequence*/
2272         first = head->i_ino > tail->i_ino ? head : tail;
2273         second = head->i_ino > tail->i_ino ? tail : head;
2274         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2275         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2276
2277         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2278                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2279         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2280         if (IS_ERR(first_node)){
2281                 rc = PTR_ERR(first_node);
2282                 GOTO(cleanup, rc);
2283         }
2284         first_tree.lt_fd = first_filp->private_data;
2285         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2286         if (rc != 0)
2287                 GOTO(cleanup, rc);
2288         cleanup_phase = 2;
2289
2290         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2291         if (IS_ERR(second_node)){
2292                 rc = PTR_ERR(second_node);
2293                 GOTO(cleanup, rc);
2294         }
2295         second_tree.lt_fd = second_filp->private_data;
2296         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2297         if (rc != 0)
2298                 GOTO(cleanup, rc);
2299         cleanup_phase = 3;
2300
2301         rc = join_sanity_check(head, tail);
2302         if (rc)
2303                 GOTO(cleanup, rc);
2304
2305         rc = join_file(head, filp, tail_filp);
2306         if (rc)
2307                 GOTO(cleanup, rc);
2308 cleanup:
2309         switch (cleanup_phase) {
2310         case 3:
2311                 ll_tree_unlock(&second_tree);
2312                 obd_cancel_unused(ll_i2dtexp(second),
2313                                   ll_i2info(second)->lli_smd, 0, NULL);
2314         case 2:
2315                 ll_tree_unlock(&first_tree);
2316                 obd_cancel_unused(ll_i2dtexp(first),
2317                                   ll_i2info(first)->lli_smd, 0, NULL);
2318         case 1:
2319                 filp_close(tail_filp, 0);
2320                 if (tail)
2321                         iput(tail);
2322                 if (head && rc == 0) {
2323                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2324                                        &hlli->lli_smd);
2325                         hlli->lli_smd = NULL;
2326                 }
2327         case 0:
2328                 break;
2329         default:
2330                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2331                 LBUG();
2332         }
2333         RETURN(rc);
2334 }
2335
2336 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2337 {
2338         struct inode *inode = dentry->d_inode;
2339         struct obd_client_handle *och;
2340         int rc;
2341         ENTRY;
2342
2343         LASSERT(inode);
2344
2345         /* Root ? Do nothing. */
2346         if (dentry->d_inode->i_sb->s_root == dentry)
2347                 RETURN(0);
2348
2349         /* No open handle to close? Move away */
2350         if (!it_disposition(it, DISP_OPEN_OPEN))
2351                 RETURN(0);
2352
2353         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2354
2355         OBD_ALLOC(och, sizeof(*och));
2356         if (!och)
2357                 GOTO(out, rc = -ENOMEM);
2358
2359         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2360                     ll_i2info(inode), it, och);
2361
2362         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2363                                        inode, och);
2364  out:
2365         /* this one is in place of ll_file_open */
2366         ptlrpc_req_finished(it->d.lustre.it_data);
2367         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2368         RETURN(rc);
2369 }
2370
2371 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2372                   unsigned long arg)
2373 {
2374         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2375         int flags;
2376         ENTRY;
2377
2378         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2379                inode->i_generation, inode, cmd);
2380         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2381
2382         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2383         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2384                 RETURN(-ENOTTY);
2385
2386         switch(cmd) {
2387         case LL_IOC_GETFLAGS:
2388                 /* Get the current value of the file flags */
2389                 return put_user(fd->fd_flags, (int *)arg);
2390         case LL_IOC_SETFLAGS:
2391         case LL_IOC_CLRFLAGS:
2392                 /* Set or clear specific file flags */
2393                 /* XXX This probably needs checks to ensure the flags are
2394                  *     not abused, and to handle any flag side effects.
2395                  */
2396                 if (get_user(flags, (int *) arg))
2397                         RETURN(-EFAULT);
2398
2399                 if (cmd == LL_IOC_SETFLAGS) {
2400                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2401                             !(file->f_flags & O_DIRECT)) {
2402                                 CERROR("%s: unable to disable locking on "
2403                                        "non-O_DIRECT file\n", current->comm);
2404                                 RETURN(-EINVAL);
2405                         }
2406
2407                         fd->fd_flags |= flags;
2408                 } else {
2409                         fd->fd_flags &= ~flags;
2410                 }
2411                 RETURN(0);
2412         case LL_IOC_LOV_SETSTRIPE:
2413                 RETURN(ll_lov_setstripe(inode, file, arg));
2414         case LL_IOC_LOV_SETEA:
2415                 RETURN(ll_lov_setea(inode, file, arg));
2416         case LL_IOC_LOV_GETSTRIPE:
2417                 RETURN(ll_lov_getstripe(inode, arg));
2418         case LL_IOC_RECREATE_OBJ:
2419                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2420         case EXT3_IOC_GETFLAGS:
2421         case EXT3_IOC_SETFLAGS:
2422                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2423         case EXT3_IOC_GETVERSION_OLD:
2424         case EXT3_IOC_GETVERSION:
2425                 RETURN(put_user(inode->i_generation, (int *)arg));
2426         case LL_IOC_JOIN: {
2427                 char *ftail;
2428                 int rc;
2429
2430                 ftail = getname((const char *)arg);
2431                 if (IS_ERR(ftail))
2432                         RETURN(PTR_ERR(ftail));
2433                 rc = ll_file_join(inode, file, ftail);
2434                 putname(ftail);
2435                 RETURN(rc);
2436         }
2437         case LL_IOC_GROUP_LOCK:
2438                 RETURN(ll_get_grouplock(inode, file, arg));
2439         case LL_IOC_GROUP_UNLOCK:
2440                 RETURN(ll_put_grouplock(inode, file, arg));
2441         case IOC_OBD_STATFS:
2442                 RETURN(ll_obd_statfs(inode, (void *)arg));
2443
2444         /* We need to special case any other ioctls we want to handle,
2445          * to send them to the MDS/OST as appropriate and to properly
2446          * network encode the arg field.
2447         case EXT3_IOC_SETVERSION_OLD:
2448         case EXT3_IOC_SETVERSION:
2449         */
2450         case LL_IOC_FLUSHCTX:
2451                 RETURN(ll_flush_ctx(inode));
2452         default: {
2453                 int err;
2454
2455                 if (LLIOC_STOP == 
2456                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2457                         RETURN(err);
2458
2459                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2460                                      (void *)arg));
2461         }
2462         }
2463 }
2464
2465 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2466 {
2467         struct inode *inode = file->f_dentry->d_inode;
2468         struct ll_inode_info *lli = ll_i2info(inode);
2469         struct lov_stripe_md *lsm = lli->lli_smd;
2470         loff_t retval;
2471         ENTRY;
2472         retval = offset + ((origin == 2) ? i_size_read(inode) :
2473                            (origin == 1) ? file->f_pos : 0);
2474         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2475                inode->i_ino, inode->i_generation, inode, retval, retval,
2476                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2477         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2478
2479         if (origin == 2) { /* SEEK_END */
2480                 int nonblock = 0, rc;
2481
2482                 if (file->f_flags & O_NONBLOCK)
2483                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2484
2485                 if (lsm != NULL) {
2486                         rc = ll_glimpse_size(inode, nonblock);
2487                         if (rc != 0)
2488                                 RETURN(rc);
2489                 }
2490
2491                 ll_inode_size_lock(inode, 0);
2492                 offset += i_size_read(inode);
2493                 ll_inode_size_unlock(inode, 0);
2494         } else if (origin == 1) { /* SEEK_CUR */
2495                 offset += file->f_pos;
2496         }
2497
2498         retval = -EINVAL;
2499         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2500                 if (offset != file->f_pos) {
2501                         file->f_pos = offset;
2502 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2503                         file->f_reada = 0;
2504                         file->f_version = ++event;
2505 #endif
2506                 }
2507                 retval = offset;
2508         }
2509         
2510         RETURN(retval);
2511 }
2512
2513 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2514 {
2515         struct inode *inode = dentry->d_inode;
2516         struct ll_inode_info *lli = ll_i2info(inode);
2517         struct lov_stripe_md *lsm = lli->lli_smd;
2518         struct ptlrpc_request *req;
2519         struct obd_capa *oc;
2520         int rc, err;
2521         ENTRY;
2522         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2523                inode->i_generation, inode);
2524         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2525
2526         /* fsync's caller has already called _fdata{sync,write}, we want
2527          * that IO to finish before calling the osc and mdc sync methods */
2528         rc = filemap_fdatawait(inode->i_mapping);
2529
2530         /* catch async errors that were recorded back when async writeback
2531          * failed for pages in this mapping. */
2532         err = lli->lli_async_rc;
2533         lli->lli_async_rc = 0;
2534         if (rc == 0)
2535                 rc = err;
2536         if (lsm) {
2537                 err = lov_test_and_clear_async_rc(lsm);
2538                 if (rc == 0)
2539                         rc = err;
2540         }
2541
2542         oc = ll_mdscapa_get(inode);
2543         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2544                       &req);
2545         capa_put(oc);
2546         if (!rc)
2547                 rc = err;
2548         if (!err)
2549                 ptlrpc_req_finished(req);
2550
2551         if (data && lsm) {
2552                 struct obdo *oa;
2553                 
2554                 OBDO_ALLOC(oa);
2555                 if (!oa)
2556                         RETURN(rc ? rc : -ENOMEM);
2557
2558                 oa->o_id = lsm->lsm_object_id;
2559                 oa->o_gr = lsm->lsm_object_gr;
2560                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2561                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2562                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2563                                            OBD_MD_FLGROUP);
2564
2565                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2566                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2567                                0, OBD_OBJECT_EOF, oc);
2568                 capa_put(oc);
2569                 if (!rc)
2570                         rc = err;
2571                 OBDO_FREE(oa);
2572         }
2573
2574         RETURN(rc);
2575 }
2576
2577 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2578 {
2579         struct inode *inode = file->f_dentry->d_inode;
2580         struct ll_sb_info *sbi = ll_i2sbi(inode);
2581         struct ldlm_res_id res_id =
2582                 { .name = { fid_seq(ll_inode2fid(inode)),
2583                             fid_oid(ll_inode2fid(inode)),
2584                             fid_ver(ll_inode2fid(inode)),
2585                             LDLM_FLOCK} };
2586         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2587                 ldlm_flock_completion_ast, NULL, file_lock };
2588         struct lustre_handle lockh = {0};
2589         ldlm_policy_data_t flock;
2590         int flags = 0;
2591         int rc;
2592         ENTRY;
2593
2594         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2595                inode->i_ino, file_lock);
2596
2597         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2598  
2599         if (file_lock->fl_flags & FL_FLOCK) {
2600                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2601                 /* set missing params for flock() calls */
2602                 file_lock->fl_end = OFFSET_MAX;
2603                 file_lock->fl_pid = current->tgid;
2604         }
2605         flock.l_flock.pid = file_lock->fl_pid;
2606         flock.l_flock.start = file_lock->fl_start;
2607         flock.l_flock.end = file_lock->fl_end;
2608
2609         switch (file_lock->fl_type) {
2610         case F_RDLCK:
2611                 einfo.ei_mode = LCK_PR;
2612                 break;
2613         case F_UNLCK:
2614                 /* An unlock request may or may not have any relation to
2615                  * existing locks so we may not be able to pass a lock handle
2616                  * via a normal ldlm_lock_cancel() request. The request may even
2617                  * unlock a byte range in the middle of an existing lock. In
2618                  * order to process an unlock request we need all of the same
2619                  * information that is given with a normal read or write record
2620                  * lock request. To avoid creating another ldlm unlock (cancel)
2621                  * message we'll treat a LCK_NL flock request as an unlock. */
2622                 einfo.ei_mode = LCK_NL;
2623                 break;
2624         case F_WRLCK:
2625                 einfo.ei_mode = LCK_PW;
2626                 break;
2627         default:
2628                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2629                 LBUG();
2630         }
2631
2632         switch (cmd) {
2633         case F_SETLKW:
2634 #ifdef F_SETLKW64
2635         case F_SETLKW64:
2636 #endif
2637                 flags = 0;
2638                 break;
2639         case F_SETLK:
2640 #ifdef F_SETLK64
2641         case F_SETLK64:
2642 #endif
2643                 flags = LDLM_FL_BLOCK_NOWAIT;
2644                 break;
2645         case F_GETLK:
2646 #ifdef F_GETLK64
2647         case F_GETLK64:
2648 #endif
2649                 flags = LDLM_FL_TEST_LOCK;
2650                 /* Save the old mode so that if the mode in the lock changes we
2651                  * can decrement the appropriate reader or writer refcount. */
2652                 file_lock->fl_type = einfo.ei_mode;
2653                 break;
2654         default:
2655                 CERROR("unknown fcntl lock command: %d\n", cmd);
2656                 LBUG();
2657         }
2658
2659         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2660                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2661                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2662
2663         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2664                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2665         if ((file_lock->fl_flags & FL_FLOCK) &&
2666             (rc == 0 || file_lock->fl_type == F_UNLCK))
2667                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2668 #ifdef HAVE_F_OP_FLOCK
2669         if ((file_lock->fl_flags & FL_POSIX) &&
2670             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2671             !(flags & LDLM_FL_TEST_LOCK))
2672                 posix_lock_file_wait(file, file_lock);
2673 #endif
2674
2675         RETURN(rc);
2676 }
2677
2678 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2679 {
2680         ENTRY;
2681
2682         RETURN(-ENOSYS);
2683 }
2684
2685 int ll_have_md_lock(struct inode *inode, __u64 bits)
2686 {
2687         struct lustre_handle lockh;
2688         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2689         struct lu_fid *fid;
2690         int flags;
2691         ENTRY;
2692
2693         if (!inode)
2694                RETURN(0);
2695
2696         fid = &ll_i2info(inode)->lli_fid;
2697         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2698
2699         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2700         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2701                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2702                 RETURN(1);
2703         }
2704         RETURN(0);
2705 }
2706
2707 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2708                             struct lustre_handle *lockh)
2709 {
2710         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2711         struct lu_fid *fid;
2712         ldlm_mode_t rc;
2713         int flags;
2714         ENTRY;
2715
2716         fid = &ll_i2info(inode)->lli_fid;
2717         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2718
2719         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2720         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2721                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2722         RETURN(rc);
2723 }
2724
2725 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2726         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2727                               * and return success */
2728                 inode->i_nlink = 0;
2729                 /* This path cannot be hit for regular files unless in
2730                  * case of obscure races, so no need to to validate
2731                  * size. */
2732                 if (!S_ISREG(inode->i_mode) &&
2733                     !S_ISDIR(inode->i_mode))
2734                         return 0;
2735         }
2736
2737         if (rc) {
2738                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2739                 return -abs(rc);
2740
2741         }
2742
2743         return 0;
2744 }
2745
2746 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2747 {
2748         struct inode *inode = dentry->d_inode;
2749         struct ptlrpc_request *req = NULL;
2750         struct ll_sb_info *sbi;
2751         struct obd_export *exp;
2752         int rc;
2753         ENTRY;
2754
2755         if (!inode) {
2756                 CERROR("REPORT THIS LINE TO PETER\n");
2757                 RETURN(0);
2758         }
2759         sbi = ll_i2sbi(inode);
2760
2761         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2762                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2763
2764         exp = ll_i2mdexp(inode);
2765
2766         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2767                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2768                 struct md_op_data *op_data;
2769
2770                 /* Call getattr by fid, so do not provide name at all. */
2771                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2772                                              dentry->d_inode, NULL, 0, 0,
2773                                              LUSTRE_OPC_ANY, NULL);
2774                 if (IS_ERR(op_data))
2775                         RETURN(PTR_ERR(op_data));
2776
2777                 oit.it_flags |= O_CHECK_STALE;
2778                 rc = md_intent_lock(exp, op_data, NULL, 0,
2779                                     /* we are not interested in name
2780                                        based lookup */
2781                                     &oit, 0, &req,
2782                                     ll_md_blocking_ast, 0);
2783                 ll_finish_md_op_data(op_data);
2784                 oit.it_flags &= ~O_CHECK_STALE;
2785                 if (rc < 0) {
2786                         rc = ll_inode_revalidate_fini(inode, rc);
2787                         GOTO (out, rc);
2788                 }
2789
2790                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2791                 if (rc != 0) {
2792                         ll_intent_release(&oit);
2793                         GOTO(out, rc);
2794                 }
2795
2796                 /* Unlinked? Unhash dentry, so it is not picked up later by
2797                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2798                    here to preserve get_cwd functionality on 2.6.
2799                    Bug 10503 */
2800                 if (!dentry->d_inode->i_nlink) {
2801                         spin_lock(&dcache_lock);
2802                         ll_drop_dentry(dentry);
2803                         spin_unlock(&dcache_lock);
2804                 }
2805
2806                 ll_lookup_finish_locks(&oit, dentry);
2807         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
2808                                                      MDS_INODELOCK_LOOKUP)) {
2809                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2810                 obd_valid valid = OBD_MD_FLGETATTR;
2811                 struct obd_capa *oc;
2812                 int ealen = 0;
2813
2814                 if (S_ISREG(inode->i_mode)) {
2815                         rc = ll_get_max_mdsize(sbi, &ealen);
2816                         if (rc)
2817                                 RETURN(rc);
2818                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2819                 }
2820                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2821                  * capa for this inode. Because we only keep capas of dirs
2822                  * fresh. */
2823                 oc = ll_mdscapa_get(inode);
2824                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2825                                 ealen, &req);
2826                 capa_put(oc);
2827                 if (rc) {
2828                         rc = ll_inode_revalidate_fini(inode, rc);
2829                         RETURN(rc);
2830                 }
2831
2832                 rc = ll_prep_inode(&inode, req, NULL);
2833                 if (rc)
2834                         GOTO(out, rc);
2835         }
2836
2837         /* if object not yet allocated, don't validate size */
2838         if (ll_i2info(inode)->lli_smd == NULL)
2839                 GOTO(out, rc = 0);
2840
2841         /* ll_glimpse_size will prefer locally cached writes if they extend
2842          * the file */
2843         rc = ll_glimpse_size(inode, 0);
2844         EXIT;
2845 out:
2846         ptlrpc_req_finished(req);
2847         return rc;
2848 }
2849
2850 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2851                   struct lookup_intent *it, struct kstat *stat)
2852 {
2853         struct inode *inode = de->d_inode;
2854         int res = 0;
2855
2856         res = ll_inode_revalidate_it(de, it);
2857         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2858
2859         if (res)
2860                 return res;
2861
2862         stat->dev = inode->i_sb->s_dev;
2863         stat->ino = inode->i_ino;
2864         stat->mode = inode->i_mode;
2865         stat->nlink = inode->i_nlink;
2866         stat->uid = inode->i_uid;
2867         stat->gid = inode->i_gid;
2868         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2869         stat->atime = inode->i_atime;
2870         stat->mtime = inode->i_mtime;
2871         stat->ctime = inode->i_ctime;
2872 #ifdef HAVE_INODE_BLKSIZE
2873         stat->blksize = inode->i_blksize;
2874 #else
2875         stat->blksize = 1 << inode->i_blkbits;
2876 #endif
2877
2878         ll_inode_size_lock(inode, 0);
2879         stat->size = i_size_read(inode);
2880         stat->blocks = inode->i_blocks;
2881         ll_inode_size_unlock(inode, 0);
2882
2883         return 0;
2884 }
2885 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2886 {
2887         struct lookup_intent it = { .it_op = IT_GETATTR };
2888
2889         return ll_getattr_it(mnt, de, &it, stat);
2890 }
2891
2892 static
2893 int lustre_check_acl(struct inode *inode, int mask)
2894 {
2895 #ifdef CONFIG_FS_POSIX_ACL
2896         struct ll_inode_info *lli = ll_i2info(inode);
2897         struct posix_acl *acl;
2898         int rc;
2899         ENTRY;
2900
2901         spin_lock(&lli->lli_lock);
2902         acl = posix_acl_dup(lli->lli_posix_acl);
2903         spin_unlock(&lli->lli_lock);
2904
2905         if (!acl)
2906                 RETURN(-EAGAIN);
2907
2908         rc = posix_acl_permission(inode, acl, mask);
2909         posix_acl_release(acl);
2910
2911         RETURN(rc);
2912 #else
2913         return -EAGAIN;
2914 #endif
2915 }
2916
2917 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2918 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2919 {
2920         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2921                inode->i_ino, inode->i_generation, inode, mask);
2922         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2923                 return lustre_check_remote_perm(inode, mask);
2924         
2925         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2926         return generic_permission(inode, mask, lustre_check_acl);
2927 }
2928 #else
2929 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2930 {
2931         int mode = inode->i_mode;
2932         int rc;
2933
2934         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2935                inode->i_ino, inode->i_generation, inode, mask);
2936
2937         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2938                 return lustre_check_remote_perm(inode, mask);
2939
2940         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2941
2942         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2943             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2944                 return -EROFS;
2945         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2946                 return -EACCES;
2947         if (current->fsuid == inode->i_uid) {
2948                 mode >>= 6;
2949         } else if (1) {
2950                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2951                         goto check_groups;
2952                 rc = lustre_check_acl(inode, mask);
2953                 if (rc == -EAGAIN)
2954                         goto check_groups;
2955                 if (rc == -EACCES)
2956                         goto check_capabilities;
2957                 return rc;
2958         } else {
2959 check_groups:
2960                 if (in_group_p(inode->i_gid))
2961                         mode >>= 3;
2962         }
2963         if ((mode & mask & S_IRWXO) == mask)
2964                 return 0;
2965
2966 check_capabilities:
2967         if (!(mask & MAY_EXEC) ||
2968             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2969                 if (capable(CAP_DAC_OVERRIDE))
2970                         return 0;
2971
2972         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2973             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2974                 return 0;
2975         
2976         return -EACCES;
2977 }
2978 #endif
2979
2980 /* -o localflock - only provides locally consistent flock locks */
2981 struct file_operations ll_file_operations = {
2982         .read           = ll_file_read,
2983         .write          = ll_file_write,
2984         .ioctl          = ll_file_ioctl,
2985         .open           = ll_file_open,
2986         .release        = ll_file_release,
2987         .mmap           = ll_file_mmap,
2988         .llseek         = ll_file_seek,
2989         .sendfile       = ll_file_sendfile,
2990         .fsync          = ll_fsync,
2991 };
2992
2993 struct file_operations ll_file_operations_flock = {
2994         .read           = ll_file_read,
2995         .write          = ll_file_write,
2996         .ioctl          = ll_file_ioctl,
2997         .open           = ll_file_open,
2998         .release        = ll_file_release,
2999         .mmap           = ll_file_mmap,
3000         .llseek         = ll_file_seek,
3001         .sendfile       = ll_file_sendfile,
3002         .fsync          = ll_fsync,
3003 #ifdef HAVE_F_OP_FLOCK
3004         .flock          = ll_file_flock,
3005 #endif
3006         .lock           = ll_file_flock
3007 };
3008
3009 /* These are for -o noflock - to return ENOSYS on flock calls */
3010 struct file_operations ll_file_operations_noflock = {
3011         .read           = ll_file_read,
3012         .write          = ll_file_write,
3013         .ioctl          = ll_file_ioctl,
3014         .open           = ll_file_open,
3015         .release        = ll_file_release,
3016         .mmap           = ll_file_mmap,
3017         .llseek         = ll_file_seek,
3018         .sendfile       = ll_file_sendfile,
3019         .fsync          = ll_fsync,
3020 #ifdef HAVE_F_OP_FLOCK
3021         .flock          = ll_file_noflock,
3022 #endif
3023         .lock           = ll_file_noflock
3024 };
3025
3026 struct inode_operations ll_file_inode_operations = {
3027 #ifdef HAVE_VFS_INTENT_PATCHES
3028         .setattr_raw    = ll_setattr_raw,
3029 #endif
3030         .setattr        = ll_setattr,
3031         .truncate       = ll_truncate,
3032         .getattr        = ll_getattr,
3033         .permission     = ll_inode_permission,
3034         .setxattr       = ll_setxattr,
3035         .getxattr       = ll_getxattr,
3036         .listxattr      = ll_listxattr,
3037         .removexattr    = ll_removexattr,
3038 };
3039
3040 /* dynamic ioctl number support routins */
3041 static struct llioc_ctl_data {
3042         struct rw_semaphore ioc_sem;
3043         struct list_head    ioc_head;
3044 } llioc = { 
3045         __RWSEM_INITIALIZER(llioc.ioc_sem), 
3046         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3047 };
3048
3049
3050 struct llioc_data {
3051         struct list_head        iocd_list;
3052         unsigned int            iocd_size;
3053         llioc_callback_t        iocd_cb;
3054         unsigned int            iocd_count;
3055         unsigned int            iocd_cmd[0];
3056 };
3057
3058 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3059 {
3060         unsigned int size;
3061         struct llioc_data *in_data = NULL;
3062         ENTRY;
3063
3064         if (cb == NULL || cmd == NULL ||
3065             count > LLIOC_MAX_CMD || count < 0)
3066                 RETURN(NULL);
3067
3068         size = sizeof(*in_data) + count * sizeof(unsigned int);
3069         OBD_ALLOC(in_data, size);
3070         if (in_data == NULL)
3071                 RETURN(NULL);
3072
3073         memset(in_data, 0, sizeof(*in_data));
3074         in_data->iocd_size = size;
3075         in_data->iocd_cb = cb;
3076         in_data->iocd_count = count;
3077         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3078
3079         down_write(&llioc.ioc_sem);
3080         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3081         up_write(&llioc.ioc_sem);
3082
3083         RETURN(in_data);
3084 }
3085
3086 void ll_iocontrol_unregister(void *magic)
3087 {
3088         struct llioc_data *tmp;
3089
3090         if (magic == NULL)
3091                 return;
3092
3093         down_write(&llioc.ioc_sem);
3094         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3095                 if (tmp == magic) {
3096                         unsigned int size = tmp->iocd_size;
3097
3098                         list_del(&tmp->iocd_list);
3099                         up_write(&llioc.ioc_sem);
3100
3101                         OBD_FREE(tmp, size);
3102                         return;
3103                 }
3104         }
3105         up_write(&llioc.ioc_sem);
3106
3107         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3108 }
3109
3110 EXPORT_SYMBOL(ll_iocontrol_register);
3111 EXPORT_SYMBOL(ll_iocontrol_unregister);
3112
3113 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
3114                         unsigned int cmd, unsigned long arg, int *rcp)
3115 {
3116         enum llioc_iter ret = LLIOC_CONT;
3117         struct llioc_data *data;
3118         int rc = -EINVAL, i;
3119
3120         down_read(&llioc.ioc_sem);
3121         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3122                 for (i = 0; i < data->iocd_count; i++) {
3123                         if (cmd != data->iocd_cmd[i]) 
3124                                 continue;
3125
3126                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3127                         break;
3128                 }
3129
3130                 if (ret == LLIOC_STOP)
3131                         break;
3132         }
3133         up_read(&llioc.ioc_sem);
3134
3135         if (rcp)
3136                 *rcp = rc;
3137         return ret;
3138 }