Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <lustre_mdc.h>
47 #include <linux/pagemap.h>
48 #include <linux/file.h>
49 #include "llite_internal.h"
50
51 /* also used by llite/special.c:ll_special_open() */
52 struct ll_file_data *ll_file_data_get(void)
53 {
54         struct ll_file_data *fd;
55
56         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
57         return fd;
58 }
59
60 static void ll_file_data_put(struct ll_file_data *fd)
61 {
62         if (fd != NULL)
63                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
64 }
65
66 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
67                           struct lustre_handle *fh)
68 {
69         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
70         op_data->op_attr.ia_mode = inode->i_mode;
71         op_data->op_attr.ia_atime = inode->i_atime;
72         op_data->op_attr.ia_mtime = inode->i_mtime;
73         op_data->op_attr.ia_ctime = inode->i_ctime;
74         op_data->op_attr.ia_size = i_size_read(inode);
75         op_data->op_attr_blocks = inode->i_blocks;
76         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
77         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
78         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
79         op_data->op_capa1 = ll_mdscapa_get(inode);
80 }
81
82 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
83                              struct obd_client_handle *och)
84 {
85         ENTRY;
86
87         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
88                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
89
90         if (!(och->och_flags & FMODE_WRITE))
91                 goto out;
92
93         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
94             !S_ISREG(inode->i_mode))
95                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
96         else
97                 ll_epoch_close(inode, op_data, &och, 0);
98
99 out:
100         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
101         EXIT;
102 }
103
104 static int ll_close_inode_openhandle(struct obd_export *md_exp,
105                                      struct inode *inode,
106                                      struct obd_client_handle *och)
107 {
108         struct obd_export *exp = ll_i2mdexp(inode);
109         struct md_op_data *op_data;
110         struct ptlrpc_request *req = NULL;
111         struct obd_device *obd = class_exp2obd(exp);
112         int epoch_close = 1;
113         int seq_end = 0, rc;
114         ENTRY;
115
116         if (obd == NULL) {
117                 /*
118                  * XXX: in case of LMV, is this correct to access
119                  * ->exp_handle?
120                  */
121                 CERROR("Invalid MDC connection handle "LPX64"\n",
122                        ll_i2mdexp(inode)->exp_handle.h_cookie);
123                 GOTO(out, rc = 0);
124         }
125
126         /*
127          * here we check if this is forced umount. If so this is called on
128          * canceling "open lock" and we do not call md_close() in this case, as
129          * it will not be successful, as import is already deactivated.
130          */
131         if (obd->obd_force)
132                 GOTO(out, rc = 0);
133
134         OBD_ALLOC_PTR(op_data);
135         if (op_data == NULL)
136                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
137
138         ll_prepare_close(inode, op_data, och);
139         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
140         rc = md_close(md_exp, op_data, och->och_mod, &req);
141         if (rc != -EAGAIN)
142                 seq_end = 1;
143
144         if (rc == -EAGAIN) {
145                 /* This close must have the epoch closed. */
146                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
147                 LASSERT(epoch_close);
148                 /* MDS has instructed us to obtain Size-on-MDS attribute from
149                  * OSTs and send setattr to back to MDS. */
150                 rc = ll_sizeonmds_update(inode, och->och_mod,
151                                          &och->och_fh, op_data->op_ioepoch);
152                 if (rc) {
153                         CERROR("inode %lu mdc Size-on-MDS update failed: "
154                                "rc = %d\n", inode->i_ino, rc);
155                         rc = 0;
156                 }
157         } else if (rc) {
158                 CERROR("inode %lu mdc close failed: rc = %d\n",
159                        inode->i_ino, rc);
160         }
161         ll_finish_md_op_data(op_data);
162
163         if (rc == 0) {
164                 rc = ll_objects_destroy(req, inode);
165                 if (rc)
166                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
167                                inode->i_ino, rc);
168         }
169
170         EXIT;
171 out:
172       
173         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
174             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
175                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
176         } else {
177                 if (seq_end)
178                         ptlrpc_close_replay_seq(req);
179                 md_clear_open_replay_data(md_exp, och);
180                 /* Free @och if it is not waiting for DONE_WRITING. */
181                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
182                 OBD_FREE_PTR(och);
183         }
184         if (req) /* This is close request */
185                 ptlrpc_req_finished(req);
186         return rc;
187 }
188
189 int ll_md_real_close(struct inode *inode, int flags)
190 {
191         struct ll_inode_info *lli = ll_i2info(inode);
192         struct obd_client_handle **och_p;
193         struct obd_client_handle *och;
194         __u64 *och_usecount;
195         int rc = 0;
196         ENTRY;
197
198         if (flags & FMODE_WRITE) {
199                 och_p = &lli->lli_mds_write_och;
200                 och_usecount = &lli->lli_open_fd_write_count;
201         } else if (flags & FMODE_EXEC) {
202                 och_p = &lli->lli_mds_exec_och;
203                 och_usecount = &lli->lli_open_fd_exec_count;
204         } else {
205                 LASSERT(flags & FMODE_READ);
206                 och_p = &lli->lli_mds_read_och;
207                 och_usecount = &lli->lli_open_fd_read_count;
208         }
209
210         down(&lli->lli_och_sem);
211         if (*och_usecount) { /* There are still users of this handle, so
212                                 skip freeing it. */
213                 up(&lli->lli_och_sem);
214                 RETURN(0);
215         }
216         och=*och_p;
217         *och_p = NULL;
218         up(&lli->lli_och_sem);
219
220         if (och) { /* There might be a race and somebody have freed this och
221                       already */
222                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
223                                                inode, och);
224         }
225
226         RETURN(rc);
227 }
228
229 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
230                 struct file *file)
231 {
232         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
233         struct ll_inode_info *lli = ll_i2info(inode);
234         int rc = 0;
235         ENTRY;
236
237         /* clear group lock, if present */
238         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
239                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
240                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
241                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
242                                       &fd->fd_cwlockh);
243         }
244
245         /* Let's see if we have good enough OPEN lock on the file and if
246            we can skip talking to MDS */
247         if (file->f_dentry->d_inode) { /* Can this ever be false? */
248                 int lockmode;
249                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
250                 struct lustre_handle lockh;
251                 struct inode *inode = file->f_dentry->d_inode;
252                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
253
254                 down(&lli->lli_och_sem);
255                 if (fd->fd_omode & FMODE_WRITE) {
256                         lockmode = LCK_CW;
257                         LASSERT(lli->lli_open_fd_write_count);
258                         lli->lli_open_fd_write_count--;
259                 } else if (fd->fd_omode & FMODE_EXEC) {
260                         lockmode = LCK_PR;
261                         LASSERT(lli->lli_open_fd_exec_count);
262                         lli->lli_open_fd_exec_count--;
263                 } else {
264                         lockmode = LCK_CR;
265                         LASSERT(lli->lli_open_fd_read_count);
266                         lli->lli_open_fd_read_count--;
267                 }
268                 up(&lli->lli_och_sem);
269
270                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
271                                    LDLM_IBITS, &policy, lockmode,
272                                    &lockh)) {
273                         rc = ll_md_real_close(file->f_dentry->d_inode,
274                                               fd->fd_omode);
275                 }
276         } else {
277                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
278                        file, file->f_dentry, file->f_dentry->d_name.name);
279         }
280
281         LUSTRE_FPRIVATE(file) = NULL;
282         ll_file_data_put(fd);
283         ll_capa_close(inode);
284
285         RETURN(rc);
286 }
287
288 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
289
290 /* While this returns an error code, fput() the caller does not, so we need
291  * to make every effort to clean up all of our state here.  Also, applications
292  * rarely check close errors and even if an error is returned they will not
293  * re-try the close call.
294  */
295 int ll_file_release(struct inode *inode, struct file *file)
296 {
297         struct ll_file_data *fd;
298         struct ll_sb_info *sbi = ll_i2sbi(inode);
299         struct ll_inode_info *lli = ll_i2info(inode);
300         struct lov_stripe_md *lsm = lli->lli_smd;
301         int rc;
302
303         ENTRY;
304         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
305                inode->i_generation, inode);
306
307 #ifdef CONFIG_FS_POSIX_ACL
308         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
309             inode == inode->i_sb->s_root->d_inode) {
310                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
311
312                 LASSERT(fd != NULL);
313                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
314                         fd->fd_flags &= ~LL_FILE_RMTACL;
315                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
316                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
317                 }
318         }
319 #endif
320
321         if (inode->i_sb->s_root != file->f_dentry)
322                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
323         fd = LUSTRE_FPRIVATE(file);
324         LASSERT(fd != NULL);
325
326         /* The last ref on @file, maybe not the the owner pid of statahead.
327          * Different processes can open the same dir, "ll_opendir_key" means:
328          * it is me that should stop the statahead thread. */
329         if (lli->lli_opendir_key == fd)
330                 ll_stop_statahead(inode, fd);
331
332         if (inode->i_sb->s_root == file->f_dentry) {
333                 LUSTRE_FPRIVATE(file) = NULL;
334                 ll_file_data_put(fd);
335                 RETURN(0);
336         }
337         
338         if (lsm)
339                 lov_test_and_clear_async_rc(lsm);
340         lli->lli_async_rc = 0;
341
342         rc = ll_md_close(sbi->ll_md_exp, inode, file);
343         RETURN(rc);
344 }
345
346 static int ll_intent_file_open(struct file *file, void *lmm,
347                                int lmmsize, struct lookup_intent *itp)
348 {
349         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
350         struct dentry *parent = file->f_dentry->d_parent;
351         const char *name = file->f_dentry->d_name.name;
352         const int len = file->f_dentry->d_name.len;
353         struct md_op_data *op_data;
354         struct ptlrpc_request *req;
355         int rc;
356         ENTRY;
357
358         if (!parent)
359                 RETURN(-ENOENT);
360
361         /* Usually we come here only for NFSD, and we want open lock.
362            But we can also get here with pre 2.6.15 patchless kernels, and in
363            that case that lock is also ok */
364         /* We can also get here if there was cached open handle in revalidate_it
365          * but it disappeared while we were getting from there to ll_file_open.
366          * But this means this file was closed and immediatelly opened which
367          * makes a good candidate for using OPEN lock */
368         /* If lmmsize & lmm are not 0, we are just setting stripe info
369          * parameters. No need for the open lock */
370         if (!lmm && !lmmsize)
371                 itp->it_flags |= MDS_OPEN_LOCK;
372
373         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
374                                       file->f_dentry->d_inode, name, len,
375                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
376         if (IS_ERR(op_data))
377                 RETURN(PTR_ERR(op_data));
378
379         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
380                             0 /*unused */, &req, ll_md_blocking_ast, 0);
381         ll_finish_md_op_data(op_data);
382         if (rc == -ESTALE) {
383                 /* reason for keep own exit path - don`t flood log
384                 * with messages with -ESTALE errors.
385                 */
386                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
387                      it_open_error(DISP_OPEN_OPEN, itp))
388                         GOTO(out, rc);
389                 ll_release_openhandle(file->f_dentry, itp);
390                 GOTO(out_stale, rc);
391         }
392
393         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
394                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
395                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
396                 GOTO(out, rc);
397         }
398
399         if (itp->d.lustre.it_lock_mode)
400                 md_set_lock_data(sbi->ll_md_exp,
401                                  &itp->d.lustre.it_lock_handle, 
402                                  file->f_dentry->d_inode);
403
404         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
405 out:
406         ptlrpc_req_finished(itp->d.lustre.it_data);
407
408 out_stale:
409         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
410         ll_intent_drop_lock(itp);
411
412         RETURN(rc);
413 }
414
415 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
416                        struct lookup_intent *it, struct obd_client_handle *och)
417 {
418         struct ptlrpc_request *req = it->d.lustre.it_data;
419         struct mdt_body *body;
420
421         LASSERT(och);
422
423         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
424         LASSERT(body != NULL);                      /* reply already checked out */
425
426         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
427         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
428         och->och_fid = lli->lli_fid;
429         och->och_flags = it->it_flags;
430         lli->lli_ioepoch = body->ioepoch;
431
432         return md_set_open_replay_data(md_exp, och, req);
433 }
434
435 int ll_local_open(struct file *file, struct lookup_intent *it,
436                   struct ll_file_data *fd, struct obd_client_handle *och)
437 {
438         struct inode *inode = file->f_dentry->d_inode;
439         struct ll_inode_info *lli = ll_i2info(inode);
440         ENTRY;
441
442         LASSERT(!LUSTRE_FPRIVATE(file));
443
444         LASSERT(fd != NULL);
445
446         if (och) {
447                 struct ptlrpc_request *req = it->d.lustre.it_data;
448                 struct mdt_body *body;
449                 int rc;
450
451                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
452                 if (rc)
453                         RETURN(rc);
454
455                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
456                 if ((it->it_flags & FMODE_WRITE) &&
457                     (body->valid & OBD_MD_FLSIZE))
458                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
459                                lli->lli_ioepoch, PFID(&lli->lli_fid));
460         }
461
462         LUSTRE_FPRIVATE(file) = fd;
463         ll_readahead_init(inode, &fd->fd_ras);
464         fd->fd_omode = it->it_flags;
465         RETURN(0);
466 }
467
468 /* Open a file, and (for the very first open) create objects on the OSTs at
469  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
470  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
471  * lli_open_sem to ensure no other process will create objects, send the
472  * stripe MD to the MDS, or try to destroy the objects if that fails.
473  *
474  * If we already have the stripe MD locally then we don't request it in
475  * md_open(), by passing a lmm_size = 0.
476  *
477  * It is up to the application to ensure no other processes open this file
478  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
479  * used.  We might be able to avoid races of that sort by getting lli_open_sem
480  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
481  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
482  */
483 int ll_file_open(struct inode *inode, struct file *file)
484 {
485         struct ll_inode_info *lli = ll_i2info(inode);
486         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
487                                           .it_flags = file->f_flags };
488         struct lov_stripe_md *lsm;
489         struct ptlrpc_request *req = NULL;
490         struct obd_client_handle **och_p;
491         __u64 *och_usecount;
492         struct ll_file_data *fd;
493         int rc = 0, opendir_set = 0;
494         ENTRY;
495
496         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
497                inode->i_generation, inode, file->f_flags);
498
499 #ifdef HAVE_VFS_INTENT_PATCHES
500         it = file->f_it;
501 #else
502         it = file->private_data; /* XXX: compat macro */
503         file->private_data = NULL; /* prevent ll_local_open assertion */
504 #endif
505
506         fd = ll_file_data_get();
507         if (fd == NULL)
508                 RETURN(-ENOMEM);
509
510         if (S_ISDIR(inode->i_mode)) {
511                 spin_lock(&lli->lli_lock);
512                 /* "lli->lli_opendir_pid != 0" means someone has set it.
513                  * "lli->lli_sai != NULL" means the previous statahead has not
514                  *                        been cleanup. */ 
515                 if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) {
516                         opendir_set = 1;
517                         lli->lli_opendir_pid = cfs_curproc_pid();
518                         lli->lli_opendir_key = fd;
519                 } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) {
520                         /* Two cases for this:
521                          * (1) The same process open such directory many times.
522                          * (2) The old process opened the directory, and exited
523                          *     before its children processes. Then new process
524                          *     with the same pid opens such directory before the
525                          *     old process's children processes exit.
526                          * Change the owner to the latest one. */
527                         opendir_set = 2;
528                         lli->lli_opendir_key = fd;
529                 }
530                 spin_unlock(&lli->lli_lock);
531         }
532
533         if (inode->i_sb->s_root == file->f_dentry) {
534                 LUSTRE_FPRIVATE(file) = fd;
535                 RETURN(0);
536         }
537
538         if (!it || !it->d.lustre.it_disposition) {
539                 /* Convert f_flags into access mode. We cannot use file->f_mode,
540                  * because everything but O_ACCMODE mask was stripped from
541                  * there */
542                 if ((oit.it_flags + 1) & O_ACCMODE)
543                         oit.it_flags++;
544                 if (file->f_flags & O_TRUNC)
545                         oit.it_flags |= FMODE_WRITE;
546
547                 /* kernel only call f_op->open in dentry_open.  filp_open calls
548                  * dentry_open after call to open_namei that checks permissions.
549                  * Only nfsd_open call dentry_open directly without checking
550                  * permissions and because of that this code below is safe. */
551                 if (oit.it_flags & FMODE_WRITE)
552                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
553
554                 /* We do not want O_EXCL here, presumably we opened the file
555                  * already? XXX - NFS implications? */
556                 oit.it_flags &= ~O_EXCL;
557
558                 it = &oit;
559         }
560
561 restart:
562         /* Let's see if we have file open on MDS already. */
563         if (it->it_flags & FMODE_WRITE) {
564                 och_p = &lli->lli_mds_write_och;
565                 och_usecount = &lli->lli_open_fd_write_count;
566         } else if (it->it_flags & FMODE_EXEC) {
567                 och_p = &lli->lli_mds_exec_och;
568                 och_usecount = &lli->lli_open_fd_exec_count;
569          } else {
570                 och_p = &lli->lli_mds_read_och;
571                 och_usecount = &lli->lli_open_fd_read_count;
572         }
573         
574         down(&lli->lli_och_sem);
575         if (*och_p) { /* Open handle is present */
576                 if (it_disposition(it, DISP_OPEN_OPEN)) {
577                         /* Well, there's extra open request that we do not need,
578                            let's close it somehow. This will decref request. */
579                         rc = it_open_error(DISP_OPEN_OPEN, it);
580                         if (rc) {
581                                 ll_file_data_put(fd);
582                                 GOTO(out_och_free, rc);
583                         }       
584                         ll_release_openhandle(file->f_dentry, it);
585                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
586                                              LPROC_LL_OPEN);
587                 }
588                 (*och_usecount)++;
589
590                 rc = ll_local_open(file, it, fd, NULL);
591                 if (rc) {
592                         up(&lli->lli_och_sem);
593                         ll_file_data_put(fd);
594                         RETURN(rc);
595                 }
596         } else {
597                 LASSERT(*och_usecount == 0);
598                 if (!it->d.lustre.it_disposition) {
599                         /* We cannot just request lock handle now, new ELC code
600                            means that one of other OPEN locks for this file
601                            could be cancelled, and since blocking ast handler
602                            would attempt to grab och_sem as well, that would
603                            result in a deadlock */
604                         up(&lli->lli_och_sem);
605                         it->it_flags |= O_CHECK_STALE;
606                         rc = ll_intent_file_open(file, NULL, 0, it);
607                         it->it_flags &= ~O_CHECK_STALE;
608                         if (rc) {
609                                 ll_file_data_put(fd);
610                                 GOTO(out_openerr, rc);
611                         }
612
613                         /* Got some error? Release the request */
614                         if (it->d.lustre.it_status < 0) {
615                                 req = it->d.lustre.it_data;
616                                 ptlrpc_req_finished(req);
617                         }
618                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
619                                          &it->d.lustre.it_lock_handle,
620                                          file->f_dentry->d_inode);
621                         goto restart;
622                 }
623                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
624                 if (!*och_p) {
625                         ll_file_data_put(fd);
626                         GOTO(out_och_free, rc = -ENOMEM);
627                 }
628                 (*och_usecount)++;
629                 req = it->d.lustre.it_data;
630
631                 /* md_intent_lock() didn't get a request ref if there was an
632                  * open error, so don't do cleanup on the request here
633                  * (bug 3430) */
634                 /* XXX (green): Should not we bail out on any error here, not
635                  * just open error? */
636                 rc = it_open_error(DISP_OPEN_OPEN, it);
637                 if (rc) {
638                         ll_file_data_put(fd);
639                         GOTO(out_och_free, rc);
640                 }
641
642                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
643                 rc = ll_local_open(file, it, fd, *och_p);
644                 if (rc) {
645                         up(&lli->lli_och_sem);
646                         ll_file_data_put(fd);
647                         GOTO(out_och_free, rc);
648                 }
649         }
650         up(&lli->lli_och_sem);
651
652         /* Must do this outside lli_och_sem lock to prevent deadlock where
653            different kind of OPEN lock for this same inode gets cancelled
654            by ldlm_cancel_lru */
655         if (!S_ISREG(inode->i_mode))
656                 GOTO(out, rc);
657
658         ll_capa_open(inode);
659
660         lsm = lli->lli_smd;
661         if (lsm == NULL) {
662                 if (file->f_flags & O_LOV_DELAY_CREATE ||
663                     !(file->f_mode & FMODE_WRITE)) {
664                         CDEBUG(D_INODE, "object creation was delayed\n");
665                         GOTO(out, rc);
666                 }
667         }
668         file->f_flags &= ~O_LOV_DELAY_CREATE;
669         GOTO(out, rc);
670 out:
671         ptlrpc_req_finished(req);
672         if (req)
673                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
674 out_och_free:
675         if (rc) {
676                 if (*och_p) {
677                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
678                         *och_p = NULL; /* OBD_FREE writes some magic there */
679                         (*och_usecount)--;
680                 }
681                 up(&lli->lli_och_sem);
682 out_openerr:
683                 if (opendir_set == 1) {
684                         lli->lli_opendir_key = NULL;
685                         lli->lli_opendir_pid = 0;
686                 } else if (unlikely(opendir_set == 2)) {
687                         ll_stop_statahead(inode, fd);
688                 }
689         }
690
691         return rc;
692 }
693
694 /* Fills the obdo with the attributes for the inode defined by lsm */
695 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
696 {
697         struct ptlrpc_request_set *set;
698         struct ll_inode_info *lli = ll_i2info(inode);
699         struct lov_stripe_md *lsm = lli->lli_smd;
700
701         struct obd_info oinfo = { { { 0 } } };
702         int rc;
703         ENTRY;
704
705         LASSERT(lsm != NULL);
706
707         oinfo.oi_md = lsm;
708         oinfo.oi_oa = obdo;
709         oinfo.oi_oa->o_id = lsm->lsm_object_id;
710         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
711         oinfo.oi_oa->o_mode = S_IFREG;
712         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
713                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
714                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
715                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
716                                OBD_MD_FLGROUP;
717         oinfo.oi_capa = ll_mdscapa_get(inode);
718
719         set = ptlrpc_prep_set();
720         if (set == NULL) {
721                 CERROR("can't allocate ptlrpc set\n");
722                 rc = -ENOMEM;
723         } else {
724                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
725                 if (rc == 0)
726                         rc = ptlrpc_set_wait(set);
727                 ptlrpc_set_destroy(set);
728         }
729         capa_put(oinfo.oi_capa);
730         if (rc)
731                 RETURN(rc);
732
733         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
734                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
735                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
736
737         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
738         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
739                lli->lli_smd->lsm_object_id, i_size_read(inode),
740                (unsigned long long)inode->i_blocks,
741                (unsigned long)ll_inode_blksize(inode));
742         RETURN(0);
743 }
744
745 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
746 {
747         struct ll_inode_info *lli = ll_i2info(inode);
748         struct lov_stripe_md *lsm = lli->lli_smd;
749         struct obd_export *exp = ll_i2dtexp(inode);
750         struct {
751                 char name[16];
752                 struct ldlm_lock *lock;
753                 struct lov_stripe_md *lsm;
754         } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock, .lsm = lsm };
755         __u32 stripe, vallen = sizeof(stripe);
756         struct lov_oinfo *loinfo;
757         int rc;
758         ENTRY;
759
760         if (lsm->lsm_stripe_count == 1)
761                 GOTO(check, stripe = 0);
762
763         /* get our offset in the lov */
764         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
765         if (rc != 0) {
766                 CERROR("obd_get_info: rc = %d\n", rc);
767                 RETURN(rc);
768         }
769         LASSERT(stripe < lsm->lsm_stripe_count);
770
771 check:
772         loinfo = lsm->lsm_oinfo[stripe];
773         if (!osc_res_name_eq(loinfo->loi_id, loinfo->loi_gr,
774                             &lock->l_resource->lr_name)){
775                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
776                            loinfo->loi_id, loinfo->loi_gr);
777                 RETURN(-ELDLM_NO_LOCK_DATA);
778         }
779
780         RETURN(stripe);
781 }
782
783 /* Get extra page reference to ensure it is not going away */
784 void ll_pin_extent_cb(void *data)
785 {
786         struct page *page = data;
787         
788         page_cache_get(page);
789
790         return;
791 }
792
793 /* Flush the page from page cache for an extent as its canceled.
794  * Page to remove is delivered as @data.
795  *
796  * No one can dirty the extent until we've finished our work and they cannot
797  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
798  * but other kernel actors could have pages locked.
799  *
800  * If @discard is set, there is no need to write the page if it is dirty.
801  *
802  * Called with the DLM lock held. */
803 int ll_page_removal_cb(void *data, int discard)
804 {
805         int rc;
806         struct page *page = data;
807         struct address_space *mapping;
808  
809         ENTRY;
810
811         /* We have page reference already from ll_pin_page */
812         lock_page(page);
813
814         /* Already truncated by somebody */
815         if (!page->mapping)
816                 GOTO(out, rc = 0);
817         mapping = page->mapping;
818
819         ll_teardown_mmaps(mapping,
820                           (__u64)page->index << PAGE_CACHE_SHIFT,
821                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
822                                                               ~PAGE_CACHE_MASK);        
823         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
824
825         if (!discard && clear_page_dirty_for_io(page)) {
826                 LASSERT(page->mapping);
827                 rc = ll_call_writepage(page->mapping->host, page);
828                 /* either waiting for io to complete or reacquiring
829                  * the lock that the failed writepage released */
830                 lock_page(page);
831                 wait_on_page_writeback(page);
832                 if (rc != 0) {
833                         CERROR("writepage inode %lu(%p) of page %p "
834                                "failed: %d\n", mapping->host->i_ino,
835                                mapping->host, page, rc);
836                         if (rc == -ENOSPC)
837                                 set_bit(AS_ENOSPC, &mapping->flags);
838                         else
839                                 set_bit(AS_EIO, &mapping->flags);
840                 }
841                 set_bit(AS_EIO, &mapping->flags);
842         }
843         if (page->mapping != NULL) {
844                 struct ll_async_page *llap = llap_cast_private(page);
845                 /* checking again to account for writeback's lock_page() */
846                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
847                 if (llap)
848                         ll_ra_accounting(llap, page->mapping);
849                 ll_truncate_complete_page(page);
850         }
851         EXIT;
852 out:
853         LASSERT(!PageWriteback(page));
854         unlock_page(page);
855         page_cache_release(page);
856
857         return 0;
858 }
859
860 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
861                              void *data, int flag)
862 {
863         struct inode *inode;
864         struct ll_inode_info *lli;
865         struct lov_stripe_md *lsm;
866         int stripe;
867         __u64 kms;
868
869         ENTRY;
870
871         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
872                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
873                 LBUG();
874         }
875
876         inode = ll_inode_from_lock(lock);
877         if (inode == NULL)
878                 RETURN(0);
879         lli = ll_i2info(inode);
880         if (lli == NULL)
881                 GOTO(iput, 0);
882         if (lli->lli_smd == NULL)
883                 GOTO(iput, 0);
884         lsm = lli->lli_smd;
885
886         stripe = ll_lock_to_stripe_offset(inode, lock);
887         if (stripe < 0)
888                 GOTO(iput, 0);
889
890         lov_stripe_lock(lsm);
891         lock_res_and_lock(lock);
892         kms = ldlm_extent_shift_kms(lock,
893                                     lsm->lsm_oinfo[stripe]->loi_kms);
894
895         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
896                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
897                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
898         lsm->lsm_oinfo[stripe]->loi_kms = kms;
899         unlock_res_and_lock(lock);
900         lov_stripe_unlock(lsm);
901         ll_queue_done_writing(inode, 0);
902         EXIT;
903 iput:
904         iput(inode);
905
906         return 0;
907 }
908
909 #if 0
910 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
911 {
912         /* XXX ALLOCATE - 160 bytes */
913         struct inode *inode = ll_inode_from_lock(lock);
914         struct ll_inode_info *lli = ll_i2info(inode);
915         struct lustre_handle lockh = { 0 };
916         struct ost_lvb *lvb;
917         int stripe;
918         ENTRY;
919
920         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
921                      LDLM_FL_BLOCK_CONV)) {
922                 LBUG(); /* not expecting any blocked async locks yet */
923                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
924                            "lock, returning");
925                 ldlm_lock_dump(D_OTHER, lock, 0);
926                 ldlm_reprocess_all(lock->l_resource);
927                 RETURN(0);
928         }
929
930         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
931
932         stripe = ll_lock_to_stripe_offset(inode, lock);
933         if (stripe < 0)
934                 goto iput;
935
936         if (lock->l_lvb_len) {
937                 struct lov_stripe_md *lsm = lli->lli_smd;
938                 __u64 kms;
939                 lvb = lock->l_lvb_data;
940                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
941
942                 lock_res_and_lock(lock);
943                 ll_inode_size_lock(inode, 1);
944                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
945                 kms = ldlm_extent_shift_kms(NULL, kms);
946                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
947                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
948                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
949                 lsm->lsm_oinfo[stripe].loi_kms = kms;
950                 ll_inode_size_unlock(inode, 1);
951                 unlock_res_and_lock(lock);
952         }
953
954 iput:
955         iput(inode);
956         wake_up(&lock->l_waitq);
957
958         ldlm_lock2handle(lock, &lockh);
959         ldlm_lock_decref(&lockh, LCK_PR);
960         RETURN(0);
961 }
962 #endif
963
964 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
965 {
966         struct ptlrpc_request *req = reqp;
967         struct inode *inode = ll_inode_from_lock(lock);
968         struct ll_inode_info *lli;
969         struct lov_stripe_md *lsm;
970         struct ost_lvb *lvb;
971         int rc, stripe;
972         ENTRY;
973
974         if (inode == NULL)
975                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
976         lli = ll_i2info(inode);
977         if (lli == NULL)
978                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
979         lsm = lli->lli_smd;
980         if (lsm == NULL)
981                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
982
983         /* First, find out which stripe index this lock corresponds to. */
984         stripe = ll_lock_to_stripe_offset(inode, lock);
985         if (stripe < 0)
986                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
987
988         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
989         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
990                              sizeof(*lvb));
991         rc = req_capsule_server_pack(&req->rq_pill);
992         if (rc) {
993                 CERROR("lustre_pack_reply: %d\n", rc);
994                 GOTO(iput, rc);
995         }
996
997         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
998         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
999         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1000         lvb->lvb_atime = LTIME_S(inode->i_atime);
1001         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1002
1003         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1004                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1005                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1006                    lvb->lvb_atime, lvb->lvb_ctime);
1007  iput:
1008         iput(inode);
1009
1010  out:
1011         /* These errors are normal races, so we don't want to fill the console
1012          * with messages by calling ptlrpc_error() */
1013         if (rc == -ELDLM_NO_LOCK_DATA)
1014                 lustre_pack_reply(req, 1, NULL, NULL);
1015
1016         req->rq_status = rc;
1017         return rc;
1018 }
1019
1020 static int ll_merge_lvb(struct inode *inode)
1021 {
1022         struct ll_inode_info *lli = ll_i2info(inode);
1023         struct ll_sb_info *sbi = ll_i2sbi(inode);
1024         struct ost_lvb lvb;
1025         int rc;
1026
1027         ENTRY;
1028
1029         ll_inode_size_lock(inode, 1);
1030         inode_init_lvb(inode, &lvb);
1031         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1032         i_size_write(inode, lvb.lvb_size);
1033         inode->i_blocks = lvb.lvb_blocks;
1034
1035         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1036         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1037         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1038         ll_inode_size_unlock(inode, 1);
1039
1040         RETURN(rc);
1041 }
1042
1043 int ll_local_size(struct inode *inode)
1044 {
1045         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1046         struct ll_inode_info *lli = ll_i2info(inode);
1047         struct ll_sb_info *sbi = ll_i2sbi(inode);
1048         struct lustre_handle lockh = { 0 };
1049         int flags = 0;
1050         int rc;
1051         ENTRY;
1052
1053         if (lli->lli_smd->lsm_stripe_count == 0)
1054                 RETURN(0);
1055
1056         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1057                        &policy, LCK_PR, &flags, inode, &lockh);
1058         if (rc < 0)
1059                 RETURN(rc);
1060         else if (rc == 0)
1061                 RETURN(-ENODATA);
1062
1063         rc = ll_merge_lvb(inode);
1064         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1065         RETURN(rc);
1066 }
1067
1068 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1069                      lstat_t *st)
1070 {
1071         struct lustre_handle lockh = { 0 };
1072         struct ldlm_enqueue_info einfo = { 0 };
1073         struct obd_info oinfo = { { { 0 } } };
1074         struct ost_lvb lvb;
1075         int rc;
1076
1077         ENTRY;
1078
1079         einfo.ei_type = LDLM_EXTENT;
1080         einfo.ei_mode = LCK_PR;
1081         einfo.ei_cb_bl = osc_extent_blocking_cb;
1082         einfo.ei_cb_cp = ldlm_completion_ast;
1083         einfo.ei_cb_gl = ll_glimpse_callback;
1084         einfo.ei_cbdata = NULL;
1085
1086         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1087         oinfo.oi_lockh = &lockh;
1088         oinfo.oi_md = lsm;
1089         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1090
1091         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1092         if (rc == -ENOENT)
1093                 RETURN(rc);
1094         if (rc != 0) {
1095                 CERROR("obd_enqueue returned rc %d, "
1096                        "returning -EIO\n", rc);
1097                 RETURN(rc > 0 ? -EIO : rc);
1098         }
1099
1100         lov_stripe_lock(lsm);
1101         memset(&lvb, 0, sizeof(lvb));
1102         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1103         st->st_size = lvb.lvb_size;
1104         st->st_blocks = lvb.lvb_blocks;
1105         st->st_mtime = lvb.lvb_mtime;
1106         st->st_atime = lvb.lvb_atime;
1107         st->st_ctime = lvb.lvb_ctime;
1108         lov_stripe_unlock(lsm);
1109
1110         RETURN(rc);
1111 }
1112
1113 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1114  * file (because it prefers KMS over RSS when larger) */
1115 int ll_glimpse_size(struct inode *inode, int ast_flags)
1116 {
1117         struct ll_inode_info *lli = ll_i2info(inode);
1118         struct ll_sb_info *sbi = ll_i2sbi(inode);
1119         struct lustre_handle lockh = { 0 };
1120         struct ldlm_enqueue_info einfo = { 0 };
1121         struct obd_info oinfo = { { { 0 } } };
1122         int rc;
1123         ENTRY;
1124
1125         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1126                 RETURN(0);
1127
1128         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1129
1130         if (!lli->lli_smd) {
1131                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1132                 RETURN(0);
1133         }
1134
1135         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1136          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1137          *       won't revoke any conflicting DLM locks held. Instead,
1138          *       ll_glimpse_callback() will be called on each client
1139          *       holding a DLM lock against this file, and resulting size
1140          *       will be returned for each stripe. DLM lock on [0, EOF] is
1141          *       acquired only if there were no conflicting locks. */
1142         einfo.ei_type = LDLM_EXTENT;
1143         einfo.ei_mode = LCK_PR;
1144         einfo.ei_cb_bl = osc_extent_blocking_cb;
1145         einfo.ei_cb_cp = ldlm_completion_ast;
1146         einfo.ei_cb_gl = ll_glimpse_callback;
1147         einfo.ei_cbdata = inode;
1148
1149         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1150         oinfo.oi_lockh = &lockh;
1151         oinfo.oi_md = lli->lli_smd;
1152         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1153
1154         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1155         if (rc == -ENOENT)
1156                 RETURN(rc);
1157         if (rc != 0) {
1158                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1159                 RETURN(rc > 0 ? -EIO : rc);
1160         }
1161
1162         rc = ll_merge_lvb(inode);
1163
1164         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1165                i_size_read(inode), (unsigned long long)inode->i_blocks);
1166
1167         RETURN(rc);
1168 }
1169
1170 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1171                    struct lov_stripe_md *lsm, int mode,
1172                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1173                    int ast_flags)
1174 {
1175         struct ll_sb_info *sbi = ll_i2sbi(inode);
1176         struct ost_lvb lvb;
1177         struct ldlm_enqueue_info einfo = { 0 };
1178         struct obd_info oinfo = { { { 0 } } };
1179         int rc;
1180         ENTRY;
1181
1182         LASSERT(!lustre_handle_is_used(lockh));
1183         LASSERT(lsm != NULL);
1184
1185         /* don't drop the mmapped file to LRU */
1186         if (mapping_mapped(inode->i_mapping))
1187                 ast_flags |= LDLM_FL_NO_LRU;
1188
1189         /* XXX phil: can we do this?  won't it screw the file size up? */
1190         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1191             (sbi->ll_flags & LL_SBI_NOLCK))
1192                 RETURN(0);
1193
1194         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1195                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1196
1197         einfo.ei_type = LDLM_EXTENT;
1198         einfo.ei_mode = mode;
1199         einfo.ei_cb_bl = osc_extent_blocking_cb;
1200         einfo.ei_cb_cp = ldlm_completion_ast;
1201         einfo.ei_cb_gl = ll_glimpse_callback;
1202         einfo.ei_cbdata = inode;
1203
1204         oinfo.oi_policy = *policy;
1205         oinfo.oi_lockh = lockh;
1206         oinfo.oi_md = lsm;
1207         oinfo.oi_flags = ast_flags;
1208
1209         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1210         *policy = oinfo.oi_policy;
1211         if (rc > 0)
1212                 rc = -EIO;
1213
1214         ll_inode_size_lock(inode, 1);
1215         inode_init_lvb(inode, &lvb);
1216         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1217
1218         if (policy->l_extent.start == 0 &&
1219             policy->l_extent.end == OBD_OBJECT_EOF) {
1220                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1221                  * the kms under both a DLM lock and the
1222                  * ll_inode_size_lock().  If we don't get the
1223                  * ll_inode_size_lock() here we can match the DLM lock and
1224                  * reset i_size from the kms before the truncating path has
1225                  * updated the kms.  generic_file_write can then trust the
1226                  * stale i_size when doing appending writes and effectively
1227                  * cancel the result of the truncate.  Getting the
1228                  * ll_inode_size_lock() after the enqueue maintains the DLM
1229                  * -> ll_inode_size_lock() acquiring order. */
1230                 i_size_write(inode, lvb.lvb_size);
1231                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1232                        inode->i_ino, i_size_read(inode));
1233         }
1234
1235         if (rc == 0) {
1236                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1237                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1238                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1239         }
1240         ll_inode_size_unlock(inode, 1);
1241
1242         RETURN(rc);
1243 }
1244
1245 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1246                      struct lov_stripe_md *lsm, int mode,
1247                      struct lustre_handle *lockh)
1248 {
1249         struct ll_sb_info *sbi = ll_i2sbi(inode);
1250         int rc;
1251         ENTRY;
1252
1253         /* XXX phil: can we do this?  won't it screw the file size up? */
1254         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1255             (sbi->ll_flags & LL_SBI_NOLCK))
1256                 RETURN(0);
1257
1258         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1259
1260         RETURN(rc);
1261 }
1262
1263 static void ll_set_file_contended(struct inode *inode)
1264 {
1265         struct ll_inode_info *lli = ll_i2info(inode);
1266         cfs_time_t now = cfs_time_current();
1267
1268         spin_lock(&lli->lli_lock);
1269         lli->lli_contention_time = now;
1270         lli->lli_flags |= LLIF_CONTENDED;
1271         spin_unlock(&lli->lli_lock);
1272 }
1273
1274 void ll_clear_file_contended(struct inode *inode)
1275 {
1276         struct ll_inode_info *lli = ll_i2info(inode);
1277
1278         spin_lock(&lli->lli_lock);
1279         lli->lli_flags &= ~LLIF_CONTENDED;
1280         spin_unlock(&lli->lli_lock);
1281 }
1282
1283 static int ll_is_file_contended(struct file *file)
1284 {
1285         struct inode *inode = file->f_dentry->d_inode;
1286         struct ll_inode_info *lli = ll_i2info(inode);
1287         struct ll_sb_info *sbi = ll_i2sbi(inode);
1288         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1289         ENTRY;
1290
1291         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1292                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1293                        " osc connect flags = 0x"LPX64"\n",
1294                        sbi->ll_lco.lco_flags);
1295                 RETURN(0);
1296         }
1297         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1298                 RETURN(1);
1299         if (lli->lli_flags & LLIF_CONTENDED) {
1300                 cfs_time_t cur_time = cfs_time_current();
1301                 cfs_time_t retry_time;
1302
1303                 retry_time = cfs_time_add(
1304                         lli->lli_contention_time,
1305                         cfs_time_seconds(sbi->ll_contention_time));
1306                 if (cfs_time_after(cur_time, retry_time)) {
1307                         ll_clear_file_contended(inode);
1308                         RETURN(0);
1309                 }
1310                 RETURN(1);
1311         }
1312         RETURN(0);
1313 }
1314
1315 static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
1316                                  const char *buf, size_t count,
1317                                  loff_t start, loff_t end, int rw)
1318 {
1319         int append;
1320         int tree_locked = 0;
1321         int rc;
1322         struct inode * inode = file->f_dentry->d_inode;
1323         ENTRY;
1324
1325         append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
1326
1327         if (append || !ll_is_file_contended(file)) {
1328                 struct ll_lock_tree_node *node;
1329                 int ast_flags;
1330
1331                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1332                 if (file->f_flags & O_NONBLOCK)
1333                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1334                 node = ll_node_from_inode(inode, start, end,
1335                                           (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
1336                 if (IS_ERR(node)) {
1337                         rc = PTR_ERR(node);
1338                         GOTO(out, rc);
1339                 }
1340                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1341                 rc = ll_tree_lock(tree, node, buf, count, ast_flags);
1342                 if (rc == 0)
1343                         tree_locked = 1;
1344                 else if (rc == -EUSERS)
1345                         ll_set_file_contended(inode);
1346                 else
1347                         GOTO(out, rc);
1348         }
1349         RETURN(tree_locked);
1350 out:
1351         return rc;
1352 }
1353
1354 /**
1355  * Checks if requested extent lock is compatible with a lock under a page.
1356  *
1357  * Checks if the lock under \a page is compatible with a read or write lock
1358  * (specified by \a rw) for an extent [\a start , \a end].
1359  *
1360  * \param page the page under which lock is considered
1361  * \param rw OBD_BRW_READ if requested for reading,
1362  *           OBD_BRW_WRITE if requested for writing
1363  * \param start start of the requested extent
1364  * \param end end of the requested extent
1365  * \param cookie transparent parameter for passing locking context
1366  *
1367  * \post result == 1, *cookie == context, appropriate lock is referenced or
1368  * \post result == 0
1369  *
1370  * \retval 1 owned lock is reused for the request
1371  * \retval 0 no lock reused for the request
1372  *
1373  * \see ll_release_short_lock
1374  */
1375 static int ll_reget_short_lock(struct page *page, int rw,
1376                                obd_off start, obd_off end,
1377                                void **cookie)
1378 {
1379         struct ll_async_page *llap;
1380         struct obd_export *exp;
1381         struct inode *inode = page->mapping->host;
1382
1383         ENTRY;
1384
1385         exp = ll_i2dtexp(inode);
1386         if (exp == NULL)
1387                 RETURN(0);
1388
1389         llap = llap_cast_private(page);
1390         if (llap == NULL)
1391                 RETURN(0);
1392
1393         RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
1394                                     &llap->llap_cookie, rw, start, end,
1395                                     cookie));
1396 }
1397
1398 /**
1399  * Releases a reference to a lock taken in a "fast" way.
1400  *
1401  * Releases a read or a write (specified by \a rw) lock
1402  * referenced by \a cookie.
1403  *
1404  * \param inode inode to which data belong
1405  * \param end end of the locked extent
1406  * \param rw OBD_BRW_READ if requested for reading,
1407  *           OBD_BRW_WRITE if requested for writing
1408  * \param cookie transparent parameter for passing locking context
1409  *
1410  * \post appropriate lock is dereferenced
1411  *
1412  * \see ll_reget_short_lock
1413  */
1414 static void ll_release_short_lock(struct inode *inode, obd_off end,
1415                                   void *cookie, int rw)
1416 {
1417         struct obd_export *exp;
1418         int rc;
1419
1420         exp = ll_i2dtexp(inode);
1421         if (exp == NULL)
1422                 return;
1423
1424         rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
1425                                     cookie, rw);
1426         if (rc < 0)
1427                 CERROR("unlock failed (%d)\n", rc);
1428 }
1429
1430 /**
1431  * Checks if requested extent lock is compatible
1432  * with a lock under a page in page cache.
1433  *
1434  * Checks if a lock under some \a page is compatible with a read or write lock
1435  * (specified by \a rw) for an extent [\a start , \a end].
1436  *
1437  * \param file the file under which lock is considered
1438  * \param rw OBD_BRW_READ if requested for reading,
1439  *           OBD_BRW_WRITE if requested for writing
1440  * \param ppos start of the requested extent
1441  * \param end end of the requested extent
1442  * \param cookie transparent parameter for passing locking context
1443  * \param buf userspace buffer for the data
1444  *
1445  * \post result == 1, *cookie == context, appropriate lock is referenced
1446  * \post retuls == 0
1447  *
1448  * \retval 1 owned lock is reused for the request
1449  * \retval 0 no lock reused for the request
1450  *
1451  * \see ll_file_put_fast_lock
1452  */
1453 static inline int ll_file_get_fast_lock(struct file *file,
1454                                         obd_off ppos, obd_off end,
1455                                         char *buf, void **cookie, int rw)
1456 {
1457         int rc = 0;
1458         struct page *page;
1459
1460         ENTRY;
1461
1462         if (!ll_region_mapped((unsigned long)buf, end - ppos)) {
1463                 page = find_lock_page(file->f_dentry->d_inode->i_mapping,
1464                                       ppos >> CFS_PAGE_SHIFT);
1465                 if (page) {
1466                         if (ll_reget_short_lock(page, rw, ppos, end, cookie))
1467                                 rc = 1;
1468
1469                         unlock_page(page);
1470                         page_cache_release(page);
1471                 }
1472         }
1473
1474         RETURN(rc);
1475 }
1476
1477 /**
1478  * Releases a reference to a lock taken in a "fast" way.
1479  *
1480  * Releases a read or a write (specified by \a rw) lock
1481  * referenced by \a cookie.
1482  *
1483  * \param inode inode to which data belong
1484  * \param end end of the locked extent
1485  * \param rw OBD_BRW_READ if requested for reading,
1486  *           OBD_BRW_WRITE if requested for writing
1487  * \param cookie transparent parameter for passing locking context
1488  *
1489  * \post appropriate lock is dereferenced
1490  *
1491  * \see ll_file_get_fast_lock
1492  */
1493 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
1494                                          void *cookie, int rw)
1495 {
1496         ll_release_short_lock(inode, end, cookie, rw);
1497 }
1498
1499 enum ll_lock_style {
1500         LL_LOCK_STYLE_NOLOCK   = 0,
1501         LL_LOCK_STYLE_FASTLOCK = 1,
1502         LL_LOCK_STYLE_TREELOCK = 2
1503 };
1504
1505 /**
1506  * Checks if requested extent lock is compatible with a lock 
1507  * under a page cache page.
1508  *
1509  * Checks if the lock under \a page is compatible with a read or write lock
1510  * (specified by \a rw) for an extent [\a start , \a end].
1511  *
1512  * \param file file under which I/O is processed
1513  * \param rw OBD_BRW_READ if requested for reading,
1514  *           OBD_BRW_WRITE if requested for writing
1515  * \param ppos start of the requested extent
1516  * \param end end of the requested extent
1517  * \param cookie transparent parameter for passing locking context
1518  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1519  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1520  * \param buf userspace buffer for the data
1521  *
1522  * \retval LL_LOCK_STYLE_FASTLOCK owned lock is reused through fast lock
1523  * \retval LL_LOCK_STYLE_TREELOCK got a lock through tree lock
1524  * \retval LL_LOCK_STYLE_NOLOCK got no lock
1525  *
1526  * \see ll_file_put_lock
1527  */
1528 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
1529                                    obd_off end, char *buf, void **cookie,
1530                                    struct ll_lock_tree *tree, int rw)
1531 {
1532         int rc;
1533
1534         ENTRY;
1535
1536         if (ll_file_get_fast_lock(file, ppos, end, buf, cookie, rw))
1537                 RETURN(LL_LOCK_STYLE_FASTLOCK);
1538
1539         rc = ll_file_get_tree_lock(tree, file, buf, ppos - end, ppos, end, rw);
1540         /* rc: 1 for tree lock, 0 for no lock, <0 for error */
1541         switch (rc) {
1542         case 1:
1543                 RETURN(LL_LOCK_STYLE_TREELOCK);
1544         case 0:
1545                 RETURN(LL_LOCK_STYLE_NOLOCK);
1546         }
1547
1548         /* an error happened if we reached this point, rc = -errno here */
1549         RETURN(rc);
1550 }
1551
1552 /**
1553  * Drops the lock taken by ll_file_get_lock.
1554  *
1555  * Releases a read or a write (specified by \a rw) lock
1556  * referenced by \a tree or \a cookie.
1557  *
1558  * \param inode inode to which data belong
1559  * \param end end of the locked extent
1560  * \param lockstyle facility through which the lock was taken
1561  * \param rw OBD_BRW_READ if requested for reading,
1562  *           OBD_BRW_WRITE if requested for writing
1563  * \param cookie transparent parameter for passing locking context
1564  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1565  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1566  *
1567  * \post appropriate lock is dereferenced
1568  *
1569  * \see ll_file_get_lock
1570  */
1571 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
1572                                     enum ll_lock_style lock_style,
1573                                     void *cookie, struct ll_lock_tree *tree,
1574                                     int rw)
1575
1576 {
1577         switch (lock_style) {
1578         case LL_LOCK_STYLE_TREELOCK:
1579                 ll_tree_unlock(tree);
1580                 break;
1581         case LL_LOCK_STYLE_FASTLOCK:
1582                 ll_file_put_fast_lock(inode, end, cookie, rw);
1583                 break;
1584         default:
1585                 CERROR("invalid locking style (%d)\n", lock_style);
1586         }
1587 }
1588
1589 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1590                             loff_t *ppos)
1591 {
1592         struct inode *inode = file->f_dentry->d_inode;
1593         struct ll_inode_info *lli = ll_i2info(inode);
1594         struct lov_stripe_md *lsm = lli->lli_smd;
1595         struct ll_sb_info *sbi = ll_i2sbi(inode);
1596         struct ll_lock_tree tree;
1597         struct ost_lvb lvb;
1598         struct ll_ra_read bead;
1599         int ra = 0;
1600         obd_off end;
1601         ssize_t retval, chunk, sum = 0;
1602         int lock_style;
1603         void *cookie;
1604
1605         __u64 kms;
1606         ENTRY;
1607         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1608                inode->i_ino, inode->i_generation, inode, count, *ppos);
1609         /* "If nbyte is 0, read() will return 0 and have no other results."
1610          *                      -- Single Unix Spec */
1611         if (count == 0)
1612                 RETURN(0);
1613
1614         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1615
1616         if (!lsm) {
1617                 /* Read on file with no objects should return zero-filled
1618                  * buffers up to file size (we can get non-zero sizes with
1619                  * mknod + truncate, then opening file for read. This is a
1620                  * common pattern in NFS case, it seems). Bug 6243 */
1621                 int notzeroed;
1622                 /* Since there are no objects on OSTs, we have nothing to get
1623                  * lock on and so we are forced to access inode->i_size
1624                  * unguarded */
1625
1626                 /* Read beyond end of file */
1627                 if (*ppos >= i_size_read(inode))
1628                         RETURN(0);
1629
1630                 if (count > i_size_read(inode) - *ppos)
1631                         count = i_size_read(inode) - *ppos;
1632                 /* Make sure to correctly adjust the file pos pointer for
1633                  * EFAULT case */
1634                 notzeroed = clear_user(buf, count);
1635                 count -= notzeroed;
1636                 *ppos += count;
1637                 if (!count)
1638                         RETURN(-EFAULT);
1639                 RETURN(count);
1640         }
1641 repeat:
1642         if (sbi->ll_max_rw_chunk != 0) {
1643                 /* first, let's know the end of the current stripe */
1644                 end = *ppos;
1645                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, &end);
1646
1647                 /* correct, the end is beyond the request */
1648                 if (end > *ppos + count - 1)
1649                         end = *ppos + count - 1;
1650
1651                 /* and chunk shouldn't be too large even if striping is wide */
1652                 if (end - *ppos > sbi->ll_max_rw_chunk)
1653                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1654         } else {
1655                 end = *ppos + count - 1;
1656         }
1657
1658         lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
1659                                       buf, &cookie, &tree, OBD_BRW_READ);
1660         if (lock_style < 0)
1661                 GOTO(out, retval = lock_style);
1662
1663         ll_inode_size_lock(inode, 1);
1664         /*
1665          * Consistency guarantees: following possibilities exist for the
1666          * relation between region being read and real file size at this
1667          * moment:
1668          *
1669          *  (A): the region is completely inside of the file;
1670          *
1671          *  (B-x): x bytes of region are inside of the file, the rest is
1672          *  outside;
1673          *
1674          *  (C): the region is completely outside of the file.
1675          *
1676          * This classification is stable under DLM lock acquired by
1677          * ll_tree_lock() above, because to change class, other client has to
1678          * take DLM lock conflicting with our lock. Also, any updates to
1679          * ->i_size by other threads on this client are serialized by
1680          * ll_inode_size_lock(). This guarantees that short reads are handled
1681          * correctly in the face of concurrent writes and truncates.
1682          */
1683         inode_init_lvb(inode, &lvb);
1684         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1685         kms = lvb.lvb_size;
1686         if (*ppos + count - 1 > kms) {
1687                 /* A glimpse is necessary to determine whether we return a
1688                  * short read (B) or some zeroes at the end of the buffer (C) */
1689                 ll_inode_size_unlock(inode, 1);
1690                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1691                 if (retval) {
1692                         if (lock_style != LL_LOCK_STYLE_NOLOCK)
1693                                 ll_file_put_lock(inode, end, lock_style,
1694                                                  cookie, &tree, OBD_BRW_READ);
1695                         goto out;
1696                 }
1697         } else {
1698                 /* region is within kms and, hence, within real file size (A).
1699                  * We need to increase i_size to cover the read region so that
1700                  * generic_file_read() will do its job, but that doesn't mean
1701                  * the kms size is _correct_, it is only the _minimum_ size.
1702                  * If someone does a stat they will get the correct size which
1703                  * will always be >= the kms value here.  b=11081 */
1704                 if (i_size_read(inode) < kms)
1705                         i_size_write(inode, kms);
1706                 ll_inode_size_unlock(inode, 1);
1707         }
1708
1709         chunk = end - *ppos + 1;
1710         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1711                inode->i_ino, chunk, *ppos, i_size_read(inode));
1712
1713         if (lock_style != LL_LOCK_STYLE_NOLOCK) {
1714                 /* turn off the kernel's read-ahead */
1715                 file->f_ra.ra_pages = 0;
1716
1717                 /* initialize read-ahead window once per syscall */
1718                 if (ra == 0) {
1719                         ra = 1;
1720                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1721                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1722                         ll_ra_read_in(file, &bead);
1723                 }
1724
1725                 /* BUG: 5972 */
1726                 file_accessed(file);
1727                 retval = generic_file_read(file, buf, chunk, ppos);
1728                 ll_file_put_lock(inode, end, lock_style, cookie, &tree, 
1729                                  OBD_BRW_READ);
1730         } else {
1731                 retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
1732         }
1733
1734         ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
1735
1736         if (retval > 0) {
1737                 buf += retval;
1738                 count -= retval;
1739                 sum += retval;
1740                 if (retval == chunk && count > 0)
1741                         goto repeat;
1742         }
1743
1744  out:
1745         if (ra != 0)
1746                 ll_ra_read_ex(file, &bead);
1747         retval = (sum > 0) ? sum : retval;
1748         RETURN(retval);
1749 }
1750
1751 /*
1752  * Write to a file (through the page cache).
1753  */
1754 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1755                              loff_t *ppos)
1756 {
1757         struct inode *inode = file->f_dentry->d_inode;
1758         struct ll_sb_info *sbi = ll_i2sbi(inode);
1759         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1760         struct ll_lock_tree tree;
1761         loff_t maxbytes = ll_file_maxbytes(inode);
1762         loff_t lock_start, lock_end, end;
1763         ssize_t retval, chunk, sum = 0;
1764         int tree_locked;
1765         ENTRY;
1766
1767         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1768                inode->i_ino, inode->i_generation, inode, count, *ppos);
1769
1770         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1771
1772         /* POSIX, but surprised the VFS doesn't check this already */
1773         if (count == 0)
1774                 RETURN(0);
1775
1776         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1777          * called on the file, don't fail the below assertion (bug 2388). */
1778         if (file->f_flags & O_LOV_DELAY_CREATE &&
1779             ll_i2info(inode)->lli_smd == NULL)
1780                 RETURN(-EBADF);
1781
1782         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1783
1784         down(&ll_i2info(inode)->lli_write_sem);
1785
1786 repeat:
1787         chunk = 0; /* just to fix gcc's warning */
1788         end = *ppos + count - 1;
1789
1790         if (file->f_flags & O_APPEND) {
1791                 lock_start = 0;
1792                 lock_end = OBD_OBJECT_EOF;
1793         } else if (sbi->ll_max_rw_chunk != 0) {
1794                 /* first, let's know the end of the current stripe */
1795                 end = *ppos;
1796                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1797                                 (obd_off *)&end);
1798
1799                 /* correct, the end is beyond the request */
1800                 if (end > *ppos + count - 1)
1801                         end = *ppos + count - 1;
1802
1803                 /* and chunk shouldn't be too large even if striping is wide */
1804                 if (end - *ppos > sbi->ll_max_rw_chunk)
1805                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1806                 lock_start = *ppos;
1807                 lock_end = end;
1808         } else {
1809                 lock_start = *ppos;
1810                 lock_end = *ppos + count - 1;
1811         }
1812
1813         tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
1814                                             lock_start, lock_end, OBD_BRW_WRITE);
1815         if (tree_locked < 0)
1816                 GOTO(out, retval = tree_locked);
1817
1818         /* This is ok, g_f_w will overwrite this under i_sem if it races
1819          * with a local truncate, it just makes our maxbyte checking easier.
1820          * The i_size value gets updated in ll_extent_lock() as a consequence
1821          * of the [0,EOF] extent lock we requested above. */
1822         if (file->f_flags & O_APPEND) {
1823                 *ppos = i_size_read(inode);
1824                 end = *ppos + count - 1;
1825         }
1826
1827         if (*ppos >= maxbytes) {
1828                 send_sig(SIGXFSZ, current, 0);
1829                 GOTO(out_unlock, retval = -EFBIG);
1830         }
1831         if (end > maxbytes - 1)
1832                 end = maxbytes - 1;
1833
1834         /* generic_file_write handles O_APPEND after getting i_mutex */
1835         chunk = end - *ppos + 1;
1836         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1837                inode->i_ino, chunk, *ppos);
1838         if (tree_locked)
1839                 retval = generic_file_write(file, buf, chunk, ppos);
1840         else
1841                 retval = ll_file_lockless_io(file, (char*)buf, chunk,
1842                                              ppos, WRITE);
1843         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1844
1845 out_unlock:
1846         if (tree_locked)
1847                 ll_tree_unlock(&tree);
1848
1849 out:
1850         if (retval > 0) {
1851                 buf += retval;
1852                 count -= retval;
1853                 sum += retval;
1854                 if (retval == chunk && count > 0)
1855                         goto repeat;
1856         }
1857
1858         up(&ll_i2info(inode)->lli_write_sem);
1859
1860         retval = (sum > 0) ? sum : retval;
1861         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1862                            retval > 0 ? retval : 0);
1863         RETURN(retval);
1864 }
1865
1866 /*
1867  * Send file content (through pagecache) somewhere with helper
1868  */
1869 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1870                                 read_actor_t actor, void *target)
1871 {
1872         struct inode *inode = in_file->f_dentry->d_inode;
1873         struct ll_inode_info *lli = ll_i2info(inode);
1874         struct lov_stripe_md *lsm = lli->lli_smd;
1875         struct ll_lock_tree tree;
1876         struct ll_lock_tree_node *node;
1877         struct ost_lvb lvb;
1878         struct ll_ra_read bead;
1879         int rc;
1880         ssize_t retval;
1881         __u64 kms;
1882         ENTRY;
1883         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1884                inode->i_ino, inode->i_generation, inode, count, *ppos);
1885
1886         /* "If nbyte is 0, read() will return 0 and have no other results."
1887          *                      -- Single Unix Spec */
1888         if (count == 0)
1889                 RETURN(0);
1890
1891         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1892         /* turn off the kernel's read-ahead */
1893         in_file->f_ra.ra_pages = 0;
1894
1895         /* File with no objects, nothing to lock */
1896         if (!lsm)
1897                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1898
1899         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1900         if (IS_ERR(node))
1901                 RETURN(PTR_ERR(node));
1902
1903         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1904         rc = ll_tree_lock(&tree, node, NULL, count,
1905                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1906         if (rc != 0)
1907                 RETURN(rc);
1908
1909         ll_clear_file_contended(inode);
1910         ll_inode_size_lock(inode, 1);
1911         /*
1912          * Consistency guarantees: following possibilities exist for the
1913          * relation between region being read and real file size at this
1914          * moment:
1915          *
1916          *  (A): the region is completely inside of the file;
1917          *
1918          *  (B-x): x bytes of region are inside of the file, the rest is
1919          *  outside;
1920          *
1921          *  (C): the region is completely outside of the file.
1922          *
1923          * This classification is stable under DLM lock acquired by
1924          * ll_tree_lock() above, because to change class, other client has to
1925          * take DLM lock conflicting with our lock. Also, any updates to
1926          * ->i_size by other threads on this client are serialized by
1927          * ll_inode_size_lock(). This guarantees that short reads are handled
1928          * correctly in the face of concurrent writes and truncates.
1929          */
1930         inode_init_lvb(inode, &lvb);
1931         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1932         kms = lvb.lvb_size;
1933         if (*ppos + count - 1 > kms) {
1934                 /* A glimpse is necessary to determine whether we return a
1935                  * short read (B) or some zeroes at the end of the buffer (C) */
1936                 ll_inode_size_unlock(inode, 1);
1937                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1938                 if (retval)
1939                         goto out;
1940         } else {
1941                 /* region is within kms and, hence, within real file size (A) */
1942                 i_size_write(inode, kms);
1943                 ll_inode_size_unlock(inode, 1);
1944         }
1945
1946         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1947                inode->i_ino, count, *ppos, i_size_read(inode));
1948
1949         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1950         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1951         ll_ra_read_in(in_file, &bead);
1952         /* BUG: 5972 */
1953         file_accessed(in_file);
1954         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1955         ll_ra_read_ex(in_file, &bead);
1956
1957  out:
1958         ll_tree_unlock(&tree);
1959         RETURN(retval);
1960 }
1961
1962 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1963                                unsigned long arg)
1964 {
1965         struct ll_inode_info *lli = ll_i2info(inode);
1966         struct obd_export *exp = ll_i2dtexp(inode);
1967         struct ll_recreate_obj ucreatp;
1968         struct obd_trans_info oti = { 0 };
1969         struct obdo *oa = NULL;
1970         int lsm_size;
1971         int rc = 0;
1972         struct lov_stripe_md *lsm, *lsm2;
1973         ENTRY;
1974
1975         if (!capable (CAP_SYS_ADMIN))
1976                 RETURN(-EPERM);
1977
1978         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1979                             sizeof(struct ll_recreate_obj));
1980         if (rc) {
1981                 RETURN(-EFAULT);
1982         }
1983         OBDO_ALLOC(oa);
1984         if (oa == NULL)
1985                 RETURN(-ENOMEM);
1986
1987         down(&lli->lli_size_sem);
1988         lsm = lli->lli_smd;
1989         if (lsm == NULL)
1990                 GOTO(out, rc = -ENOENT);
1991         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1992                    (lsm->lsm_stripe_count));
1993
1994         OBD_ALLOC(lsm2, lsm_size);
1995         if (lsm2 == NULL)
1996                 GOTO(out, rc = -ENOMEM);
1997
1998         oa->o_id = ucreatp.lrc_id;
1999         oa->o_gr = ucreatp.lrc_group;
2000         oa->o_nlink = ucreatp.lrc_ost_idx;
2001         oa->o_flags |= OBD_FL_RECREATE_OBJS;
2002         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
2003         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2004                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2005
2006         memcpy(lsm2, lsm, lsm_size);
2007         rc = obd_create(exp, oa, &lsm2, &oti);
2008
2009         OBD_FREE(lsm2, lsm_size);
2010         GOTO(out, rc);
2011 out:
2012         up(&lli->lli_size_sem);
2013         OBDO_FREE(oa);
2014         return rc;
2015 }
2016
2017 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
2018                              int flags, struct lov_user_md *lum, int lum_size)
2019 {
2020         struct ll_inode_info *lli = ll_i2info(inode);
2021         struct lov_stripe_md *lsm;
2022         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
2023         int rc = 0;
2024         ENTRY;
2025
2026         down(&lli->lli_size_sem);
2027         lsm = lli->lli_smd;
2028         if (lsm) {
2029                 up(&lli->lli_size_sem);
2030                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
2031                        inode->i_ino);
2032                 RETURN(-EEXIST);
2033         }
2034
2035         rc = ll_intent_file_open(file, lum, lum_size, &oit);
2036         if (rc)
2037                 GOTO(out, rc);
2038         if (it_disposition(&oit, DISP_LOOKUP_NEG))
2039                 GOTO(out_req_free, rc = -ENOENT);
2040         rc = oit.d.lustre.it_status;
2041         if (rc < 0)
2042                 GOTO(out_req_free, rc);
2043
2044         ll_release_openhandle(file->f_dentry, &oit);
2045
2046  out:
2047         up(&lli->lli_size_sem);
2048         ll_intent_release(&oit);
2049         RETURN(rc);
2050 out_req_free:
2051         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2052         goto out;
2053 }
2054
2055 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
2056                              struct lov_mds_md **lmmp, int *lmm_size, 
2057                              struct ptlrpc_request **request)
2058 {
2059         struct ll_sb_info *sbi = ll_i2sbi(inode);
2060         struct mdt_body  *body;
2061         struct lov_mds_md *lmm = NULL;
2062         struct ptlrpc_request *req = NULL;
2063         struct obd_capa *oc;
2064         int rc, lmmsize;
2065
2066         rc = ll_get_max_mdsize(sbi, &lmmsize);
2067         if (rc)
2068                 RETURN(rc);
2069
2070         oc = ll_mdscapa_get(inode);
2071         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
2072                              oc, filename, strlen(filename) + 1,
2073                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
2074                              ll_i2suppgid(inode), &req);
2075         capa_put(oc);
2076         if (rc < 0) {
2077                 CDEBUG(D_INFO, "md_getattr_name failed "
2078                        "on %s: rc %d\n", filename, rc);
2079                 GOTO(out, rc);
2080         }
2081
2082         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2083         LASSERT(body != NULL); /* checked by mdc_getattr_name */
2084
2085         lmmsize = body->eadatasize;
2086
2087         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
2088                         lmmsize == 0) {
2089                 GOTO(out, rc = -ENODATA);
2090         }
2091
2092         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
2093         LASSERT(lmm != NULL);
2094
2095         /*
2096          * This is coming from the MDS, so is probably in
2097          * little endian.  We convert it to host endian before
2098          * passing it to userspace.
2099          */
2100         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
2101                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
2102                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
2103         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
2104                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
2105         }
2106
2107         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
2108                 struct lov_stripe_md *lsm;
2109                 struct lov_user_md_join *lmj;
2110                 int lmj_size, i, aindex = 0;
2111
2112                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
2113                 if (rc < 0)
2114                         GOTO(out, rc = -ENOMEM);
2115                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
2116                 if (rc)
2117                         GOTO(out_free_memmd, rc);
2118
2119                 lmj_size = sizeof(struct lov_user_md_join) +
2120                            lsm->lsm_stripe_count *
2121                            sizeof(struct lov_user_ost_data_join);
2122                 OBD_ALLOC(lmj, lmj_size);
2123                 if (!lmj)
2124                         GOTO(out_free_memmd, rc = -ENOMEM);
2125
2126                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
2127                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2128                         struct lov_extent *lex =
2129                                 &lsm->lsm_array->lai_ext_array[aindex];
2130
2131                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
2132                                 aindex ++;
2133                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
2134                                         LPU64" len %d\n", aindex, i,
2135                                         lex->le_start, (int)lex->le_len);
2136                         lmj->lmm_objects[i].l_extent_start =
2137                                 lex->le_start;
2138
2139                         if ((int)lex->le_len == -1)
2140                                 lmj->lmm_objects[i].l_extent_end = -1;
2141                         else
2142                                 lmj->lmm_objects[i].l_extent_end =
2143                                         lex->le_start + lex->le_len;
2144                         lmj->lmm_objects[i].l_object_id =
2145                                 lsm->lsm_oinfo[i]->loi_id;
2146                         lmj->lmm_objects[i].l_object_gr =
2147                                 lsm->lsm_oinfo[i]->loi_gr;
2148                         lmj->lmm_objects[i].l_ost_gen =
2149                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2150                         lmj->lmm_objects[i].l_ost_idx =
2151                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2152                 }
2153                 lmm = (struct lov_mds_md *)lmj;
2154                 lmmsize = lmj_size;
2155 out_free_memmd:
2156                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
2157         }
2158 out:
2159         *lmmp = lmm;
2160         *lmm_size = lmmsize;
2161         *request = req;
2162         return rc;
2163 }
2164
2165 static int ll_lov_setea(struct inode *inode, struct file *file,
2166                             unsigned long arg)
2167 {
2168         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2169         struct lov_user_md  *lump;
2170         int lum_size = sizeof(struct lov_user_md) +
2171                        sizeof(struct lov_user_ost_data);
2172         int rc;
2173         ENTRY;
2174
2175         if (!capable (CAP_SYS_ADMIN))
2176                 RETURN(-EPERM);
2177
2178         OBD_ALLOC(lump, lum_size);
2179         if (lump == NULL) {
2180                 RETURN(-ENOMEM);
2181         }
2182         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2183         if (rc) {
2184                 OBD_FREE(lump, lum_size);
2185                 RETURN(-EFAULT);
2186         }
2187
2188         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2189
2190         OBD_FREE(lump, lum_size);
2191         RETURN(rc);
2192 }
2193
2194 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2195                             unsigned long arg)
2196 {
2197         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
2198         int rc;
2199         int flags = FMODE_WRITE;
2200         ENTRY;
2201
2202         /* Bug 1152: copy properly when this is no longer true */
2203         LASSERT(sizeof(lum) == sizeof(*lump));
2204         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
2205         rc = copy_from_user(&lum, lump, sizeof(lum));
2206         if (rc)
2207                 RETURN(-EFAULT);
2208
2209         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
2210         if (rc == 0) {
2211                  put_user(0, &lump->lmm_stripe_count);
2212                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
2213                                     0, ll_i2info(inode)->lli_smd, lump);
2214         }
2215         RETURN(rc);
2216 }
2217
2218 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2219 {
2220         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2221
2222         if (!lsm)
2223                 RETURN(-ENODATA);
2224
2225         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
2226                             (void *)arg);
2227 }
2228
2229 static int ll_get_grouplock(struct inode *inode, struct file *file,
2230                             unsigned long arg)
2231 {
2232         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2233         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2234                                                     .end = OBD_OBJECT_EOF}};
2235         struct lustre_handle lockh = { 0 };
2236         struct ll_inode_info *lli = ll_i2info(inode);
2237         struct lov_stripe_md *lsm = lli->lli_smd;
2238         int flags = 0, rc;
2239         ENTRY;
2240
2241         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2242                 RETURN(-EINVAL);
2243         }
2244
2245         policy.l_extent.gid = arg;
2246         if (file->f_flags & O_NONBLOCK)
2247                 flags = LDLM_FL_BLOCK_NOWAIT;
2248
2249         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2250         if (rc)
2251                 RETURN(rc);
2252
2253         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2254         fd->fd_gid = arg;
2255         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2256
2257         RETURN(0);
2258 }
2259
2260 static int ll_put_grouplock(struct inode *inode, struct file *file,
2261                             unsigned long arg)
2262 {
2263         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2264         struct ll_inode_info *lli = ll_i2info(inode);
2265         struct lov_stripe_md *lsm = lli->lli_smd;
2266         int rc;
2267         ENTRY;
2268
2269         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2270                 /* Ugh, it's already unlocked. */
2271                 RETURN(-EINVAL);
2272         }
2273
2274         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2275                 RETURN(-EINVAL);
2276
2277         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2278
2279         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2280         if (rc)
2281                 RETURN(rc);
2282
2283         fd->fd_gid = 0;
2284         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2285
2286         RETURN(0);
2287 }
2288
2289 static int join_sanity_check(struct inode *head, struct inode *tail)
2290 {
2291         ENTRY;
2292         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2293                 CERROR("server do not support join \n");
2294                 RETURN(-EINVAL);
2295         }
2296         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2297                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2298                        head->i_ino, tail->i_ino);
2299                 RETURN(-EINVAL);
2300         }
2301         if (head->i_ino == tail->i_ino) {
2302                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2303                 RETURN(-EINVAL);
2304         }
2305         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2306                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2307                 RETURN(-EINVAL);
2308         }
2309         RETURN(0);
2310 }
2311
2312 static int join_file(struct inode *head_inode, struct file *head_filp,
2313                      struct file *tail_filp)
2314 {
2315         struct dentry *tail_dentry = tail_filp->f_dentry;
2316         struct lookup_intent oit = {.it_op = IT_OPEN,
2317                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2318         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2319                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2320
2321         struct lustre_handle lockh;
2322         struct md_op_data *op_data;
2323         int    rc;
2324         loff_t data;
2325         ENTRY;
2326
2327         tail_dentry = tail_filp->f_dentry;
2328
2329         data = i_size_read(head_inode);
2330         op_data = ll_prep_md_op_data(NULL, head_inode,
2331                                      tail_dentry->d_parent->d_inode,
2332                                      tail_dentry->d_name.name,
2333                                      tail_dentry->d_name.len, 0,
2334                                      LUSTRE_OPC_ANY, &data);
2335         if (IS_ERR(op_data))
2336                 RETURN(PTR_ERR(op_data));
2337
2338         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2339                          op_data, &lockh, NULL, 0, 0);
2340
2341         ll_finish_md_op_data(op_data);
2342         if (rc < 0)
2343                 GOTO(out, rc);
2344
2345         rc = oit.d.lustre.it_status;
2346
2347         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2348                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2349                 ptlrpc_req_finished((struct ptlrpc_request *)
2350                                     oit.d.lustre.it_data);
2351                 GOTO(out, rc);
2352         }
2353
2354         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2355                                            * away */
2356                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2357                 oit.d.lustre.it_lock_mode = 0;
2358         }
2359         ll_release_openhandle(head_filp->f_dentry, &oit);
2360 out:
2361         ll_intent_release(&oit);
2362         RETURN(rc);
2363 }
2364
2365 static int ll_file_join(struct inode *head, struct file *filp,
2366                         char *filename_tail)
2367 {
2368         struct inode *tail = NULL, *first = NULL, *second = NULL;
2369         struct dentry *tail_dentry;
2370         struct file *tail_filp, *first_filp, *second_filp;
2371         struct ll_lock_tree first_tree, second_tree;
2372         struct ll_lock_tree_node *first_node, *second_node;
2373         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2374         int rc = 0, cleanup_phase = 0;
2375         ENTRY;
2376
2377         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2378                head->i_ino, head->i_generation, head, filename_tail);
2379
2380         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2381         if (IS_ERR(tail_filp)) {
2382                 CERROR("Can not open tail file %s", filename_tail);
2383                 rc = PTR_ERR(tail_filp);
2384                 GOTO(cleanup, rc);
2385         }
2386         tail = igrab(tail_filp->f_dentry->d_inode);
2387
2388         tlli = ll_i2info(tail);
2389         tail_dentry = tail_filp->f_dentry;
2390         LASSERT(tail_dentry);
2391         cleanup_phase = 1;
2392
2393         /*reorder the inode for lock sequence*/
2394         first = head->i_ino > tail->i_ino ? head : tail;
2395         second = head->i_ino > tail->i_ino ? tail : head;
2396         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2397         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2398
2399         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2400                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2401         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2402         if (IS_ERR(first_node)){
2403                 rc = PTR_ERR(first_node);
2404                 GOTO(cleanup, rc);
2405         }
2406         first_tree.lt_fd = first_filp->private_data;
2407         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2408         if (rc != 0)
2409                 GOTO(cleanup, rc);
2410         cleanup_phase = 2;
2411
2412         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2413         if (IS_ERR(second_node)){
2414                 rc = PTR_ERR(second_node);
2415                 GOTO(cleanup, rc);
2416         }
2417         second_tree.lt_fd = second_filp->private_data;
2418         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2419         if (rc != 0)
2420                 GOTO(cleanup, rc);
2421         cleanup_phase = 3;
2422
2423         rc = join_sanity_check(head, tail);
2424         if (rc)
2425                 GOTO(cleanup, rc);
2426
2427         rc = join_file(head, filp, tail_filp);
2428         if (rc)
2429                 GOTO(cleanup, rc);
2430 cleanup:
2431         switch (cleanup_phase) {
2432         case 3:
2433                 ll_tree_unlock(&second_tree);
2434                 obd_cancel_unused(ll_i2dtexp(second),
2435                                   ll_i2info(second)->lli_smd, 0, NULL);
2436         case 2:
2437                 ll_tree_unlock(&first_tree);
2438                 obd_cancel_unused(ll_i2dtexp(first),
2439                                   ll_i2info(first)->lli_smd, 0, NULL);
2440         case 1:
2441                 filp_close(tail_filp, 0);
2442                 if (tail)
2443                         iput(tail);
2444                 if (head && rc == 0) {
2445                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2446                                        &hlli->lli_smd);
2447                         hlli->lli_smd = NULL;
2448                 }
2449         case 0:
2450                 break;
2451         default:
2452                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2453                 LBUG();
2454         }
2455         RETURN(rc);
2456 }
2457
2458 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2459 {
2460         struct inode *inode = dentry->d_inode;
2461         struct obd_client_handle *och;
2462         int rc;
2463         ENTRY;
2464
2465         LASSERT(inode);
2466
2467         /* Root ? Do nothing. */
2468         if (dentry->d_inode->i_sb->s_root == dentry)
2469                 RETURN(0);
2470
2471         /* No open handle to close? Move away */
2472         if (!it_disposition(it, DISP_OPEN_OPEN))
2473                 RETURN(0);
2474
2475         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2476
2477         OBD_ALLOC(och, sizeof(*och));
2478         if (!och)
2479                 GOTO(out, rc = -ENOMEM);
2480
2481         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2482                     ll_i2info(inode), it, och);
2483
2484         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2485                                        inode, och);
2486  out:
2487         /* this one is in place of ll_file_open */
2488         ptlrpc_req_finished(it->d.lustre.it_data);
2489         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2490         RETURN(rc);
2491 }
2492
2493 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2494                   unsigned long arg)
2495 {
2496         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2497         int flags;
2498         ENTRY;
2499
2500         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2501                inode->i_generation, inode, cmd);
2502         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2503
2504         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2505         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2506                 RETURN(-ENOTTY);
2507
2508         switch(cmd) {
2509         case LL_IOC_GETFLAGS:
2510                 /* Get the current value of the file flags */
2511                 return put_user(fd->fd_flags, (int *)arg);
2512         case LL_IOC_SETFLAGS:
2513         case LL_IOC_CLRFLAGS:
2514                 /* Set or clear specific file flags */
2515                 /* XXX This probably needs checks to ensure the flags are
2516                  *     not abused, and to handle any flag side effects.
2517                  */
2518                 if (get_user(flags, (int *) arg))
2519                         RETURN(-EFAULT);
2520
2521                 if (cmd == LL_IOC_SETFLAGS) {
2522                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2523                             !(file->f_flags & O_DIRECT)) {
2524                                 CERROR("%s: unable to disable locking on "
2525                                        "non-O_DIRECT file\n", current->comm);
2526                                 RETURN(-EINVAL);
2527                         }
2528
2529                         fd->fd_flags |= flags;
2530                 } else {
2531                         fd->fd_flags &= ~flags;
2532                 }
2533                 RETURN(0);
2534         case LL_IOC_LOV_SETSTRIPE:
2535                 RETURN(ll_lov_setstripe(inode, file, arg));
2536         case LL_IOC_LOV_SETEA:
2537                 RETURN(ll_lov_setea(inode, file, arg));
2538         case LL_IOC_LOV_GETSTRIPE:
2539                 RETURN(ll_lov_getstripe(inode, arg));
2540         case LL_IOC_RECREATE_OBJ:
2541                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2542         case EXT3_IOC_GETFLAGS:
2543         case EXT3_IOC_SETFLAGS:
2544                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2545         case EXT3_IOC_GETVERSION_OLD:
2546         case EXT3_IOC_GETVERSION:
2547                 RETURN(put_user(inode->i_generation, (int *)arg));
2548         case LL_IOC_JOIN: {
2549                 char *ftail;
2550                 int rc;
2551
2552                 ftail = getname((const char *)arg);
2553                 if (IS_ERR(ftail))
2554                         RETURN(PTR_ERR(ftail));
2555                 rc = ll_file_join(inode, file, ftail);
2556                 putname(ftail);
2557                 RETURN(rc);
2558         }
2559         case LL_IOC_GROUP_LOCK:
2560                 RETURN(ll_get_grouplock(inode, file, arg));
2561         case LL_IOC_GROUP_UNLOCK:
2562                 RETURN(ll_put_grouplock(inode, file, arg));
2563         case IOC_OBD_STATFS:
2564                 RETURN(ll_obd_statfs(inode, (void *)arg));
2565
2566         /* We need to special case any other ioctls we want to handle,
2567          * to send them to the MDS/OST as appropriate and to properly
2568          * network encode the arg field.
2569         case EXT3_IOC_SETVERSION_OLD:
2570         case EXT3_IOC_SETVERSION:
2571         */
2572         case LL_IOC_FLUSHCTX:
2573                 RETURN(ll_flush_ctx(inode));
2574         default: {
2575                 int err;
2576
2577                 if (LLIOC_STOP == 
2578                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2579                         RETURN(err);
2580
2581                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2582                                      (void *)arg));
2583         }
2584         }
2585 }
2586
2587 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2588 {
2589         struct inode *inode = file->f_dentry->d_inode;
2590         struct ll_inode_info *lli = ll_i2info(inode);
2591         struct lov_stripe_md *lsm = lli->lli_smd;
2592         loff_t retval;
2593         ENTRY;
2594         retval = offset + ((origin == 2) ? i_size_read(inode) :
2595                            (origin == 1) ? file->f_pos : 0);
2596         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2597                inode->i_ino, inode->i_generation, inode, retval, retval,
2598                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2599         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2600
2601         if (origin == 2) { /* SEEK_END */
2602                 int nonblock = 0, rc;
2603
2604                 if (file->f_flags & O_NONBLOCK)
2605                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2606
2607                 if (lsm != NULL) {
2608                         rc = ll_glimpse_size(inode, nonblock);
2609                         if (rc != 0)
2610                                 RETURN(rc);
2611                 }
2612
2613                 ll_inode_size_lock(inode, 0);
2614                 offset += i_size_read(inode);
2615                 ll_inode_size_unlock(inode, 0);
2616         } else if (origin == 1) { /* SEEK_CUR */
2617                 offset += file->f_pos;
2618         }
2619
2620         retval = -EINVAL;
2621         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2622                 if (offset != file->f_pos) {
2623                         file->f_pos = offset;
2624 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2625                         file->f_reada = 0;
2626                         file->f_version = ++event;
2627 #endif
2628                 }
2629                 retval = offset;
2630         }
2631         
2632         RETURN(retval);
2633 }
2634
2635 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2636 {
2637         struct inode *inode = dentry->d_inode;
2638         struct ll_inode_info *lli = ll_i2info(inode);
2639         struct lov_stripe_md *lsm = lli->lli_smd;
2640         struct ptlrpc_request *req;
2641         struct obd_capa *oc;
2642         int rc, err;
2643         ENTRY;
2644         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2645                inode->i_generation, inode);
2646         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2647
2648         /* fsync's caller has already called _fdata{sync,write}, we want
2649          * that IO to finish before calling the osc and mdc sync methods */
2650         rc = filemap_fdatawait(inode->i_mapping);
2651
2652         /* catch async errors that were recorded back when async writeback
2653          * failed for pages in this mapping. */
2654         err = lli->lli_async_rc;
2655         lli->lli_async_rc = 0;
2656         if (rc == 0)
2657                 rc = err;
2658         if (lsm) {
2659                 err = lov_test_and_clear_async_rc(lsm);
2660                 if (rc == 0)
2661                         rc = err;
2662         }
2663
2664         oc = ll_mdscapa_get(inode);
2665         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2666                       &req);
2667         capa_put(oc);
2668         if (!rc)
2669                 rc = err;
2670         if (!err)
2671                 ptlrpc_req_finished(req);
2672
2673         if (data && lsm) {
2674                 struct obdo *oa;
2675                 
2676                 OBDO_ALLOC(oa);
2677                 if (!oa)
2678                         RETURN(rc ? rc : -ENOMEM);
2679
2680                 oa->o_id = lsm->lsm_object_id;
2681                 oa->o_gr = lsm->lsm_object_gr;
2682                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2683                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2684                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2685                                            OBD_MD_FLGROUP);
2686
2687                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2688                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2689                                0, OBD_OBJECT_EOF, oc);
2690                 capa_put(oc);
2691                 if (!rc)
2692                         rc = err;
2693                 OBDO_FREE(oa);
2694         }
2695
2696         RETURN(rc);
2697 }
2698
2699 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2700 {
2701         struct inode *inode = file->f_dentry->d_inode;
2702         struct ll_sb_info *sbi = ll_i2sbi(inode);
2703         struct ldlm_res_id res_id =
2704                 { .name = { fid_seq(ll_inode2fid(inode)),
2705                             fid_oid(ll_inode2fid(inode)),
2706                             fid_ver(ll_inode2fid(inode)),
2707                             LDLM_FLOCK} };
2708         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2709                 ldlm_flock_completion_ast, NULL, file_lock };
2710         struct lustre_handle lockh = {0};
2711         ldlm_policy_data_t flock;
2712         int flags = 0;
2713         int rc;
2714         ENTRY;
2715
2716         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2717                inode->i_ino, file_lock);
2718
2719         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2720  
2721         if (file_lock->fl_flags & FL_FLOCK) {
2722                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2723                 /* set missing params for flock() calls */
2724                 file_lock->fl_end = OFFSET_MAX;
2725                 file_lock->fl_pid = current->tgid;
2726         }
2727         flock.l_flock.pid = file_lock->fl_pid;
2728         flock.l_flock.start = file_lock->fl_start;
2729         flock.l_flock.end = file_lock->fl_end;
2730
2731         switch (file_lock->fl_type) {
2732         case F_RDLCK:
2733                 einfo.ei_mode = LCK_PR;
2734                 break;
2735         case F_UNLCK:
2736                 /* An unlock request may or may not have any relation to
2737                  * existing locks so we may not be able to pass a lock handle
2738                  * via a normal ldlm_lock_cancel() request. The request may even
2739                  * unlock a byte range in the middle of an existing lock. In
2740                  * order to process an unlock request we need all of the same
2741                  * information that is given with a normal read or write record
2742                  * lock request. To avoid creating another ldlm unlock (cancel)
2743                  * message we'll treat a LCK_NL flock request as an unlock. */
2744                 einfo.ei_mode = LCK_NL;
2745                 break;
2746         case F_WRLCK:
2747                 einfo.ei_mode = LCK_PW;
2748                 break;
2749         default:
2750                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2751                 LBUG();
2752         }
2753
2754         switch (cmd) {
2755         case F_SETLKW:
2756 #ifdef F_SETLKW64
2757         case F_SETLKW64:
2758 #endif
2759                 flags = 0;
2760                 break;
2761         case F_SETLK:
2762 #ifdef F_SETLK64
2763         case F_SETLK64:
2764 #endif
2765                 flags = LDLM_FL_BLOCK_NOWAIT;
2766                 break;
2767         case F_GETLK:
2768 #ifdef F_GETLK64
2769         case F_GETLK64:
2770 #endif
2771                 flags = LDLM_FL_TEST_LOCK;
2772                 /* Save the old mode so that if the mode in the lock changes we
2773                  * can decrement the appropriate reader or writer refcount. */
2774                 file_lock->fl_type = einfo.ei_mode;
2775                 break;
2776         default:
2777                 CERROR("unknown fcntl lock command: %d\n", cmd);
2778                 LBUG();
2779         }
2780
2781         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2782                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2783                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2784
2785         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2786                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2787         if ((file_lock->fl_flags & FL_FLOCK) &&
2788             (rc == 0 || file_lock->fl_type == F_UNLCK))
2789                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2790 #ifdef HAVE_F_OP_FLOCK
2791         if ((file_lock->fl_flags & FL_POSIX) &&
2792             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2793             !(flags & LDLM_FL_TEST_LOCK))
2794                 posix_lock_file_wait(file, file_lock);
2795 #endif
2796
2797         RETURN(rc);
2798 }
2799
2800 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2801 {
2802         ENTRY;
2803
2804         RETURN(-ENOSYS);
2805 }
2806
2807 int ll_have_md_lock(struct inode *inode, __u64 bits)
2808 {
2809         struct lustre_handle lockh;
2810         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2811         struct lu_fid *fid;
2812         int flags;
2813         ENTRY;
2814
2815         if (!inode)
2816                RETURN(0);
2817
2818         fid = &ll_i2info(inode)->lli_fid;
2819         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2820
2821         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2822         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2823                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2824                 RETURN(1);
2825         }
2826         RETURN(0);
2827 }
2828
2829 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2830                             struct lustre_handle *lockh)
2831 {
2832         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2833         struct lu_fid *fid;
2834         ldlm_mode_t rc;
2835         int flags;
2836         ENTRY;
2837
2838         fid = &ll_i2info(inode)->lli_fid;
2839         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2840
2841         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2842         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2843                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2844         RETURN(rc);
2845 }
2846
2847 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2848         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2849                               * and return success */
2850                 inode->i_nlink = 0;
2851                 /* This path cannot be hit for regular files unless in
2852                  * case of obscure races, so no need to to validate
2853                  * size. */
2854                 if (!S_ISREG(inode->i_mode) &&
2855                     !S_ISDIR(inode->i_mode))
2856                         return 0;
2857         }
2858
2859         if (rc) {
2860                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2861                 return -abs(rc);
2862
2863         }
2864
2865         return 0;
2866 }
2867
2868 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2869 {
2870         struct inode *inode = dentry->d_inode;
2871         struct ptlrpc_request *req = NULL;
2872         struct ll_sb_info *sbi;
2873         struct obd_export *exp;
2874         int rc;
2875         ENTRY;
2876
2877         if (!inode) {
2878                 CERROR("REPORT THIS LINE TO PETER\n");
2879                 RETURN(0);
2880         }
2881         sbi = ll_i2sbi(inode);
2882
2883         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2884                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2885
2886         exp = ll_i2mdexp(inode);
2887
2888         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2889                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2890                 struct md_op_data *op_data;
2891
2892                 /* Call getattr by fid, so do not provide name at all. */
2893                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2894                                              dentry->d_inode, NULL, 0, 0,
2895                                              LUSTRE_OPC_ANY, NULL);
2896                 if (IS_ERR(op_data))
2897                         RETURN(PTR_ERR(op_data));
2898
2899                 oit.it_flags |= O_CHECK_STALE;
2900                 rc = md_intent_lock(exp, op_data, NULL, 0,
2901                                     /* we are not interested in name
2902                                        based lookup */
2903                                     &oit, 0, &req,
2904                                     ll_md_blocking_ast, 0);
2905                 ll_finish_md_op_data(op_data);
2906                 oit.it_flags &= ~O_CHECK_STALE;
2907                 if (rc < 0) {
2908                         rc = ll_inode_revalidate_fini(inode, rc);
2909                         GOTO (out, rc);
2910                 }
2911
2912                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2913                 if (rc != 0) {
2914                         ll_intent_release(&oit);
2915                         GOTO(out, rc);
2916                 }
2917
2918                 /* Unlinked? Unhash dentry, so it is not picked up later by
2919                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2920                    here to preserve get_cwd functionality on 2.6.
2921                    Bug 10503 */
2922                 if (!dentry->d_inode->i_nlink) {
2923                         spin_lock(&dcache_lock);
2924                         ll_drop_dentry(dentry);
2925                         spin_unlock(&dcache_lock);
2926                 }
2927
2928                 ll_lookup_finish_locks(&oit, dentry);
2929         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
2930                                                      MDS_INODELOCK_LOOKUP)) {
2931                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2932                 obd_valid valid = OBD_MD_FLGETATTR;
2933                 struct obd_capa *oc;
2934                 int ealen = 0;
2935
2936                 if (S_ISREG(inode->i_mode)) {
2937                         rc = ll_get_max_mdsize(sbi, &ealen);
2938                         if (rc)
2939                                 RETURN(rc);
2940                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2941                 }
2942                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2943                  * capa for this inode. Because we only keep capas of dirs
2944                  * fresh. */
2945                 oc = ll_mdscapa_get(inode);
2946                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2947                                 ealen, &req);
2948                 capa_put(oc);
2949                 if (rc) {
2950                         rc = ll_inode_revalidate_fini(inode, rc);
2951                         RETURN(rc);
2952                 }
2953
2954                 rc = ll_prep_inode(&inode, req, NULL);
2955                 if (rc)
2956                         GOTO(out, rc);
2957         }
2958
2959         /* if object not yet allocated, don't validate size */
2960         if (ll_i2info(inode)->lli_smd == NULL)
2961                 GOTO(out, rc = 0);
2962
2963         /* ll_glimpse_size will prefer locally cached writes if they extend
2964          * the file */
2965         rc = ll_glimpse_size(inode, 0);
2966         EXIT;
2967 out:
2968         ptlrpc_req_finished(req);
2969         return rc;
2970 }
2971
2972 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2973                   struct lookup_intent *it, struct kstat *stat)
2974 {
2975         struct inode *inode = de->d_inode;
2976         int res = 0;
2977
2978         res = ll_inode_revalidate_it(de, it);
2979         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2980
2981         if (res)
2982                 return res;
2983
2984         stat->dev = inode->i_sb->s_dev;
2985         stat->ino = inode->i_ino;
2986         stat->mode = inode->i_mode;
2987         stat->nlink = inode->i_nlink;
2988         stat->uid = inode->i_uid;
2989         stat->gid = inode->i_gid;
2990         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2991         stat->atime = inode->i_atime;
2992         stat->mtime = inode->i_mtime;
2993         stat->ctime = inode->i_ctime;
2994 #ifdef HAVE_INODE_BLKSIZE
2995         stat->blksize = inode->i_blksize;
2996 #else
2997         stat->blksize = 1 << inode->i_blkbits;
2998 #endif
2999
3000         ll_inode_size_lock(inode, 0);
3001         stat->size = i_size_read(inode);
3002         stat->blocks = inode->i_blocks;
3003         ll_inode_size_unlock(inode, 0);
3004
3005         return 0;
3006 }
3007 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3008 {
3009         struct lookup_intent it = { .it_op = IT_GETATTR };
3010
3011         return ll_getattr_it(mnt, de, &it, stat);
3012 }
3013
3014 static
3015 int lustre_check_acl(struct inode *inode, int mask)
3016 {
3017 #ifdef CONFIG_FS_POSIX_ACL
3018         struct ll_inode_info *lli = ll_i2info(inode);
3019         struct posix_acl *acl;
3020         int rc;
3021         ENTRY;
3022
3023         spin_lock(&lli->lli_lock);
3024         acl = posix_acl_dup(lli->lli_posix_acl);
3025         spin_unlock(&lli->lli_lock);
3026
3027         if (!acl)
3028                 RETURN(-EAGAIN);
3029
3030         rc = posix_acl_permission(inode, acl, mask);
3031         posix_acl_release(acl);
3032
3033         RETURN(rc);
3034 #else
3035         return -EAGAIN;
3036 #endif
3037 }
3038
3039 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
3040 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3041 {
3042         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3043                inode->i_ino, inode->i_generation, inode, mask);
3044         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3045                 return lustre_check_remote_perm(inode, mask);
3046         
3047         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3048         return generic_permission(inode, mask, lustre_check_acl);
3049 }
3050 #else
3051 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3052 {
3053         int mode = inode->i_mode;
3054         int rc;
3055
3056         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3057                inode->i_ino, inode->i_generation, inode, mask);
3058
3059         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3060                 return lustre_check_remote_perm(inode, mask);
3061
3062         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3063
3064         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
3065             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
3066                 return -EROFS;
3067         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
3068                 return -EACCES;
3069         if (current->fsuid == inode->i_uid) {
3070                 mode >>= 6;
3071         } else if (1) {
3072                 if (((mode >> 3) & mask & S_IRWXO) != mask)
3073                         goto check_groups;
3074                 rc = lustre_check_acl(inode, mask);
3075                 if (rc == -EAGAIN)
3076                         goto check_groups;
3077                 if (rc == -EACCES)
3078                         goto check_capabilities;
3079                 return rc;
3080         } else {
3081 check_groups:
3082                 if (in_group_p(inode->i_gid))
3083                         mode >>= 3;
3084         }
3085         if ((mode & mask & S_IRWXO) == mask)
3086                 return 0;
3087
3088 check_capabilities:
3089         if (!(mask & MAY_EXEC) ||
3090             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
3091                 if (capable(CAP_DAC_OVERRIDE))
3092                         return 0;
3093
3094         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
3095             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
3096                 return 0;
3097         
3098         return -EACCES;
3099 }
3100 #endif
3101
3102 /* -o localflock - only provides locally consistent flock locks */
3103 struct file_operations ll_file_operations = {
3104         .read           = ll_file_read,
3105         .write          = ll_file_write,
3106         .ioctl          = ll_file_ioctl,
3107         .open           = ll_file_open,
3108         .release        = ll_file_release,
3109         .mmap           = ll_file_mmap,
3110         .llseek         = ll_file_seek,
3111         .sendfile       = ll_file_sendfile,
3112         .fsync          = ll_fsync,
3113 };
3114
3115 struct file_operations ll_file_operations_flock = {
3116         .read           = ll_file_read,
3117         .write          = ll_file_write,
3118         .ioctl          = ll_file_ioctl,
3119         .open           = ll_file_open,
3120         .release        = ll_file_release,
3121         .mmap           = ll_file_mmap,
3122         .llseek         = ll_file_seek,
3123         .sendfile       = ll_file_sendfile,
3124         .fsync          = ll_fsync,
3125 #ifdef HAVE_F_OP_FLOCK
3126         .flock          = ll_file_flock,
3127 #endif
3128         .lock           = ll_file_flock
3129 };
3130
3131 /* These are for -o noflock - to return ENOSYS on flock calls */
3132 struct file_operations ll_file_operations_noflock = {
3133         .read           = ll_file_read,
3134         .write          = ll_file_write,
3135         .ioctl          = ll_file_ioctl,
3136         .open           = ll_file_open,
3137         .release        = ll_file_release,
3138         .mmap           = ll_file_mmap,
3139         .llseek         = ll_file_seek,
3140         .sendfile       = ll_file_sendfile,
3141         .fsync          = ll_fsync,
3142 #ifdef HAVE_F_OP_FLOCK
3143         .flock          = ll_file_noflock,
3144 #endif
3145         .lock           = ll_file_noflock
3146 };
3147
3148 struct inode_operations ll_file_inode_operations = {
3149 #ifdef HAVE_VFS_INTENT_PATCHES
3150         .setattr_raw    = ll_setattr_raw,
3151 #endif
3152         .setattr        = ll_setattr,
3153         .truncate       = ll_truncate,
3154         .getattr        = ll_getattr,
3155         .permission     = ll_inode_permission,
3156         .setxattr       = ll_setxattr,
3157         .getxattr       = ll_getxattr,
3158         .listxattr      = ll_listxattr,
3159         .removexattr    = ll_removexattr,
3160 };
3161
3162 /* dynamic ioctl number support routins */
3163 static struct llioc_ctl_data {
3164         struct rw_semaphore ioc_sem;
3165         struct list_head    ioc_head;
3166 } llioc = { 
3167         __RWSEM_INITIALIZER(llioc.ioc_sem), 
3168         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3169 };
3170
3171
3172 struct llioc_data {
3173         struct list_head        iocd_list;
3174         unsigned int            iocd_size;
3175         llioc_callback_t        iocd_cb;
3176         unsigned int            iocd_count;
3177         unsigned int            iocd_cmd[0];
3178 };
3179
3180 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3181 {
3182         unsigned int size;
3183         struct llioc_data *in_data = NULL;
3184         ENTRY;
3185
3186         if (cb == NULL || cmd == NULL ||
3187             count > LLIOC_MAX_CMD || count < 0)
3188                 RETURN(NULL);
3189
3190         size = sizeof(*in_data) + count * sizeof(unsigned int);
3191         OBD_ALLOC(in_data, size);
3192         if (in_data == NULL)
3193                 RETURN(NULL);
3194
3195         memset(in_data, 0, sizeof(*in_data));
3196         in_data->iocd_size = size;
3197         in_data->iocd_cb = cb;
3198         in_data->iocd_count = count;
3199         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3200
3201         down_write(&llioc.ioc_sem);
3202         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3203         up_write(&llioc.ioc_sem);
3204
3205         RETURN(in_data);
3206 }
3207
3208 void ll_iocontrol_unregister(void *magic)
3209 {
3210         struct llioc_data *tmp;
3211
3212         if (magic == NULL)
3213                 return;
3214
3215         down_write(&llioc.ioc_sem);
3216         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3217                 if (tmp == magic) {
3218                         unsigned int size = tmp->iocd_size;
3219
3220                         list_del(&tmp->iocd_list);
3221                         up_write(&llioc.ioc_sem);
3222
3223                         OBD_FREE(tmp, size);
3224                         return;
3225                 }
3226         }
3227         up_write(&llioc.ioc_sem);
3228
3229         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3230 }
3231
3232 EXPORT_SYMBOL(ll_iocontrol_register);
3233 EXPORT_SYMBOL(ll_iocontrol_unregister);
3234
3235 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
3236                         unsigned int cmd, unsigned long arg, int *rcp)
3237 {
3238         enum llioc_iter ret = LLIOC_CONT;
3239         struct llioc_data *data;
3240         int rc = -EINVAL, i;
3241
3242         down_read(&llioc.ioc_sem);
3243         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3244                 for (i = 0; i < data->iocd_count; i++) {
3245                         if (cmd != data->iocd_cmd[i]) 
3246                                 continue;
3247
3248                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3249                         break;
3250                 }
3251
3252                 if (ret == LLIOC_STOP)
3253                         break;
3254         }
3255         up_read(&llioc.ioc_sem);
3256
3257         if (rcp)
3258                 *rcp = rc;
3259         return ret;
3260 }