Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #include "llite_internal.h"
32
33 /* also used by llite/special.c:ll_special_open() */
34 struct ll_file_data *ll_file_data_get(void)
35 {
36         struct ll_file_data *fd;
37
38         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
39         return fd;
40 }
41
42 static void ll_file_data_put(struct ll_file_data *fd)
43 {
44         if (fd != NULL)
45                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
46 }
47
48 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
49                           struct lustre_handle *fh)
50 {
51         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
52         op_data->op_attr.ia_mode = inode->i_mode;
53         op_data->op_attr.ia_atime = inode->i_atime;
54         op_data->op_attr.ia_mtime = inode->i_mtime;
55         op_data->op_attr.ia_ctime = inode->i_ctime;
56         op_data->op_attr.ia_size = i_size_read(inode);
57         op_data->op_attr_blocks = inode->i_blocks;
58         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
59         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
60         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
61         op_data->op_capa1 = ll_mdscapa_get(inode);
62 }
63
64 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
65                              struct obd_client_handle *och)
66 {
67         ENTRY;
68
69         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
70                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
71
72         if (!(och->och_flags & FMODE_WRITE))
73                 goto out;
74
75         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
76             !S_ISREG(inode->i_mode))
77                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
78         else
79                 ll_epoch_close(inode, op_data, &och, 0);
80
81 out:
82         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
83         EXIT;
84 }
85
86 static int ll_close_inode_openhandle(struct obd_export *md_exp,
87                                      struct inode *inode,
88                                      struct obd_client_handle *och)
89 {
90         struct obd_export *exp = ll_i2mdexp(inode);
91         struct md_op_data *op_data;
92         struct ptlrpc_request *req = NULL;
93         struct obd_device *obd = class_exp2obd(exp);
94         int epoch_close = 1;
95         int seq_end = 0, rc;
96         ENTRY;
97
98         if (obd == NULL) {
99                 /*
100                  * XXX: in case of LMV, is this correct to access
101                  * ->exp_handle?
102                  */
103                 CERROR("Invalid MDC connection handle "LPX64"\n",
104                        ll_i2mdexp(inode)->exp_handle.h_cookie);
105                 GOTO(out, rc = 0);
106         }
107
108         /*
109          * here we check if this is forced umount. If so this is called on
110          * canceling "open lock" and we do not call md_close() in this case, as
111          * it will not be successful, as import is already deactivated.
112          */
113         if (obd->obd_force)
114                 GOTO(out, rc = 0);
115
116         OBD_ALLOC_PTR(op_data);
117         if (op_data == NULL)
118                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
119
120         ll_prepare_close(inode, op_data, och);
121         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
122         rc = md_close(md_exp, op_data, och->och_mod, &req);
123         if (rc != -EAGAIN)
124                 seq_end = 1;
125
126         if (rc == -EAGAIN) {
127                 /* This close must have the epoch closed. */
128                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
129                 LASSERT(epoch_close);
130                 /* MDS has instructed us to obtain Size-on-MDS attribute from
131                  * OSTs and send setattr to back to MDS. */
132                 rc = ll_sizeonmds_update(inode, och->och_mod,
133                                          &och->och_fh, op_data->op_ioepoch);
134                 if (rc) {
135                         CERROR("inode %lu mdc Size-on-MDS update failed: "
136                                "rc = %d\n", inode->i_ino, rc);
137                         rc = 0;
138                 }
139         } else if (rc) {
140                 CERROR("inode %lu mdc close failed: rc = %d\n",
141                        inode->i_ino, rc);
142         }
143         ll_finish_md_op_data(op_data);
144
145         if (rc == 0) {
146                 rc = ll_objects_destroy(req, inode);
147                 if (rc)
148                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
149                                inode->i_ino, rc);
150         }
151
152         EXIT;
153 out:
154       
155         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
156             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
157                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
158         } else {
159                 if (seq_end)
160                         ptlrpc_close_replay_seq(req);
161                 md_clear_open_replay_data(md_exp, och);
162                 /* Free @och if it is not waiting for DONE_WRITING. */
163                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
164                 OBD_FREE_PTR(och);
165         }
166         if (req) /* This is close request */
167                 ptlrpc_req_finished(req);
168         return rc;
169 }
170
171 int ll_md_real_close(struct inode *inode, int flags)
172 {
173         struct ll_inode_info *lli = ll_i2info(inode);
174         struct obd_client_handle **och_p;
175         struct obd_client_handle *och;
176         __u64 *och_usecount;
177         int rc = 0;
178         ENTRY;
179
180         if (flags & FMODE_WRITE) {
181                 och_p = &lli->lli_mds_write_och;
182                 och_usecount = &lli->lli_open_fd_write_count;
183         } else if (flags & FMODE_EXEC) {
184                 och_p = &lli->lli_mds_exec_och;
185                 och_usecount = &lli->lli_open_fd_exec_count;
186         } else {
187                 LASSERT(flags & FMODE_READ);
188                 och_p = &lli->lli_mds_read_och;
189                 och_usecount = &lli->lli_open_fd_read_count;
190         }
191
192         down(&lli->lli_och_sem);
193         if (*och_usecount) { /* There are still users of this handle, so
194                                 skip freeing it. */
195                 up(&lli->lli_och_sem);
196                 RETURN(0);
197         }
198         och=*och_p;
199         *och_p = NULL;
200         up(&lli->lli_och_sem);
201
202         if (och) { /* There might be a race and somebody have freed this och
203                       already */
204                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
205                                                inode, och);
206         }
207
208         RETURN(rc);
209 }
210
211 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
212                 struct file *file)
213 {
214         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
215         struct ll_inode_info *lli = ll_i2info(inode);
216         int rc = 0;
217         ENTRY;
218
219         /* clear group lock, if present */
220         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
221                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
222                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
223                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
224                                       &fd->fd_cwlockh);
225         }
226
227         /* Let's see if we have good enough OPEN lock on the file and if
228            we can skip talking to MDS */
229         if (file->f_dentry->d_inode) { /* Can this ever be false? */
230                 int lockmode;
231                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
232                 struct lustre_handle lockh;
233                 struct inode *inode = file->f_dentry->d_inode;
234                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
235
236                 down(&lli->lli_och_sem);
237                 if (fd->fd_omode & FMODE_WRITE) {
238                         lockmode = LCK_CW;
239                         LASSERT(lli->lli_open_fd_write_count);
240                         lli->lli_open_fd_write_count--;
241                 } else if (fd->fd_omode & FMODE_EXEC) {
242                         lockmode = LCK_PR;
243                         LASSERT(lli->lli_open_fd_exec_count);
244                         lli->lli_open_fd_exec_count--;
245                 } else {
246                         lockmode = LCK_CR;
247                         LASSERT(lli->lli_open_fd_read_count);
248                         lli->lli_open_fd_read_count--;
249                 }
250                 up(&lli->lli_och_sem);
251
252                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
253                                    LDLM_IBITS, &policy, lockmode,
254                                    &lockh)) {
255                         rc = ll_md_real_close(file->f_dentry->d_inode,
256                                               fd->fd_omode);
257                 }
258         } else {
259                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
260                        file, file->f_dentry, file->f_dentry->d_name.name);
261         }
262
263         LUSTRE_FPRIVATE(file) = NULL;
264         ll_file_data_put(fd);
265         ll_capa_close(inode);
266
267         RETURN(rc);
268 }
269
270 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
271
272 /* While this returns an error code, fput() the caller does not, so we need
273  * to make every effort to clean up all of our state here.  Also, applications
274  * rarely check close errors and even if an error is returned they will not
275  * re-try the close call.
276  */
277 int ll_file_release(struct inode *inode, struct file *file)
278 {
279         struct ll_file_data *fd;
280         struct ll_sb_info *sbi = ll_i2sbi(inode);
281         struct ll_inode_info *lli = ll_i2info(inode);
282         struct lov_stripe_md *lsm = lli->lli_smd;
283         int rc;
284
285         ENTRY;
286         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
287                inode->i_generation, inode);
288
289 #ifdef CONFIG_FS_POSIX_ACL
290         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
291             inode == inode->i_sb->s_root->d_inode) {
292                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
293
294                 LASSERT(fd != NULL);
295                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
296                         fd->fd_flags &= ~LL_FILE_RMTACL;
297                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
298                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
299                 }
300         }
301 #endif
302
303         if (inode->i_sb->s_root != file->f_dentry)
304                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
305         fd = LUSTRE_FPRIVATE(file);
306         LASSERT(fd != NULL);
307
308         /* The last ref on @file, maybe not the the owner pid of statahead.
309          * Different processes can open the same dir, "ll_opendir_key" means:
310          * it is me that should stop the statahead thread. */
311         if (lli->lli_opendir_key == fd)
312                 ll_stop_statahead(inode, fd);
313
314         if (inode->i_sb->s_root == file->f_dentry) {
315                 LUSTRE_FPRIVATE(file) = NULL;
316                 ll_file_data_put(fd);
317                 RETURN(0);
318         }
319         
320         if (lsm)
321                 lov_test_and_clear_async_rc(lsm);
322         lli->lli_async_rc = 0;
323
324         rc = ll_md_close(sbi->ll_md_exp, inode, file);
325         RETURN(rc);
326 }
327
328 static int ll_intent_file_open(struct file *file, void *lmm,
329                                int lmmsize, struct lookup_intent *itp)
330 {
331         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
332         struct dentry *parent = file->f_dentry->d_parent;
333         const char *name = file->f_dentry->d_name.name;
334         const int len = file->f_dentry->d_name.len;
335         struct md_op_data *op_data;
336         struct ptlrpc_request *req;
337         int rc;
338         ENTRY;
339
340         if (!parent)
341                 RETURN(-ENOENT);
342
343         /* Usually we come here only for NFSD, and we want open lock.
344            But we can also get here with pre 2.6.15 patchless kernels, and in
345            that case that lock is also ok */
346         /* We can also get here if there was cached open handle in revalidate_it
347          * but it disappeared while we were getting from there to ll_file_open.
348          * But this means this file was closed and immediatelly opened which
349          * makes a good candidate for using OPEN lock */
350         /* If lmmsize & lmm are not 0, we are just setting stripe info
351          * parameters. No need for the open lock */
352         if (!lmm && !lmmsize)
353                 itp->it_flags |= MDS_OPEN_LOCK;
354
355         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
356                                       file->f_dentry->d_inode, name, len,
357                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
358         if (IS_ERR(op_data))
359                 RETURN(PTR_ERR(op_data));
360
361         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
362                             0 /*unused */, &req, ll_md_blocking_ast, 0);
363         ll_finish_md_op_data(op_data);
364         if (rc == -ESTALE) {
365                 /* reason for keep own exit path - don`t flood log
366                 * with messages with -ESTALE errors.
367                 */
368                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
369                      it_open_error(DISP_OPEN_OPEN, itp))
370                         GOTO(out, rc);
371                 ll_release_openhandle(file->f_dentry, itp);
372                 GOTO(out_stale, rc);
373         }
374
375         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
376                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
377                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
378                 GOTO(out, rc);
379         }
380
381         if (itp->d.lustre.it_lock_mode)
382                 md_set_lock_data(sbi->ll_md_exp,
383                                  &itp->d.lustre.it_lock_handle, 
384                                  file->f_dentry->d_inode);
385
386         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
387 out:
388         ptlrpc_req_finished(itp->d.lustre.it_data);
389
390 out_stale:
391         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
392         ll_intent_drop_lock(itp);
393
394         RETURN(rc);
395 }
396
397 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
398                        struct lookup_intent *it, struct obd_client_handle *och)
399 {
400         struct ptlrpc_request *req = it->d.lustre.it_data;
401         struct mdt_body *body;
402
403         LASSERT(och);
404
405         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
406         LASSERT(body != NULL);                      /* reply already checked out */
407
408         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
409         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
410         och->och_fid = lli->lli_fid;
411         och->och_flags = it->it_flags;
412         lli->lli_ioepoch = body->ioepoch;
413
414         return md_set_open_replay_data(md_exp, och, req);
415 }
416
417 int ll_local_open(struct file *file, struct lookup_intent *it,
418                   struct ll_file_data *fd, struct obd_client_handle *och)
419 {
420         struct inode *inode = file->f_dentry->d_inode;
421         struct ll_inode_info *lli = ll_i2info(inode);
422         ENTRY;
423
424         LASSERT(!LUSTRE_FPRIVATE(file));
425
426         LASSERT(fd != NULL);
427
428         if (och) {
429                 struct ptlrpc_request *req = it->d.lustre.it_data;
430                 struct mdt_body *body;
431                 int rc;
432
433                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
434                 if (rc)
435                         RETURN(rc);
436
437                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
438                 if ((it->it_flags & FMODE_WRITE) &&
439                     (body->valid & OBD_MD_FLSIZE))
440                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
441                                lli->lli_ioepoch, PFID(&lli->lli_fid));
442         }
443
444         LUSTRE_FPRIVATE(file) = fd;
445         ll_readahead_init(inode, &fd->fd_ras);
446         fd->fd_omode = it->it_flags;
447         RETURN(0);
448 }
449
450 /* Open a file, and (for the very first open) create objects on the OSTs at
451  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
452  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
453  * lli_open_sem to ensure no other process will create objects, send the
454  * stripe MD to the MDS, or try to destroy the objects if that fails.
455  *
456  * If we already have the stripe MD locally then we don't request it in
457  * md_open(), by passing a lmm_size = 0.
458  *
459  * It is up to the application to ensure no other processes open this file
460  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
461  * used.  We might be able to avoid races of that sort by getting lli_open_sem
462  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
463  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
464  */
465 int ll_file_open(struct inode *inode, struct file *file)
466 {
467         struct ll_inode_info *lli = ll_i2info(inode);
468         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
469                                           .it_flags = file->f_flags };
470         struct lov_stripe_md *lsm;
471         struct ptlrpc_request *req = NULL;
472         struct obd_client_handle **och_p;
473         __u64 *och_usecount;
474         struct ll_file_data *fd;
475         int rc = 0, opendir_set = 0;
476         ENTRY;
477
478         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
479                inode->i_generation, inode, file->f_flags);
480
481 #ifdef HAVE_VFS_INTENT_PATCHES
482         it = file->f_it;
483 #else
484         it = file->private_data; /* XXX: compat macro */
485         file->private_data = NULL; /* prevent ll_local_open assertion */
486 #endif
487
488         fd = ll_file_data_get();
489         if (fd == NULL)
490                 RETURN(-ENOMEM);
491
492         if (S_ISDIR(inode->i_mode)) {
493                 spin_lock(&lli->lli_lock);
494                 /* "lli->lli_opendir_pid != 0" means someone has set it.
495                  * "lli->lli_sai != NULL" means the previous statahead has not
496                  *                        been cleanup. */ 
497                 if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) {
498                         opendir_set = 1;
499                         lli->lli_opendir_pid = cfs_curproc_pid();
500                         lli->lli_opendir_key = fd;
501                 } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) {
502                         /* Two cases for this:
503                          * (1) The same process open such directory many times.
504                          * (2) The old process opened the directory, and exited
505                          *     before its children processes. Then new process
506                          *     with the same pid opens such directory before the
507                          *     old process's children processes exit.
508                          * Change the owner to the latest one. */
509                         opendir_set = 2;
510                         lli->lli_opendir_key = fd;
511                 }
512                 spin_unlock(&lli->lli_lock);
513         }
514
515         if (inode->i_sb->s_root == file->f_dentry) {
516                 LUSTRE_FPRIVATE(file) = fd;
517                 RETURN(0);
518         }
519
520         if (!it || !it->d.lustre.it_disposition) {
521                 /* Convert f_flags into access mode. We cannot use file->f_mode,
522                  * because everything but O_ACCMODE mask was stripped from
523                  * there */
524                 if ((oit.it_flags + 1) & O_ACCMODE)
525                         oit.it_flags++;
526                 if (file->f_flags & O_TRUNC)
527                         oit.it_flags |= FMODE_WRITE;
528
529                 /* kernel only call f_op->open in dentry_open.  filp_open calls
530                  * dentry_open after call to open_namei that checks permissions.
531                  * Only nfsd_open call dentry_open directly without checking
532                  * permissions and because of that this code below is safe. */
533                 if (oit.it_flags & FMODE_WRITE)
534                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
535
536                 /* We do not want O_EXCL here, presumably we opened the file
537                  * already? XXX - NFS implications? */
538                 oit.it_flags &= ~O_EXCL;
539
540                 it = &oit;
541         }
542
543 restart:
544         /* Let's see if we have file open on MDS already. */
545         if (it->it_flags & FMODE_WRITE) {
546                 och_p = &lli->lli_mds_write_och;
547                 och_usecount = &lli->lli_open_fd_write_count;
548         } else if (it->it_flags & FMODE_EXEC) {
549                 och_p = &lli->lli_mds_exec_och;
550                 och_usecount = &lli->lli_open_fd_exec_count;
551          } else {
552                 och_p = &lli->lli_mds_read_och;
553                 och_usecount = &lli->lli_open_fd_read_count;
554         }
555         
556         down(&lli->lli_och_sem);
557         if (*och_p) { /* Open handle is present */
558                 if (it_disposition(it, DISP_OPEN_OPEN)) {
559                         /* Well, there's extra open request that we do not need,
560                            let's close it somehow. This will decref request. */
561                         rc = it_open_error(DISP_OPEN_OPEN, it);
562                         if (rc) {
563                                 ll_file_data_put(fd);
564                                 GOTO(out_och_free, rc);
565                         }       
566                         ll_release_openhandle(file->f_dentry, it);
567                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
568                                              LPROC_LL_OPEN);
569                 }
570                 (*och_usecount)++;
571
572                 rc = ll_local_open(file, it, fd, NULL);
573                 if (rc) {
574                         up(&lli->lli_och_sem);
575                         ll_file_data_put(fd);
576                         RETURN(rc);
577                 }
578         } else {
579                 LASSERT(*och_usecount == 0);
580                 if (!it->d.lustre.it_disposition) {
581                         /* We cannot just request lock handle now, new ELC code
582                            means that one of other OPEN locks for this file
583                            could be cancelled, and since blocking ast handler
584                            would attempt to grab och_sem as well, that would
585                            result in a deadlock */
586                         up(&lli->lli_och_sem);
587                         it->it_flags |= O_CHECK_STALE;
588                         rc = ll_intent_file_open(file, NULL, 0, it);
589                         it->it_flags &= ~O_CHECK_STALE;
590                         if (rc) {
591                                 ll_file_data_put(fd);
592                                 GOTO(out_openerr, rc);
593                         }
594
595                         /* Got some error? Release the request */
596                         if (it->d.lustre.it_status < 0) {
597                                 req = it->d.lustre.it_data;
598                                 ptlrpc_req_finished(req);
599                         }
600                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
601                                          &it->d.lustre.it_lock_handle,
602                                          file->f_dentry->d_inode);
603                         goto restart;
604                 }
605                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
606                 if (!*och_p) {
607                         ll_file_data_put(fd);
608                         GOTO(out_och_free, rc = -ENOMEM);
609                 }
610                 (*och_usecount)++;
611                 req = it->d.lustre.it_data;
612
613                 /* md_intent_lock() didn't get a request ref if there was an
614                  * open error, so don't do cleanup on the request here
615                  * (bug 3430) */
616                 /* XXX (green): Should not we bail out on any error here, not
617                  * just open error? */
618                 rc = it_open_error(DISP_OPEN_OPEN, it);
619                 if (rc) {
620                         ll_file_data_put(fd);
621                         GOTO(out_och_free, rc);
622                 }
623
624                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
625                 rc = ll_local_open(file, it, fd, *och_p);
626                 if (rc) {
627                         up(&lli->lli_och_sem);
628                         ll_file_data_put(fd);
629                         GOTO(out_och_free, rc);
630                 }
631         }
632         up(&lli->lli_och_sem);
633
634         /* Must do this outside lli_och_sem lock to prevent deadlock where
635            different kind of OPEN lock for this same inode gets cancelled
636            by ldlm_cancel_lru */
637         if (!S_ISREG(inode->i_mode))
638                 GOTO(out, rc);
639
640         ll_capa_open(inode);
641
642         lsm = lli->lli_smd;
643         if (lsm == NULL) {
644                 if (file->f_flags & O_LOV_DELAY_CREATE ||
645                     !(file->f_mode & FMODE_WRITE)) {
646                         CDEBUG(D_INODE, "object creation was delayed\n");
647                         GOTO(out, rc);
648                 }
649         }
650         file->f_flags &= ~O_LOV_DELAY_CREATE;
651         GOTO(out, rc);
652 out:
653         ptlrpc_req_finished(req);
654         if (req)
655                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
656 out_och_free:
657         if (rc) {
658                 if (*och_p) {
659                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
660                         *och_p = NULL; /* OBD_FREE writes some magic there */
661                         (*och_usecount)--;
662                 }
663                 up(&lli->lli_och_sem);
664 out_openerr:
665                 if (opendir_set == 1) {
666                         lli->lli_opendir_key = NULL;
667                         lli->lli_opendir_pid = 0;
668                 } else if (unlikely(opendir_set == 2)) {
669                         ll_stop_statahead(inode, fd);
670                 }
671         }
672
673         return rc;
674 }
675
676 /* Fills the obdo with the attributes for the inode defined by lsm */
677 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
678 {
679         struct ptlrpc_request_set *set;
680         struct ll_inode_info *lli = ll_i2info(inode);
681         struct lov_stripe_md *lsm = lli->lli_smd;
682
683         struct obd_info oinfo = { { { 0 } } };
684         int rc;
685         ENTRY;
686
687         LASSERT(lsm != NULL);
688
689         oinfo.oi_md = lsm;
690         oinfo.oi_oa = obdo;
691         oinfo.oi_oa->o_id = lsm->lsm_object_id;
692         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
693         oinfo.oi_oa->o_mode = S_IFREG;
694         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
695                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
696                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
697                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
698                                OBD_MD_FLGROUP;
699         oinfo.oi_capa = ll_mdscapa_get(inode);
700
701         set = ptlrpc_prep_set();
702         if (set == NULL) {
703                 CERROR("can't allocate ptlrpc set\n");
704                 rc = -ENOMEM;
705         } else {
706                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
707                 if (rc == 0)
708                         rc = ptlrpc_set_wait(set);
709                 ptlrpc_set_destroy(set);
710         }
711         capa_put(oinfo.oi_capa);
712         if (rc)
713                 RETURN(rc);
714
715         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
716                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
717                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
718
719         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
720         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
721                lli->lli_smd->lsm_object_id, i_size_read(inode),
722                (unsigned long long)inode->i_blocks,
723                (unsigned long)ll_inode_blksize(inode));
724         RETURN(0);
725 }
726
727 static inline void ll_remove_suid(struct inode *inode)
728 {
729         unsigned int mode;
730
731         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
732         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
733
734         /* was any of the uid bits set? */
735         mode &= inode->i_mode;
736         if (mode && !capable(CAP_FSETID)) {
737                 inode->i_mode &= ~mode;
738                 // XXX careful here - we cannot change the size
739         }
740 }
741
742 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
743 {
744         struct ll_inode_info *lli = ll_i2info(inode);
745         struct lov_stripe_md *lsm = lli->lli_smd;
746         struct obd_export *exp = ll_i2dtexp(inode);
747         struct {
748                 char name[16];
749                 struct ldlm_lock *lock;
750                 struct lov_stripe_md *lsm;
751         } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock, .lsm = lsm };
752         __u32 stripe, vallen = sizeof(stripe);
753         struct lov_oinfo *loinfo;
754         int rc;
755         ENTRY;
756
757         if (lsm->lsm_stripe_count == 1)
758                 GOTO(check, stripe = 0);
759
760         /* get our offset in the lov */
761         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
762         if (rc != 0) {
763                 CERROR("obd_get_info: rc = %d\n", rc);
764                 RETURN(rc);
765         }
766         LASSERT(stripe < lsm->lsm_stripe_count);
767
768 check:
769         loinfo = lsm->lsm_oinfo[stripe];
770         if (!osc_res_name_eq(loinfo->loi_id, loinfo->loi_gr,
771                             &lock->l_resource->lr_name)){
772                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
773                            loinfo->loi_id, loinfo->loi_gr);
774                 RETURN(-ELDLM_NO_LOCK_DATA);
775         }
776
777         RETURN(stripe);
778 }
779
780 /* Get extra page reference to ensure it is not going away */
781 void ll_pin_extent_cb(void *data)
782 {
783         struct page *page = data;
784         
785         page_cache_get(page);
786
787         return;
788 }
789
790 /* Flush the page from page cache for an extent as its canceled.
791  * Page to remove is delivered as @data.
792  *
793  * No one can dirty the extent until we've finished our work and they cannot
794  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
795  * but other kernel actors could have pages locked.
796  *
797  * If @discard is set, there is no need to write the page if it is dirty.
798  *
799  * Called with the DLM lock held. */
800 int ll_page_removal_cb(void *data, int discard)
801 {
802         int rc;
803         struct page *page = data;
804         struct address_space *mapping;
805  
806         ENTRY;
807
808         /* We have page reference already from ll_pin_page */
809         lock_page(page);
810
811         /* Already truncated by somebody */
812         if (!page->mapping)
813                 GOTO(out, rc = 0);
814         mapping = page->mapping;
815
816         ll_teardown_mmaps(mapping,
817                           (__u64)page->index << PAGE_CACHE_SHIFT,
818                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
819                                                               ~PAGE_CACHE_MASK);        
820         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
821
822         if (!discard && clear_page_dirty_for_io(page)) {
823                 LASSERT(page->mapping);
824                 rc = ll_call_writepage(page->mapping->host, page);
825                 /* either waiting for io to complete or reacquiring
826                  * the lock that the failed writepage released */
827                 lock_page(page);
828                 wait_on_page_writeback(page);
829                 if (rc != 0) {
830                         CERROR("writepage inode %lu(%p) of page %p "
831                                "failed: %d\n", mapping->host->i_ino,
832                                mapping->host, page, rc);
833                         if (rc == -ENOSPC)
834                                 set_bit(AS_ENOSPC, &mapping->flags);
835                         else
836                                 set_bit(AS_EIO, &mapping->flags);
837                 }
838                 set_bit(AS_EIO, &mapping->flags);
839         }
840         if (page->mapping != NULL) {
841                 struct ll_async_page *llap = llap_cast_private(page);
842                 /* checking again to account for writeback's lock_page() */
843                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
844                 if (llap)
845                         ll_ra_accounting(llap, page->mapping);
846                 ll_truncate_complete_page(page);
847         }
848         EXIT;
849 out:
850         LASSERT(!PageWriteback(page));
851         unlock_page(page);
852         page_cache_release(page);
853
854         return 0;
855 }
856
857 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
858                              void *data, int flag)
859 {
860         struct inode *inode;
861         struct ll_inode_info *lli;
862         struct lov_stripe_md *lsm;
863         int stripe;
864         __u64 kms;
865
866         ENTRY;
867
868         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
869                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
870                 LBUG();
871         }
872
873         inode = ll_inode_from_lock(lock);
874         if (inode == NULL)
875                 RETURN(0);
876         lli = ll_i2info(inode);
877         if (lli == NULL)
878                 GOTO(iput, 0);
879         if (lli->lli_smd == NULL)
880                 GOTO(iput, 0);
881         lsm = lli->lli_smd;
882
883         stripe = ll_lock_to_stripe_offset(inode, lock);
884         if (stripe < 0)
885                 GOTO(iput, 0);
886
887         lov_stripe_lock(lsm);
888         lock_res_and_lock(lock);
889         kms = ldlm_extent_shift_kms(lock,
890                                     lsm->lsm_oinfo[stripe]->loi_kms);
891
892         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
893                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
894                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
895         lsm->lsm_oinfo[stripe]->loi_kms = kms;
896         unlock_res_and_lock(lock);
897         lov_stripe_unlock(lsm);
898         ll_queue_done_writing(inode, 0);
899         EXIT;
900 iput:
901         iput(inode);
902
903         return 0;
904 }
905
906 #if 0
907 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
908 {
909         /* XXX ALLOCATE - 160 bytes */
910         struct inode *inode = ll_inode_from_lock(lock);
911         struct ll_inode_info *lli = ll_i2info(inode);
912         struct lustre_handle lockh = { 0 };
913         struct ost_lvb *lvb;
914         int stripe;
915         ENTRY;
916
917         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
918                      LDLM_FL_BLOCK_CONV)) {
919                 LBUG(); /* not expecting any blocked async locks yet */
920                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
921                            "lock, returning");
922                 ldlm_lock_dump(D_OTHER, lock, 0);
923                 ldlm_reprocess_all(lock->l_resource);
924                 RETURN(0);
925         }
926
927         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
928
929         stripe = ll_lock_to_stripe_offset(inode, lock);
930         if (stripe < 0)
931                 goto iput;
932
933         if (lock->l_lvb_len) {
934                 struct lov_stripe_md *lsm = lli->lli_smd;
935                 __u64 kms;
936                 lvb = lock->l_lvb_data;
937                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
938
939                 lock_res_and_lock(lock);
940                 ll_inode_size_lock(inode, 1);
941                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
942                 kms = ldlm_extent_shift_kms(NULL, kms);
943                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
944                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
945                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
946                 lsm->lsm_oinfo[stripe].loi_kms = kms;
947                 ll_inode_size_unlock(inode, 1);
948                 unlock_res_and_lock(lock);
949         }
950
951 iput:
952         iput(inode);
953         wake_up(&lock->l_waitq);
954
955         ldlm_lock2handle(lock, &lockh);
956         ldlm_lock_decref(&lockh, LCK_PR);
957         RETURN(0);
958 }
959 #endif
960
961 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
962 {
963         struct ptlrpc_request *req = reqp;
964         struct inode *inode = ll_inode_from_lock(lock);
965         struct ll_inode_info *lli;
966         struct lov_stripe_md *lsm;
967         struct ost_lvb *lvb;
968         int rc, stripe;
969         ENTRY;
970
971         if (inode == NULL)
972                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
973         lli = ll_i2info(inode);
974         if (lli == NULL)
975                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
976         lsm = lli->lli_smd;
977         if (lsm == NULL)
978                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
979
980         /* First, find out which stripe index this lock corresponds to. */
981         stripe = ll_lock_to_stripe_offset(inode, lock);
982         if (stripe < 0)
983                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
984
985         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
986         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
987                              sizeof(*lvb));
988         rc = req_capsule_server_pack(&req->rq_pill);
989         if (rc) {
990                 CERROR("lustre_pack_reply: %d\n", rc);
991                 GOTO(iput, rc);
992         }
993
994         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
995         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
996         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
997         lvb->lvb_atime = LTIME_S(inode->i_atime);
998         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
999
1000         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1001                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1002                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1003                    lvb->lvb_atime, lvb->lvb_ctime);
1004  iput:
1005         iput(inode);
1006
1007  out:
1008         /* These errors are normal races, so we don't want to fill the console
1009          * with messages by calling ptlrpc_error() */
1010         if (rc == -ELDLM_NO_LOCK_DATA)
1011                 lustre_pack_reply(req, 1, NULL, NULL);
1012
1013         req->rq_status = rc;
1014         return rc;
1015 }
1016
1017 static int ll_merge_lvb(struct inode *inode)
1018 {
1019         struct ll_inode_info *lli = ll_i2info(inode);
1020         struct ll_sb_info *sbi = ll_i2sbi(inode);
1021         struct ost_lvb lvb;
1022         int rc;
1023
1024         ENTRY;
1025
1026         ll_inode_size_lock(inode, 1);
1027         inode_init_lvb(inode, &lvb);
1028         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1029         i_size_write(inode, lvb.lvb_size);
1030         inode->i_blocks = lvb.lvb_blocks;
1031
1032         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1033         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1034         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1035         ll_inode_size_unlock(inode, 1);
1036
1037         RETURN(rc);
1038 }
1039
1040 int ll_local_size(struct inode *inode)
1041 {
1042         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1043         struct ll_inode_info *lli = ll_i2info(inode);
1044         struct ll_sb_info *sbi = ll_i2sbi(inode);
1045         struct lustre_handle lockh = { 0 };
1046         int flags = 0;
1047         int rc;
1048         ENTRY;
1049
1050         if (lli->lli_smd->lsm_stripe_count == 0)
1051                 RETURN(0);
1052
1053         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1054                        &policy, LCK_PR, &flags, inode, &lockh);
1055         if (rc < 0)
1056                 RETURN(rc);
1057         else if (rc == 0)
1058                 RETURN(-ENODATA);
1059
1060         rc = ll_merge_lvb(inode);
1061         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1062         RETURN(rc);
1063 }
1064
1065 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1066                      lstat_t *st)
1067 {
1068         struct lustre_handle lockh = { 0 };
1069         struct ldlm_enqueue_info einfo = { 0 };
1070         struct obd_info oinfo = { { { 0 } } };
1071         struct ost_lvb lvb;
1072         int rc;
1073
1074         ENTRY;
1075
1076         einfo.ei_type = LDLM_EXTENT;
1077         einfo.ei_mode = LCK_PR;
1078         einfo.ei_cb_bl = osc_extent_blocking_cb;
1079         einfo.ei_cb_cp = ldlm_completion_ast;
1080         einfo.ei_cb_gl = ll_glimpse_callback;
1081         einfo.ei_cbdata = NULL;
1082
1083         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1084         oinfo.oi_lockh = &lockh;
1085         oinfo.oi_md = lsm;
1086         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1087
1088         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1089         if (rc == -ENOENT)
1090                 RETURN(rc);
1091         if (rc != 0) {
1092                 CERROR("obd_enqueue returned rc %d, "
1093                        "returning -EIO\n", rc);
1094                 RETURN(rc > 0 ? -EIO : rc);
1095         }
1096
1097         lov_stripe_lock(lsm);
1098         memset(&lvb, 0, sizeof(lvb));
1099         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1100         st->st_size = lvb.lvb_size;
1101         st->st_blocks = lvb.lvb_blocks;
1102         st->st_mtime = lvb.lvb_mtime;
1103         st->st_atime = lvb.lvb_atime;
1104         st->st_ctime = lvb.lvb_ctime;
1105         lov_stripe_unlock(lsm);
1106
1107         RETURN(rc);
1108 }
1109
1110 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1111  * file (because it prefers KMS over RSS when larger) */
1112 int ll_glimpse_size(struct inode *inode, int ast_flags)
1113 {
1114         struct ll_inode_info *lli = ll_i2info(inode);
1115         struct ll_sb_info *sbi = ll_i2sbi(inode);
1116         struct lustre_handle lockh = { 0 };
1117         struct ldlm_enqueue_info einfo = { 0 };
1118         struct obd_info oinfo = { { { 0 } } };
1119         int rc;
1120         ENTRY;
1121
1122         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1123                 RETURN(0);
1124
1125         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1126
1127         if (!lli->lli_smd) {
1128                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1129                 RETURN(0);
1130         }
1131
1132         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1133          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1134          *       won't revoke any conflicting DLM locks held. Instead,
1135          *       ll_glimpse_callback() will be called on each client
1136          *       holding a DLM lock against this file, and resulting size
1137          *       will be returned for each stripe. DLM lock on [0, EOF] is
1138          *       acquired only if there were no conflicting locks. */
1139         einfo.ei_type = LDLM_EXTENT;
1140         einfo.ei_mode = LCK_PR;
1141         einfo.ei_cb_bl = osc_extent_blocking_cb;
1142         einfo.ei_cb_cp = ldlm_completion_ast;
1143         einfo.ei_cb_gl = ll_glimpse_callback;
1144         einfo.ei_cbdata = inode;
1145
1146         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1147         oinfo.oi_lockh = &lockh;
1148         oinfo.oi_md = lli->lli_smd;
1149         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1150
1151         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1152         if (rc == -ENOENT)
1153                 RETURN(rc);
1154         if (rc != 0) {
1155                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1156                 RETURN(rc > 0 ? -EIO : rc);
1157         }
1158
1159         rc = ll_merge_lvb(inode);
1160
1161         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1162                i_size_read(inode), (unsigned long long)inode->i_blocks);
1163
1164         RETURN(rc);
1165 }
1166
1167 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1168                    struct lov_stripe_md *lsm, int mode,
1169                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1170                    int ast_flags)
1171 {
1172         struct ll_sb_info *sbi = ll_i2sbi(inode);
1173         struct ost_lvb lvb;
1174         struct ldlm_enqueue_info einfo = { 0 };
1175         struct obd_info oinfo = { { { 0 } } };
1176         int rc;
1177         ENTRY;
1178
1179         LASSERT(!lustre_handle_is_used(lockh));
1180         LASSERT(lsm != NULL);
1181
1182         /* don't drop the mmapped file to LRU */
1183         if (mapping_mapped(inode->i_mapping))
1184                 ast_flags |= LDLM_FL_NO_LRU;
1185
1186         /* XXX phil: can we do this?  won't it screw the file size up? */
1187         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1188             (sbi->ll_flags & LL_SBI_NOLCK))
1189                 RETURN(0);
1190
1191         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1192                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1193
1194         einfo.ei_type = LDLM_EXTENT;
1195         einfo.ei_mode = mode;
1196         einfo.ei_cb_bl = osc_extent_blocking_cb;
1197         einfo.ei_cb_cp = ldlm_completion_ast;
1198         einfo.ei_cb_gl = ll_glimpse_callback;
1199         einfo.ei_cbdata = inode;
1200
1201         oinfo.oi_policy = *policy;
1202         oinfo.oi_lockh = lockh;
1203         oinfo.oi_md = lsm;
1204         oinfo.oi_flags = ast_flags;
1205
1206         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1207         *policy = oinfo.oi_policy;
1208         if (rc > 0)
1209                 rc = -EIO;
1210
1211         ll_inode_size_lock(inode, 1);
1212         inode_init_lvb(inode, &lvb);
1213         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1214
1215         if (policy->l_extent.start == 0 &&
1216             policy->l_extent.end == OBD_OBJECT_EOF) {
1217                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1218                  * the kms under both a DLM lock and the
1219                  * ll_inode_size_lock().  If we don't get the
1220                  * ll_inode_size_lock() here we can match the DLM lock and
1221                  * reset i_size from the kms before the truncating path has
1222                  * updated the kms.  generic_file_write can then trust the
1223                  * stale i_size when doing appending writes and effectively
1224                  * cancel the result of the truncate.  Getting the
1225                  * ll_inode_size_lock() after the enqueue maintains the DLM
1226                  * -> ll_inode_size_lock() acquiring order. */
1227                 i_size_write(inode, lvb.lvb_size);
1228                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1229                        inode->i_ino, i_size_read(inode));
1230         }
1231
1232         if (rc == 0) {
1233                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1234                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1235                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1236         }
1237         ll_inode_size_unlock(inode, 1);
1238
1239         RETURN(rc);
1240 }
1241
1242 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1243                      struct lov_stripe_md *lsm, int mode,
1244                      struct lustre_handle *lockh)
1245 {
1246         struct ll_sb_info *sbi = ll_i2sbi(inode);
1247         int rc;
1248         ENTRY;
1249
1250         /* XXX phil: can we do this?  won't it screw the file size up? */
1251         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1252             (sbi->ll_flags & LL_SBI_NOLCK))
1253                 RETURN(0);
1254
1255         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1256
1257         RETURN(rc);
1258 }
1259
1260 static void ll_set_file_contended(struct inode *inode)
1261 {
1262         struct ll_inode_info *lli = ll_i2info(inode);
1263         cfs_time_t now = cfs_time_current();
1264
1265         spin_lock(&lli->lli_lock);
1266         lli->lli_contention_time = now;
1267         lli->lli_flags |= LLIF_CONTENDED;
1268         spin_unlock(&lli->lli_lock);
1269 }
1270
1271 void ll_clear_file_contended(struct inode *inode)
1272 {
1273         struct ll_inode_info *lli = ll_i2info(inode);
1274
1275         spin_lock(&lli->lli_lock);
1276         lli->lli_flags &= ~LLIF_CONTENDED;
1277         spin_unlock(&lli->lli_lock);
1278 }
1279
1280 static int ll_is_file_contended(struct file *file)
1281 {
1282         struct inode *inode = file->f_dentry->d_inode;
1283         struct ll_inode_info *lli = ll_i2info(inode);
1284         struct ll_sb_info *sbi = ll_i2sbi(inode);
1285         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1286         ENTRY;
1287
1288         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1289                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1290                        " osc connect flags = 0x"LPX64"\n",
1291                        sbi->ll_lco.lco_flags);
1292                 RETURN(0);
1293         }
1294         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1295                 RETURN(1);
1296         if (lli->lli_flags & LLIF_CONTENDED) {
1297                 cfs_time_t cur_time = cfs_time_current();
1298                 cfs_time_t retry_time;
1299
1300                 retry_time = cfs_time_add(
1301                         lli->lli_contention_time,
1302                         cfs_time_seconds(sbi->ll_contention_time));
1303                 if (cfs_time_after(cur_time, retry_time)) {
1304                         ll_clear_file_contended(inode);
1305                         RETURN(0);
1306                 }
1307                 RETURN(1);
1308         }
1309         RETURN(0);
1310 }
1311
1312 static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
1313                                  const char *buf, size_t count,
1314                                  loff_t start, loff_t end, int rw)
1315 {
1316         int append;
1317         int tree_locked = 0;
1318         int rc;
1319         struct inode * inode = file->f_dentry->d_inode;
1320         ENTRY;
1321
1322         append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
1323
1324         if (append || !ll_is_file_contended(file)) {
1325                 struct ll_lock_tree_node *node;
1326                 int ast_flags;
1327
1328                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1329                 if (file->f_flags & O_NONBLOCK)
1330                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1331                 node = ll_node_from_inode(inode, start, end,
1332                                           (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
1333                 if (IS_ERR(node)) {
1334                         rc = PTR_ERR(node);
1335                         GOTO(out, rc);
1336                 }
1337                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1338                 rc = ll_tree_lock(tree, node, buf, count, ast_flags);
1339                 if (rc == 0)
1340                         tree_locked = 1;
1341                 else if (rc == -EUSERS)
1342                         ll_set_file_contended(inode);
1343                 else
1344                         GOTO(out, rc);
1345         }
1346         RETURN(tree_locked);
1347 out:
1348         return rc;
1349 }
1350
1351 /**
1352  * Checks if requested extent lock is compatible with a lock under a page.
1353  *
1354  * Checks if the lock under \a page is compatible with a read or write lock
1355  * (specified by \a rw) for an extent [\a start , \a end].
1356  *
1357  * \param page the page under which lock is considered
1358  * \param rw OBD_BRW_READ if requested for reading,
1359  *           OBD_BRW_WRITE if requested for writing
1360  * \param start start of the requested extent
1361  * \param end end of the requested extent
1362  * \param cookie transparent parameter for passing locking context
1363  *
1364  * \post result == 1, *cookie == context, appropriate lock is referenced or
1365  * \post result == 0
1366  *
1367  * \retval 1 owned lock is reused for the request
1368  * \retval 0 no lock reused for the request
1369  *
1370  * \see ll_release_short_lock
1371  */
1372 static int ll_reget_short_lock(struct page *page, int rw,
1373                                obd_off start, obd_off end,
1374                                void **cookie)
1375 {
1376         struct ll_async_page *llap;
1377         struct obd_export *exp;
1378         struct inode *inode = page->mapping->host;
1379
1380         ENTRY;
1381
1382         exp = ll_i2dtexp(inode);
1383         if (exp == NULL)
1384                 RETURN(0);
1385
1386         llap = llap_cast_private(page);
1387         if (llap == NULL)
1388                 RETURN(0);
1389
1390         RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
1391                                     &llap->llap_cookie, rw, start, end,
1392                                     cookie));
1393 }
1394
1395 /**
1396  * Releases a reference to a lock taken in a "fast" way.
1397  *
1398  * Releases a read or a write (specified by \a rw) lock
1399  * referenced by \a cookie.
1400  *
1401  * \param inode inode to which data belong
1402  * \param end end of the locked extent
1403  * \param rw OBD_BRW_READ if requested for reading,
1404  *           OBD_BRW_WRITE if requested for writing
1405  * \param cookie transparent parameter for passing locking context
1406  *
1407  * \post appropriate lock is dereferenced
1408  *
1409  * \see ll_reget_short_lock
1410  */
1411 static void ll_release_short_lock(struct inode *inode, obd_off end,
1412                                   void *cookie, int rw)
1413 {
1414         struct obd_export *exp;
1415         int rc;
1416
1417         exp = ll_i2dtexp(inode);
1418         if (exp == NULL)
1419                 return;
1420
1421         rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
1422                                     cookie, rw);
1423         if (rc < 0)
1424                 CERROR("unlock failed (%d)\n", rc);
1425 }
1426
1427 /**
1428  * Checks if requested extent lock is compatible
1429  * with a lock under a page in page cache.
1430  *
1431  * Checks if a lock under some \a page is compatible with a read or write lock
1432  * (specified by \a rw) for an extent [\a start , \a end].
1433  *
1434  * \param file the file under which lock is considered
1435  * \param rw OBD_BRW_READ if requested for reading,
1436  *           OBD_BRW_WRITE if requested for writing
1437  * \param ppos start of the requested extent
1438  * \param end end of the requested extent
1439  * \param cookie transparent parameter for passing locking context
1440  * \param buf userspace buffer for the data
1441  *
1442  * \post result == 1, *cookie == context, appropriate lock is referenced
1443  * \post retuls == 0
1444  *
1445  * \retval 1 owned lock is reused for the request
1446  * \retval 0 no lock reused for the request
1447  *
1448  * \see ll_file_put_fast_lock
1449  */
1450 static inline int ll_file_get_fast_lock(struct file *file,
1451                                         obd_off ppos, obd_off end,
1452                                         char *buf, void **cookie, int rw)
1453 {
1454         int rc = 0;
1455         struct page *page;
1456
1457         ENTRY;
1458
1459         if (!ll_region_mapped((unsigned long)buf, end - ppos)) {
1460                 page = find_lock_page(file->f_dentry->d_inode->i_mapping,
1461                                       ppos >> CFS_PAGE_SHIFT);
1462                 if (page) {
1463                         if (ll_reget_short_lock(page, rw, ppos, end, cookie))
1464                                 rc = 1;
1465
1466                         unlock_page(page);
1467                         page_cache_release(page);
1468                 }
1469         }
1470
1471         RETURN(rc);
1472 }
1473
1474 /**
1475  * Releases a reference to a lock taken in a "fast" way.
1476  *
1477  * Releases a read or a write (specified by \a rw) lock
1478  * referenced by \a cookie.
1479  *
1480  * \param inode inode to which data belong
1481  * \param end end of the locked extent
1482  * \param rw OBD_BRW_READ if requested for reading,
1483  *           OBD_BRW_WRITE if requested for writing
1484  * \param cookie transparent parameter for passing locking context
1485  *
1486  * \post appropriate lock is dereferenced
1487  *
1488  * \see ll_file_get_fast_lock
1489  */
1490 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
1491                                          void *cookie, int rw)
1492 {
1493         ll_release_short_lock(inode, end, cookie, rw);
1494 }
1495
1496 enum ll_lock_style {
1497         LL_LOCK_STYLE_NOLOCK   = 0,
1498         LL_LOCK_STYLE_FASTLOCK = 1,
1499         LL_LOCK_STYLE_TREELOCK = 2
1500 };
1501
1502 /**
1503  * Checks if requested extent lock is compatible with a lock 
1504  * under a page cache page.
1505  *
1506  * Checks if the lock under \a page is compatible with a read or write lock
1507  * (specified by \a rw) for an extent [\a start , \a end].
1508  *
1509  * \param file file under which I/O is processed
1510  * \param rw OBD_BRW_READ if requested for reading,
1511  *           OBD_BRW_WRITE if requested for writing
1512  * \param ppos start of the requested extent
1513  * \param end end of the requested extent
1514  * \param cookie transparent parameter for passing locking context
1515  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1516  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1517  * \param buf userspace buffer for the data
1518  *
1519  * \retval LL_LOCK_STYLE_FASTLOCK owned lock is reused through fast lock
1520  * \retval LL_LOCK_STYLE_TREELOCK got a lock through tree lock
1521  * \retval LL_LOCK_STYLE_NOLOCK got no lock
1522  *
1523  * \see ll_file_put_lock
1524  */
1525 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
1526                                    obd_off end, char *buf, void **cookie,
1527                                    struct ll_lock_tree *tree, int rw)
1528 {
1529         int rc;
1530
1531         ENTRY;
1532
1533         if (ll_file_get_fast_lock(file, ppos, end, buf, cookie, rw))
1534                 RETURN(LL_LOCK_STYLE_FASTLOCK);
1535
1536         rc = ll_file_get_tree_lock(tree, file, buf, ppos - end, ppos, end, rw);
1537         /* rc: 1 for tree lock, 0 for no lock, <0 for error */
1538         switch (rc) {
1539         case 1:
1540                 RETURN(LL_LOCK_STYLE_TREELOCK);
1541         case 0:
1542                 RETURN(LL_LOCK_STYLE_NOLOCK);
1543         }
1544
1545         /* an error happened if we reached this point, rc = -errno here */
1546         RETURN(rc);
1547 }
1548
1549 /**
1550  * Drops the lock taken by ll_file_get_lock.
1551  *
1552  * Releases a read or a write (specified by \a rw) lock
1553  * referenced by \a tree or \a cookie.
1554  *
1555  * \param inode inode to which data belong
1556  * \param end end of the locked extent
1557  * \param lockstyle facility through which the lock was taken
1558  * \param rw OBD_BRW_READ if requested for reading,
1559  *           OBD_BRW_WRITE if requested for writing
1560  * \param cookie transparent parameter for passing locking context
1561  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1562  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1563  *
1564  * \post appropriate lock is dereferenced
1565  *
1566  * \see ll_file_get_lock
1567  */
1568 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
1569                                     enum ll_lock_style lock_style,
1570                                     void *cookie, struct ll_lock_tree *tree,
1571                                     int rw)
1572
1573 {
1574         switch (lock_style) {
1575         case LL_LOCK_STYLE_TREELOCK:
1576                 ll_tree_unlock(tree);
1577                 break;
1578         case LL_LOCK_STYLE_FASTLOCK:
1579                 ll_file_put_fast_lock(inode, end, cookie, rw);
1580                 break;
1581         default:
1582                 CERROR("invalid locking style (%d)\n", lock_style);
1583         }
1584 }
1585
1586 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1587                             loff_t *ppos)
1588 {
1589         struct inode *inode = file->f_dentry->d_inode;
1590         struct ll_inode_info *lli = ll_i2info(inode);
1591         struct lov_stripe_md *lsm = lli->lli_smd;
1592         struct ll_sb_info *sbi = ll_i2sbi(inode);
1593         struct ll_lock_tree tree;
1594         struct ost_lvb lvb;
1595         struct ll_ra_read bead;
1596         int ra = 0;
1597         obd_off end;
1598         ssize_t retval, chunk, sum = 0;
1599         int lock_style;
1600         void *cookie;
1601
1602         __u64 kms;
1603         ENTRY;
1604         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1605                inode->i_ino, inode->i_generation, inode, count, *ppos);
1606         /* "If nbyte is 0, read() will return 0 and have no other results."
1607          *                      -- Single Unix Spec */
1608         if (count == 0)
1609                 RETURN(0);
1610
1611         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1612
1613         if (!lsm) {
1614                 /* Read on file with no objects should return zero-filled
1615                  * buffers up to file size (we can get non-zero sizes with
1616                  * mknod + truncate, then opening file for read. This is a
1617                  * common pattern in NFS case, it seems). Bug 6243 */
1618                 int notzeroed;
1619                 /* Since there are no objects on OSTs, we have nothing to get
1620                  * lock on and so we are forced to access inode->i_size
1621                  * unguarded */
1622
1623                 /* Read beyond end of file */
1624                 if (*ppos >= i_size_read(inode))
1625                         RETURN(0);
1626
1627                 if (count > i_size_read(inode) - *ppos)
1628                         count = i_size_read(inode) - *ppos;
1629                 /* Make sure to correctly adjust the file pos pointer for
1630                  * EFAULT case */
1631                 notzeroed = clear_user(buf, count);
1632                 count -= notzeroed;
1633                 *ppos += count;
1634                 if (!count)
1635                         RETURN(-EFAULT);
1636                 RETURN(count);
1637         }
1638 repeat:
1639         if (sbi->ll_max_rw_chunk != 0) {
1640                 /* first, let's know the end of the current stripe */
1641                 end = *ppos;
1642                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, &end);
1643
1644                 /* correct, the end is beyond the request */
1645                 if (end > *ppos + count - 1)
1646                         end = *ppos + count - 1;
1647
1648                 /* and chunk shouldn't be too large even if striping is wide */
1649                 if (end - *ppos > sbi->ll_max_rw_chunk)
1650                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1651         } else {
1652                 end = *ppos + count - 1;
1653         }
1654
1655         lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
1656                                       buf, &cookie, &tree, OBD_BRW_READ);
1657         if (lock_style < 0)
1658                 GOTO(out, retval = lock_style);
1659
1660         ll_inode_size_lock(inode, 1);
1661         /*
1662          * Consistency guarantees: following possibilities exist for the
1663          * relation between region being read and real file size at this
1664          * moment:
1665          *
1666          *  (A): the region is completely inside of the file;
1667          *
1668          *  (B-x): x bytes of region are inside of the file, the rest is
1669          *  outside;
1670          *
1671          *  (C): the region is completely outside of the file.
1672          *
1673          * This classification is stable under DLM lock acquired by
1674          * ll_tree_lock() above, because to change class, other client has to
1675          * take DLM lock conflicting with our lock. Also, any updates to
1676          * ->i_size by other threads on this client are serialized by
1677          * ll_inode_size_lock(). This guarantees that short reads are handled
1678          * correctly in the face of concurrent writes and truncates.
1679          */
1680         inode_init_lvb(inode, &lvb);
1681         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1682         kms = lvb.lvb_size;
1683         if (*ppos + count - 1 > kms) {
1684                 /* A glimpse is necessary to determine whether we return a
1685                  * short read (B) or some zeroes at the end of the buffer (C) */
1686                 ll_inode_size_unlock(inode, 1);
1687                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1688                 if (retval) {
1689                         if (lock_style != LL_LOCK_STYLE_NOLOCK)
1690                                 ll_file_put_lock(inode, end, lock_style,
1691                                                  cookie, &tree, OBD_BRW_READ);
1692                         goto out;
1693                 }
1694         } else {
1695                 /* region is within kms and, hence, within real file size (A).
1696                  * We need to increase i_size to cover the read region so that
1697                  * generic_file_read() will do its job, but that doesn't mean
1698                  * the kms size is _correct_, it is only the _minimum_ size.
1699                  * If someone does a stat they will get the correct size which
1700                  * will always be >= the kms value here.  b=11081 */
1701                 if (i_size_read(inode) < kms)
1702                         i_size_write(inode, kms);
1703                 ll_inode_size_unlock(inode, 1);
1704         }
1705
1706         chunk = end - *ppos + 1;
1707         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1708                inode->i_ino, chunk, *ppos, i_size_read(inode));
1709
1710         if (lock_style != LL_LOCK_STYLE_NOLOCK) {
1711                 /* turn off the kernel's read-ahead */
1712                 file->f_ra.ra_pages = 0;
1713
1714                 /* initialize read-ahead window once per syscall */
1715                 if (ra == 0) {
1716                         ra = 1;
1717                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1718                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1719                         ll_ra_read_in(file, &bead);
1720                 }
1721
1722                 /* BUG: 5972 */
1723                 file_accessed(file);
1724                 retval = generic_file_read(file, buf, chunk, ppos);
1725                 ll_file_put_lock(inode, end, lock_style, cookie, &tree, 
1726                                  OBD_BRW_READ);
1727         } else {
1728                 retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
1729         }
1730
1731         ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
1732
1733         if (retval > 0) {
1734                 buf += retval;
1735                 count -= retval;
1736                 sum += retval;
1737                 if (retval == chunk && count > 0)
1738                         goto repeat;
1739         }
1740
1741  out:
1742         if (ra != 0)
1743                 ll_ra_read_ex(file, &bead);
1744         retval = (sum > 0) ? sum : retval;
1745         RETURN(retval);
1746 }
1747
1748 /*
1749  * Write to a file (through the page cache).
1750  */
1751 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1752                              loff_t *ppos)
1753 {
1754         struct inode *inode = file->f_dentry->d_inode;
1755         struct ll_sb_info *sbi = ll_i2sbi(inode);
1756         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1757         struct ll_lock_tree tree;
1758         loff_t maxbytes = ll_file_maxbytes(inode);
1759         loff_t lock_start, lock_end, end;
1760         ssize_t retval, chunk, sum = 0;
1761         int tree_locked;
1762         ENTRY;
1763
1764         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1765                inode->i_ino, inode->i_generation, inode, count, *ppos);
1766
1767         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1768
1769         /* POSIX, but surprised the VFS doesn't check this already */
1770         if (count == 0)
1771                 RETURN(0);
1772
1773         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1774          * called on the file, don't fail the below assertion (bug 2388). */
1775         if (file->f_flags & O_LOV_DELAY_CREATE &&
1776             ll_i2info(inode)->lli_smd == NULL)
1777                 RETURN(-EBADF);
1778
1779         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1780
1781         down(&ll_i2info(inode)->lli_write_sem);
1782
1783 repeat:
1784         chunk = 0; /* just to fix gcc's warning */
1785         end = *ppos + count - 1;
1786
1787         if (file->f_flags & O_APPEND) {
1788                 lock_start = 0;
1789                 lock_end = OBD_OBJECT_EOF;
1790         } else if (sbi->ll_max_rw_chunk != 0) {
1791                 /* first, let's know the end of the current stripe */
1792                 end = *ppos;
1793                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1794                                 (obd_off *)&end);
1795
1796                 /* correct, the end is beyond the request */
1797                 if (end > *ppos + count - 1)
1798                         end = *ppos + count - 1;
1799
1800                 /* and chunk shouldn't be too large even if striping is wide */
1801                 if (end - *ppos > sbi->ll_max_rw_chunk)
1802                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1803                 lock_start = *ppos;
1804                 lock_end = end;
1805         } else {
1806                 lock_start = *ppos;
1807                 lock_end = *ppos + count - 1;
1808         }
1809
1810         tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
1811                                             lock_start, lock_end, OBD_BRW_WRITE);
1812         if (tree_locked < 0)
1813                 GOTO(out, retval = tree_locked);
1814
1815         /* This is ok, g_f_w will overwrite this under i_sem if it races
1816          * with a local truncate, it just makes our maxbyte checking easier.
1817          * The i_size value gets updated in ll_extent_lock() as a consequence
1818          * of the [0,EOF] extent lock we requested above. */
1819         if (file->f_flags & O_APPEND) {
1820                 *ppos = i_size_read(inode);
1821                 end = *ppos + count - 1;
1822         }
1823
1824         if (*ppos >= maxbytes) {
1825                 send_sig(SIGXFSZ, current, 0);
1826                 GOTO(out_unlock, retval = -EFBIG);
1827         }
1828         if (end > maxbytes - 1)
1829                 end = maxbytes - 1;
1830
1831         /* generic_file_write handles O_APPEND after getting i_mutex */
1832         chunk = end - *ppos + 1;
1833         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1834                inode->i_ino, chunk, *ppos);
1835         if (tree_locked)
1836                 retval = generic_file_write(file, buf, chunk, ppos);
1837         else
1838                 retval = ll_file_lockless_io(file, (char*)buf, chunk,
1839                                              ppos, WRITE);
1840         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1841
1842 out_unlock:
1843         if (tree_locked)
1844                 ll_tree_unlock(&tree);
1845
1846 out:
1847         if (retval > 0) {
1848                 buf += retval;
1849                 count -= retval;
1850                 sum += retval;
1851                 if (retval == chunk && count > 0)
1852                         goto repeat;
1853         }
1854
1855         up(&ll_i2info(inode)->lli_write_sem);
1856
1857         retval = (sum > 0) ? sum : retval;
1858         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1859                            retval > 0 ? retval : 0);
1860         RETURN(retval);
1861 }
1862
1863 /*
1864  * Send file content (through pagecache) somewhere with helper
1865  */
1866 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1867                                 read_actor_t actor, void *target)
1868 {
1869         struct inode *inode = in_file->f_dentry->d_inode;
1870         struct ll_inode_info *lli = ll_i2info(inode);
1871         struct lov_stripe_md *lsm = lli->lli_smd;
1872         struct ll_lock_tree tree;
1873         struct ll_lock_tree_node *node;
1874         struct ost_lvb lvb;
1875         struct ll_ra_read bead;
1876         int rc;
1877         ssize_t retval;
1878         __u64 kms;
1879         ENTRY;
1880         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1881                inode->i_ino, inode->i_generation, inode, count, *ppos);
1882
1883         /* "If nbyte is 0, read() will return 0 and have no other results."
1884          *                      -- Single Unix Spec */
1885         if (count == 0)
1886                 RETURN(0);
1887
1888         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1889         /* turn off the kernel's read-ahead */
1890         in_file->f_ra.ra_pages = 0;
1891
1892         /* File with no objects, nothing to lock */
1893         if (!lsm)
1894                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1895
1896         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1897         if (IS_ERR(node))
1898                 RETURN(PTR_ERR(node));
1899
1900         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1901         rc = ll_tree_lock(&tree, node, NULL, count,
1902                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1903         if (rc != 0)
1904                 RETURN(rc);
1905
1906         ll_clear_file_contended(inode);
1907         ll_inode_size_lock(inode, 1);
1908         /*
1909          * Consistency guarantees: following possibilities exist for the
1910          * relation between region being read and real file size at this
1911          * moment:
1912          *
1913          *  (A): the region is completely inside of the file;
1914          *
1915          *  (B-x): x bytes of region are inside of the file, the rest is
1916          *  outside;
1917          *
1918          *  (C): the region is completely outside of the file.
1919          *
1920          * This classification is stable under DLM lock acquired by
1921          * ll_tree_lock() above, because to change class, other client has to
1922          * take DLM lock conflicting with our lock. Also, any updates to
1923          * ->i_size by other threads on this client are serialized by
1924          * ll_inode_size_lock(). This guarantees that short reads are handled
1925          * correctly in the face of concurrent writes and truncates.
1926          */
1927         inode_init_lvb(inode, &lvb);
1928         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1929         kms = lvb.lvb_size;
1930         if (*ppos + count - 1 > kms) {
1931                 /* A glimpse is necessary to determine whether we return a
1932                  * short read (B) or some zeroes at the end of the buffer (C) */
1933                 ll_inode_size_unlock(inode, 1);
1934                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1935                 if (retval)
1936                         goto out;
1937         } else {
1938                 /* region is within kms and, hence, within real file size (A) */
1939                 i_size_write(inode, kms);
1940                 ll_inode_size_unlock(inode, 1);
1941         }
1942
1943         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1944                inode->i_ino, count, *ppos, i_size_read(inode));
1945
1946         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1947         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1948         ll_ra_read_in(in_file, &bead);
1949         /* BUG: 5972 */
1950         file_accessed(in_file);
1951         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1952         ll_ra_read_ex(in_file, &bead);
1953
1954  out:
1955         ll_tree_unlock(&tree);
1956         RETURN(retval);
1957 }
1958
1959 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1960                                unsigned long arg)
1961 {
1962         struct ll_inode_info *lli = ll_i2info(inode);
1963         struct obd_export *exp = ll_i2dtexp(inode);
1964         struct ll_recreate_obj ucreatp;
1965         struct obd_trans_info oti = { 0 };
1966         struct obdo *oa = NULL;
1967         int lsm_size;
1968         int rc = 0;
1969         struct lov_stripe_md *lsm, *lsm2;
1970         ENTRY;
1971
1972         if (!capable (CAP_SYS_ADMIN))
1973                 RETURN(-EPERM);
1974
1975         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1976                             sizeof(struct ll_recreate_obj));
1977         if (rc) {
1978                 RETURN(-EFAULT);
1979         }
1980         OBDO_ALLOC(oa);
1981         if (oa == NULL)
1982                 RETURN(-ENOMEM);
1983
1984         down(&lli->lli_size_sem);
1985         lsm = lli->lli_smd;
1986         if (lsm == NULL)
1987                 GOTO(out, rc = -ENOENT);
1988         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1989                    (lsm->lsm_stripe_count));
1990
1991         OBD_ALLOC(lsm2, lsm_size);
1992         if (lsm2 == NULL)
1993                 GOTO(out, rc = -ENOMEM);
1994
1995         oa->o_id = ucreatp.lrc_id;
1996         oa->o_gr = ucreatp.lrc_group;
1997         oa->o_nlink = ucreatp.lrc_ost_idx;
1998         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1999         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
2000         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2001                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2002
2003         memcpy(lsm2, lsm, lsm_size);
2004         rc = obd_create(exp, oa, &lsm2, &oti);
2005
2006         OBD_FREE(lsm2, lsm_size);
2007         GOTO(out, rc);
2008 out:
2009         up(&lli->lli_size_sem);
2010         OBDO_FREE(oa);
2011         return rc;
2012 }
2013
2014 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
2015                              int flags, struct lov_user_md *lum, int lum_size)
2016 {
2017         struct ll_inode_info *lli = ll_i2info(inode);
2018         struct lov_stripe_md *lsm;
2019         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
2020         int rc = 0;
2021         ENTRY;
2022
2023         down(&lli->lli_size_sem);
2024         lsm = lli->lli_smd;
2025         if (lsm) {
2026                 up(&lli->lli_size_sem);
2027                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
2028                        inode->i_ino);
2029                 RETURN(-EEXIST);
2030         }
2031
2032         rc = ll_intent_file_open(file, lum, lum_size, &oit);
2033         if (rc)
2034                 GOTO(out, rc);
2035         if (it_disposition(&oit, DISP_LOOKUP_NEG))
2036                 GOTO(out_req_free, rc = -ENOENT);
2037         rc = oit.d.lustre.it_status;
2038         if (rc < 0)
2039                 GOTO(out_req_free, rc);
2040
2041         ll_release_openhandle(file->f_dentry, &oit);
2042
2043  out:
2044         up(&lli->lli_size_sem);
2045         ll_intent_release(&oit);
2046         RETURN(rc);
2047 out_req_free:
2048         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2049         goto out;
2050 }
2051
2052 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
2053                              struct lov_mds_md **lmmp, int *lmm_size, 
2054                              struct ptlrpc_request **request)
2055 {
2056         struct ll_sb_info *sbi = ll_i2sbi(inode);
2057         struct mdt_body  *body;
2058         struct lov_mds_md *lmm = NULL;
2059         struct ptlrpc_request *req = NULL;
2060         struct obd_capa *oc;
2061         int rc, lmmsize;
2062
2063         rc = ll_get_max_mdsize(sbi, &lmmsize);
2064         if (rc)
2065                 RETURN(rc);
2066
2067         oc = ll_mdscapa_get(inode);
2068         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
2069                              oc, filename, strlen(filename) + 1,
2070                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
2071                              ll_i2suppgid(inode), &req);
2072         capa_put(oc);
2073         if (rc < 0) {
2074                 CDEBUG(D_INFO, "md_getattr_name failed "
2075                        "on %s: rc %d\n", filename, rc);
2076                 GOTO(out, rc);
2077         }
2078
2079         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2080         LASSERT(body != NULL); /* checked by mdc_getattr_name */
2081
2082         lmmsize = body->eadatasize;
2083
2084         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
2085                         lmmsize == 0) {
2086                 GOTO(out, rc = -ENODATA);
2087         }
2088
2089         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
2090         LASSERT(lmm != NULL);
2091
2092         /*
2093          * This is coming from the MDS, so is probably in
2094          * little endian.  We convert it to host endian before
2095          * passing it to userspace.
2096          */
2097         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
2098                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
2099                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
2100         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
2101                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
2102         }
2103
2104         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
2105                 struct lov_stripe_md *lsm;
2106                 struct lov_user_md_join *lmj;
2107                 int lmj_size, i, aindex = 0;
2108
2109                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
2110                 if (rc < 0)
2111                         GOTO(out, rc = -ENOMEM);
2112                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
2113                 if (rc)
2114                         GOTO(out_free_memmd, rc);
2115
2116                 lmj_size = sizeof(struct lov_user_md_join) +
2117                            lsm->lsm_stripe_count *
2118                            sizeof(struct lov_user_ost_data_join);
2119                 OBD_ALLOC(lmj, lmj_size);
2120                 if (!lmj)
2121                         GOTO(out_free_memmd, rc = -ENOMEM);
2122
2123                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
2124                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2125                         struct lov_extent *lex =
2126                                 &lsm->lsm_array->lai_ext_array[aindex];
2127
2128                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
2129                                 aindex ++;
2130                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
2131                                         LPU64" len %d\n", aindex, i,
2132                                         lex->le_start, (int)lex->le_len);
2133                         lmj->lmm_objects[i].l_extent_start =
2134                                 lex->le_start;
2135
2136                         if ((int)lex->le_len == -1)
2137                                 lmj->lmm_objects[i].l_extent_end = -1;
2138                         else
2139                                 lmj->lmm_objects[i].l_extent_end =
2140                                         lex->le_start + lex->le_len;
2141                         lmj->lmm_objects[i].l_object_id =
2142                                 lsm->lsm_oinfo[i]->loi_id;
2143                         lmj->lmm_objects[i].l_object_gr =
2144                                 lsm->lsm_oinfo[i]->loi_gr;
2145                         lmj->lmm_objects[i].l_ost_gen =
2146                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2147                         lmj->lmm_objects[i].l_ost_idx =
2148                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2149                 }
2150                 lmm = (struct lov_mds_md *)lmj;
2151                 lmmsize = lmj_size;
2152 out_free_memmd:
2153                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
2154         }
2155 out:
2156         *lmmp = lmm;
2157         *lmm_size = lmmsize;
2158         *request = req;
2159         return rc;
2160 }
2161
2162 static int ll_lov_setea(struct inode *inode, struct file *file,
2163                             unsigned long arg)
2164 {
2165         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2166         struct lov_user_md  *lump;
2167         int lum_size = sizeof(struct lov_user_md) +
2168                        sizeof(struct lov_user_ost_data);
2169         int rc;
2170         ENTRY;
2171
2172         if (!capable (CAP_SYS_ADMIN))
2173                 RETURN(-EPERM);
2174
2175         OBD_ALLOC(lump, lum_size);
2176         if (lump == NULL) {
2177                 RETURN(-ENOMEM);
2178         }
2179         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2180         if (rc) {
2181                 OBD_FREE(lump, lum_size);
2182                 RETURN(-EFAULT);
2183         }
2184
2185         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2186
2187         OBD_FREE(lump, lum_size);
2188         RETURN(rc);
2189 }
2190
2191 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2192                             unsigned long arg)
2193 {
2194         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
2195         int rc;
2196         int flags = FMODE_WRITE;
2197         ENTRY;
2198
2199         /* Bug 1152: copy properly when this is no longer true */
2200         LASSERT(sizeof(lum) == sizeof(*lump));
2201         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
2202         rc = copy_from_user(&lum, lump, sizeof(lum));
2203         if (rc)
2204                 RETURN(-EFAULT);
2205
2206         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
2207         if (rc == 0) {
2208                  put_user(0, &lump->lmm_stripe_count);
2209                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
2210                                     0, ll_i2info(inode)->lli_smd, lump);
2211         }
2212         RETURN(rc);
2213 }
2214
2215 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2216 {
2217         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2218
2219         if (!lsm)
2220                 RETURN(-ENODATA);
2221
2222         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
2223                             (void *)arg);
2224 }
2225
2226 static int ll_get_grouplock(struct inode *inode, struct file *file,
2227                             unsigned long arg)
2228 {
2229         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2230         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2231                                                     .end = OBD_OBJECT_EOF}};
2232         struct lustre_handle lockh = { 0 };
2233         struct ll_inode_info *lli = ll_i2info(inode);
2234         struct lov_stripe_md *lsm = lli->lli_smd;
2235         int flags = 0, rc;
2236         ENTRY;
2237
2238         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2239                 RETURN(-EINVAL);
2240         }
2241
2242         policy.l_extent.gid = arg;
2243         if (file->f_flags & O_NONBLOCK)
2244                 flags = LDLM_FL_BLOCK_NOWAIT;
2245
2246         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2247         if (rc)
2248                 RETURN(rc);
2249
2250         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2251         fd->fd_gid = arg;
2252         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2253
2254         RETURN(0);
2255 }
2256
2257 static int ll_put_grouplock(struct inode *inode, struct file *file,
2258                             unsigned long arg)
2259 {
2260         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2261         struct ll_inode_info *lli = ll_i2info(inode);
2262         struct lov_stripe_md *lsm = lli->lli_smd;
2263         int rc;
2264         ENTRY;
2265
2266         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2267                 /* Ugh, it's already unlocked. */
2268                 RETURN(-EINVAL);
2269         }
2270
2271         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2272                 RETURN(-EINVAL);
2273
2274         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2275
2276         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2277         if (rc)
2278                 RETURN(rc);
2279
2280         fd->fd_gid = 0;
2281         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2282
2283         RETURN(0);
2284 }
2285
2286 static int join_sanity_check(struct inode *head, struct inode *tail)
2287 {
2288         ENTRY;
2289         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2290                 CERROR("server do not support join \n");
2291                 RETURN(-EINVAL);
2292         }
2293         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2294                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2295                        head->i_ino, tail->i_ino);
2296                 RETURN(-EINVAL);
2297         }
2298         if (head->i_ino == tail->i_ino) {
2299                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2300                 RETURN(-EINVAL);
2301         }
2302         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2303                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2304                 RETURN(-EINVAL);
2305         }
2306         RETURN(0);
2307 }
2308
2309 static int join_file(struct inode *head_inode, struct file *head_filp,
2310                      struct file *tail_filp)
2311 {
2312         struct dentry *tail_dentry = tail_filp->f_dentry;
2313         struct lookup_intent oit = {.it_op = IT_OPEN,
2314                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2315         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2316                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2317
2318         struct lustre_handle lockh;
2319         struct md_op_data *op_data;
2320         int    rc;
2321         loff_t data;
2322         ENTRY;
2323
2324         tail_dentry = tail_filp->f_dentry;
2325
2326         data = i_size_read(head_inode);
2327         op_data = ll_prep_md_op_data(NULL, head_inode,
2328                                      tail_dentry->d_parent->d_inode,
2329                                      tail_dentry->d_name.name,
2330                                      tail_dentry->d_name.len, 0,
2331                                      LUSTRE_OPC_ANY, &data);
2332         if (IS_ERR(op_data))
2333                 RETURN(PTR_ERR(op_data));
2334
2335         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2336                          op_data, &lockh, NULL, 0, 0);
2337
2338         ll_finish_md_op_data(op_data);
2339         if (rc < 0)
2340                 GOTO(out, rc);
2341
2342         rc = oit.d.lustre.it_status;
2343
2344         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2345                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2346                 ptlrpc_req_finished((struct ptlrpc_request *)
2347                                     oit.d.lustre.it_data);
2348                 GOTO(out, rc);
2349         }
2350
2351         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2352                                            * away */
2353                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2354                 oit.d.lustre.it_lock_mode = 0;
2355         }
2356         ll_release_openhandle(head_filp->f_dentry, &oit);
2357 out:
2358         ll_intent_release(&oit);
2359         RETURN(rc);
2360 }
2361
2362 static int ll_file_join(struct inode *head, struct file *filp,
2363                         char *filename_tail)
2364 {
2365         struct inode *tail = NULL, *first = NULL, *second = NULL;
2366         struct dentry *tail_dentry;
2367         struct file *tail_filp, *first_filp, *second_filp;
2368         struct ll_lock_tree first_tree, second_tree;
2369         struct ll_lock_tree_node *first_node, *second_node;
2370         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2371         int rc = 0, cleanup_phase = 0;
2372         ENTRY;
2373
2374         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2375                head->i_ino, head->i_generation, head, filename_tail);
2376
2377         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2378         if (IS_ERR(tail_filp)) {
2379                 CERROR("Can not open tail file %s", filename_tail);
2380                 rc = PTR_ERR(tail_filp);
2381                 GOTO(cleanup, rc);
2382         }
2383         tail = igrab(tail_filp->f_dentry->d_inode);
2384
2385         tlli = ll_i2info(tail);
2386         tail_dentry = tail_filp->f_dentry;
2387         LASSERT(tail_dentry);
2388         cleanup_phase = 1;
2389
2390         /*reorder the inode for lock sequence*/
2391         first = head->i_ino > tail->i_ino ? head : tail;
2392         second = head->i_ino > tail->i_ino ? tail : head;
2393         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2394         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2395
2396         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2397                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2398         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2399         if (IS_ERR(first_node)){
2400                 rc = PTR_ERR(first_node);
2401                 GOTO(cleanup, rc);
2402         }
2403         first_tree.lt_fd = first_filp->private_data;
2404         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2405         if (rc != 0)
2406                 GOTO(cleanup, rc);
2407         cleanup_phase = 2;
2408
2409         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2410         if (IS_ERR(second_node)){
2411                 rc = PTR_ERR(second_node);
2412                 GOTO(cleanup, rc);
2413         }
2414         second_tree.lt_fd = second_filp->private_data;
2415         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2416         if (rc != 0)
2417                 GOTO(cleanup, rc);
2418         cleanup_phase = 3;
2419
2420         rc = join_sanity_check(head, tail);
2421         if (rc)
2422                 GOTO(cleanup, rc);
2423
2424         rc = join_file(head, filp, tail_filp);
2425         if (rc)
2426                 GOTO(cleanup, rc);
2427 cleanup:
2428         switch (cleanup_phase) {
2429         case 3:
2430                 ll_tree_unlock(&second_tree);
2431                 obd_cancel_unused(ll_i2dtexp(second),
2432                                   ll_i2info(second)->lli_smd, 0, NULL);
2433         case 2:
2434                 ll_tree_unlock(&first_tree);
2435                 obd_cancel_unused(ll_i2dtexp(first),
2436                                   ll_i2info(first)->lli_smd, 0, NULL);
2437         case 1:
2438                 filp_close(tail_filp, 0);
2439                 if (tail)
2440                         iput(tail);
2441                 if (head && rc == 0) {
2442                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2443                                        &hlli->lli_smd);
2444                         hlli->lli_smd = NULL;
2445                 }
2446         case 0:
2447                 break;
2448         default:
2449                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2450                 LBUG();
2451         }
2452         RETURN(rc);
2453 }
2454
2455 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2456 {
2457         struct inode *inode = dentry->d_inode;
2458         struct obd_client_handle *och;
2459         int rc;
2460         ENTRY;
2461
2462         LASSERT(inode);
2463
2464         /* Root ? Do nothing. */
2465         if (dentry->d_inode->i_sb->s_root == dentry)
2466                 RETURN(0);
2467
2468         /* No open handle to close? Move away */
2469         if (!it_disposition(it, DISP_OPEN_OPEN))
2470                 RETURN(0);
2471
2472         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2473
2474         OBD_ALLOC(och, sizeof(*och));
2475         if (!och)
2476                 GOTO(out, rc = -ENOMEM);
2477
2478         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2479                     ll_i2info(inode), it, och);
2480
2481         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2482                                        inode, och);
2483  out:
2484         /* this one is in place of ll_file_open */
2485         ptlrpc_req_finished(it->d.lustre.it_data);
2486         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2487         RETURN(rc);
2488 }
2489
2490 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2491                   unsigned long arg)
2492 {
2493         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2494         int flags;
2495         ENTRY;
2496
2497         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2498                inode->i_generation, inode, cmd);
2499         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2500
2501         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2502         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2503                 RETURN(-ENOTTY);
2504
2505         switch(cmd) {
2506         case LL_IOC_GETFLAGS:
2507                 /* Get the current value of the file flags */
2508                 return put_user(fd->fd_flags, (int *)arg);
2509         case LL_IOC_SETFLAGS:
2510         case LL_IOC_CLRFLAGS:
2511                 /* Set or clear specific file flags */
2512                 /* XXX This probably needs checks to ensure the flags are
2513                  *     not abused, and to handle any flag side effects.
2514                  */
2515                 if (get_user(flags, (int *) arg))
2516                         RETURN(-EFAULT);
2517
2518                 if (cmd == LL_IOC_SETFLAGS) {
2519                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2520                             !(file->f_flags & O_DIRECT)) {
2521                                 CERROR("%s: unable to disable locking on "
2522                                        "non-O_DIRECT file\n", current->comm);
2523                                 RETURN(-EINVAL);
2524                         }
2525
2526                         fd->fd_flags |= flags;
2527                 } else {
2528                         fd->fd_flags &= ~flags;
2529                 }
2530                 RETURN(0);
2531         case LL_IOC_LOV_SETSTRIPE:
2532                 RETURN(ll_lov_setstripe(inode, file, arg));
2533         case LL_IOC_LOV_SETEA:
2534                 RETURN(ll_lov_setea(inode, file, arg));
2535         case LL_IOC_LOV_GETSTRIPE:
2536                 RETURN(ll_lov_getstripe(inode, arg));
2537         case LL_IOC_RECREATE_OBJ:
2538                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2539         case EXT3_IOC_GETFLAGS:
2540         case EXT3_IOC_SETFLAGS:
2541                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2542         case EXT3_IOC_GETVERSION_OLD:
2543         case EXT3_IOC_GETVERSION:
2544                 RETURN(put_user(inode->i_generation, (int *)arg));
2545         case LL_IOC_JOIN: {
2546                 char *ftail;
2547                 int rc;
2548
2549                 ftail = getname((const char *)arg);
2550                 if (IS_ERR(ftail))
2551                         RETURN(PTR_ERR(ftail));
2552                 rc = ll_file_join(inode, file, ftail);
2553                 putname(ftail);
2554                 RETURN(rc);
2555         }
2556         case LL_IOC_GROUP_LOCK:
2557                 RETURN(ll_get_grouplock(inode, file, arg));
2558         case LL_IOC_GROUP_UNLOCK:
2559                 RETURN(ll_put_grouplock(inode, file, arg));
2560         case IOC_OBD_STATFS:
2561                 RETURN(ll_obd_statfs(inode, (void *)arg));
2562
2563         /* We need to special case any other ioctls we want to handle,
2564          * to send them to the MDS/OST as appropriate and to properly
2565          * network encode the arg field.
2566         case EXT3_IOC_SETVERSION_OLD:
2567         case EXT3_IOC_SETVERSION:
2568         */
2569         case LL_IOC_FLUSHCTX:
2570                 RETURN(ll_flush_ctx(inode));
2571         default: {
2572                 int err;
2573
2574                 if (LLIOC_STOP == 
2575                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2576                         RETURN(err);
2577
2578                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2579                                      (void *)arg));
2580         }
2581         }
2582 }
2583
2584 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2585 {
2586         struct inode *inode = file->f_dentry->d_inode;
2587         struct ll_inode_info *lli = ll_i2info(inode);
2588         struct lov_stripe_md *lsm = lli->lli_smd;
2589         loff_t retval;
2590         ENTRY;
2591         retval = offset + ((origin == 2) ? i_size_read(inode) :
2592                            (origin == 1) ? file->f_pos : 0);
2593         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2594                inode->i_ino, inode->i_generation, inode, retval, retval,
2595                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2596         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2597
2598         if (origin == 2) { /* SEEK_END */
2599                 int nonblock = 0, rc;
2600
2601                 if (file->f_flags & O_NONBLOCK)
2602                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2603
2604                 if (lsm != NULL) {
2605                         rc = ll_glimpse_size(inode, nonblock);
2606                         if (rc != 0)
2607                                 RETURN(rc);
2608                 }
2609
2610                 ll_inode_size_lock(inode, 0);
2611                 offset += i_size_read(inode);
2612                 ll_inode_size_unlock(inode, 0);
2613         } else if (origin == 1) { /* SEEK_CUR */
2614                 offset += file->f_pos;
2615         }
2616
2617         retval = -EINVAL;
2618         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2619                 if (offset != file->f_pos) {
2620                         file->f_pos = offset;
2621 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2622                         file->f_reada = 0;
2623                         file->f_version = ++event;
2624 #endif
2625                 }
2626                 retval = offset;
2627         }
2628         
2629         RETURN(retval);
2630 }
2631
2632 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2633 {
2634         struct inode *inode = dentry->d_inode;
2635         struct ll_inode_info *lli = ll_i2info(inode);
2636         struct lov_stripe_md *lsm = lli->lli_smd;
2637         struct ptlrpc_request *req;
2638         struct obd_capa *oc;
2639         int rc, err;
2640         ENTRY;
2641         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2642                inode->i_generation, inode);
2643         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2644
2645         /* fsync's caller has already called _fdata{sync,write}, we want
2646          * that IO to finish before calling the osc and mdc sync methods */
2647         rc = filemap_fdatawait(inode->i_mapping);
2648
2649         /* catch async errors that were recorded back when async writeback
2650          * failed for pages in this mapping. */
2651         err = lli->lli_async_rc;
2652         lli->lli_async_rc = 0;
2653         if (rc == 0)
2654                 rc = err;
2655         if (lsm) {
2656                 err = lov_test_and_clear_async_rc(lsm);
2657                 if (rc == 0)
2658                         rc = err;
2659         }
2660
2661         oc = ll_mdscapa_get(inode);
2662         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2663                       &req);
2664         capa_put(oc);
2665         if (!rc)
2666                 rc = err;
2667         if (!err)
2668                 ptlrpc_req_finished(req);
2669
2670         if (data && lsm) {
2671                 struct obdo *oa;
2672                 
2673                 OBDO_ALLOC(oa);
2674                 if (!oa)
2675                         RETURN(rc ? rc : -ENOMEM);
2676
2677                 oa->o_id = lsm->lsm_object_id;
2678                 oa->o_gr = lsm->lsm_object_gr;
2679                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2680                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2681                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2682                                            OBD_MD_FLGROUP);
2683
2684                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2685                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2686                                0, OBD_OBJECT_EOF, oc);
2687                 capa_put(oc);
2688                 if (!rc)
2689                         rc = err;
2690                 OBDO_FREE(oa);
2691         }
2692
2693         RETURN(rc);
2694 }
2695
2696 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2697 {
2698         struct inode *inode = file->f_dentry->d_inode;
2699         struct ll_sb_info *sbi = ll_i2sbi(inode);
2700         struct ldlm_res_id res_id =
2701                 { .name = { fid_seq(ll_inode2fid(inode)),
2702                             fid_oid(ll_inode2fid(inode)),
2703                             fid_ver(ll_inode2fid(inode)),
2704                             LDLM_FLOCK} };
2705         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2706                 ldlm_flock_completion_ast, NULL, file_lock };
2707         struct lustre_handle lockh = {0};
2708         ldlm_policy_data_t flock;
2709         int flags = 0;
2710         int rc;
2711         ENTRY;
2712
2713         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2714                inode->i_ino, file_lock);
2715
2716         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2717  
2718         if (file_lock->fl_flags & FL_FLOCK) {
2719                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2720                 /* set missing params for flock() calls */
2721                 file_lock->fl_end = OFFSET_MAX;
2722                 file_lock->fl_pid = current->tgid;
2723         }
2724         flock.l_flock.pid = file_lock->fl_pid;
2725         flock.l_flock.start = file_lock->fl_start;
2726         flock.l_flock.end = file_lock->fl_end;
2727
2728         switch (file_lock->fl_type) {
2729         case F_RDLCK:
2730                 einfo.ei_mode = LCK_PR;
2731                 break;
2732         case F_UNLCK:
2733                 /* An unlock request may or may not have any relation to
2734                  * existing locks so we may not be able to pass a lock handle
2735                  * via a normal ldlm_lock_cancel() request. The request may even
2736                  * unlock a byte range in the middle of an existing lock. In
2737                  * order to process an unlock request we need all of the same
2738                  * information that is given with a normal read or write record
2739                  * lock request. To avoid creating another ldlm unlock (cancel)
2740                  * message we'll treat a LCK_NL flock request as an unlock. */
2741                 einfo.ei_mode = LCK_NL;
2742                 break;
2743         case F_WRLCK:
2744                 einfo.ei_mode = LCK_PW;
2745                 break;
2746         default:
2747                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2748                 LBUG();
2749         }
2750
2751         switch (cmd) {
2752         case F_SETLKW:
2753 #ifdef F_SETLKW64
2754         case F_SETLKW64:
2755 #endif
2756                 flags = 0;
2757                 break;
2758         case F_SETLK:
2759 #ifdef F_SETLK64
2760         case F_SETLK64:
2761 #endif
2762                 flags = LDLM_FL_BLOCK_NOWAIT;
2763                 break;
2764         case F_GETLK:
2765 #ifdef F_GETLK64
2766         case F_GETLK64:
2767 #endif
2768                 flags = LDLM_FL_TEST_LOCK;
2769                 /* Save the old mode so that if the mode in the lock changes we
2770                  * can decrement the appropriate reader or writer refcount. */
2771                 file_lock->fl_type = einfo.ei_mode;
2772                 break;
2773         default:
2774                 CERROR("unknown fcntl lock command: %d\n", cmd);
2775                 LBUG();
2776         }
2777
2778         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2779                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2780                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2781
2782         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2783                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2784         if ((file_lock->fl_flags & FL_FLOCK) &&
2785             (rc == 0 || file_lock->fl_type == F_UNLCK))
2786                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2787 #ifdef HAVE_F_OP_FLOCK
2788         if ((file_lock->fl_flags & FL_POSIX) &&
2789             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2790             !(flags & LDLM_FL_TEST_LOCK))
2791                 posix_lock_file_wait(file, file_lock);
2792 #endif
2793
2794         RETURN(rc);
2795 }
2796
2797 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2798 {
2799         ENTRY;
2800
2801         RETURN(-ENOSYS);
2802 }
2803
2804 int ll_have_md_lock(struct inode *inode, __u64 bits)
2805 {
2806         struct lustre_handle lockh;
2807         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2808         struct lu_fid *fid;
2809         int flags;
2810         ENTRY;
2811
2812         if (!inode)
2813                RETURN(0);
2814
2815         fid = &ll_i2info(inode)->lli_fid;
2816         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2817
2818         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2819         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2820                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2821                 RETURN(1);
2822         }
2823         RETURN(0);
2824 }
2825
2826 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2827                             struct lustre_handle *lockh)
2828 {
2829         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2830         struct lu_fid *fid;
2831         ldlm_mode_t rc;
2832         int flags;
2833         ENTRY;
2834
2835         fid = &ll_i2info(inode)->lli_fid;
2836         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2837
2838         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2839         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2840                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2841         RETURN(rc);
2842 }
2843
2844 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2845         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2846                               * and return success */
2847                 inode->i_nlink = 0;
2848                 /* This path cannot be hit for regular files unless in
2849                  * case of obscure races, so no need to to validate
2850                  * size. */
2851                 if (!S_ISREG(inode->i_mode) &&
2852                     !S_ISDIR(inode->i_mode))
2853                         return 0;
2854         }
2855
2856         if (rc) {
2857                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2858                 return -abs(rc);
2859
2860         }
2861
2862         return 0;
2863 }
2864
2865 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2866 {
2867         struct inode *inode = dentry->d_inode;
2868         struct ptlrpc_request *req = NULL;
2869         struct ll_sb_info *sbi;
2870         struct obd_export *exp;
2871         int rc;
2872         ENTRY;
2873
2874         if (!inode) {
2875                 CERROR("REPORT THIS LINE TO PETER\n");
2876                 RETURN(0);
2877         }
2878         sbi = ll_i2sbi(inode);
2879
2880         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2881                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2882
2883         exp = ll_i2mdexp(inode);
2884
2885         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2886                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2887                 struct md_op_data *op_data;
2888
2889                 /* Call getattr by fid, so do not provide name at all. */
2890                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2891                                              dentry->d_inode, NULL, 0, 0,
2892                                              LUSTRE_OPC_ANY, NULL);
2893                 if (IS_ERR(op_data))
2894                         RETURN(PTR_ERR(op_data));
2895
2896                 oit.it_flags |= O_CHECK_STALE;
2897                 rc = md_intent_lock(exp, op_data, NULL, 0,
2898                                     /* we are not interested in name
2899                                        based lookup */
2900                                     &oit, 0, &req,
2901                                     ll_md_blocking_ast, 0);
2902                 ll_finish_md_op_data(op_data);
2903                 oit.it_flags &= ~O_CHECK_STALE;
2904                 if (rc < 0) {
2905                         rc = ll_inode_revalidate_fini(inode, rc);
2906                         GOTO (out, rc);
2907                 }
2908
2909                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2910                 if (rc != 0) {
2911                         ll_intent_release(&oit);
2912                         GOTO(out, rc);
2913                 }
2914
2915                 /* Unlinked? Unhash dentry, so it is not picked up later by
2916                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2917                    here to preserve get_cwd functionality on 2.6.
2918                    Bug 10503 */
2919                 if (!dentry->d_inode->i_nlink) {
2920                         spin_lock(&dcache_lock);
2921                         ll_drop_dentry(dentry);
2922                         spin_unlock(&dcache_lock);
2923                 }
2924
2925                 ll_lookup_finish_locks(&oit, dentry);
2926         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
2927                                                      MDS_INODELOCK_LOOKUP)) {
2928                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2929                 obd_valid valid = OBD_MD_FLGETATTR;
2930                 struct obd_capa *oc;
2931                 int ealen = 0;
2932
2933                 if (S_ISREG(inode->i_mode)) {
2934                         rc = ll_get_max_mdsize(sbi, &ealen);
2935                         if (rc)
2936                                 RETURN(rc);
2937                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2938                 }
2939                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2940                  * capa for this inode. Because we only keep capas of dirs
2941                  * fresh. */
2942                 oc = ll_mdscapa_get(inode);
2943                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2944                                 ealen, &req);
2945                 capa_put(oc);
2946                 if (rc) {
2947                         rc = ll_inode_revalidate_fini(inode, rc);
2948                         RETURN(rc);
2949                 }
2950
2951                 rc = ll_prep_inode(&inode, req, NULL);
2952                 if (rc)
2953                         GOTO(out, rc);
2954         }
2955
2956         /* if object not yet allocated, don't validate size */
2957         if (ll_i2info(inode)->lli_smd == NULL)
2958                 GOTO(out, rc = 0);
2959
2960         /* ll_glimpse_size will prefer locally cached writes if they extend
2961          * the file */
2962         rc = ll_glimpse_size(inode, 0);
2963         EXIT;
2964 out:
2965         ptlrpc_req_finished(req);
2966         return rc;
2967 }
2968
2969 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2970                   struct lookup_intent *it, struct kstat *stat)
2971 {
2972         struct inode *inode = de->d_inode;
2973         int res = 0;
2974
2975         res = ll_inode_revalidate_it(de, it);
2976         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2977
2978         if (res)
2979                 return res;
2980
2981         stat->dev = inode->i_sb->s_dev;
2982         stat->ino = inode->i_ino;
2983         stat->mode = inode->i_mode;
2984         stat->nlink = inode->i_nlink;
2985         stat->uid = inode->i_uid;
2986         stat->gid = inode->i_gid;
2987         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2988         stat->atime = inode->i_atime;
2989         stat->mtime = inode->i_mtime;
2990         stat->ctime = inode->i_ctime;
2991 #ifdef HAVE_INODE_BLKSIZE
2992         stat->blksize = inode->i_blksize;
2993 #else
2994         stat->blksize = 1 << inode->i_blkbits;
2995 #endif
2996
2997         ll_inode_size_lock(inode, 0);
2998         stat->size = i_size_read(inode);
2999         stat->blocks = inode->i_blocks;
3000         ll_inode_size_unlock(inode, 0);
3001
3002         return 0;
3003 }
3004 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3005 {
3006         struct lookup_intent it = { .it_op = IT_GETATTR };
3007
3008         return ll_getattr_it(mnt, de, &it, stat);
3009 }
3010
3011 static
3012 int lustre_check_acl(struct inode *inode, int mask)
3013 {
3014 #ifdef CONFIG_FS_POSIX_ACL
3015         struct ll_inode_info *lli = ll_i2info(inode);
3016         struct posix_acl *acl;
3017         int rc;
3018         ENTRY;
3019
3020         spin_lock(&lli->lli_lock);
3021         acl = posix_acl_dup(lli->lli_posix_acl);
3022         spin_unlock(&lli->lli_lock);
3023
3024         if (!acl)
3025                 RETURN(-EAGAIN);
3026
3027         rc = posix_acl_permission(inode, acl, mask);
3028         posix_acl_release(acl);
3029
3030         RETURN(rc);
3031 #else
3032         return -EAGAIN;
3033 #endif
3034 }
3035
3036 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
3037 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3038 {
3039         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3040                inode->i_ino, inode->i_generation, inode, mask);
3041         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3042                 return lustre_check_remote_perm(inode, mask);
3043         
3044         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3045         return generic_permission(inode, mask, lustre_check_acl);
3046 }
3047 #else
3048 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3049 {
3050         int mode = inode->i_mode;
3051         int rc;
3052
3053         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3054                inode->i_ino, inode->i_generation, inode, mask);
3055
3056         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3057                 return lustre_check_remote_perm(inode, mask);
3058
3059         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3060
3061         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
3062             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
3063                 return -EROFS;
3064         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
3065                 return -EACCES;
3066         if (current->fsuid == inode->i_uid) {
3067                 mode >>= 6;
3068         } else if (1) {
3069                 if (((mode >> 3) & mask & S_IRWXO) != mask)
3070                         goto check_groups;
3071                 rc = lustre_check_acl(inode, mask);
3072                 if (rc == -EAGAIN)
3073                         goto check_groups;
3074                 if (rc == -EACCES)
3075                         goto check_capabilities;
3076                 return rc;
3077         } else {
3078 check_groups:
3079                 if (in_group_p(inode->i_gid))
3080                         mode >>= 3;
3081         }
3082         if ((mode & mask & S_IRWXO) == mask)
3083                 return 0;
3084
3085 check_capabilities:
3086         if (!(mask & MAY_EXEC) ||
3087             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
3088                 if (capable(CAP_DAC_OVERRIDE))
3089                         return 0;
3090
3091         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
3092             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
3093                 return 0;
3094         
3095         return -EACCES;
3096 }
3097 #endif
3098
3099 /* -o localflock - only provides locally consistent flock locks */
3100 struct file_operations ll_file_operations = {
3101         .read           = ll_file_read,
3102         .write          = ll_file_write,
3103         .ioctl          = ll_file_ioctl,
3104         .open           = ll_file_open,
3105         .release        = ll_file_release,
3106         .mmap           = ll_file_mmap,
3107         .llseek         = ll_file_seek,
3108         .sendfile       = ll_file_sendfile,
3109         .fsync          = ll_fsync,
3110 };
3111
3112 struct file_operations ll_file_operations_flock = {
3113         .read           = ll_file_read,
3114         .write          = ll_file_write,
3115         .ioctl          = ll_file_ioctl,
3116         .open           = ll_file_open,
3117         .release        = ll_file_release,
3118         .mmap           = ll_file_mmap,
3119         .llseek         = ll_file_seek,
3120         .sendfile       = ll_file_sendfile,
3121         .fsync          = ll_fsync,
3122 #ifdef HAVE_F_OP_FLOCK
3123         .flock          = ll_file_flock,
3124 #endif
3125         .lock           = ll_file_flock
3126 };
3127
3128 /* These are for -o noflock - to return ENOSYS on flock calls */
3129 struct file_operations ll_file_operations_noflock = {
3130         .read           = ll_file_read,
3131         .write          = ll_file_write,
3132         .ioctl          = ll_file_ioctl,
3133         .open           = ll_file_open,
3134         .release        = ll_file_release,
3135         .mmap           = ll_file_mmap,
3136         .llseek         = ll_file_seek,
3137         .sendfile       = ll_file_sendfile,
3138         .fsync          = ll_fsync,
3139 #ifdef HAVE_F_OP_FLOCK
3140         .flock          = ll_file_noflock,
3141 #endif
3142         .lock           = ll_file_noflock
3143 };
3144
3145 struct inode_operations ll_file_inode_operations = {
3146 #ifdef HAVE_VFS_INTENT_PATCHES
3147         .setattr_raw    = ll_setattr_raw,
3148 #endif
3149         .setattr        = ll_setattr,
3150         .truncate       = ll_truncate,
3151         .getattr        = ll_getattr,
3152         .permission     = ll_inode_permission,
3153         .setxattr       = ll_setxattr,
3154         .getxattr       = ll_getxattr,
3155         .listxattr      = ll_listxattr,
3156         .removexattr    = ll_removexattr,
3157 };
3158
3159 /* dynamic ioctl number support routins */
3160 static struct llioc_ctl_data {
3161         struct rw_semaphore ioc_sem;
3162         struct list_head    ioc_head;
3163 } llioc = { 
3164         __RWSEM_INITIALIZER(llioc.ioc_sem), 
3165         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3166 };
3167
3168
3169 struct llioc_data {
3170         struct list_head        iocd_list;
3171         unsigned int            iocd_size;
3172         llioc_callback_t        iocd_cb;
3173         unsigned int            iocd_count;
3174         unsigned int            iocd_cmd[0];
3175 };
3176
3177 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3178 {
3179         unsigned int size;
3180         struct llioc_data *in_data = NULL;
3181         ENTRY;
3182
3183         if (cb == NULL || cmd == NULL ||
3184             count > LLIOC_MAX_CMD || count < 0)
3185                 RETURN(NULL);
3186
3187         size = sizeof(*in_data) + count * sizeof(unsigned int);
3188         OBD_ALLOC(in_data, size);
3189         if (in_data == NULL)
3190                 RETURN(NULL);
3191
3192         memset(in_data, 0, sizeof(*in_data));
3193         in_data->iocd_size = size;
3194         in_data->iocd_cb = cb;
3195         in_data->iocd_count = count;
3196         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3197
3198         down_write(&llioc.ioc_sem);
3199         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3200         up_write(&llioc.ioc_sem);
3201
3202         RETURN(in_data);
3203 }
3204
3205 void ll_iocontrol_unregister(void *magic)
3206 {
3207         struct llioc_data *tmp;
3208
3209         if (magic == NULL)
3210                 return;
3211
3212         down_write(&llioc.ioc_sem);
3213         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3214                 if (tmp == magic) {
3215                         unsigned int size = tmp->iocd_size;
3216
3217                         list_del(&tmp->iocd_list);
3218                         up_write(&llioc.ioc_sem);
3219
3220                         OBD_FREE(tmp, size);
3221                         return;
3222                 }
3223         }
3224         up_write(&llioc.ioc_sem);
3225
3226         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3227 }
3228
3229 EXPORT_SYMBOL(ll_iocontrol_register);
3230 EXPORT_SYMBOL(ll_iocontrol_unregister);
3231
3232 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
3233                         unsigned int cmd, unsigned long arg, int *rcp)
3234 {
3235         enum llioc_iter ret = LLIOC_CONT;
3236         struct llioc_data *data;
3237         int rc = -EINVAL, i;
3238
3239         down_read(&llioc.ioc_sem);
3240         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3241                 for (i = 0; i < data->iocd_count; i++) {
3242                         if (cmd != data->iocd_cmd[i]) 
3243                                 continue;
3244
3245                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3246                         break;
3247                 }
3248
3249                 if (ret == LLIOC_STOP)
3250                         break;
3251         }
3252         up_read(&llioc.ioc_sem);
3253
3254         if (rcp)
3255                 *rcp = rc;
3256         return ret;
3257 }