Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <lustre_mdc.h>
47 #include <linux/pagemap.h>
48 #include <linux/file.h>
49 #include "llite_internal.h"
50 #include <lustre/ll_fiemap.h>
51
52 /* also used by llite/special.c:ll_special_open() */
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
58         return fd;
59 }
60
61 static void ll_file_data_put(struct ll_file_data *fd)
62 {
63         if (fd != NULL)
64                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
65 }
66
67 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
68                           struct lustre_handle *fh)
69 {
70         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
71         op_data->op_attr.ia_mode = inode->i_mode;
72         op_data->op_attr.ia_atime = inode->i_atime;
73         op_data->op_attr.ia_mtime = inode->i_mtime;
74         op_data->op_attr.ia_ctime = inode->i_ctime;
75         op_data->op_attr.ia_size = i_size_read(inode);
76         op_data->op_attr_blocks = inode->i_blocks;
77         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
78         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
79         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
80         op_data->op_capa1 = ll_mdscapa_get(inode);
81 }
82
83 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
84                              struct obd_client_handle *och)
85 {
86         ENTRY;
87
88         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
89                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
90
91         if (!(och->och_flags & FMODE_WRITE))
92                 goto out;
93
94         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
95             !S_ISREG(inode->i_mode))
96                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
97         else
98                 ll_epoch_close(inode, op_data, &och, 0);
99
100 out:
101         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
102         EXIT;
103 }
104
105 static int ll_close_inode_openhandle(struct obd_export *md_exp,
106                                      struct inode *inode,
107                                      struct obd_client_handle *och)
108 {
109         struct obd_export *exp = ll_i2mdexp(inode);
110         struct md_op_data *op_data;
111         struct ptlrpc_request *req = NULL;
112         struct obd_device *obd = class_exp2obd(exp);
113         int epoch_close = 1;
114         int seq_end = 0, rc;
115         ENTRY;
116
117         if (obd == NULL) {
118                 /*
119                  * XXX: in case of LMV, is this correct to access
120                  * ->exp_handle?
121                  */
122                 CERROR("Invalid MDC connection handle "LPX64"\n",
123                        ll_i2mdexp(inode)->exp_handle.h_cookie);
124                 GOTO(out, rc = 0);
125         }
126
127         /*
128          * here we check if this is forced umount. If so this is called on
129          * canceling "open lock" and we do not call md_close() in this case, as
130          * it will not be successful, as import is already deactivated.
131          */
132         if (obd->obd_force)
133                 GOTO(out, rc = 0);
134
135         OBD_ALLOC_PTR(op_data);
136         if (op_data == NULL)
137                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
138
139         ll_prepare_close(inode, op_data, och);
140         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
141         rc = md_close(md_exp, op_data, och->och_mod, &req);
142         if (rc != -EAGAIN)
143                 seq_end = 1;
144
145         if (rc == -EAGAIN) {
146                 /* This close must have the epoch closed. */
147                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
148                 LASSERT(epoch_close);
149                 /* MDS has instructed us to obtain Size-on-MDS attribute from
150                  * OSTs and send setattr to back to MDS. */
151                 rc = ll_sizeonmds_update(inode, och->och_mod,
152                                          &och->och_fh, op_data->op_ioepoch);
153                 if (rc) {
154                         CERROR("inode %lu mdc Size-on-MDS update failed: "
155                                "rc = %d\n", inode->i_ino, rc);
156                         rc = 0;
157                 }
158         } else if (rc) {
159                 CERROR("inode %lu mdc close failed: rc = %d\n",
160                        inode->i_ino, rc);
161         }
162         ll_finish_md_op_data(op_data);
163
164         if (rc == 0) {
165                 rc = ll_objects_destroy(req, inode);
166                 if (rc)
167                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
168                                inode->i_ino, rc);
169         }
170
171         EXIT;
172 out:
173       
174         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
175             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
176                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
177         } else {
178                 if (seq_end)
179                         ptlrpc_close_replay_seq(req);
180                 md_clear_open_replay_data(md_exp, och);
181                 /* Free @och if it is not waiting for DONE_WRITING. */
182                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
183                 OBD_FREE_PTR(och);
184         }
185         if (req) /* This is close request */
186                 ptlrpc_req_finished(req);
187         return rc;
188 }
189
190 int ll_md_real_close(struct inode *inode, int flags)
191 {
192         struct ll_inode_info *lli = ll_i2info(inode);
193         struct obd_client_handle **och_p;
194         struct obd_client_handle *och;
195         __u64 *och_usecount;
196         int rc = 0;
197         ENTRY;
198
199         if (flags & FMODE_WRITE) {
200                 och_p = &lli->lli_mds_write_och;
201                 och_usecount = &lli->lli_open_fd_write_count;
202         } else if (flags & FMODE_EXEC) {
203                 och_p = &lli->lli_mds_exec_och;
204                 och_usecount = &lli->lli_open_fd_exec_count;
205         } else {
206                 LASSERT(flags & FMODE_READ);
207                 och_p = &lli->lli_mds_read_och;
208                 och_usecount = &lli->lli_open_fd_read_count;
209         }
210
211         down(&lli->lli_och_sem);
212         if (*och_usecount) { /* There are still users of this handle, so
213                                 skip freeing it. */
214                 up(&lli->lli_och_sem);
215                 RETURN(0);
216         }
217         och=*och_p;
218         *och_p = NULL;
219         up(&lli->lli_och_sem);
220
221         if (och) { /* There might be a race and somebody have freed this och
222                       already */
223                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
224                                                inode, och);
225         }
226
227         RETURN(rc);
228 }
229
230 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
231                 struct file *file)
232 {
233         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
234         struct ll_inode_info *lli = ll_i2info(inode);
235         int rc = 0;
236         ENTRY;
237
238         /* clear group lock, if present */
239         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
240                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
241                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
242                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
243                                       &fd->fd_cwlockh);
244         }
245
246         /* Let's see if we have good enough OPEN lock on the file and if
247            we can skip talking to MDS */
248         if (file->f_dentry->d_inode) { /* Can this ever be false? */
249                 int lockmode;
250                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
251                 struct lustre_handle lockh;
252                 struct inode *inode = file->f_dentry->d_inode;
253                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
254
255                 down(&lli->lli_och_sem);
256                 if (fd->fd_omode & FMODE_WRITE) {
257                         lockmode = LCK_CW;
258                         LASSERT(lli->lli_open_fd_write_count);
259                         lli->lli_open_fd_write_count--;
260                 } else if (fd->fd_omode & FMODE_EXEC) {
261                         lockmode = LCK_PR;
262                         LASSERT(lli->lli_open_fd_exec_count);
263                         lli->lli_open_fd_exec_count--;
264                 } else {
265                         lockmode = LCK_CR;
266                         LASSERT(lli->lli_open_fd_read_count);
267                         lli->lli_open_fd_read_count--;
268                 }
269                 up(&lli->lli_och_sem);
270
271                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
272                                    LDLM_IBITS, &policy, lockmode,
273                                    &lockh)) {
274                         rc = ll_md_real_close(file->f_dentry->d_inode,
275                                               fd->fd_omode);
276                 }
277         } else {
278                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
279                        file, file->f_dentry, file->f_dentry->d_name.name);
280         }
281
282         LUSTRE_FPRIVATE(file) = NULL;
283         ll_file_data_put(fd);
284         ll_capa_close(inode);
285
286         RETURN(rc);
287 }
288
289 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
290
291 /* While this returns an error code, fput() the caller does not, so we need
292  * to make every effort to clean up all of our state here.  Also, applications
293  * rarely check close errors and even if an error is returned they will not
294  * re-try the close call.
295  */
296 int ll_file_release(struct inode *inode, struct file *file)
297 {
298         struct ll_file_data *fd;
299         struct ll_sb_info *sbi = ll_i2sbi(inode);
300         struct ll_inode_info *lli = ll_i2info(inode);
301         struct lov_stripe_md *lsm = lli->lli_smd;
302         int rc;
303
304         ENTRY;
305         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
306                inode->i_generation, inode);
307
308 #ifdef CONFIG_FS_POSIX_ACL
309         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
310             inode == inode->i_sb->s_root->d_inode) {
311                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
312
313                 LASSERT(fd != NULL);
314                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
315                         fd->fd_flags &= ~LL_FILE_RMTACL;
316                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
317                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
318                 }
319         }
320 #endif
321
322         if (inode->i_sb->s_root != file->f_dentry)
323                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
324         fd = LUSTRE_FPRIVATE(file);
325         LASSERT(fd != NULL);
326
327         /* The last ref on @file, maybe not the the owner pid of statahead.
328          * Different processes can open the same dir, "ll_opendir_key" means:
329          * it is me that should stop the statahead thread. */
330         if (lli->lli_opendir_key == fd)
331                 ll_stop_statahead(inode, fd);
332
333         if (inode->i_sb->s_root == file->f_dentry) {
334                 LUSTRE_FPRIVATE(file) = NULL;
335                 ll_file_data_put(fd);
336                 RETURN(0);
337         }
338         
339         if (lsm)
340                 lov_test_and_clear_async_rc(lsm);
341         lli->lli_async_rc = 0;
342
343         rc = ll_md_close(sbi->ll_md_exp, inode, file);
344         RETURN(rc);
345 }
346
347 static int ll_intent_file_open(struct file *file, void *lmm,
348                                int lmmsize, struct lookup_intent *itp)
349 {
350         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
351         struct dentry *parent = file->f_dentry->d_parent;
352         const char *name = file->f_dentry->d_name.name;
353         const int len = file->f_dentry->d_name.len;
354         struct md_op_data *op_data;
355         struct ptlrpc_request *req;
356         int rc;
357         ENTRY;
358
359         if (!parent)
360                 RETURN(-ENOENT);
361
362         /* Usually we come here only for NFSD, and we want open lock.
363            But we can also get here with pre 2.6.15 patchless kernels, and in
364            that case that lock is also ok */
365         /* We can also get here if there was cached open handle in revalidate_it
366          * but it disappeared while we were getting from there to ll_file_open.
367          * But this means this file was closed and immediatelly opened which
368          * makes a good candidate for using OPEN lock */
369         /* If lmmsize & lmm are not 0, we are just setting stripe info
370          * parameters. No need for the open lock */
371         if (!lmm && !lmmsize)
372                 itp->it_flags |= MDS_OPEN_LOCK;
373
374         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
375                                       file->f_dentry->d_inode, name, len,
376                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
377         if (IS_ERR(op_data))
378                 RETURN(PTR_ERR(op_data));
379
380         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
381                             0 /*unused */, &req, ll_md_blocking_ast, 0);
382         ll_finish_md_op_data(op_data);
383         if (rc == -ESTALE) {
384                 /* reason for keep own exit path - don`t flood log
385                 * with messages with -ESTALE errors.
386                 */
387                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
388                      it_open_error(DISP_OPEN_OPEN, itp))
389                         GOTO(out, rc);
390                 ll_release_openhandle(file->f_dentry, itp);
391                 GOTO(out_stale, rc);
392         }
393
394         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
395                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
396                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
397                 GOTO(out, rc);
398         }
399
400         if (itp->d.lustre.it_lock_mode)
401                 md_set_lock_data(sbi->ll_md_exp,
402                                  &itp->d.lustre.it_lock_handle, 
403                                  file->f_dentry->d_inode);
404
405         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
406 out:
407         ptlrpc_req_finished(itp->d.lustre.it_data);
408
409 out_stale:
410         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
411         ll_intent_drop_lock(itp);
412
413         RETURN(rc);
414 }
415
416 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
417                        struct lookup_intent *it, struct obd_client_handle *och)
418 {
419         struct ptlrpc_request *req = it->d.lustre.it_data;
420         struct mdt_body *body;
421
422         LASSERT(och);
423
424         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
425         LASSERT(body != NULL);                      /* reply already checked out */
426
427         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
428         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
429         och->och_fid = lli->lli_fid;
430         och->och_flags = it->it_flags;
431         lli->lli_ioepoch = body->ioepoch;
432
433         return md_set_open_replay_data(md_exp, och, req);
434 }
435
436 int ll_local_open(struct file *file, struct lookup_intent *it,
437                   struct ll_file_data *fd, struct obd_client_handle *och)
438 {
439         struct inode *inode = file->f_dentry->d_inode;
440         struct ll_inode_info *lli = ll_i2info(inode);
441         ENTRY;
442
443         LASSERT(!LUSTRE_FPRIVATE(file));
444
445         LASSERT(fd != NULL);
446
447         if (och) {
448                 struct ptlrpc_request *req = it->d.lustre.it_data;
449                 struct mdt_body *body;
450                 int rc;
451
452                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
453                 if (rc)
454                         RETURN(rc);
455
456                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
457                 if ((it->it_flags & FMODE_WRITE) &&
458                     (body->valid & OBD_MD_FLSIZE))
459                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
460                                lli->lli_ioepoch, PFID(&lli->lli_fid));
461         }
462
463         LUSTRE_FPRIVATE(file) = fd;
464         ll_readahead_init(inode, &fd->fd_ras);
465         fd->fd_omode = it->it_flags;
466         RETURN(0);
467 }
468
469 /* Open a file, and (for the very first open) create objects on the OSTs at
470  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
471  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
472  * lli_open_sem to ensure no other process will create objects, send the
473  * stripe MD to the MDS, or try to destroy the objects if that fails.
474  *
475  * If we already have the stripe MD locally then we don't request it in
476  * md_open(), by passing a lmm_size = 0.
477  *
478  * It is up to the application to ensure no other processes open this file
479  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
480  * used.  We might be able to avoid races of that sort by getting lli_open_sem
481  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
482  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
483  */
484 int ll_file_open(struct inode *inode, struct file *file)
485 {
486         struct ll_inode_info *lli = ll_i2info(inode);
487         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
488                                           .it_flags = file->f_flags };
489         struct lov_stripe_md *lsm;
490         struct ptlrpc_request *req = NULL;
491         struct obd_client_handle **och_p;
492         __u64 *och_usecount;
493         struct ll_file_data *fd;
494         int rc = 0, opendir_set = 0;
495         ENTRY;
496
497         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
498                inode->i_generation, inode, file->f_flags);
499
500 #ifdef HAVE_VFS_INTENT_PATCHES
501         it = file->f_it;
502 #else
503         it = file->private_data; /* XXX: compat macro */
504         file->private_data = NULL; /* prevent ll_local_open assertion */
505 #endif
506
507         fd = ll_file_data_get();
508         if (fd == NULL)
509                 RETURN(-ENOMEM);
510
511         if (S_ISDIR(inode->i_mode)) {
512                 spin_lock(&lli->lli_lock);
513                 /* "lli->lli_opendir_pid != 0" means someone has set it.
514                  * "lli->lli_sai != NULL" means the previous statahead has not
515                  *                        been cleanup. */ 
516                 if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) {
517                         opendir_set = 1;
518                         lli->lli_opendir_pid = cfs_curproc_pid();
519                         lli->lli_opendir_key = fd;
520                 } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) {
521                         /* Two cases for this:
522                          * (1) The same process open such directory many times.
523                          * (2) The old process opened the directory, and exited
524                          *     before its children processes. Then new process
525                          *     with the same pid opens such directory before the
526                          *     old process's children processes exit.
527                          * Change the owner to the latest one. */
528                         opendir_set = 2;
529                         lli->lli_opendir_key = fd;
530                 }
531                 spin_unlock(&lli->lli_lock);
532         }
533
534         if (inode->i_sb->s_root == file->f_dentry) {
535                 LUSTRE_FPRIVATE(file) = fd;
536                 RETURN(0);
537         }
538
539         if (!it || !it->d.lustre.it_disposition) {
540                 /* Convert f_flags into access mode. We cannot use file->f_mode,
541                  * because everything but O_ACCMODE mask was stripped from
542                  * there */
543                 if ((oit.it_flags + 1) & O_ACCMODE)
544                         oit.it_flags++;
545                 if (file->f_flags & O_TRUNC)
546                         oit.it_flags |= FMODE_WRITE;
547
548                 /* kernel only call f_op->open in dentry_open.  filp_open calls
549                  * dentry_open after call to open_namei that checks permissions.
550                  * Only nfsd_open call dentry_open directly without checking
551                  * permissions and because of that this code below is safe. */
552                 if (oit.it_flags & FMODE_WRITE)
553                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
554
555                 /* We do not want O_EXCL here, presumably we opened the file
556                  * already? XXX - NFS implications? */
557                 oit.it_flags &= ~O_EXCL;
558
559                 it = &oit;
560         }
561
562 restart:
563         /* Let's see if we have file open on MDS already. */
564         if (it->it_flags & FMODE_WRITE) {
565                 och_p = &lli->lli_mds_write_och;
566                 och_usecount = &lli->lli_open_fd_write_count;
567         } else if (it->it_flags & FMODE_EXEC) {
568                 och_p = &lli->lli_mds_exec_och;
569                 och_usecount = &lli->lli_open_fd_exec_count;
570          } else {
571                 och_p = &lli->lli_mds_read_och;
572                 och_usecount = &lli->lli_open_fd_read_count;
573         }
574         
575         down(&lli->lli_och_sem);
576         if (*och_p) { /* Open handle is present */
577                 if (it_disposition(it, DISP_OPEN_OPEN)) {
578                         /* Well, there's extra open request that we do not need,
579                            let's close it somehow. This will decref request. */
580                         rc = it_open_error(DISP_OPEN_OPEN, it);
581                         if (rc) {
582                                 up(&lli->lli_och_sem);
583                                 ll_file_data_put(fd);
584                                 GOTO(out_openerr, rc);
585                         }       
586                         ll_release_openhandle(file->f_dentry, it);
587                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
588                                              LPROC_LL_OPEN);
589                 }
590                 (*och_usecount)++;
591
592                 rc = ll_local_open(file, it, fd, NULL);
593                 if (rc) {
594                         (*och_usecount)--;
595                         up(&lli->lli_och_sem);
596                         ll_file_data_put(fd);
597                         GOTO(out_openerr, rc);
598                 }
599         } else {
600                 LASSERT(*och_usecount == 0);
601                 if (!it->d.lustre.it_disposition) {
602                         /* We cannot just request lock handle now, new ELC code
603                            means that one of other OPEN locks for this file
604                            could be cancelled, and since blocking ast handler
605                            would attempt to grab och_sem as well, that would
606                            result in a deadlock */
607                         up(&lli->lli_och_sem);
608                         it->it_flags |= O_CHECK_STALE;
609                         rc = ll_intent_file_open(file, NULL, 0, it);
610                         it->it_flags &= ~O_CHECK_STALE;
611                         if (rc) {
612                                 ll_file_data_put(fd);
613                                 GOTO(out_openerr, rc);
614                         }
615
616                         /* Got some error? Release the request */
617                         if (it->d.lustre.it_status < 0) {
618                                 req = it->d.lustre.it_data;
619                                 ptlrpc_req_finished(req);
620                         }
621                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
622                                          &it->d.lustre.it_lock_handle,
623                                          file->f_dentry->d_inode);
624                         goto restart;
625                 }
626                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
627                 if (!*och_p) {
628                         ll_file_data_put(fd);
629                         GOTO(out_och_free, rc = -ENOMEM);
630                 }
631                 (*och_usecount)++;
632                 req = it->d.lustre.it_data;
633
634                 /* md_intent_lock() didn't get a request ref if there was an
635                  * open error, so don't do cleanup on the request here
636                  * (bug 3430) */
637                 /* XXX (green): Should not we bail out on any error here, not
638                  * just open error? */
639                 rc = it_open_error(DISP_OPEN_OPEN, it);
640                 if (rc) {
641                         ll_file_data_put(fd);
642                         GOTO(out_och_free, rc);
643                 }
644
645                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
646                 rc = ll_local_open(file, it, fd, *och_p);
647                 if (rc) {
648                         ll_file_data_put(fd);
649                         GOTO(out_och_free, rc);
650                 }
651         }
652         up(&lli->lli_och_sem);
653
654         /* Must do this outside lli_och_sem lock to prevent deadlock where
655            different kind of OPEN lock for this same inode gets cancelled
656            by ldlm_cancel_lru */
657         if (!S_ISREG(inode->i_mode))
658                 GOTO(out, rc);
659
660         ll_capa_open(inode);
661
662         lsm = lli->lli_smd;
663         if (lsm == NULL) {
664                 if (file->f_flags & O_LOV_DELAY_CREATE ||
665                     !(file->f_mode & FMODE_WRITE)) {
666                         CDEBUG(D_INODE, "object creation was delayed\n");
667                         GOTO(out, rc);
668                 }
669         }
670         file->f_flags &= ~O_LOV_DELAY_CREATE;
671         GOTO(out, rc);
672 out:
673         ptlrpc_req_finished(req);
674         if (req)
675                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
676 out_och_free:
677         if (rc) {
678                 if (*och_p) {
679                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
680                         *och_p = NULL; /* OBD_FREE writes some magic there */
681                         (*och_usecount)--;
682                 }
683                 up(&lli->lli_och_sem);
684 out_openerr:
685                 if (opendir_set == 1) {
686                         lli->lli_opendir_key = NULL;
687                         lli->lli_opendir_pid = 0;
688                 } else if (unlikely(opendir_set == 2)) {
689                         ll_stop_statahead(inode, fd);
690                 }
691         }
692
693         return rc;
694 }
695
696 /* Fills the obdo with the attributes for the inode defined by lsm */
697 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
698 {
699         struct ptlrpc_request_set *set;
700         struct ll_inode_info *lli = ll_i2info(inode);
701         struct lov_stripe_md *lsm = lli->lli_smd;
702
703         struct obd_info oinfo = { { { 0 } } };
704         int rc;
705         ENTRY;
706
707         LASSERT(lsm != NULL);
708
709         oinfo.oi_md = lsm;
710         oinfo.oi_oa = obdo;
711         oinfo.oi_oa->o_id = lsm->lsm_object_id;
712         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
713         oinfo.oi_oa->o_mode = S_IFREG;
714         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
715                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
716                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
717                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
718                                OBD_MD_FLGROUP;
719         oinfo.oi_capa = ll_mdscapa_get(inode);
720
721         set = ptlrpc_prep_set();
722         if (set == NULL) {
723                 CERROR("can't allocate ptlrpc set\n");
724                 rc = -ENOMEM;
725         } else {
726                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
727                 if (rc == 0)
728                         rc = ptlrpc_set_wait(set);
729                 ptlrpc_set_destroy(set);
730         }
731         capa_put(oinfo.oi_capa);
732         if (rc)
733                 RETURN(rc);
734
735         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
736                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
737                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
738
739         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
740         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
741                lli->lli_smd->lsm_object_id, i_size_read(inode),
742                (unsigned long long)inode->i_blocks,
743                (unsigned long)ll_inode_blksize(inode));
744         RETURN(0);
745 }
746
747 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
748 {
749         struct ll_inode_info *lli = ll_i2info(inode);
750         struct lov_stripe_md *lsm = lli->lli_smd;
751         struct obd_export *exp = ll_i2dtexp(inode);
752         struct {
753                 char name[16];
754                 struct ldlm_lock *lock;
755         } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock };
756         __u32 stripe, vallen = sizeof(stripe);
757         struct lov_oinfo *loinfo;
758         int rc;
759         ENTRY;
760
761         if (lsm->lsm_stripe_count == 1)
762                 GOTO(check, stripe = 0);
763
764         /* get our offset in the lov */
765         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe, lsm);
766         if (rc != 0) {
767                 CERROR("obd_get_info: rc = %d\n", rc);
768                 RETURN(rc);
769         }
770         LASSERT(stripe < lsm->lsm_stripe_count);
771
772 check:
773         loinfo = lsm->lsm_oinfo[stripe];
774         if (!osc_res_name_eq(loinfo->loi_id, loinfo->loi_gr,
775                             &lock->l_resource->lr_name)){
776                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
777                            loinfo->loi_id, loinfo->loi_gr);
778                 RETURN(-ELDLM_NO_LOCK_DATA);
779         }
780
781         RETURN(stripe);
782 }
783
784 /* Get extra page reference to ensure it is not going away */
785 void ll_pin_extent_cb(void *data)
786 {
787         struct page *page = data;
788         
789         page_cache_get(page);
790
791         return;
792 }
793
794 /* Flush the page from page cache for an extent as its canceled.
795  * Page to remove is delivered as @data.
796  *
797  * No one can dirty the extent until we've finished our work and they cannot
798  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
799  * but other kernel actors could have pages locked.
800  *
801  * If @discard is set, there is no need to write the page if it is dirty.
802  *
803  * Called with the DLM lock held. */
804 int ll_page_removal_cb(void *data, int discard)
805 {
806         int rc;
807         struct page *page = data;
808         struct address_space *mapping;
809  
810         ENTRY;
811
812         /* We have page reference already from ll_pin_page */
813         lock_page(page);
814
815         /* Already truncated by somebody */
816         if (!page->mapping)
817                 GOTO(out, rc = 0);
818         mapping = page->mapping;
819
820         ll_teardown_mmaps(mapping,
821                           (__u64)page->index << PAGE_CACHE_SHIFT,
822                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
823                                                               ~PAGE_CACHE_MASK);        
824         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
825
826         if (!discard && clear_page_dirty_for_io(page)) {
827                 LASSERT(page->mapping);
828                 rc = ll_call_writepage(page->mapping->host, page);
829                 /* either waiting for io to complete or reacquiring
830                  * the lock that the failed writepage released */
831                 lock_page(page);
832                 wait_on_page_writeback(page);
833                 if (rc != 0) {
834                         CERROR("writepage inode %lu(%p) of page %p "
835                                "failed: %d\n", mapping->host->i_ino,
836                                mapping->host, page, rc);
837                         if (rc == -ENOSPC)
838                                 set_bit(AS_ENOSPC, &mapping->flags);
839                         else
840                                 set_bit(AS_EIO, &mapping->flags);
841                 }
842                 set_bit(AS_EIO, &mapping->flags);
843         }
844         if (page->mapping != NULL) {
845                 struct ll_async_page *llap = llap_cast_private(page);
846                 /* checking again to account for writeback's lock_page() */
847                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
848                 if (llap)
849                         ll_ra_accounting(llap, page->mapping);
850                 ll_truncate_complete_page(page);
851         }
852         EXIT;
853 out:
854         LASSERT(!PageWriteback(page));
855         unlock_page(page);
856         page_cache_release(page);
857
858         return 0;
859 }
860
861 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
862                              void *data, int flag)
863 {
864         struct inode *inode;
865         struct ll_inode_info *lli;
866         struct lov_stripe_md *lsm;
867         int stripe;
868         __u64 kms;
869
870         ENTRY;
871
872         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
873                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
874                 LBUG();
875         }
876
877         inode = ll_inode_from_lock(lock);
878         if (inode == NULL)
879                 RETURN(0);
880         lli = ll_i2info(inode);
881         if (lli == NULL)
882                 GOTO(iput, 0);
883         if (lli->lli_smd == NULL)
884                 GOTO(iput, 0);
885         lsm = lli->lli_smd;
886
887         stripe = ll_lock_to_stripe_offset(inode, lock);
888         if (stripe < 0)
889                 GOTO(iput, 0);
890
891         lov_stripe_lock(lsm);
892         lock_res_and_lock(lock);
893         kms = ldlm_extent_shift_kms(lock,
894                                     lsm->lsm_oinfo[stripe]->loi_kms);
895
896         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
897                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
898                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
899         lsm->lsm_oinfo[stripe]->loi_kms = kms;
900         unlock_res_and_lock(lock);
901         lov_stripe_unlock(lsm);
902         ll_queue_done_writing(inode, 0);
903         EXIT;
904 iput:
905         iput(inode);
906
907         return 0;
908 }
909
910 #if 0
911 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
912 {
913         /* XXX ALLOCATE - 160 bytes */
914         struct inode *inode = ll_inode_from_lock(lock);
915         struct ll_inode_info *lli = ll_i2info(inode);
916         struct lustre_handle lockh = { 0 };
917         struct ost_lvb *lvb;
918         int stripe;
919         ENTRY;
920
921         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
922                      LDLM_FL_BLOCK_CONV)) {
923                 LBUG(); /* not expecting any blocked async locks yet */
924                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
925                            "lock, returning");
926                 ldlm_lock_dump(D_OTHER, lock, 0);
927                 ldlm_reprocess_all(lock->l_resource);
928                 RETURN(0);
929         }
930
931         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
932
933         stripe = ll_lock_to_stripe_offset(inode, lock);
934         if (stripe < 0)
935                 goto iput;
936
937         if (lock->l_lvb_len) {
938                 struct lov_stripe_md *lsm = lli->lli_smd;
939                 __u64 kms;
940                 lvb = lock->l_lvb_data;
941                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
942
943                 lock_res_and_lock(lock);
944                 ll_inode_size_lock(inode, 1);
945                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
946                 kms = ldlm_extent_shift_kms(NULL, kms);
947                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
948                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
949                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
950                 lsm->lsm_oinfo[stripe].loi_kms = kms;
951                 ll_inode_size_unlock(inode, 1);
952                 unlock_res_and_lock(lock);
953         }
954
955 iput:
956         iput(inode);
957         wake_up(&lock->l_waitq);
958
959         ldlm_lock2handle(lock, &lockh);
960         ldlm_lock_decref(&lockh, LCK_PR);
961         RETURN(0);
962 }
963 #endif
964
965 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
966 {
967         struct ptlrpc_request *req = reqp;
968         struct inode *inode = ll_inode_from_lock(lock);
969         struct ll_inode_info *lli;
970         struct lov_stripe_md *lsm;
971         struct ost_lvb *lvb;
972         int rc, stripe;
973         ENTRY;
974
975         if (inode == NULL)
976                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
977         lli = ll_i2info(inode);
978         if (lli == NULL)
979                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
980         lsm = lli->lli_smd;
981         if (lsm == NULL)
982                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
983
984         /* First, find out which stripe index this lock corresponds to. */
985         stripe = ll_lock_to_stripe_offset(inode, lock);
986         if (stripe < 0)
987                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
988
989         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
990         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
991                              sizeof(*lvb));
992         rc = req_capsule_server_pack(&req->rq_pill);
993         if (rc) {
994                 CERROR("lustre_pack_reply: %d\n", rc);
995                 GOTO(iput, rc);
996         }
997
998         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
999         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1000         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1001         lvb->lvb_atime = LTIME_S(inode->i_atime);
1002         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1003
1004         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1005                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1006                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1007                    lvb->lvb_atime, lvb->lvb_ctime);
1008  iput:
1009         iput(inode);
1010
1011  out:
1012         /* These errors are normal races, so we don't want to fill the console
1013          * with messages by calling ptlrpc_error() */
1014         if (rc == -ELDLM_NO_LOCK_DATA)
1015                 lustre_pack_reply(req, 1, NULL, NULL);
1016
1017         req->rq_status = rc;
1018         return rc;
1019 }
1020
1021 static int ll_merge_lvb(struct inode *inode)
1022 {
1023         struct ll_inode_info *lli = ll_i2info(inode);
1024         struct ll_sb_info *sbi = ll_i2sbi(inode);
1025         struct ost_lvb lvb;
1026         int rc;
1027
1028         ENTRY;
1029
1030         ll_inode_size_lock(inode, 1);
1031         inode_init_lvb(inode, &lvb);
1032         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1033         i_size_write(inode, lvb.lvb_size);
1034         inode->i_blocks = lvb.lvb_blocks;
1035
1036         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1037         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1038         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1039         ll_inode_size_unlock(inode, 1);
1040
1041         RETURN(rc);
1042 }
1043
1044 int ll_local_size(struct inode *inode)
1045 {
1046         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1047         struct ll_inode_info *lli = ll_i2info(inode);
1048         struct ll_sb_info *sbi = ll_i2sbi(inode);
1049         struct lustre_handle lockh = { 0 };
1050         int flags = 0;
1051         int rc;
1052         ENTRY;
1053
1054         if (lli->lli_smd->lsm_stripe_count == 0)
1055                 RETURN(0);
1056
1057         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1058                        &policy, LCK_PR, &flags, inode, &lockh);
1059         if (rc < 0)
1060                 RETURN(rc);
1061         else if (rc == 0)
1062                 RETURN(-ENODATA);
1063
1064         rc = ll_merge_lvb(inode);
1065         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1066         RETURN(rc);
1067 }
1068
1069 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1070                      lstat_t *st)
1071 {
1072         struct lustre_handle lockh = { 0 };
1073         struct ldlm_enqueue_info einfo = { 0 };
1074         struct obd_info oinfo = { { { 0 } } };
1075         struct ost_lvb lvb;
1076         int rc;
1077
1078         ENTRY;
1079
1080         einfo.ei_type = LDLM_EXTENT;
1081         einfo.ei_mode = LCK_PR;
1082         einfo.ei_cb_bl = osc_extent_blocking_cb;
1083         einfo.ei_cb_cp = ldlm_completion_ast;
1084         einfo.ei_cb_gl = ll_glimpse_callback;
1085         einfo.ei_cbdata = NULL;
1086
1087         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1088         oinfo.oi_lockh = &lockh;
1089         oinfo.oi_md = lsm;
1090         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1091
1092         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1093         if (rc == -ENOENT)
1094                 RETURN(rc);
1095         if (rc != 0) {
1096                 CERROR("obd_enqueue returned rc %d, "
1097                        "returning -EIO\n", rc);
1098                 RETURN(rc > 0 ? -EIO : rc);
1099         }
1100
1101         lov_stripe_lock(lsm);
1102         memset(&lvb, 0, sizeof(lvb));
1103         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1104         st->st_size = lvb.lvb_size;
1105         st->st_blocks = lvb.lvb_blocks;
1106         st->st_mtime = lvb.lvb_mtime;
1107         st->st_atime = lvb.lvb_atime;
1108         st->st_ctime = lvb.lvb_ctime;
1109         lov_stripe_unlock(lsm);
1110
1111         RETURN(rc);
1112 }
1113
1114 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1115  * file (because it prefers KMS over RSS when larger) */
1116 int ll_glimpse_size(struct inode *inode, int ast_flags)
1117 {
1118         struct ll_inode_info *lli = ll_i2info(inode);
1119         struct ll_sb_info *sbi = ll_i2sbi(inode);
1120         struct lustre_handle lockh = { 0 };
1121         struct ldlm_enqueue_info einfo = { 0 };
1122         struct obd_info oinfo = { { { 0 } } };
1123         int rc;
1124         ENTRY;
1125
1126         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1127                 RETURN(0);
1128
1129         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1130
1131         if (!lli->lli_smd) {
1132                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1133                 RETURN(0);
1134         }
1135
1136         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1137          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1138          *       won't revoke any conflicting DLM locks held. Instead,
1139          *       ll_glimpse_callback() will be called on each client
1140          *       holding a DLM lock against this file, and resulting size
1141          *       will be returned for each stripe. DLM lock on [0, EOF] is
1142          *       acquired only if there were no conflicting locks. */
1143         einfo.ei_type = LDLM_EXTENT;
1144         einfo.ei_mode = LCK_PR;
1145         einfo.ei_cb_bl = osc_extent_blocking_cb;
1146         einfo.ei_cb_cp = ldlm_completion_ast;
1147         einfo.ei_cb_gl = ll_glimpse_callback;
1148         einfo.ei_cbdata = inode;
1149
1150         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1151         oinfo.oi_lockh = &lockh;
1152         oinfo.oi_md = lli->lli_smd;
1153         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1154
1155         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1156         if (rc == -ENOENT)
1157                 RETURN(rc);
1158         if (rc != 0) {
1159                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1160                 RETURN(rc > 0 ? -EIO : rc);
1161         }
1162
1163         rc = ll_merge_lvb(inode);
1164
1165         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1166                i_size_read(inode), (unsigned long long)inode->i_blocks);
1167
1168         RETURN(rc);
1169 }
1170
1171 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1172                    struct lov_stripe_md *lsm, int mode,
1173                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1174                    int ast_flags)
1175 {
1176         struct ll_sb_info *sbi = ll_i2sbi(inode);
1177         struct ost_lvb lvb;
1178         struct ldlm_enqueue_info einfo = { 0 };
1179         struct obd_info oinfo = { { { 0 } } };
1180         int rc;
1181         ENTRY;
1182
1183         LASSERT(!lustre_handle_is_used(lockh));
1184         LASSERT(lsm != NULL);
1185
1186         /* don't drop the mmapped file to LRU */
1187         if (mapping_mapped(inode->i_mapping))
1188                 ast_flags |= LDLM_FL_NO_LRU;
1189
1190         /* XXX phil: can we do this?  won't it screw the file size up? */
1191         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1192             (sbi->ll_flags & LL_SBI_NOLCK))
1193                 RETURN(0);
1194
1195         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1196                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1197
1198         einfo.ei_type = LDLM_EXTENT;
1199         einfo.ei_mode = mode;
1200         einfo.ei_cb_bl = osc_extent_blocking_cb;
1201         einfo.ei_cb_cp = ldlm_completion_ast;
1202         einfo.ei_cb_gl = ll_glimpse_callback;
1203         einfo.ei_cbdata = inode;
1204
1205         oinfo.oi_policy = *policy;
1206         oinfo.oi_lockh = lockh;
1207         oinfo.oi_md = lsm;
1208         oinfo.oi_flags = ast_flags;
1209
1210         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1211         *policy = oinfo.oi_policy;
1212         if (rc > 0)
1213                 rc = -EIO;
1214
1215         ll_inode_size_lock(inode, 1);
1216         inode_init_lvb(inode, &lvb);
1217         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1218
1219         if (policy->l_extent.start == 0 &&
1220             policy->l_extent.end == OBD_OBJECT_EOF) {
1221                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1222                  * the kms under both a DLM lock and the
1223                  * ll_inode_size_lock().  If we don't get the
1224                  * ll_inode_size_lock() here we can match the DLM lock and
1225                  * reset i_size from the kms before the truncating path has
1226                  * updated the kms.  generic_file_write can then trust the
1227                  * stale i_size when doing appending writes and effectively
1228                  * cancel the result of the truncate.  Getting the
1229                  * ll_inode_size_lock() after the enqueue maintains the DLM
1230                  * -> ll_inode_size_lock() acquiring order. */
1231                 i_size_write(inode, lvb.lvb_size);
1232                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1233                        inode->i_ino, i_size_read(inode));
1234         }
1235
1236         if (rc == 0) {
1237                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1238                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1239                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1240         }
1241         ll_inode_size_unlock(inode, 1);
1242
1243         RETURN(rc);
1244 }
1245
1246 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1247                      struct lov_stripe_md *lsm, int mode,
1248                      struct lustre_handle *lockh)
1249 {
1250         struct ll_sb_info *sbi = ll_i2sbi(inode);
1251         int rc;
1252         ENTRY;
1253
1254         /* XXX phil: can we do this?  won't it screw the file size up? */
1255         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1256             (sbi->ll_flags & LL_SBI_NOLCK))
1257                 RETURN(0);
1258
1259         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1260
1261         RETURN(rc);
1262 }
1263
1264 static void ll_set_file_contended(struct inode *inode)
1265 {
1266         struct ll_inode_info *lli = ll_i2info(inode);
1267         cfs_time_t now = cfs_time_current();
1268
1269         spin_lock(&lli->lli_lock);
1270         lli->lli_contention_time = now;
1271         lli->lli_flags |= LLIF_CONTENDED;
1272         spin_unlock(&lli->lli_lock);
1273 }
1274
1275 void ll_clear_file_contended(struct inode *inode)
1276 {
1277         struct ll_inode_info *lli = ll_i2info(inode);
1278
1279         spin_lock(&lli->lli_lock);
1280         lli->lli_flags &= ~LLIF_CONTENDED;
1281         spin_unlock(&lli->lli_lock);
1282 }
1283
1284 static int ll_is_file_contended(struct file *file)
1285 {
1286         struct inode *inode = file->f_dentry->d_inode;
1287         struct ll_inode_info *lli = ll_i2info(inode);
1288         struct ll_sb_info *sbi = ll_i2sbi(inode);
1289         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1290         ENTRY;
1291
1292         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1293                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1294                        " osc connect flags = 0x"LPX64"\n",
1295                        sbi->ll_lco.lco_flags);
1296                 RETURN(0);
1297         }
1298         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1299                 RETURN(1);
1300         if (lli->lli_flags & LLIF_CONTENDED) {
1301                 cfs_time_t cur_time = cfs_time_current();
1302                 cfs_time_t retry_time;
1303
1304                 retry_time = cfs_time_add(
1305                         lli->lli_contention_time,
1306                         cfs_time_seconds(sbi->ll_contention_time));
1307                 if (cfs_time_after(cur_time, retry_time)) {
1308                         ll_clear_file_contended(inode);
1309                         RETURN(0);
1310                 }
1311                 RETURN(1);
1312         }
1313         RETURN(0);
1314 }
1315
1316 static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
1317                                  const char *buf, size_t count,
1318                                  loff_t start, loff_t end, int rw)
1319 {
1320         int append;
1321         int tree_locked = 0;
1322         int rc;
1323         struct inode * inode = file->f_dentry->d_inode;
1324         ENTRY;
1325
1326         append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
1327
1328         if (append || !ll_is_file_contended(file)) {
1329                 struct ll_lock_tree_node *node;
1330                 int ast_flags;
1331
1332                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1333                 if (file->f_flags & O_NONBLOCK)
1334                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1335                 node = ll_node_from_inode(inode, start, end,
1336                                           (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
1337                 if (IS_ERR(node)) {
1338                         rc = PTR_ERR(node);
1339                         GOTO(out, rc);
1340                 }
1341                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1342                 rc = ll_tree_lock(tree, node, buf, count, ast_flags);
1343                 if (rc == 0)
1344                         tree_locked = 1;
1345                 else if (rc == -EUSERS)
1346                         ll_set_file_contended(inode);
1347                 else
1348                         GOTO(out, rc);
1349         }
1350         RETURN(tree_locked);
1351 out:
1352         return rc;
1353 }
1354
1355 /**
1356  * Checks if requested extent lock is compatible with a lock under a page.
1357  *
1358  * Checks if the lock under \a page is compatible with a read or write lock
1359  * (specified by \a rw) for an extent [\a start , \a end].
1360  *
1361  * \param page the page under which lock is considered
1362  * \param rw OBD_BRW_READ if requested for reading,
1363  *           OBD_BRW_WRITE if requested for writing
1364  * \param start start of the requested extent
1365  * \param end end of the requested extent
1366  * \param cookie transparent parameter for passing locking context
1367  *
1368  * \post result == 1, *cookie == context, appropriate lock is referenced or
1369  * \post result == 0
1370  *
1371  * \retval 1 owned lock is reused for the request
1372  * \retval 0 no lock reused for the request
1373  *
1374  * \see ll_release_short_lock
1375  */
1376 static int ll_reget_short_lock(struct page *page, int rw,
1377                                obd_off start, obd_off end,
1378                                void **cookie)
1379 {
1380         struct ll_async_page *llap;
1381         struct obd_export *exp;
1382         struct inode *inode = page->mapping->host;
1383
1384         ENTRY;
1385
1386         exp = ll_i2dtexp(inode);
1387         if (exp == NULL)
1388                 RETURN(0);
1389
1390         llap = llap_cast_private(page);
1391         if (llap == NULL)
1392                 RETURN(0);
1393
1394         RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
1395                                     &llap->llap_cookie, rw, start, end,
1396                                     cookie));
1397 }
1398
1399 /**
1400  * Releases a reference to a lock taken in a "fast" way.
1401  *
1402  * Releases a read or a write (specified by \a rw) lock
1403  * referenced by \a cookie.
1404  *
1405  * \param inode inode to which data belong
1406  * \param end end of the locked extent
1407  * \param rw OBD_BRW_READ if requested for reading,
1408  *           OBD_BRW_WRITE if requested for writing
1409  * \param cookie transparent parameter for passing locking context
1410  *
1411  * \post appropriate lock is dereferenced
1412  *
1413  * \see ll_reget_short_lock
1414  */
1415 static void ll_release_short_lock(struct inode *inode, obd_off end,
1416                                   void *cookie, int rw)
1417 {
1418         struct obd_export *exp;
1419         int rc;
1420
1421         exp = ll_i2dtexp(inode);
1422         if (exp == NULL)
1423                 return;
1424
1425         rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
1426                                     cookie, rw);
1427         if (rc < 0)
1428                 CERROR("unlock failed (%d)\n", rc);
1429 }
1430
1431 /**
1432  * Checks if requested extent lock is compatible
1433  * with a lock under a page in page cache.
1434  *
1435  * Checks if a lock under some \a page is compatible with a read or write lock
1436  * (specified by \a rw) for an extent [\a start , \a end].
1437  *
1438  * \param file the file under which lock is considered
1439  * \param rw OBD_BRW_READ if requested for reading,
1440  *           OBD_BRW_WRITE if requested for writing
1441  * \param ppos start of the requested extent
1442  * \param end end of the requested extent
1443  * \param cookie transparent parameter for passing locking context
1444  * \param buf userspace buffer for the data
1445  *
1446  * \post result == 1, *cookie == context, appropriate lock is referenced
1447  * \post retuls == 0
1448  *
1449  * \retval 1 owned lock is reused for the request
1450  * \retval 0 no lock reused for the request
1451  *
1452  * \see ll_file_put_fast_lock
1453  */
1454 static inline int ll_file_get_fast_lock(struct file *file,
1455                                         obd_off ppos, obd_off end,
1456                                         char *buf, void **cookie, int rw)
1457 {
1458         int rc = 0;
1459         struct page *page;
1460
1461         ENTRY;
1462
1463         if (!ll_region_mapped((unsigned long)buf, end - ppos)) {
1464                 page = find_lock_page(file->f_dentry->d_inode->i_mapping,
1465                                       ppos >> CFS_PAGE_SHIFT);
1466                 if (page) {
1467                         if (ll_reget_short_lock(page, rw, ppos, end, cookie))
1468                                 rc = 1;
1469
1470                         unlock_page(page);
1471                         page_cache_release(page);
1472                 }
1473         }
1474
1475         RETURN(rc);
1476 }
1477
1478 /**
1479  * Releases a reference to a lock taken in a "fast" way.
1480  *
1481  * Releases a read or a write (specified by \a rw) lock
1482  * referenced by \a cookie.
1483  *
1484  * \param inode inode to which data belong
1485  * \param end end of the locked extent
1486  * \param rw OBD_BRW_READ if requested for reading,
1487  *           OBD_BRW_WRITE if requested for writing
1488  * \param cookie transparent parameter for passing locking context
1489  *
1490  * \post appropriate lock is dereferenced
1491  *
1492  * \see ll_file_get_fast_lock
1493  */
1494 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
1495                                          void *cookie, int rw)
1496 {
1497         ll_release_short_lock(inode, end, cookie, rw);
1498 }
1499
1500 enum ll_lock_style {
1501         LL_LOCK_STYLE_NOLOCK   = 0,
1502         LL_LOCK_STYLE_FASTLOCK = 1,
1503         LL_LOCK_STYLE_TREELOCK = 2
1504 };
1505
1506 /**
1507  * Checks if requested extent lock is compatible with a lock 
1508  * under a page cache page.
1509  *
1510  * Checks if the lock under \a page is compatible with a read or write lock
1511  * (specified by \a rw) for an extent [\a start , \a end].
1512  *
1513  * \param file file under which I/O is processed
1514  * \param rw OBD_BRW_READ if requested for reading,
1515  *           OBD_BRW_WRITE if requested for writing
1516  * \param ppos start of the requested extent
1517  * \param end end of the requested extent
1518  * \param cookie transparent parameter for passing locking context
1519  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1520  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1521  * \param buf userspace buffer for the data
1522  *
1523  * \retval LL_LOCK_STYLE_FASTLOCK owned lock is reused through fast lock
1524  * \retval LL_LOCK_STYLE_TREELOCK got a lock through tree lock
1525  * \retval LL_LOCK_STYLE_NOLOCK got no lock
1526  *
1527  * \see ll_file_put_lock
1528  */
1529 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
1530                                    obd_off end, char *buf, void **cookie,
1531                                    struct ll_lock_tree *tree, int rw)
1532 {
1533         int rc;
1534
1535         ENTRY;
1536
1537         if (ll_file_get_fast_lock(file, ppos, end, buf, cookie, rw))
1538                 RETURN(LL_LOCK_STYLE_FASTLOCK);
1539
1540         rc = ll_file_get_tree_lock(tree, file, buf, ppos - end, ppos, end, rw);
1541         /* rc: 1 for tree lock, 0 for no lock, <0 for error */
1542         switch (rc) {
1543         case 1:
1544                 RETURN(LL_LOCK_STYLE_TREELOCK);
1545         case 0:
1546                 RETURN(LL_LOCK_STYLE_NOLOCK);
1547         }
1548
1549         /* an error happened if we reached this point, rc = -errno here */
1550         RETURN(rc);
1551 }
1552
1553 /**
1554  * Drops the lock taken by ll_file_get_lock.
1555  *
1556  * Releases a read or a write (specified by \a rw) lock
1557  * referenced by \a tree or \a cookie.
1558  *
1559  * \param inode inode to which data belong
1560  * \param end end of the locked extent
1561  * \param lockstyle facility through which the lock was taken
1562  * \param rw OBD_BRW_READ if requested for reading,
1563  *           OBD_BRW_WRITE if requested for writing
1564  * \param cookie transparent parameter for passing locking context
1565  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1566  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1567  *
1568  * \post appropriate lock is dereferenced
1569  *
1570  * \see ll_file_get_lock
1571  */
1572 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
1573                                     enum ll_lock_style lock_style,
1574                                     void *cookie, struct ll_lock_tree *tree,
1575                                     int rw)
1576
1577 {
1578         switch (lock_style) {
1579         case LL_LOCK_STYLE_TREELOCK:
1580                 ll_tree_unlock(tree);
1581                 break;
1582         case LL_LOCK_STYLE_FASTLOCK:
1583                 ll_file_put_fast_lock(inode, end, cookie, rw);
1584                 break;
1585         default:
1586                 CERROR("invalid locking style (%d)\n", lock_style);
1587         }
1588 }
1589
1590 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1591                             loff_t *ppos)
1592 {
1593         struct inode *inode = file->f_dentry->d_inode;
1594         struct ll_inode_info *lli = ll_i2info(inode);
1595         struct lov_stripe_md *lsm = lli->lli_smd;
1596         struct ll_sb_info *sbi = ll_i2sbi(inode);
1597         struct ll_lock_tree tree;
1598         struct ost_lvb lvb;
1599         struct ll_ra_read bead;
1600         int ra = 0;
1601         obd_off end;
1602         ssize_t retval, chunk, sum = 0;
1603         int lock_style;
1604         void *cookie;
1605
1606         __u64 kms;
1607         ENTRY;
1608         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1609                inode->i_ino, inode->i_generation, inode, count, *ppos);
1610         /* "If nbyte is 0, read() will return 0 and have no other results."
1611          *                      -- Single Unix Spec */
1612         if (count == 0)
1613                 RETURN(0);
1614
1615         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1616
1617         if (!lsm) {
1618                 /* Read on file with no objects should return zero-filled
1619                  * buffers up to file size (we can get non-zero sizes with
1620                  * mknod + truncate, then opening file for read. This is a
1621                  * common pattern in NFS case, it seems). Bug 6243 */
1622                 int notzeroed;
1623                 /* Since there are no objects on OSTs, we have nothing to get
1624                  * lock on and so we are forced to access inode->i_size
1625                  * unguarded */
1626
1627                 /* Read beyond end of file */
1628                 if (*ppos >= i_size_read(inode))
1629                         RETURN(0);
1630
1631                 if (count > i_size_read(inode) - *ppos)
1632                         count = i_size_read(inode) - *ppos;
1633                 /* Make sure to correctly adjust the file pos pointer for
1634                  * EFAULT case */
1635                 notzeroed = clear_user(buf, count);
1636                 count -= notzeroed;
1637                 *ppos += count;
1638                 if (!count)
1639                         RETURN(-EFAULT);
1640                 RETURN(count);
1641         }
1642 repeat:
1643         if (sbi->ll_max_rw_chunk != 0) {
1644                 /* first, let's know the end of the current stripe */
1645                 end = *ppos;
1646                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, &end);
1647
1648                 /* correct, the end is beyond the request */
1649                 if (end > *ppos + count - 1)
1650                         end = *ppos + count - 1;
1651
1652                 /* and chunk shouldn't be too large even if striping is wide */
1653                 if (end - *ppos > sbi->ll_max_rw_chunk)
1654                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1655         } else {
1656                 end = *ppos + count - 1;
1657         }
1658
1659         lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
1660                                       buf, &cookie, &tree, OBD_BRW_READ);
1661         if (lock_style < 0)
1662                 GOTO(out, retval = lock_style);
1663
1664         ll_inode_size_lock(inode, 1);
1665         /*
1666          * Consistency guarantees: following possibilities exist for the
1667          * relation between region being read and real file size at this
1668          * moment:
1669          *
1670          *  (A): the region is completely inside of the file;
1671          *
1672          *  (B-x): x bytes of region are inside of the file, the rest is
1673          *  outside;
1674          *
1675          *  (C): the region is completely outside of the file.
1676          *
1677          * This classification is stable under DLM lock acquired by
1678          * ll_tree_lock() above, because to change class, other client has to
1679          * take DLM lock conflicting with our lock. Also, any updates to
1680          * ->i_size by other threads on this client are serialized by
1681          * ll_inode_size_lock(). This guarantees that short reads are handled
1682          * correctly in the face of concurrent writes and truncates.
1683          */
1684         inode_init_lvb(inode, &lvb);
1685         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1686         kms = lvb.lvb_size;
1687         if (*ppos + count - 1 > kms) {
1688                 /* A glimpse is necessary to determine whether we return a
1689                  * short read (B) or some zeroes at the end of the buffer (C) */
1690                 ll_inode_size_unlock(inode, 1);
1691                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1692                 if (retval) {
1693                         if (lock_style != LL_LOCK_STYLE_NOLOCK)
1694                                 ll_file_put_lock(inode, end, lock_style,
1695                                                  cookie, &tree, OBD_BRW_READ);
1696                         goto out;
1697                 }
1698         } else {
1699                 /* region is within kms and, hence, within real file size (A).
1700                  * We need to increase i_size to cover the read region so that
1701                  * generic_file_read() will do its job, but that doesn't mean
1702                  * the kms size is _correct_, it is only the _minimum_ size.
1703                  * If someone does a stat they will get the correct size which
1704                  * will always be >= the kms value here.  b=11081 */
1705                 if (i_size_read(inode) < kms)
1706                         i_size_write(inode, kms);
1707                 ll_inode_size_unlock(inode, 1);
1708         }
1709
1710         chunk = end - *ppos + 1;
1711         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1712                inode->i_ino, chunk, *ppos, i_size_read(inode));
1713
1714         if (lock_style != LL_LOCK_STYLE_NOLOCK) {
1715                 /* turn off the kernel's read-ahead */
1716                 file->f_ra.ra_pages = 0;
1717
1718                 /* initialize read-ahead window once per syscall */
1719                 if (ra == 0) {
1720                         ra = 1;
1721                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1722                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1723                         ll_ra_read_in(file, &bead);
1724                 }
1725
1726                 /* BUG: 5972 */
1727                 file_accessed(file);
1728                 retval = generic_file_read(file, buf, chunk, ppos);
1729                 ll_file_put_lock(inode, end, lock_style, cookie, &tree, 
1730                                  OBD_BRW_READ);
1731         } else {
1732                 retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
1733         }
1734
1735         ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
1736
1737         if (retval > 0) {
1738                 buf += retval;
1739                 count -= retval;
1740                 sum += retval;
1741                 if (retval == chunk && count > 0)
1742                         goto repeat;
1743         }
1744
1745  out:
1746         if (ra != 0)
1747                 ll_ra_read_ex(file, &bead);
1748         retval = (sum > 0) ? sum : retval;
1749         RETURN(retval);
1750 }
1751
1752 /*
1753  * Write to a file (through the page cache).
1754  */
1755 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1756                              loff_t *ppos)
1757 {
1758         struct inode *inode = file->f_dentry->d_inode;
1759         struct ll_sb_info *sbi = ll_i2sbi(inode);
1760         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1761         struct ll_lock_tree tree;
1762         loff_t maxbytes = ll_file_maxbytes(inode);
1763         loff_t lock_start, lock_end, end;
1764         ssize_t retval, chunk, sum = 0;
1765         int tree_locked;
1766         ENTRY;
1767
1768         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1769                inode->i_ino, inode->i_generation, inode, count, *ppos);
1770
1771         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1772
1773         /* POSIX, but surprised the VFS doesn't check this already */
1774         if (count == 0)
1775                 RETURN(0);
1776
1777         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1778          * called on the file, don't fail the below assertion (bug 2388). */
1779         if (file->f_flags & O_LOV_DELAY_CREATE &&
1780             ll_i2info(inode)->lli_smd == NULL)
1781                 RETURN(-EBADF);
1782
1783         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1784
1785         down(&ll_i2info(inode)->lli_write_sem);
1786
1787 repeat:
1788         chunk = 0; /* just to fix gcc's warning */
1789         end = *ppos + count - 1;
1790
1791         if (file->f_flags & O_APPEND) {
1792                 lock_start = 0;
1793                 lock_end = OBD_OBJECT_EOF;
1794         } else if (sbi->ll_max_rw_chunk != 0) {
1795                 /* first, let's know the end of the current stripe */
1796                 end = *ppos;
1797                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1798                                 (obd_off *)&end);
1799
1800                 /* correct, the end is beyond the request */
1801                 if (end > *ppos + count - 1)
1802                         end = *ppos + count - 1;
1803
1804                 /* and chunk shouldn't be too large even if striping is wide */
1805                 if (end - *ppos > sbi->ll_max_rw_chunk)
1806                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1807                 lock_start = *ppos;
1808                 lock_end = end;
1809         } else {
1810                 lock_start = *ppos;
1811                 lock_end = *ppos + count - 1;
1812         }
1813
1814         tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
1815                                             lock_start, lock_end, OBD_BRW_WRITE);
1816         if (tree_locked < 0)
1817                 GOTO(out, retval = tree_locked);
1818
1819         /* This is ok, g_f_w will overwrite this under i_sem if it races
1820          * with a local truncate, it just makes our maxbyte checking easier.
1821          * The i_size value gets updated in ll_extent_lock() as a consequence
1822          * of the [0,EOF] extent lock we requested above. */
1823         if (file->f_flags & O_APPEND) {
1824                 *ppos = i_size_read(inode);
1825                 end = *ppos + count - 1;
1826         }
1827
1828         if (*ppos >= maxbytes) {
1829                 send_sig(SIGXFSZ, current, 0);
1830                 GOTO(out_unlock, retval = -EFBIG);
1831         }
1832         if (end > maxbytes - 1)
1833                 end = maxbytes - 1;
1834
1835         /* generic_file_write handles O_APPEND after getting i_mutex */
1836         chunk = end - *ppos + 1;
1837         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1838                inode->i_ino, chunk, *ppos);
1839         if (tree_locked)
1840                 retval = generic_file_write(file, buf, chunk, ppos);
1841         else
1842                 retval = ll_file_lockless_io(file, (char*)buf, chunk,
1843                                              ppos, WRITE);
1844         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1845
1846 out_unlock:
1847         if (tree_locked)
1848                 ll_tree_unlock(&tree);
1849
1850 out:
1851         if (retval > 0) {
1852                 buf += retval;
1853                 count -= retval;
1854                 sum += retval;
1855                 if (retval == chunk && count > 0)
1856                         goto repeat;
1857         }
1858
1859         up(&ll_i2info(inode)->lli_write_sem);
1860
1861         retval = (sum > 0) ? sum : retval;
1862         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1863                            retval > 0 ? retval : 0);
1864         RETURN(retval);
1865 }
1866
1867 /*
1868  * Send file content (through pagecache) somewhere with helper
1869  */
1870 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1871                                 read_actor_t actor, void *target)
1872 {
1873         struct inode *inode = in_file->f_dentry->d_inode;
1874         struct ll_inode_info *lli = ll_i2info(inode);
1875         struct lov_stripe_md *lsm = lli->lli_smd;
1876         struct ll_lock_tree tree;
1877         struct ll_lock_tree_node *node;
1878         struct ost_lvb lvb;
1879         struct ll_ra_read bead;
1880         int rc;
1881         ssize_t retval;
1882         __u64 kms;
1883         ENTRY;
1884         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1885                inode->i_ino, inode->i_generation, inode, count, *ppos);
1886
1887         /* "If nbyte is 0, read() will return 0 and have no other results."
1888          *                      -- Single Unix Spec */
1889         if (count == 0)
1890                 RETURN(0);
1891
1892         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1893         /* turn off the kernel's read-ahead */
1894         in_file->f_ra.ra_pages = 0;
1895
1896         /* File with no objects, nothing to lock */
1897         if (!lsm)
1898                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1899
1900         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1901         if (IS_ERR(node))
1902                 RETURN(PTR_ERR(node));
1903
1904         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1905         rc = ll_tree_lock(&tree, node, NULL, count,
1906                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1907         if (rc != 0)
1908                 RETURN(rc);
1909
1910         ll_clear_file_contended(inode);
1911         ll_inode_size_lock(inode, 1);
1912         /*
1913          * Consistency guarantees: following possibilities exist for the
1914          * relation between region being read and real file size at this
1915          * moment:
1916          *
1917          *  (A): the region is completely inside of the file;
1918          *
1919          *  (B-x): x bytes of region are inside of the file, the rest is
1920          *  outside;
1921          *
1922          *  (C): the region is completely outside of the file.
1923          *
1924          * This classification is stable under DLM lock acquired by
1925          * ll_tree_lock() above, because to change class, other client has to
1926          * take DLM lock conflicting with our lock. Also, any updates to
1927          * ->i_size by other threads on this client are serialized by
1928          * ll_inode_size_lock(). This guarantees that short reads are handled
1929          * correctly in the face of concurrent writes and truncates.
1930          */
1931         inode_init_lvb(inode, &lvb);
1932         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1933         kms = lvb.lvb_size;
1934         if (*ppos + count - 1 > kms) {
1935                 /* A glimpse is necessary to determine whether we return a
1936                  * short read (B) or some zeroes at the end of the buffer (C) */
1937                 ll_inode_size_unlock(inode, 1);
1938                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1939                 if (retval)
1940                         goto out;
1941         } else {
1942                 /* region is within kms and, hence, within real file size (A) */
1943                 i_size_write(inode, kms);
1944                 ll_inode_size_unlock(inode, 1);
1945         }
1946
1947         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1948                inode->i_ino, count, *ppos, i_size_read(inode));
1949
1950         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1951         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1952         ll_ra_read_in(in_file, &bead);
1953         /* BUG: 5972 */
1954         file_accessed(in_file);
1955         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1956         ll_ra_read_ex(in_file, &bead);
1957
1958  out:
1959         ll_tree_unlock(&tree);
1960         RETURN(retval);
1961 }
1962
1963 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1964                                unsigned long arg)
1965 {
1966         struct ll_inode_info *lli = ll_i2info(inode);
1967         struct obd_export *exp = ll_i2dtexp(inode);
1968         struct ll_recreate_obj ucreatp;
1969         struct obd_trans_info oti = { 0 };
1970         struct obdo *oa = NULL;
1971         int lsm_size;
1972         int rc = 0;
1973         struct lov_stripe_md *lsm, *lsm2;
1974         ENTRY;
1975
1976         if (!capable (CAP_SYS_ADMIN))
1977                 RETURN(-EPERM);
1978
1979         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1980                             sizeof(struct ll_recreate_obj));
1981         if (rc) {
1982                 RETURN(-EFAULT);
1983         }
1984         OBDO_ALLOC(oa);
1985         if (oa == NULL)
1986                 RETURN(-ENOMEM);
1987
1988         down(&lli->lli_size_sem);
1989         lsm = lli->lli_smd;
1990         if (lsm == NULL)
1991                 GOTO(out, rc = -ENOENT);
1992         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1993                    (lsm->lsm_stripe_count));
1994
1995         OBD_ALLOC(lsm2, lsm_size);
1996         if (lsm2 == NULL)
1997                 GOTO(out, rc = -ENOMEM);
1998
1999         oa->o_id = ucreatp.lrc_id;
2000         oa->o_gr = ucreatp.lrc_group;
2001         oa->o_nlink = ucreatp.lrc_ost_idx;
2002         oa->o_flags |= OBD_FL_RECREATE_OBJS;
2003         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
2004         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2005                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2006
2007         memcpy(lsm2, lsm, lsm_size);
2008         rc = obd_create(exp, oa, &lsm2, &oti);
2009
2010         OBD_FREE(lsm2, lsm_size);
2011         GOTO(out, rc);
2012 out:
2013         up(&lli->lli_size_sem);
2014         OBDO_FREE(oa);
2015         return rc;
2016 }
2017
2018 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
2019                              int flags, struct lov_user_md *lum, int lum_size)
2020 {
2021         struct ll_inode_info *lli = ll_i2info(inode);
2022         struct lov_stripe_md *lsm;
2023         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
2024         int rc = 0;
2025         ENTRY;
2026
2027         down(&lli->lli_size_sem);
2028         lsm = lli->lli_smd;
2029         if (lsm) {
2030                 up(&lli->lli_size_sem);
2031                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
2032                        inode->i_ino);
2033                 RETURN(-EEXIST);
2034         }
2035
2036         rc = ll_intent_file_open(file, lum, lum_size, &oit);
2037         if (rc)
2038                 GOTO(out, rc);
2039         if (it_disposition(&oit, DISP_LOOKUP_NEG))
2040                 GOTO(out_req_free, rc = -ENOENT);
2041         rc = oit.d.lustre.it_status;
2042         if (rc < 0)
2043                 GOTO(out_req_free, rc);
2044
2045         ll_release_openhandle(file->f_dentry, &oit);
2046
2047  out:
2048         up(&lli->lli_size_sem);
2049         ll_intent_release(&oit);
2050         RETURN(rc);
2051 out_req_free:
2052         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2053         goto out;
2054 }
2055
2056 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
2057                              struct lov_mds_md **lmmp, int *lmm_size, 
2058                              struct ptlrpc_request **request)
2059 {
2060         struct ll_sb_info *sbi = ll_i2sbi(inode);
2061         struct mdt_body  *body;
2062         struct lov_mds_md *lmm = NULL;
2063         struct ptlrpc_request *req = NULL;
2064         struct obd_capa *oc;
2065         int rc, lmmsize;
2066
2067         rc = ll_get_max_mdsize(sbi, &lmmsize);
2068         if (rc)
2069                 RETURN(rc);
2070
2071         oc = ll_mdscapa_get(inode);
2072         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
2073                              oc, filename, strlen(filename) + 1,
2074                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
2075                              ll_i2suppgid(inode), &req);
2076         capa_put(oc);
2077         if (rc < 0) {
2078                 CDEBUG(D_INFO, "md_getattr_name failed "
2079                        "on %s: rc %d\n", filename, rc);
2080                 GOTO(out, rc);
2081         }
2082
2083         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2084         LASSERT(body != NULL); /* checked by mdc_getattr_name */
2085
2086         lmmsize = body->eadatasize;
2087
2088         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
2089                         lmmsize == 0) {
2090                 GOTO(out, rc = -ENODATA);
2091         }
2092
2093         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
2094         LASSERT(lmm != NULL);
2095
2096         /*
2097          * This is coming from the MDS, so is probably in
2098          * little endian.  We convert it to host endian before
2099          * passing it to userspace.
2100          */
2101         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
2102                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
2103                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
2104         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
2105                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
2106         }
2107
2108         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
2109                 struct lov_stripe_md *lsm;
2110                 struct lov_user_md_join *lmj;
2111                 int lmj_size, i, aindex = 0;
2112
2113                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
2114                 if (rc < 0)
2115                         GOTO(out, rc = -ENOMEM);
2116                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
2117                 if (rc)
2118                         GOTO(out_free_memmd, rc);
2119
2120                 lmj_size = sizeof(struct lov_user_md_join) +
2121                            lsm->lsm_stripe_count *
2122                            sizeof(struct lov_user_ost_data_join);
2123                 OBD_ALLOC(lmj, lmj_size);
2124                 if (!lmj)
2125                         GOTO(out_free_memmd, rc = -ENOMEM);
2126
2127                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
2128                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2129                         struct lov_extent *lex =
2130                                 &lsm->lsm_array->lai_ext_array[aindex];
2131
2132                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
2133                                 aindex ++;
2134                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
2135                                         LPU64" len %d\n", aindex, i,
2136                                         lex->le_start, (int)lex->le_len);
2137                         lmj->lmm_objects[i].l_extent_start =
2138                                 lex->le_start;
2139
2140                         if ((int)lex->le_len == -1)
2141                                 lmj->lmm_objects[i].l_extent_end = -1;
2142                         else
2143                                 lmj->lmm_objects[i].l_extent_end =
2144                                         lex->le_start + lex->le_len;
2145                         lmj->lmm_objects[i].l_object_id =
2146                                 lsm->lsm_oinfo[i]->loi_id;
2147                         lmj->lmm_objects[i].l_object_gr =
2148                                 lsm->lsm_oinfo[i]->loi_gr;
2149                         lmj->lmm_objects[i].l_ost_gen =
2150                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2151                         lmj->lmm_objects[i].l_ost_idx =
2152                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2153                 }
2154                 lmm = (struct lov_mds_md *)lmj;
2155                 lmmsize = lmj_size;
2156 out_free_memmd:
2157                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
2158         }
2159 out:
2160         *lmmp = lmm;
2161         *lmm_size = lmmsize;
2162         *request = req;
2163         return rc;
2164 }
2165
2166 static int ll_lov_setea(struct inode *inode, struct file *file,
2167                             unsigned long arg)
2168 {
2169         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2170         struct lov_user_md  *lump;
2171         int lum_size = sizeof(struct lov_user_md) +
2172                        sizeof(struct lov_user_ost_data);
2173         int rc;
2174         ENTRY;
2175
2176         if (!capable (CAP_SYS_ADMIN))
2177                 RETURN(-EPERM);
2178
2179         OBD_ALLOC(lump, lum_size);
2180         if (lump == NULL) {
2181                 RETURN(-ENOMEM);
2182         }
2183         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2184         if (rc) {
2185                 OBD_FREE(lump, lum_size);
2186                 RETURN(-EFAULT);
2187         }
2188
2189         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2190
2191         OBD_FREE(lump, lum_size);
2192         RETURN(rc);
2193 }
2194
2195 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2196                             unsigned long arg)
2197 {
2198         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
2199         int rc;
2200         int flags = FMODE_WRITE;
2201         ENTRY;
2202
2203         /* Bug 1152: copy properly when this is no longer true */
2204         LASSERT(sizeof(lum) == sizeof(*lump));
2205         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
2206         rc = copy_from_user(&lum, lump, sizeof(lum));
2207         if (rc)
2208                 RETURN(-EFAULT);
2209
2210         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
2211         if (rc == 0) {
2212                  put_user(0, &lump->lmm_stripe_count);
2213                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
2214                                     0, ll_i2info(inode)->lli_smd, lump);
2215         }
2216         RETURN(rc);
2217 }
2218
2219 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2220 {
2221         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2222
2223         if (!lsm)
2224                 RETURN(-ENODATA);
2225
2226         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
2227                             (void *)arg);
2228 }
2229
2230 static int ll_get_grouplock(struct inode *inode, struct file *file,
2231                             unsigned long arg)
2232 {
2233         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2234         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2235                                                     .end = OBD_OBJECT_EOF}};
2236         struct lustre_handle lockh = { 0 };
2237         struct ll_inode_info *lli = ll_i2info(inode);
2238         struct lov_stripe_md *lsm = lli->lli_smd;
2239         int flags = 0, rc;
2240         ENTRY;
2241
2242         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2243                 RETURN(-EINVAL);
2244         }
2245
2246         policy.l_extent.gid = arg;
2247         if (file->f_flags & O_NONBLOCK)
2248                 flags = LDLM_FL_BLOCK_NOWAIT;
2249
2250         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2251         if (rc)
2252                 RETURN(rc);
2253
2254         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2255         fd->fd_gid = arg;
2256         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2257
2258         RETURN(0);
2259 }
2260
2261 static int ll_put_grouplock(struct inode *inode, struct file *file,
2262                             unsigned long arg)
2263 {
2264         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2265         struct ll_inode_info *lli = ll_i2info(inode);
2266         struct lov_stripe_md *lsm = lli->lli_smd;
2267         int rc;
2268         ENTRY;
2269
2270         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2271                 /* Ugh, it's already unlocked. */
2272                 RETURN(-EINVAL);
2273         }
2274
2275         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2276                 RETURN(-EINVAL);
2277
2278         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2279
2280         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2281         if (rc)
2282                 RETURN(rc);
2283
2284         fd->fd_gid = 0;
2285         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2286
2287         RETURN(0);
2288 }
2289
2290 static int join_sanity_check(struct inode *head, struct inode *tail)
2291 {
2292         ENTRY;
2293         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2294                 CERROR("server do not support join \n");
2295                 RETURN(-EINVAL);
2296         }
2297         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2298                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2299                        head->i_ino, tail->i_ino);
2300                 RETURN(-EINVAL);
2301         }
2302         if (head->i_ino == tail->i_ino) {
2303                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2304                 RETURN(-EINVAL);
2305         }
2306         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2307                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2308                 RETURN(-EINVAL);
2309         }
2310         RETURN(0);
2311 }
2312
2313 static int join_file(struct inode *head_inode, struct file *head_filp,
2314                      struct file *tail_filp)
2315 {
2316         struct dentry *tail_dentry = tail_filp->f_dentry;
2317         struct lookup_intent oit = {.it_op = IT_OPEN,
2318                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2319         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2320                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2321
2322         struct lustre_handle lockh;
2323         struct md_op_data *op_data;
2324         int    rc;
2325         loff_t data;
2326         ENTRY;
2327
2328         tail_dentry = tail_filp->f_dentry;
2329
2330         data = i_size_read(head_inode);
2331         op_data = ll_prep_md_op_data(NULL, head_inode,
2332                                      tail_dentry->d_parent->d_inode,
2333                                      tail_dentry->d_name.name,
2334                                      tail_dentry->d_name.len, 0,
2335                                      LUSTRE_OPC_ANY, &data);
2336         if (IS_ERR(op_data))
2337                 RETURN(PTR_ERR(op_data));
2338
2339         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2340                          op_data, &lockh, NULL, 0, 0);
2341
2342         ll_finish_md_op_data(op_data);
2343         if (rc < 0)
2344                 GOTO(out, rc);
2345
2346         rc = oit.d.lustre.it_status;
2347
2348         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2349                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2350                 ptlrpc_req_finished((struct ptlrpc_request *)
2351                                     oit.d.lustre.it_data);
2352                 GOTO(out, rc);
2353         }
2354
2355         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2356                                            * away */
2357                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2358                 oit.d.lustre.it_lock_mode = 0;
2359         }
2360         ll_release_openhandle(head_filp->f_dentry, &oit);
2361 out:
2362         ll_intent_release(&oit);
2363         RETURN(rc);
2364 }
2365
2366 static int ll_file_join(struct inode *head, struct file *filp,
2367                         char *filename_tail)
2368 {
2369         struct inode *tail = NULL, *first = NULL, *second = NULL;
2370         struct dentry *tail_dentry;
2371         struct file *tail_filp, *first_filp, *second_filp;
2372         struct ll_lock_tree first_tree, second_tree;
2373         struct ll_lock_tree_node *first_node, *second_node;
2374         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2375         int rc = 0, cleanup_phase = 0;
2376         ENTRY;
2377
2378         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2379                head->i_ino, head->i_generation, head, filename_tail);
2380
2381         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2382         if (IS_ERR(tail_filp)) {
2383                 CERROR("Can not open tail file %s", filename_tail);
2384                 rc = PTR_ERR(tail_filp);
2385                 GOTO(cleanup, rc);
2386         }
2387         tail = igrab(tail_filp->f_dentry->d_inode);
2388
2389         tlli = ll_i2info(tail);
2390         tail_dentry = tail_filp->f_dentry;
2391         LASSERT(tail_dentry);
2392         cleanup_phase = 1;
2393
2394         /*reorder the inode for lock sequence*/
2395         first = head->i_ino > tail->i_ino ? head : tail;
2396         second = head->i_ino > tail->i_ino ? tail : head;
2397         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2398         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2399
2400         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2401                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2402         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2403         if (IS_ERR(first_node)){
2404                 rc = PTR_ERR(first_node);
2405                 GOTO(cleanup, rc);
2406         }
2407         first_tree.lt_fd = first_filp->private_data;
2408         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2409         if (rc != 0)
2410                 GOTO(cleanup, rc);
2411         cleanup_phase = 2;
2412
2413         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2414         if (IS_ERR(second_node)){
2415                 rc = PTR_ERR(second_node);
2416                 GOTO(cleanup, rc);
2417         }
2418         second_tree.lt_fd = second_filp->private_data;
2419         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2420         if (rc != 0)
2421                 GOTO(cleanup, rc);
2422         cleanup_phase = 3;
2423
2424         rc = join_sanity_check(head, tail);
2425         if (rc)
2426                 GOTO(cleanup, rc);
2427
2428         rc = join_file(head, filp, tail_filp);
2429         if (rc)
2430                 GOTO(cleanup, rc);
2431 cleanup:
2432         switch (cleanup_phase) {
2433         case 3:
2434                 ll_tree_unlock(&second_tree);
2435                 obd_cancel_unused(ll_i2dtexp(second),
2436                                   ll_i2info(second)->lli_smd, 0, NULL);
2437         case 2:
2438                 ll_tree_unlock(&first_tree);
2439                 obd_cancel_unused(ll_i2dtexp(first),
2440                                   ll_i2info(first)->lli_smd, 0, NULL);
2441         case 1:
2442                 filp_close(tail_filp, 0);
2443                 if (tail)
2444                         iput(tail);
2445                 if (head && rc == 0) {
2446                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2447                                        &hlli->lli_smd);
2448                         hlli->lli_smd = NULL;
2449                 }
2450         case 0:
2451                 break;
2452         default:
2453                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2454                 LBUG();
2455         }
2456         RETURN(rc);
2457 }
2458
2459 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2460 {
2461         struct inode *inode = dentry->d_inode;
2462         struct obd_client_handle *och;
2463         int rc;
2464         ENTRY;
2465
2466         LASSERT(inode);
2467
2468         /* Root ? Do nothing. */
2469         if (dentry->d_inode->i_sb->s_root == dentry)
2470                 RETURN(0);
2471
2472         /* No open handle to close? Move away */
2473         if (!it_disposition(it, DISP_OPEN_OPEN))
2474                 RETURN(0);
2475
2476         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2477
2478         OBD_ALLOC(och, sizeof(*och));
2479         if (!och)
2480                 GOTO(out, rc = -ENOMEM);
2481
2482         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2483                     ll_i2info(inode), it, och);
2484
2485         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2486                                        inode, och);
2487  out:
2488         /* this one is in place of ll_file_open */
2489         ptlrpc_req_finished(it->d.lustre.it_data);
2490         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2491         RETURN(rc);
2492 }
2493
2494 /**
2495  * Get size for inode for which FIEMAP mapping is requested.
2496  * Make the FIEMAP get_info call and returns the result.
2497  */
2498 int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
2499               int num_bytes)
2500 {
2501         struct obd_export *exp = ll_i2dtexp(inode);
2502         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2503         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
2504         int vallen = num_bytes;
2505         int rc;
2506         ENTRY;
2507
2508         /* If the stripe_count > 1 and the application does not understand
2509          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
2510          */
2511         if (lsm->lsm_stripe_count > 1 &&
2512             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
2513                 return -EOPNOTSUPP;
2514
2515         fm_key.oa.o_id = lsm->lsm_object_id;
2516         fm_key.oa.o_gr = lsm->lsm_object_gr;
2517         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2518
2519         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLFID | OBD_MD_FLGROUP |
2520                         OBD_MD_FLSIZE);
2521
2522         /* If filesize is 0, then there would be no objects for mapping */
2523         if (fm_key.oa.o_size == 0) {
2524                 fiemap->fm_mapped_extents = 0;
2525                 RETURN(0);
2526         }
2527
2528         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
2529
2530         rc = obd_get_info(exp, sizeof(fm_key), &fm_key, &vallen, fiemap, lsm);
2531         if (rc)
2532                 CERROR("obd_get_info failed: rc = %d\n", rc);
2533
2534         RETURN(rc);
2535 }
2536
2537 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2538                   unsigned long arg)
2539 {
2540         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2541         int flags;
2542         ENTRY;
2543
2544         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2545                inode->i_generation, inode, cmd);
2546         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2547
2548         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2549         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2550                 RETURN(-ENOTTY);
2551
2552         switch(cmd) {
2553         case LL_IOC_GETFLAGS:
2554                 /* Get the current value of the file flags */
2555                 return put_user(fd->fd_flags, (int *)arg);
2556         case LL_IOC_SETFLAGS:
2557         case LL_IOC_CLRFLAGS:
2558                 /* Set or clear specific file flags */
2559                 /* XXX This probably needs checks to ensure the flags are
2560                  *     not abused, and to handle any flag side effects.
2561                  */
2562                 if (get_user(flags, (int *) arg))
2563                         RETURN(-EFAULT);
2564
2565                 if (cmd == LL_IOC_SETFLAGS) {
2566                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2567                             !(file->f_flags & O_DIRECT)) {
2568                                 CERROR("%s: unable to disable locking on "
2569                                        "non-O_DIRECT file\n", current->comm);
2570                                 RETURN(-EINVAL);
2571                         }
2572
2573                         fd->fd_flags |= flags;
2574                 } else {
2575                         fd->fd_flags &= ~flags;
2576                 }
2577                 RETURN(0);
2578         case LL_IOC_LOV_SETSTRIPE:
2579                 RETURN(ll_lov_setstripe(inode, file, arg));
2580         case LL_IOC_LOV_SETEA:
2581                 RETURN(ll_lov_setea(inode, file, arg));
2582         case LL_IOC_LOV_GETSTRIPE:
2583                 RETURN(ll_lov_getstripe(inode, arg));
2584         case LL_IOC_RECREATE_OBJ:
2585                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2586         case EXT3_IOC_FIEMAP: {
2587                 struct ll_user_fiemap *fiemap_s;
2588                 size_t num_bytes, ret_bytes;
2589                 unsigned int extent_count;
2590                 int rc = 0;
2591
2592                 /* Get the extent count so we can calculate the size of
2593                  * required fiemap buffer */
2594                 if (get_user(extent_count,
2595                     &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
2596                         RETURN(-EFAULT);
2597                 num_bytes = sizeof(*fiemap_s) + (extent_count *
2598                                                  sizeof(struct ll_fiemap_extent));
2599                 OBD_VMALLOC(fiemap_s, num_bytes);
2600                 if (fiemap_s == NULL)
2601                         RETURN(-ENOMEM);
2602
2603                 if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
2604                                    sizeof(*fiemap_s)))
2605                         GOTO(error, rc = -EFAULT);
2606
2607                 if (fiemap_s->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
2608                         fiemap_s->fm_flags = fiemap_s->fm_flags &
2609                                                     ~LUSTRE_FIEMAP_FLAGS_COMPAT;
2610                         if (copy_to_user((char *)arg, fiemap_s,
2611                                          sizeof(*fiemap_s)))
2612                                 GOTO(error, rc = -EFAULT);
2613
2614                         GOTO(error, rc = -EBADR);
2615                 }
2616
2617                 /* If fm_extent_count is non-zero, read the first extent since
2618                  * it is used to calculate end_offset and device from previous
2619                  * fiemap call. */
2620                 if (extent_count) {
2621                         if (copy_from_user(&fiemap_s->fm_extents[0],
2622                             (char __user *)arg + sizeof(*fiemap_s),
2623                             sizeof(struct ll_fiemap_extent)))
2624                                 GOTO(error, rc = -EFAULT);
2625                 }
2626
2627                 if (fiemap_s->fm_flags & FIEMAP_FLAG_SYNC) {
2628                         int rc;
2629
2630                         rc = filemap_fdatawrite(inode->i_mapping);
2631                         if (rc)
2632                                 GOTO(error, rc);
2633                 }
2634
2635                 rc = ll_fiemap(inode, fiemap_s, num_bytes);
2636                 if (rc)
2637                         GOTO(error, rc);
2638
2639                 ret_bytes = sizeof(struct ll_user_fiemap);
2640
2641                 if (extent_count != 0)
2642                         ret_bytes += (fiemap_s->fm_mapped_extents *
2643                                          sizeof(struct ll_fiemap_extent));
2644
2645                 if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
2646                         rc = -EFAULT;
2647
2648 error:
2649                 OBD_VFREE(fiemap_s, num_bytes);
2650                 RETURN(rc);
2651         }
2652         case EXT3_IOC_GETFLAGS:
2653         case EXT3_IOC_SETFLAGS:
2654                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2655         case EXT3_IOC_GETVERSION_OLD:
2656         case EXT3_IOC_GETVERSION:
2657                 RETURN(put_user(inode->i_generation, (int *)arg));
2658         case LL_IOC_JOIN: {
2659                 char *ftail;
2660                 int rc;
2661
2662                 ftail = getname((const char *)arg);
2663                 if (IS_ERR(ftail))
2664                         RETURN(PTR_ERR(ftail));
2665                 rc = ll_file_join(inode, file, ftail);
2666                 putname(ftail);
2667                 RETURN(rc);
2668         }
2669         case LL_IOC_GROUP_LOCK:
2670                 RETURN(ll_get_grouplock(inode, file, arg));
2671         case LL_IOC_GROUP_UNLOCK:
2672                 RETURN(ll_put_grouplock(inode, file, arg));
2673         case IOC_OBD_STATFS:
2674                 RETURN(ll_obd_statfs(inode, (void *)arg));
2675
2676         /* We need to special case any other ioctls we want to handle,
2677          * to send them to the MDS/OST as appropriate and to properly
2678          * network encode the arg field.
2679         case EXT3_IOC_SETVERSION_OLD:
2680         case EXT3_IOC_SETVERSION:
2681         */
2682         case LL_IOC_FLUSHCTX:
2683                 RETURN(ll_flush_ctx(inode));
2684         default: {
2685                 int err;
2686
2687                 if (LLIOC_STOP == 
2688                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2689                         RETURN(err);
2690
2691                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2692                                      (void *)arg));
2693         }
2694         }
2695 }
2696
2697 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2698 {
2699         struct inode *inode = file->f_dentry->d_inode;
2700         struct ll_inode_info *lli = ll_i2info(inode);
2701         struct lov_stripe_md *lsm = lli->lli_smd;
2702         loff_t retval;
2703         ENTRY;
2704         retval = offset + ((origin == 2) ? i_size_read(inode) :
2705                            (origin == 1) ? file->f_pos : 0);
2706         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2707                inode->i_ino, inode->i_generation, inode, retval, retval,
2708                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2709         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2710
2711         if (origin == 2) { /* SEEK_END */
2712                 int nonblock = 0, rc;
2713
2714                 if (file->f_flags & O_NONBLOCK)
2715                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2716
2717                 if (lsm != NULL) {
2718                         rc = ll_glimpse_size(inode, nonblock);
2719                         if (rc != 0)
2720                                 RETURN(rc);
2721                 }
2722
2723                 ll_inode_size_lock(inode, 0);
2724                 offset += i_size_read(inode);
2725                 ll_inode_size_unlock(inode, 0);
2726         } else if (origin == 1) { /* SEEK_CUR */
2727                 offset += file->f_pos;
2728         }
2729
2730         retval = -EINVAL;
2731         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2732                 if (offset != file->f_pos) {
2733                         file->f_pos = offset;
2734 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2735                         file->f_reada = 0;
2736                         file->f_version = ++event;
2737 #endif
2738                 }
2739                 retval = offset;
2740         }
2741         
2742         RETURN(retval);
2743 }
2744
2745 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2746 {
2747         struct inode *inode = dentry->d_inode;
2748         struct ll_inode_info *lli = ll_i2info(inode);
2749         struct lov_stripe_md *lsm = lli->lli_smd;
2750         struct ptlrpc_request *req;
2751         struct obd_capa *oc;
2752         int rc, err;
2753         ENTRY;
2754         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2755                inode->i_generation, inode);
2756         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2757
2758         /* fsync's caller has already called _fdata{sync,write}, we want
2759          * that IO to finish before calling the osc and mdc sync methods */
2760         rc = filemap_fdatawait(inode->i_mapping);
2761
2762         /* catch async errors that were recorded back when async writeback
2763          * failed for pages in this mapping. */
2764         err = lli->lli_async_rc;
2765         lli->lli_async_rc = 0;
2766         if (rc == 0)
2767                 rc = err;
2768         if (lsm) {
2769                 err = lov_test_and_clear_async_rc(lsm);
2770                 if (rc == 0)
2771                         rc = err;
2772         }
2773
2774         oc = ll_mdscapa_get(inode);
2775         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2776                       &req);
2777         capa_put(oc);
2778         if (!rc)
2779                 rc = err;
2780         if (!err)
2781                 ptlrpc_req_finished(req);
2782
2783         if (data && lsm) {
2784                 struct obdo *oa;
2785                 
2786                 OBDO_ALLOC(oa);
2787                 if (!oa)
2788                         RETURN(rc ? rc : -ENOMEM);
2789
2790                 oa->o_id = lsm->lsm_object_id;
2791                 oa->o_gr = lsm->lsm_object_gr;
2792                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2793                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2794                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2795                                            OBD_MD_FLGROUP);
2796
2797                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2798                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2799                                0, OBD_OBJECT_EOF, oc);
2800                 capa_put(oc);
2801                 if (!rc)
2802                         rc = err;
2803                 OBDO_FREE(oa);
2804         }
2805
2806         RETURN(rc);
2807 }
2808
2809 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2810 {
2811         struct inode *inode = file->f_dentry->d_inode;
2812         struct ll_sb_info *sbi = ll_i2sbi(inode);
2813         struct ldlm_res_id res_id =
2814                 { .name = { fid_seq(ll_inode2fid(inode)),
2815                             fid_oid(ll_inode2fid(inode)),
2816                             fid_ver(ll_inode2fid(inode)),
2817                             LDLM_FLOCK} };
2818         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2819                 ldlm_flock_completion_ast, NULL, file_lock };
2820         struct lustre_handle lockh = {0};
2821         ldlm_policy_data_t flock;
2822         int flags = 0;
2823         int rc;
2824         ENTRY;
2825
2826         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2827                inode->i_ino, file_lock);
2828
2829         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2830  
2831         if (file_lock->fl_flags & FL_FLOCK) {
2832                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2833                 /* set missing params for flock() calls */
2834                 file_lock->fl_end = OFFSET_MAX;
2835                 file_lock->fl_pid = current->tgid;
2836         }
2837         flock.l_flock.pid = file_lock->fl_pid;
2838         flock.l_flock.start = file_lock->fl_start;
2839         flock.l_flock.end = file_lock->fl_end;
2840
2841         switch (file_lock->fl_type) {
2842         case F_RDLCK:
2843                 einfo.ei_mode = LCK_PR;
2844                 break;
2845         case F_UNLCK:
2846                 /* An unlock request may or may not have any relation to
2847                  * existing locks so we may not be able to pass a lock handle
2848                  * via a normal ldlm_lock_cancel() request. The request may even
2849                  * unlock a byte range in the middle of an existing lock. In
2850                  * order to process an unlock request we need all of the same
2851                  * information that is given with a normal read or write record
2852                  * lock request. To avoid creating another ldlm unlock (cancel)
2853                  * message we'll treat a LCK_NL flock request as an unlock. */
2854                 einfo.ei_mode = LCK_NL;
2855                 break;
2856         case F_WRLCK:
2857                 einfo.ei_mode = LCK_PW;
2858                 break;
2859         default:
2860                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2861                 LBUG();
2862         }
2863
2864         switch (cmd) {
2865         case F_SETLKW:
2866 #ifdef F_SETLKW64
2867         case F_SETLKW64:
2868 #endif
2869                 flags = 0;
2870                 break;
2871         case F_SETLK:
2872 #ifdef F_SETLK64
2873         case F_SETLK64:
2874 #endif
2875                 flags = LDLM_FL_BLOCK_NOWAIT;
2876                 break;
2877         case F_GETLK:
2878 #ifdef F_GETLK64
2879         case F_GETLK64:
2880 #endif
2881                 flags = LDLM_FL_TEST_LOCK;
2882                 /* Save the old mode so that if the mode in the lock changes we
2883                  * can decrement the appropriate reader or writer refcount. */
2884                 file_lock->fl_type = einfo.ei_mode;
2885                 break;
2886         default:
2887                 CERROR("unknown fcntl lock command: %d\n", cmd);
2888                 LBUG();
2889         }
2890
2891         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2892                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2893                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2894
2895         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2896                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2897         if ((file_lock->fl_flags & FL_FLOCK) &&
2898             (rc == 0 || file_lock->fl_type == F_UNLCK))
2899                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2900 #ifdef HAVE_F_OP_FLOCK
2901         if ((file_lock->fl_flags & FL_POSIX) &&
2902             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2903             !(flags & LDLM_FL_TEST_LOCK))
2904                 posix_lock_file_wait(file, file_lock);
2905 #endif
2906
2907         RETURN(rc);
2908 }
2909
2910 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2911 {
2912         ENTRY;
2913
2914         RETURN(-ENOSYS);
2915 }
2916
2917 int ll_have_md_lock(struct inode *inode, __u64 bits)
2918 {
2919         struct lustre_handle lockh;
2920         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2921         struct lu_fid *fid;
2922         int flags;
2923         ENTRY;
2924
2925         if (!inode)
2926                RETURN(0);
2927
2928         fid = &ll_i2info(inode)->lli_fid;
2929         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2930
2931         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2932         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2933                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2934                 RETURN(1);
2935         }
2936         RETURN(0);
2937 }
2938
2939 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2940                             struct lustre_handle *lockh)
2941 {
2942         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2943         struct lu_fid *fid;
2944         ldlm_mode_t rc;
2945         int flags;
2946         ENTRY;
2947
2948         fid = &ll_i2info(inode)->lli_fid;
2949         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2950
2951         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2952         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2953                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2954         RETURN(rc);
2955 }
2956
2957 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2958         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2959                               * and return success */
2960                 inode->i_nlink = 0;
2961                 /* This path cannot be hit for regular files unless in
2962                  * case of obscure races, so no need to to validate
2963                  * size. */
2964                 if (!S_ISREG(inode->i_mode) &&
2965                     !S_ISDIR(inode->i_mode))
2966                         return 0;
2967         }
2968
2969         if (rc) {
2970                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2971                 return -abs(rc);
2972
2973         }
2974
2975         return 0;
2976 }
2977
2978 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2979 {
2980         struct inode *inode = dentry->d_inode;
2981         struct ptlrpc_request *req = NULL;
2982         struct ll_sb_info *sbi;
2983         struct obd_export *exp;
2984         int rc;
2985         ENTRY;
2986
2987         if (!inode) {
2988                 CERROR("REPORT THIS LINE TO PETER\n");
2989                 RETURN(0);
2990         }
2991         sbi = ll_i2sbi(inode);
2992
2993         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2994                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2995
2996         exp = ll_i2mdexp(inode);
2997
2998         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2999                 struct lookup_intent oit = { .it_op = IT_GETATTR };
3000                 struct md_op_data *op_data;
3001
3002                 /* Call getattr by fid, so do not provide name at all. */
3003                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
3004                                              dentry->d_inode, NULL, 0, 0,
3005                                              LUSTRE_OPC_ANY, NULL);
3006                 if (IS_ERR(op_data))
3007                         RETURN(PTR_ERR(op_data));
3008
3009                 oit.it_flags |= O_CHECK_STALE;
3010                 rc = md_intent_lock(exp, op_data, NULL, 0,
3011                                     /* we are not interested in name
3012                                        based lookup */
3013                                     &oit, 0, &req,
3014                                     ll_md_blocking_ast, 0);
3015                 ll_finish_md_op_data(op_data);
3016                 oit.it_flags &= ~O_CHECK_STALE;
3017                 if (rc < 0) {
3018                         rc = ll_inode_revalidate_fini(inode, rc);
3019                         GOTO (out, rc);
3020                 }
3021
3022                 rc = ll_revalidate_it_finish(req, &oit, dentry);
3023                 if (rc != 0) {
3024                         ll_intent_release(&oit);
3025                         GOTO(out, rc);
3026                 }
3027
3028                 /* Unlinked? Unhash dentry, so it is not picked up later by
3029                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
3030                    here to preserve get_cwd functionality on 2.6.
3031                    Bug 10503 */
3032                 if (!dentry->d_inode->i_nlink) {
3033                         spin_lock(&dcache_lock);
3034                         ll_drop_dentry(dentry);
3035                         spin_unlock(&dcache_lock);
3036                 }
3037
3038                 ll_lookup_finish_locks(&oit, dentry);
3039         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
3040                                                      MDS_INODELOCK_LOOKUP)) {
3041                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
3042                 obd_valid valid = OBD_MD_FLGETATTR;
3043                 struct obd_capa *oc;
3044                 int ealen = 0;
3045
3046                 if (S_ISREG(inode->i_mode)) {
3047                         rc = ll_get_max_mdsize(sbi, &ealen);
3048                         if (rc)
3049                                 RETURN(rc);
3050                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
3051                 }
3052                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
3053                  * capa for this inode. Because we only keep capas of dirs
3054                  * fresh. */
3055                 oc = ll_mdscapa_get(inode);
3056                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
3057                                 ealen, &req);
3058                 capa_put(oc);
3059                 if (rc) {
3060                         rc = ll_inode_revalidate_fini(inode, rc);
3061                         RETURN(rc);
3062                 }
3063
3064                 rc = ll_prep_inode(&inode, req, NULL);
3065                 if (rc)
3066                         GOTO(out, rc);
3067         }
3068
3069         /* if object not yet allocated, don't validate size */
3070         if (ll_i2info(inode)->lli_smd == NULL)
3071                 GOTO(out, rc = 0);
3072
3073         /* ll_glimpse_size will prefer locally cached writes if they extend
3074          * the file */
3075         rc = ll_glimpse_size(inode, 0);
3076         EXIT;
3077 out:
3078         ptlrpc_req_finished(req);
3079         return rc;
3080 }
3081
3082 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
3083                   struct lookup_intent *it, struct kstat *stat)
3084 {
3085         struct inode *inode = de->d_inode;
3086         int res = 0;
3087
3088         res = ll_inode_revalidate_it(de, it);
3089         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
3090
3091         if (res)
3092                 return res;
3093
3094         stat->dev = inode->i_sb->s_dev;
3095         stat->ino = inode->i_ino;
3096         stat->mode = inode->i_mode;
3097         stat->nlink = inode->i_nlink;
3098         stat->uid = inode->i_uid;
3099         stat->gid = inode->i_gid;
3100         stat->rdev = kdev_t_to_nr(inode->i_rdev);
3101         stat->atime = inode->i_atime;
3102         stat->mtime = inode->i_mtime;
3103         stat->ctime = inode->i_ctime;
3104 #ifdef HAVE_INODE_BLKSIZE
3105         stat->blksize = inode->i_blksize;
3106 #else
3107         stat->blksize = 1 << inode->i_blkbits;
3108 #endif
3109
3110         ll_inode_size_lock(inode, 0);
3111         stat->size = i_size_read(inode);
3112         stat->blocks = inode->i_blocks;
3113         ll_inode_size_unlock(inode, 0);
3114
3115         return 0;
3116 }
3117 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3118 {
3119         struct lookup_intent it = { .it_op = IT_GETATTR };
3120
3121         return ll_getattr_it(mnt, de, &it, stat);
3122 }
3123
3124 static
3125 int lustre_check_acl(struct inode *inode, int mask)
3126 {
3127 #ifdef CONFIG_FS_POSIX_ACL
3128         struct ll_inode_info *lli = ll_i2info(inode);
3129         struct posix_acl *acl;
3130         int rc;
3131         ENTRY;
3132
3133         spin_lock(&lli->lli_lock);
3134         acl = posix_acl_dup(lli->lli_posix_acl);
3135         spin_unlock(&lli->lli_lock);
3136
3137         if (!acl)
3138                 RETURN(-EAGAIN);
3139
3140         rc = posix_acl_permission(inode, acl, mask);
3141         posix_acl_release(acl);
3142
3143         RETURN(rc);
3144 #else
3145         return -EAGAIN;
3146 #endif
3147 }
3148
3149 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
3150 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3151 {
3152         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3153                inode->i_ino, inode->i_generation, inode, mask);
3154         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3155                 return lustre_check_remote_perm(inode, mask);
3156         
3157         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3158         return generic_permission(inode, mask, lustre_check_acl);
3159 }
3160 #else
3161 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3162 {
3163         int mode = inode->i_mode;
3164         int rc;
3165
3166         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3167                inode->i_ino, inode->i_generation, inode, mask);
3168
3169         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3170                 return lustre_check_remote_perm(inode, mask);
3171
3172         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3173
3174         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
3175             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
3176                 return -EROFS;
3177         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
3178                 return -EACCES;
3179         if (current->fsuid == inode->i_uid) {
3180                 mode >>= 6;
3181         } else if (1) {
3182                 if (((mode >> 3) & mask & S_IRWXO) != mask)
3183                         goto check_groups;
3184                 rc = lustre_check_acl(inode, mask);
3185                 if (rc == -EAGAIN)
3186                         goto check_groups;
3187                 if (rc == -EACCES)
3188                         goto check_capabilities;
3189                 return rc;
3190         } else {
3191 check_groups:
3192                 if (in_group_p(inode->i_gid))
3193                         mode >>= 3;
3194         }
3195         if ((mode & mask & S_IRWXO) == mask)
3196                 return 0;
3197
3198 check_capabilities:
3199         if (!(mask & MAY_EXEC) ||
3200             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
3201                 if (capable(CAP_DAC_OVERRIDE))
3202                         return 0;
3203
3204         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
3205             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
3206                 return 0;
3207         
3208         return -EACCES;
3209 }
3210 #endif
3211
3212 /* -o localflock - only provides locally consistent flock locks */
3213 struct file_operations ll_file_operations = {
3214         .read           = ll_file_read,
3215         .write          = ll_file_write,
3216         .ioctl          = ll_file_ioctl,
3217         .open           = ll_file_open,
3218         .release        = ll_file_release,
3219         .mmap           = ll_file_mmap,
3220         .llseek         = ll_file_seek,
3221         .sendfile       = ll_file_sendfile,
3222         .fsync          = ll_fsync,
3223 };
3224
3225 struct file_operations ll_file_operations_flock = {
3226         .read           = ll_file_read,
3227         .write          = ll_file_write,
3228         .ioctl          = ll_file_ioctl,
3229         .open           = ll_file_open,
3230         .release        = ll_file_release,
3231         .mmap           = ll_file_mmap,
3232         .llseek         = ll_file_seek,
3233         .sendfile       = ll_file_sendfile,
3234         .fsync          = ll_fsync,
3235 #ifdef HAVE_F_OP_FLOCK
3236         .flock          = ll_file_flock,
3237 #endif
3238         .lock           = ll_file_flock
3239 };
3240
3241 /* These are for -o noflock - to return ENOSYS on flock calls */
3242 struct file_operations ll_file_operations_noflock = {
3243         .read           = ll_file_read,
3244         .write          = ll_file_write,
3245         .ioctl          = ll_file_ioctl,
3246         .open           = ll_file_open,
3247         .release        = ll_file_release,
3248         .mmap           = ll_file_mmap,
3249         .llseek         = ll_file_seek,
3250         .sendfile       = ll_file_sendfile,
3251         .fsync          = ll_fsync,
3252 #ifdef HAVE_F_OP_FLOCK
3253         .flock          = ll_file_noflock,
3254 #endif
3255         .lock           = ll_file_noflock
3256 };
3257
3258 struct inode_operations ll_file_inode_operations = {
3259 #ifdef HAVE_VFS_INTENT_PATCHES
3260         .setattr_raw    = ll_setattr_raw,
3261 #endif
3262         .setattr        = ll_setattr,
3263         .truncate       = ll_truncate,
3264         .getattr        = ll_getattr,
3265         .permission     = ll_inode_permission,
3266         .setxattr       = ll_setxattr,
3267         .getxattr       = ll_getxattr,
3268         .listxattr      = ll_listxattr,
3269         .removexattr    = ll_removexattr,
3270 };
3271
3272 /* dynamic ioctl number support routins */
3273 static struct llioc_ctl_data {
3274         struct rw_semaphore ioc_sem;
3275         struct list_head    ioc_head;
3276 } llioc = { 
3277         __RWSEM_INITIALIZER(llioc.ioc_sem), 
3278         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3279 };
3280
3281
3282 struct llioc_data {
3283         struct list_head        iocd_list;
3284         unsigned int            iocd_size;
3285         llioc_callback_t        iocd_cb;
3286         unsigned int            iocd_count;
3287         unsigned int            iocd_cmd[0];
3288 };
3289
3290 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3291 {
3292         unsigned int size;
3293         struct llioc_data *in_data = NULL;
3294         ENTRY;
3295
3296         if (cb == NULL || cmd == NULL ||
3297             count > LLIOC_MAX_CMD || count < 0)
3298                 RETURN(NULL);
3299
3300         size = sizeof(*in_data) + count * sizeof(unsigned int);
3301         OBD_ALLOC(in_data, size);
3302         if (in_data == NULL)
3303                 RETURN(NULL);
3304
3305         memset(in_data, 0, sizeof(*in_data));
3306         in_data->iocd_size = size;