Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #include "llite_internal.h"
32
33 /* also used by llite/special.c:ll_special_open() */
34 struct ll_file_data *ll_file_data_get(void)
35 {
36         struct ll_file_data *fd;
37
38         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
39         return fd;
40 }
41
42 static void ll_file_data_put(struct ll_file_data *fd)
43 {
44         if (fd != NULL)
45                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
46 }
47
48 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
49                           struct lustre_handle *fh)
50 {
51         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
52         op_data->op_attr.ia_mode = inode->i_mode;
53         op_data->op_attr.ia_atime = inode->i_atime;
54         op_data->op_attr.ia_mtime = inode->i_mtime;
55         op_data->op_attr.ia_ctime = inode->i_ctime;
56         op_data->op_attr.ia_size = i_size_read(inode);
57         op_data->op_attr_blocks = inode->i_blocks;
58         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
59         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
60         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
61         op_data->op_capa1 = ll_mdscapa_get(inode);
62 }
63
64 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
65                              struct obd_client_handle *och)
66 {
67         ENTRY;
68
69         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
70                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
71
72         if (!(och->och_flags & FMODE_WRITE))
73                 goto out;
74
75         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
76             !S_ISREG(inode->i_mode))
77                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
78         else
79                 ll_epoch_close(inode, op_data, &och, 0);
80
81 out:
82         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
83         EXIT;
84 }
85
86 static int ll_close_inode_openhandle(struct obd_export *md_exp,
87                                      struct inode *inode,
88                                      struct obd_client_handle *och)
89 {
90         struct obd_export *exp = ll_i2mdexp(inode);
91         struct md_op_data *op_data;
92         struct ptlrpc_request *req = NULL;
93         struct obd_device *obd = class_exp2obd(exp);
94         int epoch_close = 1;
95         int seq_end = 0, rc;
96         ENTRY;
97
98         if (obd == NULL) {
99                 /*
100                  * XXX: in case of LMV, is this correct to access
101                  * ->exp_handle?
102                  */
103                 CERROR("Invalid MDC connection handle "LPX64"\n",
104                        ll_i2mdexp(inode)->exp_handle.h_cookie);
105                 GOTO(out, rc = 0);
106         }
107
108         /*
109          * here we check if this is forced umount. If so this is called on
110          * canceling "open lock" and we do not call md_close() in this case, as
111          * it will not be successful, as import is already deactivated.
112          */
113         if (obd->obd_force)
114                 GOTO(out, rc = 0);
115
116         OBD_ALLOC_PTR(op_data);
117         if (op_data == NULL)
118                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
119
120         ll_prepare_close(inode, op_data, och);
121         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
122         rc = md_close(md_exp, op_data, och->och_mod, &req);
123         if (rc != -EAGAIN)
124                 seq_end = 1;
125
126         if (rc == -EAGAIN) {
127                 /* This close must have the epoch closed. */
128                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
129                 LASSERT(epoch_close);
130                 /* MDS has instructed us to obtain Size-on-MDS attribute from
131                  * OSTs and send setattr to back to MDS. */
132                 rc = ll_sizeonmds_update(inode, och->och_mod,
133                                          &och->och_fh, op_data->op_ioepoch);
134                 if (rc) {
135                         CERROR("inode %lu mdc Size-on-MDS update failed: "
136                                "rc = %d\n", inode->i_ino, rc);
137                         rc = 0;
138                 }
139         } else if (rc) {
140                 CERROR("inode %lu mdc close failed: rc = %d\n",
141                        inode->i_ino, rc);
142         }
143         ll_finish_md_op_data(op_data);
144
145         if (rc == 0) {
146                 rc = ll_objects_destroy(req, inode);
147                 if (rc)
148                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
149                                inode->i_ino, rc);
150         }
151
152         EXIT;
153 out:
154       
155         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
156             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
157                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
158         } else {
159                 if (seq_end)
160                         ptlrpc_close_replay_seq(req);
161                 md_clear_open_replay_data(md_exp, och);
162                 /* Free @och if it is not waiting for DONE_WRITING. */
163                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
164                 OBD_FREE_PTR(och);
165         }
166         if (req) /* This is close request */
167                 ptlrpc_req_finished(req);
168         return rc;
169 }
170
171 int ll_md_real_close(struct inode *inode, int flags)
172 {
173         struct ll_inode_info *lli = ll_i2info(inode);
174         struct obd_client_handle **och_p;
175         struct obd_client_handle *och;
176         __u64 *och_usecount;
177         int rc = 0;
178         ENTRY;
179
180         if (flags & FMODE_WRITE) {
181                 och_p = &lli->lli_mds_write_och;
182                 och_usecount = &lli->lli_open_fd_write_count;
183         } else if (flags & FMODE_EXEC) {
184                 och_p = &lli->lli_mds_exec_och;
185                 och_usecount = &lli->lli_open_fd_exec_count;
186         } else {
187                 LASSERT(flags & FMODE_READ);
188                 och_p = &lli->lli_mds_read_och;
189                 och_usecount = &lli->lli_open_fd_read_count;
190         }
191
192         down(&lli->lli_och_sem);
193         if (*och_usecount) { /* There are still users of this handle, so
194                                 skip freeing it. */
195                 up(&lli->lli_och_sem);
196                 RETURN(0);
197         }
198         och=*och_p;
199         *och_p = NULL;
200         up(&lli->lli_och_sem);
201
202         if (och) { /* There might be a race and somebody have freed this och
203                       already */
204                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
205                                                inode, och);
206         }
207
208         RETURN(rc);
209 }
210
211 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
212                 struct file *file)
213 {
214         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
215         struct ll_inode_info *lli = ll_i2info(inode);
216         int rc = 0;
217         ENTRY;
218
219         /* clear group lock, if present */
220         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
221                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
222                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
223                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
224                                       &fd->fd_cwlockh);
225         }
226
227         /* Let's see if we have good enough OPEN lock on the file and if
228            we can skip talking to MDS */
229         if (file->f_dentry->d_inode) { /* Can this ever be false? */
230                 int lockmode;
231                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
232                 struct lustre_handle lockh;
233                 struct inode *inode = file->f_dentry->d_inode;
234                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
235
236                 down(&lli->lli_och_sem);
237                 if (fd->fd_omode & FMODE_WRITE) {
238                         lockmode = LCK_CW;
239                         LASSERT(lli->lli_open_fd_write_count);
240                         lli->lli_open_fd_write_count--;
241                 } else if (fd->fd_omode & FMODE_EXEC) {
242                         lockmode = LCK_PR;
243                         LASSERT(lli->lli_open_fd_exec_count);
244                         lli->lli_open_fd_exec_count--;
245                 } else {
246                         lockmode = LCK_CR;
247                         LASSERT(lli->lli_open_fd_read_count);
248                         lli->lli_open_fd_read_count--;
249                 }
250                 up(&lli->lli_och_sem);
251
252                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
253                                    LDLM_IBITS, &policy, lockmode,
254                                    &lockh)) {
255                         rc = ll_md_real_close(file->f_dentry->d_inode,
256                                               fd->fd_omode);
257                 }
258         } else {
259                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
260                        file, file->f_dentry, file->f_dentry->d_name.name);
261         }
262
263         LUSTRE_FPRIVATE(file) = NULL;
264         ll_file_data_put(fd);
265         ll_capa_close(inode);
266
267         RETURN(rc);
268 }
269
270 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
271
272 /* While this returns an error code, fput() the caller does not, so we need
273  * to make every effort to clean up all of our state here.  Also, applications
274  * rarely check close errors and even if an error is returned they will not
275  * re-try the close call.
276  */
277 int ll_file_release(struct inode *inode, struct file *file)
278 {
279         struct ll_file_data *fd;
280         struct ll_sb_info *sbi = ll_i2sbi(inode);
281         struct ll_inode_info *lli = ll_i2info(inode);
282         struct lov_stripe_md *lsm = lli->lli_smd;
283         int rc;
284
285         ENTRY;
286         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
287                inode->i_generation, inode);
288
289 #ifdef CONFIG_FS_POSIX_ACL
290         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
291             inode == inode->i_sb->s_root->d_inode) {
292                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
293
294                 LASSERT(fd != NULL);
295                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
296                         fd->fd_flags &= ~LL_FILE_RMTACL;
297                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
298                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
299                 }
300         }
301 #endif
302
303         if (inode->i_sb->s_root != file->f_dentry)
304                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
305         fd = LUSTRE_FPRIVATE(file);
306         LASSERT(fd != NULL);
307
308         /* The last ref on @file, maybe not the the owner pid of statahead.
309          * Different processes can open the same dir, "ll_opendir_key" means:
310          * it is me that should stop the statahead thread. */
311         if (lli->lli_opendir_key == fd)
312                 ll_stop_statahead(inode, fd);
313
314         if (inode->i_sb->s_root == file->f_dentry) {
315                 LUSTRE_FPRIVATE(file) = NULL;
316                 ll_file_data_put(fd);
317                 RETURN(0);
318         }
319         
320         if (lsm)
321                 lov_test_and_clear_async_rc(lsm);
322         lli->lli_async_rc = 0;
323
324         rc = ll_md_close(sbi->ll_md_exp, inode, file);
325         RETURN(rc);
326 }
327
328 static int ll_intent_file_open(struct file *file, void *lmm,
329                                int lmmsize, struct lookup_intent *itp)
330 {
331         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
332         struct dentry *parent = file->f_dentry->d_parent;
333         const char *name = file->f_dentry->d_name.name;
334         const int len = file->f_dentry->d_name.len;
335         struct md_op_data *op_data;
336         struct ptlrpc_request *req;
337         int rc;
338         ENTRY;
339
340         if (!parent)
341                 RETURN(-ENOENT);
342
343         /* Usually we come here only for NFSD, and we want open lock.
344            But we can also get here with pre 2.6.15 patchless kernels, and in
345            that case that lock is also ok */
346         /* We can also get here if there was cached open handle in revalidate_it
347          * but it disappeared while we were getting from there to ll_file_open.
348          * But this means this file was closed and immediatelly opened which
349          * makes a good candidate for using OPEN lock */
350         /* If lmmsize & lmm are not 0, we are just setting stripe info
351          * parameters. No need for the open lock */
352         if (!lmm && !lmmsize)
353                 itp->it_flags |= MDS_OPEN_LOCK;
354
355         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
356                                       file->f_dentry->d_inode, name, len,
357                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
358         if (IS_ERR(op_data))
359                 RETURN(PTR_ERR(op_data));
360
361         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
362                             0 /*unused */, &req, ll_md_blocking_ast, 0);
363         ll_finish_md_op_data(op_data);
364         if (rc == -ESTALE) {
365                 /* reason for keep own exit path - don`t flood log
366                 * with messages with -ESTALE errors.
367                 */
368                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
369                      it_open_error(DISP_OPEN_OPEN, itp))
370                         GOTO(out, rc);
371                 ll_release_openhandle(file->f_dentry, itp);
372                 GOTO(out_stale, rc);
373         }
374
375         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
376                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
377                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
378                 GOTO(out, rc);
379         }
380
381         if (itp->d.lustre.it_lock_mode)
382                 md_set_lock_data(sbi->ll_md_exp,
383                                  &itp->d.lustre.it_lock_handle, 
384                                  file->f_dentry->d_inode);
385
386         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
387 out:
388         ptlrpc_req_finished(itp->d.lustre.it_data);
389
390 out_stale:
391         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
392         ll_intent_drop_lock(itp);
393
394         RETURN(rc);
395 }
396
397 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
398                        struct lookup_intent *it, struct obd_client_handle *och)
399 {
400         struct ptlrpc_request *req = it->d.lustre.it_data;
401         struct mdt_body *body;
402
403         LASSERT(och);
404
405         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
406         LASSERT(body != NULL);                      /* reply already checked out */
407
408         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
409         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
410         och->och_fid = lli->lli_fid;
411         och->och_flags = it->it_flags;
412         lli->lli_ioepoch = body->ioepoch;
413
414         return md_set_open_replay_data(md_exp, och, req);
415 }
416
417 int ll_local_open(struct file *file, struct lookup_intent *it,
418                   struct ll_file_data *fd, struct obd_client_handle *och)
419 {
420         struct inode *inode = file->f_dentry->d_inode;
421         struct ll_inode_info *lli = ll_i2info(inode);
422         ENTRY;
423
424         LASSERT(!LUSTRE_FPRIVATE(file));
425
426         LASSERT(fd != NULL);
427
428         if (och) {
429                 struct ptlrpc_request *req = it->d.lustre.it_data;
430                 struct mdt_body *body;
431                 int rc;
432
433                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
434                 if (rc)
435                         RETURN(rc);
436
437                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
438                 if ((it->it_flags & FMODE_WRITE) &&
439                     (body->valid & OBD_MD_FLSIZE))
440                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
441                                lli->lli_ioepoch, PFID(&lli->lli_fid));
442         }
443
444         LUSTRE_FPRIVATE(file) = fd;
445         ll_readahead_init(inode, &fd->fd_ras);
446         fd->fd_omode = it->it_flags;
447         RETURN(0);
448 }
449
450 /* Open a file, and (for the very first open) create objects on the OSTs at
451  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
452  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
453  * lli_open_sem to ensure no other process will create objects, send the
454  * stripe MD to the MDS, or try to destroy the objects if that fails.
455  *
456  * If we already have the stripe MD locally then we don't request it in
457  * md_open(), by passing a lmm_size = 0.
458  *
459  * It is up to the application to ensure no other processes open this file
460  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
461  * used.  We might be able to avoid races of that sort by getting lli_open_sem
462  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
463  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
464  */
465 int ll_file_open(struct inode *inode, struct file *file)
466 {
467         struct ll_inode_info *lli = ll_i2info(inode);
468         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
469                                           .it_flags = file->f_flags };
470         struct lov_stripe_md *lsm;
471         struct ptlrpc_request *req = NULL;
472         struct obd_client_handle **och_p;
473         __u64 *och_usecount;
474         struct ll_file_data *fd;
475         int rc = 0, opendir_set = 0;
476         ENTRY;
477
478         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
479                inode->i_generation, inode, file->f_flags);
480
481 #ifdef HAVE_VFS_INTENT_PATCHES
482         it = file->f_it;
483 #else
484         it = file->private_data; /* XXX: compat macro */
485         file->private_data = NULL; /* prevent ll_local_open assertion */
486 #endif
487
488         fd = ll_file_data_get();
489         if (fd == NULL)
490                 RETURN(-ENOMEM);
491
492         if (S_ISDIR(inode->i_mode)) {
493                 spin_lock(&lli->lli_lock);
494                 /* "lli->lli_opendir_pid != 0" means someone has set it.
495                  * "lli->lli_sai != NULL" means the previous statahead has not
496                  *                        been cleanup. */ 
497                 if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) {
498                         opendir_set = 1;
499                         lli->lli_opendir_pid = cfs_curproc_pid();
500                         lli->lli_opendir_key = fd;
501                 } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) {
502                         /* Two cases for this:
503                          * (1) The same process open such directory many times.
504                          * (2) The old process opened the directory, and exited
505                          *     before its children processes. Then new process
506                          *     with the same pid opens such directory before the
507                          *     old process's children processes exit.
508                          * Change the owner to the latest one. */
509                         opendir_set = 2;
510                         lli->lli_opendir_key = fd;
511                 }
512                 spin_unlock(&lli->lli_lock);
513         }
514
515         if (inode->i_sb->s_root == file->f_dentry) {
516                 LUSTRE_FPRIVATE(file) = fd;
517                 RETURN(0);
518         }
519
520         if (!it || !it->d.lustre.it_disposition) {
521                 /* Convert f_flags into access mode. We cannot use file->f_mode,
522                  * because everything but O_ACCMODE mask was stripped from
523                  * there */
524                 if ((oit.it_flags + 1) & O_ACCMODE)
525                         oit.it_flags++;
526                 if (file->f_flags & O_TRUNC)
527                         oit.it_flags |= FMODE_WRITE;
528
529                 /* kernel only call f_op->open in dentry_open.  filp_open calls
530                  * dentry_open after call to open_namei that checks permissions.
531                  * Only nfsd_open call dentry_open directly without checking
532                  * permissions and because of that this code below is safe. */
533                 if (oit.it_flags & FMODE_WRITE)
534                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
535
536                 /* We do not want O_EXCL here, presumably we opened the file
537                  * already? XXX - NFS implications? */
538                 oit.it_flags &= ~O_EXCL;
539
540                 it = &oit;
541         }
542
543 restart:
544         /* Let's see if we have file open on MDS already. */
545         if (it->it_flags & FMODE_WRITE) {
546                 och_p = &lli->lli_mds_write_och;
547                 och_usecount = &lli->lli_open_fd_write_count;
548         } else if (it->it_flags & FMODE_EXEC) {
549                 och_p = &lli->lli_mds_exec_och;
550                 och_usecount = &lli->lli_open_fd_exec_count;
551          } else {
552                 och_p = &lli->lli_mds_read_och;
553                 och_usecount = &lli->lli_open_fd_read_count;
554         }
555         
556         down(&lli->lli_och_sem);
557         if (*och_p) { /* Open handle is present */
558                 if (it_disposition(it, DISP_OPEN_OPEN)) {
559                         /* Well, there's extra open request that we do not need,
560                            let's close it somehow. This will decref request. */
561                         rc = it_open_error(DISP_OPEN_OPEN, it);
562                         if (rc) {
563                                 ll_file_data_put(fd);
564                                 GOTO(out_och_free, rc);
565                         }       
566                         ll_release_openhandle(file->f_dentry, it);
567                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
568                                              LPROC_LL_OPEN);
569                 }
570                 (*och_usecount)++;
571
572                 rc = ll_local_open(file, it, fd, NULL);
573                 if (rc) {
574                         up(&lli->lli_och_sem);
575                         ll_file_data_put(fd);
576                         RETURN(rc);
577                 }
578         } else {
579                 LASSERT(*och_usecount == 0);
580                 if (!it->d.lustre.it_disposition) {
581                         /* We cannot just request lock handle now, new ELC code
582                            means that one of other OPEN locks for this file
583                            could be cancelled, and since blocking ast handler
584                            would attempt to grab och_sem as well, that would
585                            result in a deadlock */
586                         up(&lli->lli_och_sem);
587                         it->it_flags |= O_CHECK_STALE;
588                         rc = ll_intent_file_open(file, NULL, 0, it);
589                         it->it_flags &= ~O_CHECK_STALE;
590                         if (rc) {
591                                 ll_file_data_put(fd);
592                                 GOTO(out_openerr, rc);
593                         }
594
595                         /* Got some error? Release the request */
596                         if (it->d.lustre.it_status < 0) {
597                                 req = it->d.lustre.it_data;
598                                 ptlrpc_req_finished(req);
599                         }
600                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
601                                          &it->d.lustre.it_lock_handle,
602                                          file->f_dentry->d_inode);
603                         goto restart;
604                 }
605                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
606                 if (!*och_p) {
607                         ll_file_data_put(fd);
608                         GOTO(out_och_free, rc = -ENOMEM);
609                 }
610                 (*och_usecount)++;
611                 req = it->d.lustre.it_data;
612
613                 /* md_intent_lock() didn't get a request ref if there was an
614                  * open error, so don't do cleanup on the request here
615                  * (bug 3430) */
616                 /* XXX (green): Should not we bail out on any error here, not
617                  * just open error? */
618                 rc = it_open_error(DISP_OPEN_OPEN, it);
619                 if (rc) {
620                         ll_file_data_put(fd);
621                         GOTO(out_och_free, rc);
622                 }
623
624                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
625                 rc = ll_local_open(file, it, fd, *och_p);
626                 if (rc) {
627                         up(&lli->lli_och_sem);
628                         ll_file_data_put(fd);
629                         GOTO(out_och_free, rc);
630                 }
631         }
632         up(&lli->lli_och_sem);
633
634         /* Must do this outside lli_och_sem lock to prevent deadlock where
635            different kind of OPEN lock for this same inode gets cancelled
636            by ldlm_cancel_lru */
637         if (!S_ISREG(inode->i_mode))
638                 GOTO(out, rc);
639
640         ll_capa_open(inode);
641
642         lsm = lli->lli_smd;
643         if (lsm == NULL) {
644                 if (file->f_flags & O_LOV_DELAY_CREATE ||
645                     !(file->f_mode & FMODE_WRITE)) {
646                         CDEBUG(D_INODE, "object creation was delayed\n");
647                         GOTO(out, rc);
648                 }
649         }
650         file->f_flags &= ~O_LOV_DELAY_CREATE;
651         GOTO(out, rc);
652 out:
653         ptlrpc_req_finished(req);
654         if (req)
655                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
656 out_och_free:
657         if (rc) {
658                 if (*och_p) {
659                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
660                         *och_p = NULL; /* OBD_FREE writes some magic there */
661                         (*och_usecount)--;
662                 }
663                 up(&lli->lli_och_sem);
664 out_openerr:
665                 if (opendir_set == 1) {
666                         lli->lli_opendir_key = NULL;
667                         lli->lli_opendir_pid = 0;
668                 } else if (unlikely(opendir_set == 2)) {
669                         ll_stop_statahead(inode, fd);
670                 }
671         }
672
673         return rc;
674 }
675
676 /* Fills the obdo with the attributes for the inode defined by lsm */
677 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
678 {
679         struct ptlrpc_request_set *set;
680         struct ll_inode_info *lli = ll_i2info(inode);
681         struct lov_stripe_md *lsm = lli->lli_smd;
682
683         struct obd_info oinfo = { { { 0 } } };
684         int rc;
685         ENTRY;
686
687         LASSERT(lsm != NULL);
688
689         oinfo.oi_md = lsm;
690         oinfo.oi_oa = obdo;
691         oinfo.oi_oa->o_id = lsm->lsm_object_id;
692         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
693         oinfo.oi_oa->o_mode = S_IFREG;
694         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
695                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
696                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
697                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
698                                OBD_MD_FLGROUP;
699         oinfo.oi_capa = ll_mdscapa_get(inode);
700
701         set = ptlrpc_prep_set();
702         if (set == NULL) {
703                 CERROR("can't allocate ptlrpc set\n");
704                 rc = -ENOMEM;
705         } else {
706                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
707                 if (rc == 0)
708                         rc = ptlrpc_set_wait(set);
709                 ptlrpc_set_destroy(set);
710         }
711         capa_put(oinfo.oi_capa);
712         if (rc)
713                 RETURN(rc);
714
715         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
716                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
717                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
718
719         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
720         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
721                lli->lli_smd->lsm_object_id, i_size_read(inode),
722                (unsigned long long)inode->i_blocks,
723                (unsigned long)ll_inode_blksize(inode));
724         RETURN(0);
725 }
726
727 static inline void ll_remove_suid(struct inode *inode)
728 {
729         unsigned int mode;
730
731         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
732         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
733
734         /* was any of the uid bits set? */
735         mode &= inode->i_mode;
736         if (mode && !capable(CAP_FSETID)) {
737                 inode->i_mode &= ~mode;
738                 // XXX careful here - we cannot change the size
739         }
740 }
741
742 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
743 {
744         struct ll_inode_info *lli = ll_i2info(inode);
745         struct lov_stripe_md *lsm = lli->lli_smd;
746         struct obd_export *exp = ll_i2dtexp(inode);
747         struct {
748                 char name[16];
749                 struct ldlm_lock *lock;
750                 struct lov_stripe_md *lsm;
751         } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock, .lsm = lsm };
752         __u32 stripe, vallen = sizeof(stripe);
753         int rc;
754         ENTRY;
755
756         if (lsm->lsm_stripe_count == 1)
757                 GOTO(check, stripe = 0);
758
759         /* get our offset in the lov */
760         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
761         if (rc != 0) {
762                 CERROR("obd_get_info: rc = %d\n", rc);
763                 RETURN(rc);
764         }
765         LASSERT(stripe < lsm->lsm_stripe_count);
766
767 check:
768         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
769             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
770                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
771                            lsm->lsm_oinfo[stripe]->loi_id,
772                            lsm->lsm_oinfo[stripe]->loi_gr);
773                 RETURN(-ELDLM_NO_LOCK_DATA);
774         }
775
776         RETURN(stripe);
777 }
778
779 /* Get extra page reference to ensure it is not going away */
780 void ll_pin_extent_cb(void *data)
781 {
782         struct page *page = data;
783         
784         page_cache_get(page);
785
786         return;
787 }
788
789 /* Flush the page from page cache for an extent as its canceled.
790  * Page to remove is delivered as @data.
791  *
792  * No one can dirty the extent until we've finished our work and they cannot
793  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
794  * but other kernel actors could have pages locked.
795  *
796  * If @discard is set, there is no need to write the page if it is dirty.
797  *
798  * Called with the DLM lock held. */
799 int ll_page_removal_cb(void *data, int discard)
800 {
801         int rc;
802         struct page *page = data;
803         struct address_space *mapping;
804  
805         ENTRY;
806
807         /* We have page reference already from ll_pin_page */
808         lock_page(page);
809
810         /* Already truncated by somebody */
811         if (!page->mapping)
812                 GOTO(out, rc = 0);
813         mapping = page->mapping;
814
815         ll_teardown_mmaps(mapping,
816                           (__u64)page->index << PAGE_CACHE_SHIFT,
817                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
818                                                               ~PAGE_CACHE_MASK);        
819         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
820
821         if (!discard && clear_page_dirty_for_io(page)) {
822                 LASSERT(page->mapping);
823                 rc = ll_call_writepage(page->mapping->host, page);
824                 /* either waiting for io to complete or reacquiring
825                  * the lock that the failed writepage released */
826                 lock_page(page);
827                 wait_on_page_writeback(page);
828                 if (rc != 0) {
829                         CERROR("writepage inode %lu(%p) of page %p "
830                                "failed: %d\n", mapping->host->i_ino,
831                                mapping->host, page, rc);
832                         if (rc == -ENOSPC)
833                                 set_bit(AS_ENOSPC, &mapping->flags);
834                         else
835                                 set_bit(AS_EIO, &mapping->flags);
836                 }
837                 set_bit(AS_EIO, &mapping->flags);
838         }
839         if (page->mapping != NULL) {
840                 struct ll_async_page *llap = llap_cast_private(page);
841                 /* checking again to account for writeback's lock_page() */
842                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
843                 if (llap)
844                         ll_ra_accounting(llap, page->mapping);
845                 ll_truncate_complete_page(page);
846         }
847         EXIT;
848 out:
849         LASSERT(!PageWriteback(page));
850         unlock_page(page);
851         page_cache_release(page);
852
853         return 0;
854 }
855
856 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
857                              void *data, int flag)
858 {
859         struct inode *inode;
860         struct ll_inode_info *lli;
861         struct lov_stripe_md *lsm;
862         int stripe;
863         __u64 kms;
864
865         ENTRY;
866
867         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
868                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
869                 LBUG();
870         }
871
872         inode = ll_inode_from_lock(lock);
873         if (inode == NULL)
874                 RETURN(0);
875         lli = ll_i2info(inode);
876         if (lli == NULL)
877                 GOTO(iput, 0);
878         if (lli->lli_smd == NULL)
879                 GOTO(iput, 0);
880         lsm = lli->lli_smd;
881
882         stripe = ll_lock_to_stripe_offset(inode, lock);
883         if (stripe < 0)
884                 GOTO(iput, 0);
885
886         lov_stripe_lock(lsm);
887         lock_res_and_lock(lock);
888         kms = ldlm_extent_shift_kms(lock,
889                                     lsm->lsm_oinfo[stripe]->loi_kms);
890
891         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
892                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
893                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
894         lsm->lsm_oinfo[stripe]->loi_kms = kms;
895         unlock_res_and_lock(lock);
896         lov_stripe_unlock(lsm);
897         ll_queue_done_writing(inode, 0);
898         EXIT;
899 iput:
900         iput(inode);
901
902         return 0;
903 }
904
905 #if 0
906 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
907 {
908         /* XXX ALLOCATE - 160 bytes */
909         struct inode *inode = ll_inode_from_lock(lock);
910         struct ll_inode_info *lli = ll_i2info(inode);
911         struct lustre_handle lockh = { 0 };
912         struct ost_lvb *lvb;
913         int stripe;
914         ENTRY;
915
916         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
917                      LDLM_FL_BLOCK_CONV)) {
918                 LBUG(); /* not expecting any blocked async locks yet */
919                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
920                            "lock, returning");
921                 ldlm_lock_dump(D_OTHER, lock, 0);
922                 ldlm_reprocess_all(lock->l_resource);
923                 RETURN(0);
924         }
925
926         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
927
928         stripe = ll_lock_to_stripe_offset(inode, lock);
929         if (stripe < 0)
930                 goto iput;
931
932         if (lock->l_lvb_len) {
933                 struct lov_stripe_md *lsm = lli->lli_smd;
934                 __u64 kms;
935                 lvb = lock->l_lvb_data;
936                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
937
938                 lock_res_and_lock(lock);
939                 ll_inode_size_lock(inode, 1);
940                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
941                 kms = ldlm_extent_shift_kms(NULL, kms);
942                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
943                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
944                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
945                 lsm->lsm_oinfo[stripe].loi_kms = kms;
946                 ll_inode_size_unlock(inode, 1);
947                 unlock_res_and_lock(lock);
948         }
949
950 iput:
951         iput(inode);
952         wake_up(&lock->l_waitq);
953
954         ldlm_lock2handle(lock, &lockh);
955         ldlm_lock_decref(&lockh, LCK_PR);
956         RETURN(0);
957 }
958 #endif
959
960 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
961 {
962         struct ptlrpc_request *req = reqp;
963         struct inode *inode = ll_inode_from_lock(lock);
964         struct ll_inode_info *lli;
965         struct lov_stripe_md *lsm;
966         struct ost_lvb *lvb;
967         int rc, stripe;
968         ENTRY;
969
970         if (inode == NULL)
971                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
972         lli = ll_i2info(inode);
973         if (lli == NULL)
974                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
975         lsm = lli->lli_smd;
976         if (lsm == NULL)
977                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
978
979         /* First, find out which stripe index this lock corresponds to. */
980         stripe = ll_lock_to_stripe_offset(inode, lock);
981         if (stripe < 0)
982                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
983
984         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
985         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
986                              sizeof(*lvb));
987         rc = req_capsule_server_pack(&req->rq_pill);
988         if (rc) {
989                 CERROR("lustre_pack_reply: %d\n", rc);
990                 GOTO(iput, rc);
991         }
992
993         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
994         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
995         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
996         lvb->lvb_atime = LTIME_S(inode->i_atime);
997         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
998
999         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1000                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1001                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1002                    lvb->lvb_atime, lvb->lvb_ctime);
1003  iput:
1004         iput(inode);
1005
1006  out:
1007         /* These errors are normal races, so we don't want to fill the console
1008          * with messages by calling ptlrpc_error() */
1009         if (rc == -ELDLM_NO_LOCK_DATA)
1010                 lustre_pack_reply(req, 1, NULL, NULL);
1011
1012         req->rq_status = rc;
1013         return rc;
1014 }
1015
1016 static int ll_merge_lvb(struct inode *inode)
1017 {
1018         struct ll_inode_info *lli = ll_i2info(inode);
1019         struct ll_sb_info *sbi = ll_i2sbi(inode);
1020         struct ost_lvb lvb;
1021         int rc;
1022
1023         ENTRY;
1024
1025         ll_inode_size_lock(inode, 1);
1026         inode_init_lvb(inode, &lvb);
1027         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1028         i_size_write(inode, lvb.lvb_size);
1029         inode->i_blocks = lvb.lvb_blocks;
1030
1031         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1032         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1033         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1034         ll_inode_size_unlock(inode, 1);
1035
1036         RETURN(rc);
1037 }
1038
1039 int ll_local_size(struct inode *inode)
1040 {
1041         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1042         struct ll_inode_info *lli = ll_i2info(inode);
1043         struct ll_sb_info *sbi = ll_i2sbi(inode);
1044         struct lustre_handle lockh = { 0 };
1045         int flags = 0;
1046         int rc;
1047         ENTRY;
1048
1049         if (lli->lli_smd->lsm_stripe_count == 0)
1050                 RETURN(0);
1051
1052         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1053                        &policy, LCK_PR, &flags, inode, &lockh);
1054         if (rc < 0)
1055                 RETURN(rc);
1056         else if (rc == 0)
1057                 RETURN(-ENODATA);
1058
1059         rc = ll_merge_lvb(inode);
1060         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1061         RETURN(rc);
1062 }
1063
1064 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1065                      lstat_t *st)
1066 {
1067         struct lustre_handle lockh = { 0 };
1068         struct ldlm_enqueue_info einfo = { 0 };
1069         struct obd_info oinfo = { { { 0 } } };
1070         struct ost_lvb lvb;
1071         int rc;
1072
1073         ENTRY;
1074
1075         einfo.ei_type = LDLM_EXTENT;
1076         einfo.ei_mode = LCK_PR;
1077         einfo.ei_cb_bl = osc_extent_blocking_cb;
1078         einfo.ei_cb_cp = ldlm_completion_ast;
1079         einfo.ei_cb_gl = ll_glimpse_callback;
1080         einfo.ei_cbdata = NULL;
1081
1082         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1083         oinfo.oi_lockh = &lockh;
1084         oinfo.oi_md = lsm;
1085         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1086
1087         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1088         if (rc == -ENOENT)
1089                 RETURN(rc);
1090         if (rc != 0) {
1091                 CERROR("obd_enqueue returned rc %d, "
1092                        "returning -EIO\n", rc);
1093                 RETURN(rc > 0 ? -EIO : rc);
1094         }
1095
1096         lov_stripe_lock(lsm);
1097         memset(&lvb, 0, sizeof(lvb));
1098         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1099         st->st_size = lvb.lvb_size;
1100         st->st_blocks = lvb.lvb_blocks;
1101         st->st_mtime = lvb.lvb_mtime;
1102         st->st_atime = lvb.lvb_atime;
1103         st->st_ctime = lvb.lvb_ctime;
1104         lov_stripe_unlock(lsm);
1105
1106         RETURN(rc);
1107 }
1108
1109 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1110  * file (because it prefers KMS over RSS when larger) */
1111 int ll_glimpse_size(struct inode *inode, int ast_flags)
1112 {
1113         struct ll_inode_info *lli = ll_i2info(inode);
1114         struct ll_sb_info *sbi = ll_i2sbi(inode);
1115         struct lustre_handle lockh = { 0 };
1116         struct ldlm_enqueue_info einfo = { 0 };
1117         struct obd_info oinfo = { { { 0 } } };
1118         int rc;
1119         ENTRY;
1120
1121         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1122                 RETURN(0);
1123
1124         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1125
1126         if (!lli->lli_smd) {
1127                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1128                 RETURN(0);
1129         }
1130
1131         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1132          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1133          *       won't revoke any conflicting DLM locks held. Instead,
1134          *       ll_glimpse_callback() will be called on each client
1135          *       holding a DLM lock against this file, and resulting size
1136          *       will be returned for each stripe. DLM lock on [0, EOF] is
1137          *       acquired only if there were no conflicting locks. */
1138         einfo.ei_type = LDLM_EXTENT;
1139         einfo.ei_mode = LCK_PR;
1140         einfo.ei_cb_bl = osc_extent_blocking_cb;
1141         einfo.ei_cb_cp = ldlm_completion_ast;
1142         einfo.ei_cb_gl = ll_glimpse_callback;
1143         einfo.ei_cbdata = inode;
1144
1145         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1146         oinfo.oi_lockh = &lockh;
1147         oinfo.oi_md = lli->lli_smd;
1148         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1149
1150         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1151         if (rc == -ENOENT)
1152                 RETURN(rc);
1153         if (rc != 0) {
1154                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1155                 RETURN(rc > 0 ? -EIO : rc);
1156         }
1157
1158         rc = ll_merge_lvb(inode);
1159
1160         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1161                i_size_read(inode), (unsigned long long)inode->i_blocks);
1162
1163         RETURN(rc);
1164 }
1165
1166 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1167                    struct lov_stripe_md *lsm, int mode,
1168                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1169                    int ast_flags)
1170 {
1171         struct ll_sb_info *sbi = ll_i2sbi(inode);
1172         struct ost_lvb lvb;
1173         struct ldlm_enqueue_info einfo = { 0 };
1174         struct obd_info oinfo = { { { 0 } } };
1175         int rc;
1176         ENTRY;
1177
1178         LASSERT(!lustre_handle_is_used(lockh));
1179         LASSERT(lsm != NULL);
1180
1181         /* don't drop the mmapped file to LRU */
1182         if (mapping_mapped(inode->i_mapping))
1183                 ast_flags |= LDLM_FL_NO_LRU;
1184
1185         /* XXX phil: can we do this?  won't it screw the file size up? */
1186         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1187             (sbi->ll_flags & LL_SBI_NOLCK))
1188                 RETURN(0);
1189
1190         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1191                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1192
1193         einfo.ei_type = LDLM_EXTENT;
1194         einfo.ei_mode = mode;
1195         einfo.ei_cb_bl = osc_extent_blocking_cb;
1196         einfo.ei_cb_cp = ldlm_completion_ast;
1197         einfo.ei_cb_gl = ll_glimpse_callback;
1198         einfo.ei_cbdata = inode;
1199
1200         oinfo.oi_policy = *policy;
1201         oinfo.oi_lockh = lockh;
1202         oinfo.oi_md = lsm;
1203         oinfo.oi_flags = ast_flags;
1204
1205         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1206         *policy = oinfo.oi_policy;
1207         if (rc > 0)
1208                 rc = -EIO;
1209
1210         ll_inode_size_lock(inode, 1);
1211         inode_init_lvb(inode, &lvb);
1212         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1213
1214         if (policy->l_extent.start == 0 &&
1215             policy->l_extent.end == OBD_OBJECT_EOF) {
1216                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1217                  * the kms under both a DLM lock and the
1218                  * ll_inode_size_lock().  If we don't get the
1219                  * ll_inode_size_lock() here we can match the DLM lock and
1220                  * reset i_size from the kms before the truncating path has
1221                  * updated the kms.  generic_file_write can then trust the
1222                  * stale i_size when doing appending writes and effectively
1223                  * cancel the result of the truncate.  Getting the
1224                  * ll_inode_size_lock() after the enqueue maintains the DLM
1225                  * -> ll_inode_size_lock() acquiring order. */
1226                 i_size_write(inode, lvb.lvb_size);
1227                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1228                        inode->i_ino, i_size_read(inode));
1229         }
1230
1231         if (rc == 0) {
1232                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1233                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1234                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1235         }
1236         ll_inode_size_unlock(inode, 1);
1237
1238         RETURN(rc);
1239 }
1240
1241 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1242                      struct lov_stripe_md *lsm, int mode,
1243                      struct lustre_handle *lockh)
1244 {
1245         struct ll_sb_info *sbi = ll_i2sbi(inode);
1246         int rc;
1247         ENTRY;
1248
1249         /* XXX phil: can we do this?  won't it screw the file size up? */
1250         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1251             (sbi->ll_flags & LL_SBI_NOLCK))
1252                 RETURN(0);
1253
1254         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1255
1256         RETURN(rc);
1257 }
1258
1259 static void ll_set_file_contended(struct inode *inode)
1260 {
1261         struct ll_inode_info *lli = ll_i2info(inode);
1262         cfs_time_t now = cfs_time_current();
1263
1264         spin_lock(&lli->lli_lock);
1265         lli->lli_contention_time = now;
1266         lli->lli_flags |= LLIF_CONTENDED;
1267         spin_unlock(&lli->lli_lock);
1268 }
1269
1270 void ll_clear_file_contended(struct inode *inode)
1271 {
1272         struct ll_inode_info *lli = ll_i2info(inode);
1273
1274         spin_lock(&lli->lli_lock);
1275         lli->lli_flags &= ~LLIF_CONTENDED;
1276         spin_unlock(&lli->lli_lock);
1277 }
1278
1279 static int ll_is_file_contended(struct file *file)
1280 {
1281         struct inode *inode = file->f_dentry->d_inode;
1282         struct ll_inode_info *lli = ll_i2info(inode);
1283         struct ll_sb_info *sbi = ll_i2sbi(inode);
1284         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1285         ENTRY;
1286
1287         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1288                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1289                        " osc connect flags = 0x"LPX64"\n",
1290                        sbi->ll_lco.lco_flags);
1291                 RETURN(0);
1292         }
1293         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1294                 RETURN(1);
1295         if (lli->lli_flags & LLIF_CONTENDED) {
1296                 cfs_time_t cur_time = cfs_time_current();
1297                 cfs_time_t retry_time;
1298
1299                 retry_time = cfs_time_add(
1300                         lli->lli_contention_time,
1301                         cfs_time_seconds(sbi->ll_contention_time));
1302                 if (cfs_time_after(cur_time, retry_time)) {
1303                         ll_clear_file_contended(inode);
1304                         RETURN(0);
1305                 }
1306                 RETURN(1);
1307         }
1308         RETURN(0);
1309 }
1310
1311 static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
1312                                  const char *buf, size_t count,
1313                                  loff_t start, loff_t end, int rw)
1314 {
1315         int append;
1316         int tree_locked = 0;
1317         int rc;
1318         struct inode * inode = file->f_dentry->d_inode;
1319         ENTRY;
1320
1321         append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
1322
1323         if (append || !ll_is_file_contended(file)) {
1324                 struct ll_lock_tree_node *node;
1325                 int ast_flags;
1326
1327                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1328                 if (file->f_flags & O_NONBLOCK)
1329                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1330                 node = ll_node_from_inode(inode, start, end,
1331                                           (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
1332                 if (IS_ERR(node)) {
1333                         rc = PTR_ERR(node);
1334                         GOTO(out, rc);
1335                 }
1336                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1337                 rc = ll_tree_lock(tree, node, buf, count, ast_flags);
1338                 if (rc == 0)
1339                         tree_locked = 1;
1340                 else if (rc == -EUSERS)
1341                         ll_set_file_contended(inode);
1342                 else
1343                         GOTO(out, rc);
1344         }
1345         RETURN(tree_locked);
1346 out:
1347         return rc;
1348 }
1349
1350 /**
1351  * Checks if requested extent lock is compatible with a lock under a page.
1352  *
1353  * Checks if the lock under \a page is compatible with a read or write lock
1354  * (specified by \a rw) for an extent [\a start , \a end].
1355  *
1356  * \param page the page under which lock is considered
1357  * \param rw OBD_BRW_READ if requested for reading,
1358  *           OBD_BRW_WRITE if requested for writing
1359  * \param start start of the requested extent
1360  * \param end end of the requested extent
1361  * \param cookie transparent parameter for passing locking context
1362  *
1363  * \post result == 1, *cookie == context, appropriate lock is referenced or
1364  * \post result == 0
1365  *
1366  * \retval 1 owned lock is reused for the request
1367  * \retval 0 no lock reused for the request
1368  *
1369  * \see ll_release_short_lock
1370  */
1371 static int ll_reget_short_lock(struct page *page, int rw,
1372                                obd_off start, obd_off end,
1373                                void **cookie)
1374 {
1375         struct ll_async_page *llap;
1376         struct obd_export *exp;
1377         struct inode *inode = page->mapping->host;
1378
1379         ENTRY;
1380
1381         exp = ll_i2dtexp(inode);
1382         if (exp == NULL)
1383                 RETURN(0);
1384
1385         llap = llap_cast_private(page);
1386         if (llap == NULL)
1387                 RETURN(0);
1388
1389         RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
1390                                     &llap->llap_cookie, rw, start, end,
1391                                     cookie));
1392 }
1393
1394 /**
1395  * Releases a reference to a lock taken in a "fast" way.
1396  *
1397  * Releases a read or a write (specified by \a rw) lock
1398  * referenced by \a cookie.
1399  *
1400  * \param inode inode to which data belong
1401  * \param end end of the locked extent
1402  * \param rw OBD_BRW_READ if requested for reading,
1403  *           OBD_BRW_WRITE if requested for writing
1404  * \param cookie transparent parameter for passing locking context
1405  *
1406  * \post appropriate lock is dereferenced
1407  *
1408  * \see ll_reget_short_lock
1409  */
1410 static void ll_release_short_lock(struct inode *inode, obd_off end,
1411                                   void *cookie, int rw)
1412 {
1413         struct obd_export *exp;
1414         int rc;
1415
1416         exp = ll_i2dtexp(inode);
1417         if (exp == NULL)
1418                 return;
1419
1420         rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
1421                                     cookie, rw);
1422         if (rc < 0)
1423                 CERROR("unlock failed (%d)\n", rc);
1424 }
1425
1426 /**
1427  * Checks if requested extent lock is compatible
1428  * with a lock under a page in page cache.
1429  *
1430  * Checks if a lock under some \a page is compatible with a read or write lock
1431  * (specified by \a rw) for an extent [\a start , \a end].
1432  *
1433  * \param file the file under which lock is considered
1434  * \param rw OBD_BRW_READ if requested for reading,
1435  *           OBD_BRW_WRITE if requested for writing
1436  * \param ppos start of the requested extent
1437  * \param end end of the requested extent
1438  * \param cookie transparent parameter for passing locking context
1439  * \param buf userspace buffer for the data
1440  *
1441  * \post result == 1, *cookie == context, appropriate lock is referenced
1442  * \post retuls == 0
1443  *
1444  * \retval 1 owned lock is reused for the request
1445  * \retval 0 no lock reused for the request
1446  *
1447  * \see ll_file_put_fast_lock
1448  */
1449 static inline int ll_file_get_fast_lock(struct file *file,
1450                                         obd_off ppos, obd_off end,
1451                                         char *buf, void **cookie, int rw)
1452 {
1453         int rc = 0;
1454         struct page *page;
1455
1456         ENTRY;
1457
1458         if (!ll_region_mapped((unsigned long)buf, end - ppos)) {
1459                 page = find_lock_page(file->f_dentry->d_inode->i_mapping,
1460                                       ppos >> CFS_PAGE_SHIFT);
1461                 if (page) {
1462                         if (ll_reget_short_lock(page, rw, ppos, end, cookie))
1463                                 rc = 1;
1464
1465                         unlock_page(page);
1466                         page_cache_release(page);
1467                 }
1468         }
1469
1470         RETURN(rc);
1471 }
1472
1473 /**
1474  * Releases a reference to a lock taken in a "fast" way.
1475  *
1476  * Releases a read or a write (specified by \a rw) lock
1477  * referenced by \a cookie.
1478  *
1479  * \param inode inode to which data belong
1480  * \param end end of the locked extent
1481  * \param rw OBD_BRW_READ if requested for reading,
1482  *           OBD_BRW_WRITE if requested for writing
1483  * \param cookie transparent parameter for passing locking context
1484  *
1485  * \post appropriate lock is dereferenced
1486  *
1487  * \see ll_file_get_fast_lock
1488  */
1489 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
1490                                          void *cookie, int rw)
1491 {
1492         ll_release_short_lock(inode, end, cookie, rw);
1493 }
1494
1495 enum ll_lock_style {
1496         LL_LOCK_STYLE_NOLOCK   = 0,
1497         LL_LOCK_STYLE_FASTLOCK = 1,
1498         LL_LOCK_STYLE_TREELOCK = 2
1499 };
1500
1501 /**
1502  * Checks if requested extent lock is compatible with a lock 
1503  * under a page cache page.
1504  *
1505  * Checks if the lock under \a page is compatible with a read or write lock
1506  * (specified by \a rw) for an extent [\a start , \a end].
1507  *
1508  * \param file file under which I/O is processed
1509  * \param rw OBD_BRW_READ if requested for reading,
1510  *           OBD_BRW_WRITE if requested for writing
1511  * \param ppos start of the requested extent
1512  * \param end end of the requested extent
1513  * \param cookie transparent parameter for passing locking context
1514  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1515  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1516  * \param buf userspace buffer for the data
1517  *
1518  * \retval LL_LOCK_STYLE_FASTLOCK owned lock is reused through fast lock
1519  * \retval LL_LOCK_STYLE_TREELOCK got a lock through tree lock
1520  * \retval LL_LOCK_STYLE_NOLOCK got no lock
1521  *
1522  * \see ll_file_put_lock
1523  */
1524 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
1525                                    obd_off end, char *buf, void **cookie,
1526                                    struct ll_lock_tree *tree, int rw)
1527 {
1528         int rc;
1529
1530         ENTRY;
1531
1532         if (ll_file_get_fast_lock(file, ppos, end, buf, cookie, rw))
1533                 RETURN(LL_LOCK_STYLE_FASTLOCK);
1534
1535         rc = ll_file_get_tree_lock(tree, file, buf, ppos - end, ppos, end, rw);
1536         /* rc: 1 for tree lock, 0 for no lock, <0 for error */
1537         switch (rc) {
1538         case 1:
1539                 RETURN(LL_LOCK_STYLE_TREELOCK);
1540         case 0:
1541                 RETURN(LL_LOCK_STYLE_NOLOCK);
1542         }
1543
1544         /* an error happened if we reached this point, rc = -errno here */
1545         RETURN(rc);
1546 }
1547
1548 /**
1549  * Drops the lock taken by ll_file_get_lock.
1550  *
1551  * Releases a read or a write (specified by \a rw) lock
1552  * referenced by \a tree or \a cookie.
1553  *
1554  * \param inode inode to which data belong
1555  * \param end end of the locked extent
1556  * \param lockstyle facility through which the lock was taken
1557  * \param rw OBD_BRW_READ if requested for reading,
1558  *           OBD_BRW_WRITE if requested for writing
1559  * \param cookie transparent parameter for passing locking context
1560  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1561  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1562  *
1563  * \post appropriate lock is dereferenced
1564  *
1565  * \see ll_file_get_lock
1566  */
1567 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
1568                                     enum ll_lock_style lock_style,
1569                                     void *cookie, struct ll_lock_tree *tree,
1570                                     int rw)
1571
1572 {
1573         switch (lock_style) {
1574         case LL_LOCK_STYLE_TREELOCK:
1575                 ll_tree_unlock(tree);
1576                 break;
1577         case LL_LOCK_STYLE_FASTLOCK:
1578                 ll_file_put_fast_lock(inode, end, cookie, rw);
1579                 break;
1580         default:
1581                 CERROR("invalid locking style (%d)\n", lock_style);
1582         }
1583 }
1584
1585 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1586                             loff_t *ppos)
1587 {
1588         struct inode *inode = file->f_dentry->d_inode;
1589         struct ll_inode_info *lli = ll_i2info(inode);
1590         struct lov_stripe_md *lsm = lli->lli_smd;
1591         struct ll_sb_info *sbi = ll_i2sbi(inode);
1592         struct ll_lock_tree tree;
1593         struct ost_lvb lvb;
1594         struct ll_ra_read bead;
1595         int ra = 0;
1596         obd_off end;
1597         ssize_t retval, chunk, sum = 0;
1598         int lock_style;
1599         void *cookie;
1600
1601         __u64 kms;
1602         ENTRY;
1603         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1604                inode->i_ino, inode->i_generation, inode, count, *ppos);
1605         /* "If nbyte is 0, read() will return 0 and have no other results."
1606          *                      -- Single Unix Spec */
1607         if (count == 0)
1608                 RETURN(0);
1609
1610         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1611
1612         if (!lsm) {
1613                 /* Read on file with no objects should return zero-filled
1614                  * buffers up to file size (we can get non-zero sizes with
1615                  * mknod + truncate, then opening file for read. This is a
1616                  * common pattern in NFS case, it seems). Bug 6243 */
1617                 int notzeroed;
1618                 /* Since there are no objects on OSTs, we have nothing to get
1619                  * lock on and so we are forced to access inode->i_size
1620                  * unguarded */
1621
1622                 /* Read beyond end of file */
1623                 if (*ppos >= i_size_read(inode))
1624                         RETURN(0);
1625
1626                 if (count > i_size_read(inode) - *ppos)
1627                         count = i_size_read(inode) - *ppos;
1628                 /* Make sure to correctly adjust the file pos pointer for
1629                  * EFAULT case */
1630                 notzeroed = clear_user(buf, count);
1631                 count -= notzeroed;
1632                 *ppos += count;
1633                 if (!count)
1634                         RETURN(-EFAULT);
1635                 RETURN(count);
1636         }
1637 repeat:
1638         if (sbi->ll_max_rw_chunk != 0) {
1639                 /* first, let's know the end of the current stripe */
1640                 end = *ppos;
1641                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, &end);
1642
1643                 /* correct, the end is beyond the request */
1644                 if (end > *ppos + count - 1)
1645                         end = *ppos + count - 1;
1646
1647                 /* and chunk shouldn't be too large even if striping is wide */
1648                 if (end - *ppos > sbi->ll_max_rw_chunk)
1649                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1650         } else {
1651                 end = *ppos + count - 1;
1652         }
1653
1654         lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
1655                                       buf, &cookie, &tree, OBD_BRW_READ);
1656         if (lock_style < 0)
1657                 GOTO(out, retval = lock_style);
1658
1659         ll_inode_size_lock(inode, 1);
1660         /*
1661          * Consistency guarantees: following possibilities exist for the
1662          * relation between region being read and real file size at this
1663          * moment:
1664          *
1665          *  (A): the region is completely inside of the file;
1666          *
1667          *  (B-x): x bytes of region are inside of the file, the rest is
1668          *  outside;
1669          *
1670          *  (C): the region is completely outside of the file.
1671          *
1672          * This classification is stable under DLM lock acquired by
1673          * ll_tree_lock() above, because to change class, other client has to
1674          * take DLM lock conflicting with our lock. Also, any updates to
1675          * ->i_size by other threads on this client are serialized by
1676          * ll_inode_size_lock(). This guarantees that short reads are handled
1677          * correctly in the face of concurrent writes and truncates.
1678          */
1679         inode_init_lvb(inode, &lvb);
1680         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1681         kms = lvb.lvb_size;
1682         if (*ppos + count - 1 > kms) {
1683                 /* A glimpse is necessary to determine whether we return a
1684                  * short read (B) or some zeroes at the end of the buffer (C) */
1685                 ll_inode_size_unlock(inode, 1);
1686                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1687                 if (retval) {
1688                         if (lock_style != LL_LOCK_STYLE_NOLOCK)
1689                                 ll_file_put_lock(inode, end, lock_style,
1690                                                  cookie, &tree, OBD_BRW_READ);
1691                         goto out;
1692                 }
1693         } else {
1694                 /* region is within kms and, hence, within real file size (A).
1695                  * We need to increase i_size to cover the read region so that
1696                  * generic_file_read() will do its job, but that doesn't mean
1697                  * the kms size is _correct_, it is only the _minimum_ size.
1698                  * If someone does a stat they will get the correct size which
1699                  * will always be >= the kms value here.  b=11081 */
1700                 if (i_size_read(inode) < kms)
1701                         i_size_write(inode, kms);
1702                 ll_inode_size_unlock(inode, 1);
1703         }
1704
1705         chunk = end - *ppos + 1;
1706         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1707                inode->i_ino, chunk, *ppos, i_size_read(inode));
1708
1709         if (lock_style != LL_LOCK_STYLE_NOLOCK) {
1710                 /* turn off the kernel's read-ahead */
1711                 file->f_ra.ra_pages = 0;
1712
1713                 /* initialize read-ahead window once per syscall */
1714                 if (ra == 0) {
1715                         ra = 1;
1716                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1717                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1718                         ll_ra_read_in(file, &bead);
1719                 }
1720
1721                 /* BUG: 5972 */
1722                 file_accessed(file);
1723                 retval = generic_file_read(file, buf, chunk, ppos);
1724                 ll_file_put_lock(inode, end, lock_style, cookie, &tree, 
1725                                  OBD_BRW_READ);
1726         } else {
1727                 retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
1728         }
1729
1730         ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
1731
1732         if (retval > 0) {
1733                 buf += retval;
1734                 count -= retval;
1735                 sum += retval;
1736                 if (retval == chunk && count > 0)
1737                         goto repeat;
1738         }
1739
1740  out:
1741         if (ra != 0)
1742                 ll_ra_read_ex(file, &bead);
1743         retval = (sum > 0) ? sum : retval;
1744         RETURN(retval);
1745 }
1746
1747 /*
1748  * Write to a file (through the page cache).
1749  */
1750 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1751                              loff_t *ppos)
1752 {
1753         struct inode *inode = file->f_dentry->d_inode;
1754         struct ll_sb_info *sbi = ll_i2sbi(inode);
1755         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1756         struct ll_lock_tree tree;
1757         loff_t maxbytes = ll_file_maxbytes(inode);
1758         loff_t lock_start, lock_end, end;
1759         ssize_t retval, chunk, sum = 0;
1760         int tree_locked;
1761         ENTRY;
1762
1763         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1764                inode->i_ino, inode->i_generation, inode, count, *ppos);
1765
1766         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1767
1768         /* POSIX, but surprised the VFS doesn't check this already */
1769         if (count == 0)
1770                 RETURN(0);
1771
1772         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1773          * called on the file, don't fail the below assertion (bug 2388). */
1774         if (file->f_flags & O_LOV_DELAY_CREATE &&
1775             ll_i2info(inode)->lli_smd == NULL)
1776                 RETURN(-EBADF);
1777
1778         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1779
1780         down(&ll_i2info(inode)->lli_write_sem);
1781
1782 repeat:
1783         chunk = 0; /* just to fix gcc's warning */
1784         end = *ppos + count - 1;
1785
1786         if (file->f_flags & O_APPEND) {
1787                 lock_start = 0;
1788                 lock_end = OBD_OBJECT_EOF;
1789         } else if (sbi->ll_max_rw_chunk != 0) {
1790                 /* first, let's know the end of the current stripe */
1791                 end = *ppos;
1792                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1793                                 (obd_off *)&end);
1794
1795                 /* correct, the end is beyond the request */
1796                 if (end > *ppos + count - 1)
1797                         end = *ppos + count - 1;
1798
1799                 /* and chunk shouldn't be too large even if striping is wide */
1800                 if (end - *ppos > sbi->ll_max_rw_chunk)
1801                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1802                 lock_start = *ppos;
1803                 lock_end = end;
1804         } else {
1805                 lock_start = *ppos;
1806                 lock_end = *ppos + count - 1;
1807         }
1808
1809         tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
1810                                             lock_start, lock_end, OBD_BRW_WRITE);
1811         if (tree_locked < 0)
1812                 GOTO(out, retval = tree_locked);
1813
1814         /* This is ok, g_f_w will overwrite this under i_sem if it races
1815          * with a local truncate, it just makes our maxbyte checking easier.
1816          * The i_size value gets updated in ll_extent_lock() as a consequence
1817          * of the [0,EOF] extent lock we requested above. */
1818         if (file->f_flags & O_APPEND) {
1819                 *ppos = i_size_read(inode);
1820                 end = *ppos + count - 1;
1821         }
1822
1823         if (*ppos >= maxbytes) {
1824                 send_sig(SIGXFSZ, current, 0);
1825                 GOTO(out_unlock, retval = -EFBIG);
1826         }
1827         if (end > maxbytes - 1)
1828                 end = maxbytes - 1;
1829
1830         /* generic_file_write handles O_APPEND after getting i_mutex */
1831         chunk = end - *ppos + 1;
1832         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1833                inode->i_ino, chunk, *ppos);
1834         if (tree_locked)
1835                 retval = generic_file_write(file, buf, chunk, ppos);
1836         else
1837                 retval = ll_file_lockless_io(file, (char*)buf, chunk,
1838                                              ppos, WRITE);
1839         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1840
1841 out_unlock:
1842         if (tree_locked)
1843                 ll_tree_unlock(&tree);
1844
1845 out:
1846         if (retval > 0) {
1847                 buf += retval;
1848                 count -= retval;
1849                 sum += retval;
1850                 if (retval == chunk && count > 0)
1851                         goto repeat;
1852         }
1853
1854         up(&ll_i2info(inode)->lli_write_sem);
1855
1856         retval = (sum > 0) ? sum : retval;
1857         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1858                            retval > 0 ? retval : 0);
1859         RETURN(retval);
1860 }
1861
1862 /*
1863  * Send file content (through pagecache) somewhere with helper
1864  */
1865 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1866                                 read_actor_t actor, void *target)
1867 {
1868         struct inode *inode = in_file->f_dentry->d_inode;
1869         struct ll_inode_info *lli = ll_i2info(inode);
1870         struct lov_stripe_md *lsm = lli->lli_smd;
1871         struct ll_lock_tree tree;
1872         struct ll_lock_tree_node *node;
1873         struct ost_lvb lvb;
1874         struct ll_ra_read bead;
1875         int rc;
1876         ssize_t retval;
1877         __u64 kms;
1878         ENTRY;
1879         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1880                inode->i_ino, inode->i_generation, inode, count, *ppos);
1881
1882         /* "If nbyte is 0, read() will return 0 and have no other results."
1883          *                      -- Single Unix Spec */
1884         if (count == 0)
1885                 RETURN(0);
1886
1887         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1888         /* turn off the kernel's read-ahead */
1889         in_file->f_ra.ra_pages = 0;
1890
1891         /* File with no objects, nothing to lock */
1892         if (!lsm)
1893                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1894
1895         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1896         if (IS_ERR(node))
1897                 RETURN(PTR_ERR(node));
1898
1899         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1900         rc = ll_tree_lock(&tree, node, NULL, count,
1901                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1902         if (rc != 0)
1903                 RETURN(rc);
1904
1905         ll_clear_file_contended(inode);
1906         ll_inode_size_lock(inode, 1);
1907         /*
1908          * Consistency guarantees: following possibilities exist for the
1909          * relation between region being read and real file size at this
1910          * moment:
1911          *
1912          *  (A): the region is completely inside of the file;
1913          *
1914          *  (B-x): x bytes of region are inside of the file, the rest is
1915          *  outside;
1916          *
1917          *  (C): the region is completely outside of the file.
1918          *
1919          * This classification is stable under DLM lock acquired by
1920          * ll_tree_lock() above, because to change class, other client has to
1921          * take DLM lock conflicting with our lock. Also, any updates to
1922          * ->i_size by other threads on this client are serialized by
1923          * ll_inode_size_lock(). This guarantees that short reads are handled
1924          * correctly in the face of concurrent writes and truncates.
1925          */
1926         inode_init_lvb(inode, &lvb);
1927         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1928         kms = lvb.lvb_size;
1929         if (*ppos + count - 1 > kms) {
1930                 /* A glimpse is necessary to determine whether we return a
1931                  * short read (B) or some zeroes at the end of the buffer (C) */
1932                 ll_inode_size_unlock(inode, 1);
1933                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1934                 if (retval)
1935                         goto out;
1936         } else {
1937                 /* region is within kms and, hence, within real file size (A) */
1938                 i_size_write(inode, kms);
1939                 ll_inode_size_unlock(inode, 1);
1940         }
1941
1942         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1943                inode->i_ino, count, *ppos, i_size_read(inode));
1944
1945         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1946         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1947         ll_ra_read_in(in_file, &bead);
1948         /* BUG: 5972 */
1949         file_accessed(in_file);
1950         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1951         ll_ra_read_ex(in_file, &bead);
1952
1953  out:
1954         ll_tree_unlock(&tree);
1955         RETURN(retval);
1956 }
1957
1958 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1959                                unsigned long arg)
1960 {
1961         struct ll_inode_info *lli = ll_i2info(inode);
1962         struct obd_export *exp = ll_i2dtexp(inode);
1963         struct ll_recreate_obj ucreatp;
1964         struct obd_trans_info oti = { 0 };
1965         struct obdo *oa = NULL;
1966         int lsm_size;
1967         int rc = 0;
1968         struct lov_stripe_md *lsm, *lsm2;
1969         ENTRY;
1970
1971         if (!capable (CAP_SYS_ADMIN))
1972                 RETURN(-EPERM);
1973
1974         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1975                             sizeof(struct ll_recreate_obj));
1976         if (rc) {
1977                 RETURN(-EFAULT);
1978         }
1979         OBDO_ALLOC(oa);
1980         if (oa == NULL)
1981                 RETURN(-ENOMEM);
1982
1983         down(&lli->lli_size_sem);
1984         lsm = lli->lli_smd;
1985         if (lsm == NULL)
1986                 GOTO(out, rc = -ENOENT);
1987         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1988                    (lsm->lsm_stripe_count));
1989
1990         OBD_ALLOC(lsm2, lsm_size);
1991         if (lsm2 == NULL)
1992                 GOTO(out, rc = -ENOMEM);
1993
1994         oa->o_id = ucreatp.lrc_id;
1995         oa->o_gr = ucreatp.lrc_group;
1996         oa->o_nlink = ucreatp.lrc_ost_idx;
1997         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1998         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1999         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2000                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2001
2002         memcpy(lsm2, lsm, lsm_size);
2003         rc = obd_create(exp, oa, &lsm2, &oti);
2004
2005         OBD_FREE(lsm2, lsm_size);
2006         GOTO(out, rc);
2007 out:
2008         up(&lli->lli_size_sem);
2009         OBDO_FREE(oa);
2010         return rc;
2011 }
2012
2013 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
2014                              int flags, struct lov_user_md *lum, int lum_size)
2015 {
2016         struct ll_inode_info *lli = ll_i2info(inode);
2017         struct lov_stripe_md *lsm;
2018         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
2019         int rc = 0;
2020         ENTRY;
2021
2022         down(&lli->lli_size_sem);
2023         lsm = lli->lli_smd;
2024         if (lsm) {
2025                 up(&lli->lli_size_sem);
2026                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
2027                        inode->i_ino);
2028                 RETURN(-EEXIST);
2029         }
2030
2031         rc = ll_intent_file_open(file, lum, lum_size, &oit);
2032         if (rc)
2033                 GOTO(out, rc);
2034         if (it_disposition(&oit, DISP_LOOKUP_NEG))
2035                 GOTO(out_req_free, rc = -ENOENT);
2036         rc = oit.d.lustre.it_status;
2037         if (rc < 0)
2038                 GOTO(out_req_free, rc);
2039
2040         ll_release_openhandle(file->f_dentry, &oit);
2041
2042  out:
2043         up(&lli->lli_size_sem);
2044         ll_intent_release(&oit);
2045         RETURN(rc);
2046 out_req_free:
2047         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2048         goto out;
2049 }
2050
2051 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
2052                              struct lov_mds_md **lmmp, int *lmm_size, 
2053                              struct ptlrpc_request **request)
2054 {
2055         struct ll_sb_info *sbi = ll_i2sbi(inode);
2056         struct mdt_body  *body;
2057         struct lov_mds_md *lmm = NULL;
2058         struct ptlrpc_request *req = NULL;
2059         struct obd_capa *oc;
2060         int rc, lmmsize;
2061
2062         rc = ll_get_max_mdsize(sbi, &lmmsize);
2063         if (rc)
2064                 RETURN(rc);
2065
2066         oc = ll_mdscapa_get(inode);
2067         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
2068                              oc, filename, strlen(filename) + 1,
2069                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
2070                              ll_i2suppgid(inode), &req);
2071         capa_put(oc);
2072         if (rc < 0) {
2073                 CDEBUG(D_INFO, "md_getattr_name failed "
2074                        "on %s: rc %d\n", filename, rc);
2075                 GOTO(out, rc);
2076         }
2077
2078         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2079         LASSERT(body != NULL); /* checked by mdc_getattr_name */
2080
2081         lmmsize = body->eadatasize;
2082
2083         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
2084                         lmmsize == 0) {
2085                 GOTO(out, rc = -ENODATA);
2086         }
2087
2088         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
2089         LASSERT(lmm != NULL);
2090
2091         /*
2092          * This is coming from the MDS, so is probably in
2093          * little endian.  We convert it to host endian before
2094          * passing it to userspace.
2095          */
2096         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
2097                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
2098                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
2099         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
2100                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
2101         }
2102
2103         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
2104                 struct lov_stripe_md *lsm;
2105                 struct lov_user_md_join *lmj;
2106                 int lmj_size, i, aindex = 0;
2107
2108                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
2109                 if (rc < 0)
2110                         GOTO(out, rc = -ENOMEM);
2111                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
2112                 if (rc)
2113                         GOTO(out_free_memmd, rc);
2114
2115                 lmj_size = sizeof(struct lov_user_md_join) +
2116                            lsm->lsm_stripe_count *
2117                            sizeof(struct lov_user_ost_data_join);
2118                 OBD_ALLOC(lmj, lmj_size);
2119                 if (!lmj)
2120                         GOTO(out_free_memmd, rc = -ENOMEM);
2121
2122                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
2123                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2124                         struct lov_extent *lex =
2125                                 &lsm->lsm_array->lai_ext_array[aindex];
2126
2127                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
2128                                 aindex ++;
2129                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
2130                                         LPU64" len %d\n", aindex, i,
2131                                         lex->le_start, (int)lex->le_len);
2132                         lmj->lmm_objects[i].l_extent_start =
2133                                 lex->le_start;
2134
2135                         if ((int)lex->le_len == -1)
2136                                 lmj->lmm_objects[i].l_extent_end = -1;
2137                         else
2138                                 lmj->lmm_objects[i].l_extent_end =
2139                                         lex->le_start + lex->le_len;
2140                         lmj->lmm_objects[i].l_object_id =
2141                                 lsm->lsm_oinfo[i]->loi_id;
2142                         lmj->lmm_objects[i].l_object_gr =
2143                                 lsm->lsm_oinfo[i]->loi_gr;
2144                         lmj->lmm_objects[i].l_ost_gen =
2145                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2146                         lmj->lmm_objects[i].l_ost_idx =
2147                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2148                 }
2149                 lmm = (struct lov_mds_md *)lmj;
2150                 lmmsize = lmj_size;
2151 out_free_memmd:
2152                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
2153         }
2154 out:
2155         *lmmp = lmm;
2156         *lmm_size = lmmsize;
2157         *request = req;
2158         return rc;
2159 }
2160
2161 static int ll_lov_setea(struct inode *inode, struct file *file,
2162                             unsigned long arg)
2163 {
2164         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2165         struct lov_user_md  *lump;
2166         int lum_size = sizeof(struct lov_user_md) +
2167                        sizeof(struct lov_user_ost_data);
2168         int rc;
2169         ENTRY;
2170
2171         if (!capable (CAP_SYS_ADMIN))
2172                 RETURN(-EPERM);
2173
2174         OBD_ALLOC(lump, lum_size);
2175         if (lump == NULL) {
2176                 RETURN(-ENOMEM);
2177         }
2178         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2179         if (rc) {
2180                 OBD_FREE(lump, lum_size);
2181                 RETURN(-EFAULT);
2182         }
2183
2184         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2185
2186         OBD_FREE(lump, lum_size);
2187         RETURN(rc);
2188 }
2189
2190 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2191                             unsigned long arg)
2192 {
2193         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
2194         int rc;
2195         int flags = FMODE_WRITE;
2196         ENTRY;
2197
2198         /* Bug 1152: copy properly when this is no longer true */
2199         LASSERT(sizeof(lum) == sizeof(*lump));
2200         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
2201         rc = copy_from_user(&lum, lump, sizeof(lum));
2202         if (rc)
2203                 RETURN(-EFAULT);
2204
2205         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
2206         if (rc == 0) {
2207                  put_user(0, &lump->lmm_stripe_count);
2208                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
2209                                     0, ll_i2info(inode)->lli_smd, lump);
2210         }
2211         RETURN(rc);
2212 }
2213
2214 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2215 {
2216         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2217
2218         if (!lsm)
2219                 RETURN(-ENODATA);
2220
2221         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
2222                             (void *)arg);
2223 }
2224
2225 static int ll_get_grouplock(struct inode *inode, struct file *file,
2226                             unsigned long arg)
2227 {
2228         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2229         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2230                                                     .end = OBD_OBJECT_EOF}};
2231         struct lustre_handle lockh = { 0 };
2232         struct ll_inode_info *lli = ll_i2info(inode);
2233         struct lov_stripe_md *lsm = lli->lli_smd;
2234         int flags = 0, rc;
2235         ENTRY;
2236
2237         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2238                 RETURN(-EINVAL);
2239         }
2240
2241         policy.l_extent.gid = arg;
2242         if (file->f_flags & O_NONBLOCK)
2243                 flags = LDLM_FL_BLOCK_NOWAIT;
2244
2245         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2246         if (rc)
2247                 RETURN(rc);
2248
2249         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2250         fd->fd_gid = arg;
2251         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2252
2253         RETURN(0);
2254 }
2255
2256 static int ll_put_grouplock(struct inode *inode, struct file *file,
2257                             unsigned long arg)
2258 {
2259         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2260         struct ll_inode_info *lli = ll_i2info(inode);
2261         struct lov_stripe_md *lsm = lli->lli_smd;
2262         int rc;
2263         ENTRY;
2264
2265         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2266                 /* Ugh, it's already unlocked. */
2267                 RETURN(-EINVAL);
2268         }
2269
2270         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2271                 RETURN(-EINVAL);
2272
2273         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2274
2275         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2276         if (rc)
2277                 RETURN(rc);
2278
2279         fd->fd_gid = 0;
2280         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2281
2282         RETURN(0);
2283 }
2284
2285 static int join_sanity_check(struct inode *head, struct inode *tail)
2286 {
2287         ENTRY;
2288         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2289                 CERROR("server do not support join \n");
2290                 RETURN(-EINVAL);
2291         }
2292         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2293                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2294                        head->i_ino, tail->i_ino);
2295                 RETURN(-EINVAL);
2296         }
2297         if (head->i_ino == tail->i_ino) {
2298                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2299                 RETURN(-EINVAL);
2300         }
2301         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2302                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2303                 RETURN(-EINVAL);
2304         }
2305         RETURN(0);
2306 }
2307
2308 static int join_file(struct inode *head_inode, struct file *head_filp,
2309                      struct file *tail_filp)
2310 {
2311         struct dentry *tail_dentry = tail_filp->f_dentry;
2312         struct lookup_intent oit = {.it_op = IT_OPEN,
2313                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2314         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2315                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2316
2317         struct lustre_handle lockh;
2318         struct md_op_data *op_data;
2319         int    rc;
2320         loff_t data;
2321         ENTRY;
2322
2323         tail_dentry = tail_filp->f_dentry;
2324
2325         data = i_size_read(head_inode);
2326         op_data = ll_prep_md_op_data(NULL, head_inode,
2327                                      tail_dentry->d_parent->d_inode,
2328                                      tail_dentry->d_name.name,
2329                                      tail_dentry->d_name.len, 0,
2330                                      LUSTRE_OPC_ANY, &data);
2331         if (IS_ERR(op_data))
2332                 RETURN(PTR_ERR(op_data));
2333
2334         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2335                          op_data, &lockh, NULL, 0, 0);
2336
2337         ll_finish_md_op_data(op_data);
2338         if (rc < 0)
2339                 GOTO(out, rc);
2340
2341         rc = oit.d.lustre.it_status;
2342
2343         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2344                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2345                 ptlrpc_req_finished((struct ptlrpc_request *)
2346                                     oit.d.lustre.it_data);
2347                 GOTO(out, rc);
2348         }
2349
2350         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2351                                            * away */
2352                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2353                 oit.d.lustre.it_lock_mode = 0;
2354         }
2355         ll_release_openhandle(head_filp->f_dentry, &oit);
2356 out:
2357         ll_intent_release(&oit);
2358         RETURN(rc);
2359 }
2360
2361 static int ll_file_join(struct inode *head, struct file *filp,
2362                         char *filename_tail)
2363 {
2364         struct inode *tail = NULL, *first = NULL, *second = NULL;
2365         struct dentry *tail_dentry;
2366         struct file *tail_filp, *first_filp, *second_filp;
2367         struct ll_lock_tree first_tree, second_tree;
2368         struct ll_lock_tree_node *first_node, *second_node;
2369         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2370         int rc = 0, cleanup_phase = 0;
2371         ENTRY;
2372
2373         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2374                head->i_ino, head->i_generation, head, filename_tail);
2375
2376         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2377         if (IS_ERR(tail_filp)) {
2378                 CERROR("Can not open tail file %s", filename_tail);
2379                 rc = PTR_ERR(tail_filp);
2380                 GOTO(cleanup, rc);
2381         }
2382         tail = igrab(tail_filp->f_dentry->d_inode);
2383
2384         tlli = ll_i2info(tail);
2385         tail_dentry = tail_filp->f_dentry;
2386         LASSERT(tail_dentry);
2387         cleanup_phase = 1;
2388
2389         /*reorder the inode for lock sequence*/
2390         first = head->i_ino > tail->i_ino ? head : tail;
2391         second = head->i_ino > tail->i_ino ? tail : head;
2392         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2393         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2394
2395         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2396                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2397         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2398         if (IS_ERR(first_node)){
2399                 rc = PTR_ERR(first_node);
2400                 GOTO(cleanup, rc);
2401         }
2402         first_tree.lt_fd = first_filp->private_data;
2403         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2404         if (rc != 0)
2405                 GOTO(cleanup, rc);
2406         cleanup_phase = 2;
2407
2408         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2409         if (IS_ERR(second_node)){
2410                 rc = PTR_ERR(second_node);
2411                 GOTO(cleanup, rc);
2412         }
2413         second_tree.lt_fd = second_filp->private_data;
2414         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2415         if (rc != 0)
2416                 GOTO(cleanup, rc);
2417         cleanup_phase = 3;
2418
2419         rc = join_sanity_check(head, tail);
2420         if (rc)
2421                 GOTO(cleanup, rc);
2422
2423         rc = join_file(head, filp, tail_filp);
2424         if (rc)
2425                 GOTO(cleanup, rc);
2426 cleanup:
2427         switch (cleanup_phase) {
2428         case 3:
2429                 ll_tree_unlock(&second_tree);
2430                 obd_cancel_unused(ll_i2dtexp(second),
2431                                   ll_i2info(second)->lli_smd, 0, NULL);
2432         case 2:
2433                 ll_tree_unlock(&first_tree);
2434                 obd_cancel_unused(ll_i2dtexp(first),
2435                                   ll_i2info(first)->lli_smd, 0, NULL);
2436         case 1:
2437                 filp_close(tail_filp, 0);
2438                 if (tail)
2439                         iput(tail);
2440                 if (head && rc == 0) {
2441                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2442                                        &hlli->lli_smd);
2443                         hlli->lli_smd = NULL;
2444                 }
2445         case 0:
2446                 break;
2447         default:
2448                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2449                 LBUG();
2450         }
2451         RETURN(rc);
2452 }
2453
2454 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2455 {
2456         struct inode *inode = dentry->d_inode;
2457         struct obd_client_handle *och;
2458         int rc;
2459         ENTRY;
2460
2461         LASSERT(inode);
2462
2463         /* Root ? Do nothing. */
2464         if (dentry->d_inode->i_sb->s_root == dentry)
2465                 RETURN(0);
2466
2467         /* No open handle to close? Move away */
2468         if (!it_disposition(it, DISP_OPEN_OPEN))
2469                 RETURN(0);
2470
2471         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2472
2473         OBD_ALLOC(och, sizeof(*och));
2474         if (!och)
2475                 GOTO(out, rc = -ENOMEM);
2476
2477         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2478                     ll_i2info(inode), it, och);
2479
2480         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2481                                        inode, och);
2482  out:
2483         /* this one is in place of ll_file_open */
2484         ptlrpc_req_finished(it->d.lustre.it_data);
2485         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2486         RETURN(rc);
2487 }
2488
2489 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2490                   unsigned long arg)
2491 {
2492         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2493         int flags;
2494         ENTRY;
2495
2496         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2497                inode->i_generation, inode, cmd);
2498         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2499
2500         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2501         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2502                 RETURN(-ENOTTY);
2503
2504         switch(cmd) {
2505         case LL_IOC_GETFLAGS:
2506                 /* Get the current value of the file flags */
2507                 return put_user(fd->fd_flags, (int *)arg);
2508         case LL_IOC_SETFLAGS:
2509         case LL_IOC_CLRFLAGS:
2510                 /* Set or clear specific file flags */
2511                 /* XXX This probably needs checks to ensure the flags are
2512                  *     not abused, and to handle any flag side effects.
2513                  */
2514                 if (get_user(flags, (int *) arg))
2515                         RETURN(-EFAULT);
2516
2517                 if (cmd == LL_IOC_SETFLAGS) {
2518                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2519                             !(file->f_flags & O_DIRECT)) {
2520                                 CERROR("%s: unable to disable locking on "
2521                                        "non-O_DIRECT file\n", current->comm);
2522                                 RETURN(-EINVAL);
2523                         }
2524
2525                         fd->fd_flags |= flags;
2526                 } else {
2527                         fd->fd_flags &= ~flags;
2528                 }
2529                 RETURN(0);
2530         case LL_IOC_LOV_SETSTRIPE:
2531                 RETURN(ll_lov_setstripe(inode, file, arg));
2532         case LL_IOC_LOV_SETEA:
2533                 RETURN(ll_lov_setea(inode, file, arg));
2534         case LL_IOC_LOV_GETSTRIPE:
2535                 RETURN(ll_lov_getstripe(inode, arg));
2536         case LL_IOC_RECREATE_OBJ:
2537                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2538         case EXT3_IOC_GETFLAGS:
2539         case EXT3_IOC_SETFLAGS:
2540                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2541         case EXT3_IOC_GETVERSION_OLD:
2542         case EXT3_IOC_GETVERSION:
2543                 RETURN(put_user(inode->i_generation, (int *)arg));
2544         case LL_IOC_JOIN: {
2545                 char *ftail;
2546                 int rc;
2547
2548                 ftail = getname((const char *)arg);
2549                 if (IS_ERR(ftail))
2550                         RETURN(PTR_ERR(ftail));
2551                 rc = ll_file_join(inode, file, ftail);
2552                 putname(ftail);
2553                 RETURN(rc);
2554         }
2555         case LL_IOC_GROUP_LOCK:
2556                 RETURN(ll_get_grouplock(inode, file, arg));
2557         case LL_IOC_GROUP_UNLOCK:
2558                 RETURN(ll_put_grouplock(inode, file, arg));
2559         case IOC_OBD_STATFS:
2560                 RETURN(ll_obd_statfs(inode, (void *)arg));
2561
2562         /* We need to special case any other ioctls we want to handle,
2563          * to send them to the MDS/OST as appropriate and to properly
2564          * network encode the arg field.
2565         case EXT3_IOC_SETVERSION_OLD:
2566         case EXT3_IOC_SETVERSION:
2567         */
2568         case LL_IOC_FLUSHCTX:
2569                 RETURN(ll_flush_ctx(inode));
2570         default: {
2571                 int err;
2572
2573                 if (LLIOC_STOP == 
2574                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2575                         RETURN(err);
2576
2577                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2578                                      (void *)arg));
2579         }
2580         }
2581 }
2582
2583 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2584 {
2585         struct inode *inode = file->f_dentry->d_inode;
2586         struct ll_inode_info *lli = ll_i2info(inode);
2587         struct lov_stripe_md *lsm = lli->lli_smd;
2588         loff_t retval;
2589         ENTRY;
2590         retval = offset + ((origin == 2) ? i_size_read(inode) :
2591                            (origin == 1) ? file->f_pos : 0);
2592         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2593                inode->i_ino, inode->i_generation, inode, retval, retval,
2594                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2595         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2596
2597         if (origin == 2) { /* SEEK_END */
2598                 int nonblock = 0, rc;
2599
2600                 if (file->f_flags & O_NONBLOCK)
2601                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2602
2603                 if (lsm != NULL) {
2604                         rc = ll_glimpse_size(inode, nonblock);
2605                         if (rc != 0)
2606                                 RETURN(rc);
2607                 }
2608
2609                 ll_inode_size_lock(inode, 0);
2610                 offset += i_size_read(inode);
2611                 ll_inode_size_unlock(inode, 0);
2612         } else if (origin == 1) { /* SEEK_CUR */
2613                 offset += file->f_pos;
2614         }
2615
2616         retval = -EINVAL;
2617         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2618                 if (offset != file->f_pos) {
2619                         file->f_pos = offset;
2620 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2621                         file->f_reada = 0;
2622                         file->f_version = ++event;
2623 #endif
2624                 }
2625                 retval = offset;
2626         }
2627         
2628         RETURN(retval);
2629 }
2630
2631 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2632 {
2633         struct inode *inode = dentry->d_inode;
2634         struct ll_inode_info *lli = ll_i2info(inode);
2635         struct lov_stripe_md *lsm = lli->lli_smd;
2636         struct ptlrpc_request *req;
2637         struct obd_capa *oc;
2638         int rc, err;
2639         ENTRY;
2640         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2641                inode->i_generation, inode);
2642         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2643
2644         /* fsync's caller has already called _fdata{sync,write}, we want
2645          * that IO to finish before calling the osc and mdc sync methods */
2646         rc = filemap_fdatawait(inode->i_mapping);
2647
2648         /* catch async errors that were recorded back when async writeback
2649          * failed for pages in this mapping. */
2650         err = lli->lli_async_rc;
2651         lli->lli_async_rc = 0;
2652         if (rc == 0)
2653                 rc = err;
2654         if (lsm) {
2655                 err = lov_test_and_clear_async_rc(lsm);
2656                 if (rc == 0)
2657                         rc = err;
2658         }
2659
2660         oc = ll_mdscapa_get(inode);
2661         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2662                       &req);
2663         capa_put(oc);
2664         if (!rc)
2665                 rc = err;
2666         if (!err)
2667                 ptlrpc_req_finished(req);
2668
2669         if (data && lsm) {
2670                 struct obdo *oa;
2671                 
2672                 OBDO_ALLOC(oa);
2673                 if (!oa)
2674                         RETURN(rc ? rc : -ENOMEM);
2675
2676                 oa->o_id = lsm->lsm_object_id;
2677                 oa->o_gr = lsm->lsm_object_gr;
2678                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2679                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2680                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2681                                            OBD_MD_FLGROUP);
2682
2683                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2684                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2685                                0, OBD_OBJECT_EOF, oc);
2686                 capa_put(oc);
2687                 if (!rc)
2688                         rc = err;
2689                 OBDO_FREE(oa);
2690         }
2691
2692         RETURN(rc);
2693 }
2694
2695 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2696 {
2697         struct inode *inode = file->f_dentry->d_inode;
2698         struct ll_sb_info *sbi = ll_i2sbi(inode);
2699         struct ldlm_res_id res_id =
2700                 { .name = { fid_seq(ll_inode2fid(inode)),
2701                             fid_oid(ll_inode2fid(inode)),
2702                             fid_ver(ll_inode2fid(inode)),
2703                             LDLM_FLOCK} };
2704         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2705                 ldlm_flock_completion_ast, NULL, file_lock };
2706         struct lustre_handle lockh = {0};
2707         ldlm_policy_data_t flock;
2708         int flags = 0;
2709         int rc;
2710         ENTRY;
2711
2712         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2713                inode->i_ino, file_lock);
2714
2715         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2716  
2717         if (file_lock->fl_flags & FL_FLOCK) {
2718                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2719                 /* set missing params for flock() calls */
2720                 file_lock->fl_end = OFFSET_MAX;
2721                 file_lock->fl_pid = current->tgid;
2722         }
2723         flock.l_flock.pid = file_lock->fl_pid;
2724         flock.l_flock.start = file_lock->fl_start;
2725         flock.l_flock.end = file_lock->fl_end;
2726
2727         switch (file_lock->fl_type) {
2728         case F_RDLCK:
2729                 einfo.ei_mode = LCK_PR;
2730                 break;
2731         case F_UNLCK:
2732                 /* An unlock request may or may not have any relation to
2733                  * existing locks so we may not be able to pass a lock handle
2734                  * via a normal ldlm_lock_cancel() request. The request may even
2735                  * unlock a byte range in the middle of an existing lock. In
2736                  * order to process an unlock request we need all of the same
2737                  * information that is given with a normal read or write record
2738                  * lock request. To avoid creating another ldlm unlock (cancel)
2739                  * message we'll treat a LCK_NL flock request as an unlock. */
2740                 einfo.ei_mode = LCK_NL;
2741                 break;
2742         case F_WRLCK:
2743                 einfo.ei_mode = LCK_PW;
2744                 break;
2745         default:
2746                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2747                 LBUG();
2748         }
2749
2750         switch (cmd) {
2751         case F_SETLKW:
2752 #ifdef F_SETLKW64
2753         case F_SETLKW64:
2754 #endif
2755                 flags = 0;
2756                 break;
2757         case F_SETLK:
2758 #ifdef F_SETLK64
2759         case F_SETLK64:
2760 #endif
2761                 flags = LDLM_FL_BLOCK_NOWAIT;
2762                 break;
2763         case F_GETLK:
2764 #ifdef F_GETLK64
2765         case F_GETLK64:
2766 #endif
2767                 flags = LDLM_FL_TEST_LOCK;
2768                 /* Save the old mode so that if the mode in the lock changes we
2769                  * can decrement the appropriate reader or writer refcount. */
2770                 file_lock->fl_type = einfo.ei_mode;
2771                 break;
2772         default:
2773                 CERROR("unknown fcntl lock command: %d\n", cmd);
2774                 LBUG();
2775         }
2776
2777         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2778                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2779                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2780
2781         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2782                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2783         if ((file_lock->fl_flags & FL_FLOCK) &&
2784             (rc == 0 || file_lock->fl_type == F_UNLCK))
2785                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2786 #ifdef HAVE_F_OP_FLOCK
2787         if ((file_lock->fl_flags & FL_POSIX) &&
2788             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2789             !(flags & LDLM_FL_TEST_LOCK))
2790                 posix_lock_file_wait(file, file_lock);
2791 #endif
2792
2793         RETURN(rc);
2794 }
2795
2796 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2797 {
2798         ENTRY;
2799
2800         RETURN(-ENOSYS);
2801 }
2802
2803 int ll_have_md_lock(struct inode *inode, __u64 bits)
2804 {
2805         struct lustre_handle lockh;
2806         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2807         struct lu_fid *fid;
2808         int flags;
2809         ENTRY;
2810
2811         if (!inode)
2812                RETURN(0);
2813
2814         fid = &ll_i2info(inode)->lli_fid;
2815         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2816
2817         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2818         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2819                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2820                 RETURN(1);
2821         }
2822         RETURN(0);
2823 }
2824
2825 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2826                             struct lustre_handle *lockh)
2827 {
2828         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2829         struct lu_fid *fid;
2830         ldlm_mode_t rc;
2831         int flags;
2832         ENTRY;
2833
2834         fid = &ll_i2info(inode)->lli_fid;
2835         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2836
2837         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2838         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2839                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2840         RETURN(rc);
2841 }
2842
2843 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2844         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2845                               * and return success */
2846                 inode->i_nlink = 0;
2847                 /* This path cannot be hit for regular files unless in
2848                  * case of obscure races, so no need to to validate
2849                  * size. */
2850                 if (!S_ISREG(inode->i_mode) &&
2851                     !S_ISDIR(inode->i_mode))
2852                         return 0;
2853         }
2854
2855         if (rc) {
2856                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2857                 return -abs(rc);
2858
2859         }
2860
2861         return 0;
2862 }
2863
2864 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2865 {
2866         struct inode *inode = dentry->d_inode;
2867         struct ptlrpc_request *req = NULL;
2868         struct ll_sb_info *sbi;
2869         struct obd_export *exp;
2870         int rc;
2871         ENTRY;
2872
2873         if (!inode) {
2874                 CERROR("REPORT THIS LINE TO PETER\n");
2875                 RETURN(0);
2876         }
2877         sbi = ll_i2sbi(inode);
2878
2879         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2880                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2881
2882         exp = ll_i2mdexp(inode);
2883
2884         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2885                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2886                 struct md_op_data *op_data;
2887
2888                 /* Call getattr by fid, so do not provide name at all. */
2889                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2890                                              dentry->d_inode, NULL, 0, 0,
2891                                              LUSTRE_OPC_ANY, NULL);
2892                 if (IS_ERR(op_data))
2893                         RETURN(PTR_ERR(op_data));
2894
2895                 oit.it_flags |= O_CHECK_STALE;
2896                 rc = md_intent_lock(exp, op_data, NULL, 0,
2897                                     /* we are not interested in name
2898                                        based lookup */
2899                                     &oit, 0, &req,
2900                                     ll_md_blocking_ast, 0);
2901                 ll_finish_md_op_data(op_data);
2902                 oit.it_flags &= ~O_CHECK_STALE;
2903                 if (rc < 0) {
2904                         rc = ll_inode_revalidate_fini(inode, rc);
2905                         GOTO (out, rc);
2906                 }
2907
2908                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2909                 if (rc != 0) {
2910                         ll_intent_release(&oit);
2911                         GOTO(out, rc);
2912                 }
2913
2914                 /* Unlinked? Unhash dentry, so it is not picked up later by
2915                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2916                    here to preserve get_cwd functionality on 2.6.
2917                    Bug 10503 */
2918                 if (!dentry->d_inode->i_nlink) {
2919                         spin_lock(&dcache_lock);
2920                         ll_drop_dentry(dentry);
2921                         spin_unlock(&dcache_lock);
2922                 }
2923
2924                 ll_lookup_finish_locks(&oit, dentry);
2925         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
2926                                                      MDS_INODELOCK_LOOKUP)) {
2927                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2928                 obd_valid valid = OBD_MD_FLGETATTR;
2929                 struct obd_capa *oc;
2930                 int ealen = 0;
2931
2932                 if (S_ISREG(inode->i_mode)) {
2933                         rc = ll_get_max_mdsize(sbi, &ealen);
2934                         if (rc)
2935                                 RETURN(rc);
2936                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2937                 }
2938                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2939                  * capa for this inode. Because we only keep capas of dirs
2940                  * fresh. */
2941                 oc = ll_mdscapa_get(inode);
2942                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2943                                 ealen, &req);
2944                 capa_put(oc);
2945                 if (rc) {
2946                         rc = ll_inode_revalidate_fini(inode, rc);
2947                         RETURN(rc);
2948                 }
2949
2950                 rc = ll_prep_inode(&inode, req, NULL);
2951                 if (rc)
2952                         GOTO(out, rc);
2953         }
2954
2955         /* if object not yet allocated, don't validate size */
2956         if (ll_i2info(inode)->lli_smd == NULL)
2957                 GOTO(out, rc = 0);
2958
2959         /* ll_glimpse_size will prefer locally cached writes if they extend
2960          * the file */
2961         rc = ll_glimpse_size(inode, 0);
2962         EXIT;
2963 out:
2964         ptlrpc_req_finished(req);
2965         return rc;
2966 }
2967
2968 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2969                   struct lookup_intent *it, struct kstat *stat)
2970 {
2971         struct inode *inode = de->d_inode;
2972         int res = 0;
2973
2974         res = ll_inode_revalidate_it(de, it);
2975         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2976
2977         if (res)
2978                 return res;
2979
2980         stat->dev = inode->i_sb->s_dev;
2981         stat->ino = inode->i_ino;
2982         stat->mode = inode->i_mode;
2983         stat->nlink = inode->i_nlink;
2984         stat->uid = inode->i_uid;
2985         stat->gid = inode->i_gid;
2986         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2987         stat->atime = inode->i_atime;
2988         stat->mtime = inode->i_mtime;
2989         stat->ctime = inode->i_ctime;
2990 #ifdef HAVE_INODE_BLKSIZE
2991         stat->blksize = inode->i_blksize;
2992 #else
2993         stat->blksize = 1 << inode->i_blkbits;
2994 #endif
2995
2996         ll_inode_size_lock(inode, 0);
2997         stat->size = i_size_read(inode);
2998         stat->blocks = inode->i_blocks;
2999         ll_inode_size_unlock(inode, 0);
3000
3001         return 0;
3002 }
3003 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3004 {
3005         struct lookup_intent it = { .it_op = IT_GETATTR };
3006
3007         return ll_getattr_it(mnt, de, &it, stat);
3008 }
3009
3010 static
3011 int lustre_check_acl(struct inode *inode, int mask)
3012 {
3013 #ifdef CONFIG_FS_POSIX_ACL
3014         struct ll_inode_info *lli = ll_i2info(inode);
3015         struct posix_acl *acl;
3016         int rc;
3017         ENTRY;
3018
3019         spin_lock(&lli->lli_lock);
3020         acl = posix_acl_dup(lli->lli_posix_acl);
3021         spin_unlock(&lli->lli_lock);
3022
3023         if (!acl)
3024                 RETURN(-EAGAIN);
3025
3026         rc = posix_acl_permission(inode, acl, mask);
3027         posix_acl_release(acl);
3028
3029         RETURN(rc);
3030 #else
3031         return -EAGAIN;
3032 #endif
3033 }
3034
3035 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
3036 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3037 {
3038         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3039                inode->i_ino, inode->i_generation, inode, mask);
3040         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3041                 return lustre_check_remote_perm(inode, mask);
3042         
3043         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3044         return generic_permission(inode, mask, lustre_check_acl);
3045 }
3046 #else
3047 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3048 {
3049         int mode = inode->i_mode;
3050         int rc;
3051
3052         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3053                inode->i_ino, inode->i_generation, inode, mask);
3054
3055         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3056                 return lustre_check_remote_perm(inode, mask);
3057
3058         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3059
3060         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
3061             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
3062                 return -EROFS;
3063         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
3064                 return -EACCES;
3065         if (current->fsuid == inode->i_uid) {
3066                 mode >>= 6;
3067         } else if (1) {
3068                 if (((mode >> 3) & mask & S_IRWXO) != mask)
3069                         goto check_groups;
3070                 rc = lustre_check_acl(inode, mask);
3071                 if (rc == -EAGAIN)
3072                         goto check_groups;
3073                 if (rc == -EACCES)
3074                         goto check_capabilities;
3075                 return rc;
3076         } else {
3077 check_groups:
3078                 if (in_group_p(inode->i_gid))
3079                         mode >>= 3;
3080         }
3081         if ((mode & mask & S_IRWXO) == mask)
3082                 return 0;
3083
3084 check_capabilities:
3085         if (!(mask & MAY_EXEC) ||
3086             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
3087                 if (capable(CAP_DAC_OVERRIDE))
3088                         return 0;
3089
3090         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
3091             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
3092                 return 0;
3093         
3094         return -EACCES;
3095 }
3096 #endif
3097
3098 /* -o localflock - only provides locally consistent flock locks */
3099 struct file_operations ll_file_operations = {
3100         .read           = ll_file_read,
3101         .write          = ll_file_write,
3102         .ioctl          = ll_file_ioctl,
3103         .open           = ll_file_open,
3104         .release        = ll_file_release,
3105         .mmap           = ll_file_mmap,
3106         .llseek         = ll_file_seek,
3107         .sendfile       = ll_file_sendfile,
3108         .fsync          = ll_fsync,
3109 };
3110
3111 struct file_operations ll_file_operations_flock = {
3112         .read           = ll_file_read,
3113         .write          = ll_file_write,
3114         .ioctl          = ll_file_ioctl,
3115         .open           = ll_file_open,
3116         .release        = ll_file_release,
3117         .mmap           = ll_file_mmap,
3118         .llseek         = ll_file_seek,
3119         .sendfile       = ll_file_sendfile,
3120         .fsync          = ll_fsync,
3121 #ifdef HAVE_F_OP_FLOCK
3122         .flock          = ll_file_flock,
3123 #endif
3124         .lock           = ll_file_flock
3125 };
3126
3127 /* These are for -o noflock - to return ENOSYS on flock calls */
3128 struct file_operations ll_file_operations_noflock = {
3129         .read           = ll_file_read,
3130         .write          = ll_file_write,
3131         .ioctl          = ll_file_ioctl,
3132         .open           = ll_file_open,
3133         .release        = ll_file_release,
3134         .mmap           = ll_file_mmap,
3135         .llseek         = ll_file_seek,
3136         .sendfile       = ll_file_sendfile,
3137         .fsync          = ll_fsync,
3138 #ifdef HAVE_F_OP_FLOCK
3139         .flock          = ll_file_noflock,
3140 #endif
3141         .lock           = ll_file_noflock
3142 };
3143
3144 struct inode_operations ll_file_inode_operations = {
3145 #ifdef HAVE_VFS_INTENT_PATCHES
3146         .setattr_raw    = ll_setattr_raw,
3147 #endif
3148         .setattr        = ll_setattr,
3149         .truncate       = ll_truncate,
3150         .getattr        = ll_getattr,
3151         .permission     = ll_inode_permission,
3152         .setxattr       = ll_setxattr,
3153         .getxattr       = ll_getxattr,
3154         .listxattr      = ll_listxattr,
3155         .removexattr    = ll_removexattr,
3156 };
3157
3158 /* dynamic ioctl number support routins */
3159 static struct llioc_ctl_data {
3160         struct rw_semaphore ioc_sem;
3161         struct list_head    ioc_head;
3162 } llioc = { 
3163         __RWSEM_INITIALIZER(llioc.ioc_sem), 
3164         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3165 };
3166
3167
3168 struct llioc_data {
3169         struct list_head        iocd_list;
3170         unsigned int            iocd_size;
3171         llioc_callback_t        iocd_cb;
3172         unsigned int            iocd_count;
3173         unsigned int            iocd_cmd[0];
3174 };
3175
3176 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3177 {
3178         unsigned int size;
3179         struct llioc_data *in_data = NULL;
3180         ENTRY;
3181
3182         if (cb == NULL || cmd == NULL ||
3183             count > LLIOC_MAX_CMD || count < 0)
3184                 RETURN(NULL);
3185
3186         size = sizeof(*in_data) + count * sizeof(unsigned int);
3187         OBD_ALLOC(in_data, size);
3188         if (in_data == NULL)
3189                 RETURN(NULL);
3190
3191         memset(in_data, 0, sizeof(*in_data));
3192         in_data->iocd_size = size;
3193         in_data->iocd_cb = cb;
3194         in_data->iocd_count = count;
3195         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3196
3197         down_write(&llioc.ioc_sem);
3198         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3199         up_write(&llioc.ioc_sem);
3200
3201         RETURN(in_data);
3202 }
3203
3204 void ll_iocontrol_unregister(void *magic)
3205 {
3206         struct llioc_data *tmp;
3207
3208         if (magic == NULL)
3209                 return;
3210
3211         down_write(&llioc.ioc_sem);
3212         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3213                 if (tmp == magic) {
3214                         unsigned int size = tmp->iocd_size;
3215
3216                         list_del(&tmp->iocd_list);
3217                         up_write(&llioc.ioc_sem);
3218
3219                         OBD_FREE(tmp, size);
3220                         return;
3221                 }
3222         }
3223         up_write(&llioc.ioc_sem);
3224
3225         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3226 }
3227
3228 EXPORT_SYMBOL(ll_iocontrol_register);
3229 EXPORT_SYMBOL(ll_iocontrol_unregister);
3230
3231 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
3232                         unsigned int cmd, unsigned long arg, int *rcp)
3233 {
3234         enum llioc_iter ret = LLIOC_CONT;
3235         struct llioc_data *data;
3236         int rc = -EINVAL, i;
3237
3238         down_read(&llioc.ioc_sem);
3239         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3240                 for (i = 0; i < data->iocd_count; i++) {
3241                         if (cmd != data->iocd_cmd[i]) 
3242                                 continue;
3243
3244                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3245                         break;
3246                 }
3247
3248                 if (ret == LLIOC_STOP)
3249                         break;
3250         }
3251         up_read(&llioc.ioc_sem);
3252
3253         if (rcp)
3254                 *rcp = rc;
3255         return ret;
3256 }