Whamcloud - gitweb
b=10555
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <lustre_mdc.h>
47 #include <linux/pagemap.h>
48 #include <linux/file.h>
49 #include "llite_internal.h"
50 #include <lustre/ll_fiemap.h>
51
52 /* also used by llite/special.c:ll_special_open() */
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
58         return fd;
59 }
60
61 static void ll_file_data_put(struct ll_file_data *fd)
62 {
63         if (fd != NULL)
64                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
65 }
66
67 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
68                           struct lustre_handle *fh)
69 {
70         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
71         op_data->op_attr.ia_mode = inode->i_mode;
72         op_data->op_attr.ia_atime = inode->i_atime;
73         op_data->op_attr.ia_mtime = inode->i_mtime;
74         op_data->op_attr.ia_ctime = inode->i_ctime;
75         op_data->op_attr.ia_size = i_size_read(inode);
76         op_data->op_attr_blocks = inode->i_blocks;
77         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
78         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
79         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
80         op_data->op_capa1 = ll_mdscapa_get(inode);
81 }
82
83 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
84                              struct obd_client_handle *och)
85 {
86         ENTRY;
87
88         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
89                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
90
91         if (!(och->och_flags & FMODE_WRITE))
92                 goto out;
93
94         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
95             !S_ISREG(inode->i_mode))
96                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
97         else
98                 ll_epoch_close(inode, op_data, &och, 0);
99
100 out:
101         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
102         EXIT;
103 }
104
105 static int ll_close_inode_openhandle(struct obd_export *md_exp,
106                                      struct inode *inode,
107                                      struct obd_client_handle *och)
108 {
109         struct obd_export *exp = ll_i2mdexp(inode);
110         struct md_op_data *op_data;
111         struct ptlrpc_request *req = NULL;
112         struct obd_device *obd = class_exp2obd(exp);
113         int epoch_close = 1;
114         int seq_end = 0, rc;
115         ENTRY;
116
117         if (obd == NULL) {
118                 /*
119                  * XXX: in case of LMV, is this correct to access
120                  * ->exp_handle?
121                  */
122                 CERROR("Invalid MDC connection handle "LPX64"\n",
123                        ll_i2mdexp(inode)->exp_handle.h_cookie);
124                 GOTO(out, rc = 0);
125         }
126
127         /*
128          * here we check if this is forced umount. If so this is called on
129          * canceling "open lock" and we do not call md_close() in this case, as
130          * it will not be successful, as import is already deactivated.
131          */
132         if (obd->obd_force)
133                 GOTO(out, rc = 0);
134
135         OBD_ALLOC_PTR(op_data);
136         if (op_data == NULL)
137                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
138
139         ll_prepare_close(inode, op_data, och);
140         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
141         rc = md_close(md_exp, op_data, och->och_mod, &req);
142         if (rc != -EAGAIN)
143                 seq_end = 1;
144
145         if (rc == -EAGAIN) {
146                 /* This close must have the epoch closed. */
147                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
148                 LASSERT(epoch_close);
149                 /* MDS has instructed us to obtain Size-on-MDS attribute from
150                  * OSTs and send setattr to back to MDS. */
151                 rc = ll_sizeonmds_update(inode, och->och_mod,
152                                          &och->och_fh, op_data->op_ioepoch);
153                 if (rc) {
154                         CERROR("inode %lu mdc Size-on-MDS update failed: "
155                                "rc = %d\n", inode->i_ino, rc);
156                         rc = 0;
157                 }
158         } else if (rc) {
159                 CERROR("inode %lu mdc close failed: rc = %d\n",
160                        inode->i_ino, rc);
161         }
162         ll_finish_md_op_data(op_data);
163
164         if (rc == 0) {
165                 rc = ll_objects_destroy(req, inode);
166                 if (rc)
167                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
168                                inode->i_ino, rc);
169         }
170
171         EXIT;
172 out:
173       
174         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
175             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
176                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
177         } else {
178                 if (seq_end)
179                         ptlrpc_close_replay_seq(req);
180                 md_clear_open_replay_data(md_exp, och);
181                 /* Free @och if it is not waiting for DONE_WRITING. */
182                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
183                 OBD_FREE_PTR(och);
184         }
185         if (req) /* This is close request */
186                 ptlrpc_req_finished(req);
187         return rc;
188 }
189
190 int ll_md_real_close(struct inode *inode, int flags)
191 {
192         struct ll_inode_info *lli = ll_i2info(inode);
193         struct obd_client_handle **och_p;
194         struct obd_client_handle *och;
195         __u64 *och_usecount;
196         int rc = 0;
197         ENTRY;
198
199         if (flags & FMODE_WRITE) {
200                 och_p = &lli->lli_mds_write_och;
201                 och_usecount = &lli->lli_open_fd_write_count;
202         } else if (flags & FMODE_EXEC) {
203                 och_p = &lli->lli_mds_exec_och;
204                 och_usecount = &lli->lli_open_fd_exec_count;
205         } else {
206                 LASSERT(flags & FMODE_READ);
207                 och_p = &lli->lli_mds_read_och;
208                 och_usecount = &lli->lli_open_fd_read_count;
209         }
210
211         down(&lli->lli_och_sem);
212         if (*och_usecount) { /* There are still users of this handle, so
213                                 skip freeing it. */
214                 up(&lli->lli_och_sem);
215                 RETURN(0);
216         }
217         och=*och_p;
218         *och_p = NULL;
219         up(&lli->lli_och_sem);
220
221         if (och) { /* There might be a race and somebody have freed this och
222                       already */
223                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
224                                                inode, och);
225         }
226
227         RETURN(rc);
228 }
229
230 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
231                 struct file *file)
232 {
233         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
234         struct ll_inode_info *lli = ll_i2info(inode);
235         int rc = 0;
236         ENTRY;
237
238         /* clear group lock, if present */
239         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
240                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
241                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
242                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
243                                       &fd->fd_cwlockh);
244         }
245
246         /* Let's see if we have good enough OPEN lock on the file and if
247            we can skip talking to MDS */
248         if (file->f_dentry->d_inode) { /* Can this ever be false? */
249                 int lockmode;
250                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
251                 struct lustre_handle lockh;
252                 struct inode *inode = file->f_dentry->d_inode;
253                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
254
255                 down(&lli->lli_och_sem);
256                 if (fd->fd_omode & FMODE_WRITE) {
257                         lockmode = LCK_CW;
258                         LASSERT(lli->lli_open_fd_write_count);
259                         lli->lli_open_fd_write_count--;
260                 } else if (fd->fd_omode & FMODE_EXEC) {
261                         lockmode = LCK_PR;
262                         LASSERT(lli->lli_open_fd_exec_count);
263                         lli->lli_open_fd_exec_count--;
264                 } else {
265                         lockmode = LCK_CR;
266                         LASSERT(lli->lli_open_fd_read_count);
267                         lli->lli_open_fd_read_count--;
268                 }
269                 up(&lli->lli_och_sem);
270
271                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
272                                    LDLM_IBITS, &policy, lockmode,
273                                    &lockh)) {
274                         rc = ll_md_real_close(file->f_dentry->d_inode,
275                                               fd->fd_omode);
276                 }
277         } else {
278                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
279                        file, file->f_dentry, file->f_dentry->d_name.name);
280         }
281
282         LUSTRE_FPRIVATE(file) = NULL;
283         ll_file_data_put(fd);
284         ll_capa_close(inode);
285
286         RETURN(rc);
287 }
288
289 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
290
291 /* While this returns an error code, fput() the caller does not, so we need
292  * to make every effort to clean up all of our state here.  Also, applications
293  * rarely check close errors and even if an error is returned they will not
294  * re-try the close call.
295  */
296 int ll_file_release(struct inode *inode, struct file *file)
297 {
298         struct ll_file_data *fd;
299         struct ll_sb_info *sbi = ll_i2sbi(inode);
300         struct ll_inode_info *lli = ll_i2info(inode);
301         struct lov_stripe_md *lsm = lli->lli_smd;
302         int rc;
303
304         ENTRY;
305         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
306                inode->i_generation, inode);
307
308 #ifdef CONFIG_FS_POSIX_ACL
309         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
310             inode == inode->i_sb->s_root->d_inode) {
311                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
312
313                 LASSERT(fd != NULL);
314                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
315                         fd->fd_flags &= ~LL_FILE_RMTACL;
316                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
317                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
318                 }
319         }
320 #endif
321
322         if (inode->i_sb->s_root != file->f_dentry)
323                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
324         fd = LUSTRE_FPRIVATE(file);
325         LASSERT(fd != NULL);
326
327         /* The last ref on @file, maybe not the the owner pid of statahead.
328          * Different processes can open the same dir, "ll_opendir_key" means:
329          * it is me that should stop the statahead thread. */
330         if (lli->lli_opendir_key == fd)
331                 ll_stop_statahead(inode, fd);
332
333         if (inode->i_sb->s_root == file->f_dentry) {
334                 LUSTRE_FPRIVATE(file) = NULL;
335                 ll_file_data_put(fd);
336                 RETURN(0);
337         }
338         
339         if (lsm)
340                 lov_test_and_clear_async_rc(lsm);
341         lli->lli_async_rc = 0;
342
343         rc = ll_md_close(sbi->ll_md_exp, inode, file);
344         RETURN(rc);
345 }
346
347 static int ll_intent_file_open(struct file *file, void *lmm,
348                                int lmmsize, struct lookup_intent *itp)
349 {
350         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
351         struct dentry *parent = file->f_dentry->d_parent;
352         const char *name = file->f_dentry->d_name.name;
353         const int len = file->f_dentry->d_name.len;
354         struct md_op_data *op_data;
355         struct ptlrpc_request *req;
356         int rc;
357         ENTRY;
358
359         if (!parent)
360                 RETURN(-ENOENT);
361
362         /* Usually we come here only for NFSD, and we want open lock.
363            But we can also get here with pre 2.6.15 patchless kernels, and in
364            that case that lock is also ok */
365         /* We can also get here if there was cached open handle in revalidate_it
366          * but it disappeared while we were getting from there to ll_file_open.
367          * But this means this file was closed and immediatelly opened which
368          * makes a good candidate for using OPEN lock */
369         /* If lmmsize & lmm are not 0, we are just setting stripe info
370          * parameters. No need for the open lock */
371         if (!lmm && !lmmsize)
372                 itp->it_flags |= MDS_OPEN_LOCK;
373
374         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
375                                       file->f_dentry->d_inode, name, len,
376                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
377         if (IS_ERR(op_data))
378                 RETURN(PTR_ERR(op_data));
379
380         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
381                             0 /*unused */, &req, ll_md_blocking_ast, 0);
382         ll_finish_md_op_data(op_data);
383         if (rc == -ESTALE) {
384                 /* reason for keep own exit path - don`t flood log
385                 * with messages with -ESTALE errors.
386                 */
387                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
388                      it_open_error(DISP_OPEN_OPEN, itp))
389                         GOTO(out, rc);
390                 ll_release_openhandle(file->f_dentry, itp);
391                 GOTO(out_stale, rc);
392         }
393
394         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
395                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
396                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
397                 GOTO(out, rc);
398         }
399
400         if (itp->d.lustre.it_lock_mode)
401                 md_set_lock_data(sbi->ll_md_exp,
402                                  &itp->d.lustre.it_lock_handle, 
403                                  file->f_dentry->d_inode);
404
405         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
406 out:
407         ptlrpc_req_finished(itp->d.lustre.it_data);
408
409 out_stale:
410         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
411         ll_intent_drop_lock(itp);
412
413         RETURN(rc);
414 }
415
416 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
417                        struct lookup_intent *it, struct obd_client_handle *och)
418 {
419         struct ptlrpc_request *req = it->d.lustre.it_data;
420         struct mdt_body *body;
421
422         LASSERT(och);
423
424         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
425         LASSERT(body != NULL);                      /* reply already checked out */
426
427         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
428         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
429         och->och_fid = lli->lli_fid;
430         och->och_flags = it->it_flags;
431         lli->lli_ioepoch = body->ioepoch;
432
433         return md_set_open_replay_data(md_exp, och, req);
434 }
435
436 int ll_local_open(struct file *file, struct lookup_intent *it,
437                   struct ll_file_data *fd, struct obd_client_handle *och)
438 {
439         struct inode *inode = file->f_dentry->d_inode;
440         struct ll_inode_info *lli = ll_i2info(inode);
441         ENTRY;
442
443         LASSERT(!LUSTRE_FPRIVATE(file));
444
445         LASSERT(fd != NULL);
446
447         if (och) {
448                 struct ptlrpc_request *req = it->d.lustre.it_data;
449                 struct mdt_body *body;
450                 int rc;
451
452                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
453                 if (rc)
454                         RETURN(rc);
455
456                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
457                 if ((it->it_flags & FMODE_WRITE) &&
458                     (body->valid & OBD_MD_FLSIZE))
459                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
460                                lli->lli_ioepoch, PFID(&lli->lli_fid));
461         }
462
463         LUSTRE_FPRIVATE(file) = fd;
464         ll_readahead_init(inode, &fd->fd_ras);
465         fd->fd_omode = it->it_flags;
466         RETURN(0);
467 }
468
469 /* Open a file, and (for the very first open) create objects on the OSTs at
470  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
471  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
472  * lli_open_sem to ensure no other process will create objects, send the
473  * stripe MD to the MDS, or try to destroy the objects if that fails.
474  *
475  * If we already have the stripe MD locally then we don't request it in
476  * md_open(), by passing a lmm_size = 0.
477  *
478  * It is up to the application to ensure no other processes open this file
479  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
480  * used.  We might be able to avoid races of that sort by getting lli_open_sem
481  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
482  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
483  */
484 int ll_file_open(struct inode *inode, struct file *file)
485 {
486         struct ll_inode_info *lli = ll_i2info(inode);
487         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
488                                           .it_flags = file->f_flags };
489         struct lov_stripe_md *lsm;
490         struct ptlrpc_request *req = NULL;
491         struct obd_client_handle **och_p;
492         __u64 *och_usecount;
493         struct ll_file_data *fd;
494         int rc = 0, opendir_set = 0;
495         ENTRY;
496
497         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
498                inode->i_generation, inode, file->f_flags);
499
500 #ifdef HAVE_VFS_INTENT_PATCHES
501         it = file->f_it;
502 #else
503         it = file->private_data; /* XXX: compat macro */
504         file->private_data = NULL; /* prevent ll_local_open assertion */
505 #endif
506
507         fd = ll_file_data_get();
508         if (fd == NULL)
509                 RETURN(-ENOMEM);
510
511         if (S_ISDIR(inode->i_mode)) {
512                 spin_lock(&lli->lli_lock);
513                 /* "lli->lli_opendir_pid != 0" means someone has set it.
514                  * "lli->lli_sai != NULL" means the previous statahead has not
515                  *                        been cleanup. */ 
516                 if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) {
517                         opendir_set = 1;
518                         lli->lli_opendir_pid = cfs_curproc_pid();
519                         lli->lli_opendir_key = fd;
520                 } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) {
521                         /* Two cases for this:
522                          * (1) The same process open such directory many times.
523                          * (2) The old process opened the directory, and exited
524                          *     before its children processes. Then new process
525                          *     with the same pid opens such directory before the
526                          *     old process's children processes exit.
527                          * Change the owner to the latest one. */
528                         opendir_set = 2;
529                         lli->lli_opendir_key = fd;
530                 }
531                 spin_unlock(&lli->lli_lock);
532         }
533
534         if (inode->i_sb->s_root == file->f_dentry) {
535                 LUSTRE_FPRIVATE(file) = fd;
536                 RETURN(0);
537         }
538
539         if (!it || !it->d.lustre.it_disposition) {
540                 /* Convert f_flags into access mode. We cannot use file->f_mode,
541                  * because everything but O_ACCMODE mask was stripped from
542                  * there */
543                 if ((oit.it_flags + 1) & O_ACCMODE)
544                         oit.it_flags++;
545                 if (file->f_flags & O_TRUNC)
546                         oit.it_flags |= FMODE_WRITE;
547
548                 /* kernel only call f_op->open in dentry_open.  filp_open calls
549                  * dentry_open after call to open_namei that checks permissions.
550                  * Only nfsd_open call dentry_open directly without checking
551                  * permissions and because of that this code below is safe. */
552                 if (oit.it_flags & FMODE_WRITE)
553                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
554
555                 /* We do not want O_EXCL here, presumably we opened the file
556                  * already? XXX - NFS implications? */
557                 oit.it_flags &= ~O_EXCL;
558
559                 it = &oit;
560         }
561
562 restart:
563         /* Let's see if we have file open on MDS already. */
564         if (it->it_flags & FMODE_WRITE) {
565                 och_p = &lli->lli_mds_write_och;
566                 och_usecount = &lli->lli_open_fd_write_count;
567         } else if (it->it_flags & FMODE_EXEC) {
568                 och_p = &lli->lli_mds_exec_och;
569                 och_usecount = &lli->lli_open_fd_exec_count;
570          } else {
571                 och_p = &lli->lli_mds_read_och;
572                 och_usecount = &lli->lli_open_fd_read_count;
573         }
574         
575         down(&lli->lli_och_sem);
576         if (*och_p) { /* Open handle is present */
577                 if (it_disposition(it, DISP_OPEN_OPEN)) {
578                         /* Well, there's extra open request that we do not need,
579                            let's close it somehow. This will decref request. */
580                         rc = it_open_error(DISP_OPEN_OPEN, it);
581                         if (rc) {
582                                 ll_file_data_put(fd);
583                                 GOTO(out_och_free, rc);
584                         }       
585                         ll_release_openhandle(file->f_dentry, it);
586                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
587                                              LPROC_LL_OPEN);
588                 }
589                 (*och_usecount)++;
590
591                 rc = ll_local_open(file, it, fd, NULL);
592                 if (rc) {
593                         up(&lli->lli_och_sem);
594                         ll_file_data_put(fd);
595                         RETURN(rc);
596                 }
597         } else {
598                 LASSERT(*och_usecount == 0);
599                 if (!it->d.lustre.it_disposition) {
600                         /* We cannot just request lock handle now, new ELC code
601                            means that one of other OPEN locks for this file
602                            could be cancelled, and since blocking ast handler
603                            would attempt to grab och_sem as well, that would
604                            result in a deadlock */
605                         up(&lli->lli_och_sem);
606                         it->it_flags |= O_CHECK_STALE;
607                         rc = ll_intent_file_open(file, NULL, 0, it);
608                         it->it_flags &= ~O_CHECK_STALE;
609                         if (rc) {
610                                 ll_file_data_put(fd);
611                                 GOTO(out_openerr, rc);
612                         }
613
614                         /* Got some error? Release the request */
615                         if (it->d.lustre.it_status < 0) {
616                                 req = it->d.lustre.it_data;
617                                 ptlrpc_req_finished(req);
618                         }
619                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
620                                          &it->d.lustre.it_lock_handle,
621                                          file->f_dentry->d_inode);
622                         goto restart;
623                 }
624                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
625                 if (!*och_p) {
626                         ll_file_data_put(fd);
627                         GOTO(out_och_free, rc = -ENOMEM);
628                 }
629                 (*och_usecount)++;
630                 req = it->d.lustre.it_data;
631
632                 /* md_intent_lock() didn't get a request ref if there was an
633                  * open error, so don't do cleanup on the request here
634                  * (bug 3430) */
635                 /* XXX (green): Should not we bail out on any error here, not
636                  * just open error? */
637                 rc = it_open_error(DISP_OPEN_OPEN, it);
638                 if (rc) {
639                         ll_file_data_put(fd);
640                         GOTO(out_och_free, rc);
641                 }
642
643                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
644                 rc = ll_local_open(file, it, fd, *och_p);
645                 if (rc) {
646                         up(&lli->lli_och_sem);
647                         ll_file_data_put(fd);
648                         GOTO(out_och_free, rc);
649                 }
650         }
651         up(&lli->lli_och_sem);
652
653         /* Must do this outside lli_och_sem lock to prevent deadlock where
654            different kind of OPEN lock for this same inode gets cancelled
655            by ldlm_cancel_lru */
656         if (!S_ISREG(inode->i_mode))
657                 GOTO(out, rc);
658
659         ll_capa_open(inode);
660
661         lsm = lli->lli_smd;
662         if (lsm == NULL) {
663                 if (file->f_flags & O_LOV_DELAY_CREATE ||
664                     !(file->f_mode & FMODE_WRITE)) {
665                         CDEBUG(D_INODE, "object creation was delayed\n");
666                         GOTO(out, rc);
667                 }
668         }
669         file->f_flags &= ~O_LOV_DELAY_CREATE;
670         GOTO(out, rc);
671 out:
672         ptlrpc_req_finished(req);
673         if (req)
674                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
675 out_och_free:
676         if (rc) {
677                 if (*och_p) {
678                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
679                         *och_p = NULL; /* OBD_FREE writes some magic there */
680                         (*och_usecount)--;
681                 }
682                 up(&lli->lli_och_sem);
683 out_openerr:
684                 if (opendir_set == 1) {
685                         lli->lli_opendir_key = NULL;
686                         lli->lli_opendir_pid = 0;
687                 } else if (unlikely(opendir_set == 2)) {
688                         ll_stop_statahead(inode, fd);
689                 }
690         }
691
692         return rc;
693 }
694
695 /* Fills the obdo with the attributes for the inode defined by lsm */
696 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
697 {
698         struct ptlrpc_request_set *set;
699         struct ll_inode_info *lli = ll_i2info(inode);
700         struct lov_stripe_md *lsm = lli->lli_smd;
701
702         struct obd_info oinfo = { { { 0 } } };
703         int rc;
704         ENTRY;
705
706         LASSERT(lsm != NULL);
707
708         oinfo.oi_md = lsm;
709         oinfo.oi_oa = obdo;
710         oinfo.oi_oa->o_id = lsm->lsm_object_id;
711         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
712         oinfo.oi_oa->o_mode = S_IFREG;
713         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
714                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
715                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
716                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
717                                OBD_MD_FLGROUP;
718         oinfo.oi_capa = ll_mdscapa_get(inode);
719
720         set = ptlrpc_prep_set();
721         if (set == NULL) {
722                 CERROR("can't allocate ptlrpc set\n");
723                 rc = -ENOMEM;
724         } else {
725                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
726                 if (rc == 0)
727                         rc = ptlrpc_set_wait(set);
728                 ptlrpc_set_destroy(set);
729         }
730         capa_put(oinfo.oi_capa);
731         if (rc)
732                 RETURN(rc);
733
734         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
735                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
736                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
737
738         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
739         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
740                lli->lli_smd->lsm_object_id, i_size_read(inode),
741                (unsigned long long)inode->i_blocks,
742                (unsigned long)ll_inode_blksize(inode));
743         RETURN(0);
744 }
745
746 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
747 {
748         struct ll_inode_info *lli = ll_i2info(inode);
749         struct lov_stripe_md *lsm = lli->lli_smd;
750         struct obd_export *exp = ll_i2dtexp(inode);
751         struct {
752                 char name[16];
753                 struct ldlm_lock *lock;
754         } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock };
755         __u32 stripe, vallen = sizeof(stripe);
756         struct lov_oinfo *loinfo;
757         int rc;
758         ENTRY;
759
760         if (lsm->lsm_stripe_count == 1)
761                 GOTO(check, stripe = 0);
762
763         /* get our offset in the lov */
764         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe, lsm);
765         if (rc != 0) {
766                 CERROR("obd_get_info: rc = %d\n", rc);
767                 RETURN(rc);
768         }
769         LASSERT(stripe < lsm->lsm_stripe_count);
770
771 check:
772         loinfo = lsm->lsm_oinfo[stripe];
773         if (!osc_res_name_eq(loinfo->loi_id, loinfo->loi_gr,
774                             &lock->l_resource->lr_name)){
775                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
776                            loinfo->loi_id, loinfo->loi_gr);
777                 RETURN(-ELDLM_NO_LOCK_DATA);
778         }
779
780         RETURN(stripe);
781 }
782
783 /* Get extra page reference to ensure it is not going away */
784 void ll_pin_extent_cb(void *data)
785 {
786         struct page *page = data;
787         
788         page_cache_get(page);
789
790         return;
791 }
792
793 /* Flush the page from page cache for an extent as its canceled.
794  * Page to remove is delivered as @data.
795  *
796  * No one can dirty the extent until we've finished our work and they cannot
797  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
798  * but other kernel actors could have pages locked.
799  *
800  * If @discard is set, there is no need to write the page if it is dirty.
801  *
802  * Called with the DLM lock held. */
803 int ll_page_removal_cb(void *data, int discard)
804 {
805         int rc;
806         struct page *page = data;
807         struct address_space *mapping;
808  
809         ENTRY;
810
811         /* We have page reference already from ll_pin_page */
812         lock_page(page);
813
814         /* Already truncated by somebody */
815         if (!page->mapping)
816                 GOTO(out, rc = 0);
817         mapping = page->mapping;
818
819         ll_teardown_mmaps(mapping,
820                           (__u64)page->index << PAGE_CACHE_SHIFT,
821                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
822                                                               ~PAGE_CACHE_MASK);        
823         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
824
825         if (!discard && clear_page_dirty_for_io(page)) {
826                 LASSERT(page->mapping);
827                 rc = ll_call_writepage(page->mapping->host, page);
828                 /* either waiting for io to complete or reacquiring
829                  * the lock that the failed writepage released */
830                 lock_page(page);
831                 wait_on_page_writeback(page);
832                 if (rc != 0) {
833                         CERROR("writepage inode %lu(%p) of page %p "
834                                "failed: %d\n", mapping->host->i_ino,
835                                mapping->host, page, rc);
836                         if (rc == -ENOSPC)
837                                 set_bit(AS_ENOSPC, &mapping->flags);
838                         else
839                                 set_bit(AS_EIO, &mapping->flags);
840                 }
841                 set_bit(AS_EIO, &mapping->flags);
842         }
843         if (page->mapping != NULL) {
844                 struct ll_async_page *llap = llap_cast_private(page);
845                 /* checking again to account for writeback's lock_page() */
846                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
847                 if (llap)
848                         ll_ra_accounting(llap, page->mapping);
849                 ll_truncate_complete_page(page);
850         }
851         EXIT;
852 out:
853         LASSERT(!PageWriteback(page));
854         unlock_page(page);
855         page_cache_release(page);
856
857         return 0;
858 }
859
860 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
861                              void *data, int flag)
862 {
863         struct inode *inode;
864         struct ll_inode_info *lli;
865         struct lov_stripe_md *lsm;
866         int stripe;
867         __u64 kms;
868
869         ENTRY;
870
871         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
872                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
873                 LBUG();
874         }
875
876         inode = ll_inode_from_lock(lock);
877         if (inode == NULL)
878                 RETURN(0);
879         lli = ll_i2info(inode);
880         if (lli == NULL)
881                 GOTO(iput, 0);
882         if (lli->lli_smd == NULL)
883                 GOTO(iput, 0);
884         lsm = lli->lli_smd;
885
886         stripe = ll_lock_to_stripe_offset(inode, lock);
887         if (stripe < 0)
888                 GOTO(iput, 0);
889
890         lov_stripe_lock(lsm);
891         lock_res_and_lock(lock);
892         kms = ldlm_extent_shift_kms(lock,
893                                     lsm->lsm_oinfo[stripe]->loi_kms);
894
895         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
896                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
897                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
898         lsm->lsm_oinfo[stripe]->loi_kms = kms;
899         unlock_res_and_lock(lock);
900         lov_stripe_unlock(lsm);
901         ll_queue_done_writing(inode, 0);
902         EXIT;
903 iput:
904         iput(inode);
905
906         return 0;
907 }
908
909 #if 0
910 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
911 {
912         /* XXX ALLOCATE - 160 bytes */
913         struct inode *inode = ll_inode_from_lock(lock);
914         struct ll_inode_info *lli = ll_i2info(inode);
915         struct lustre_handle lockh = { 0 };
916         struct ost_lvb *lvb;
917         int stripe;
918         ENTRY;
919
920         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
921                      LDLM_FL_BLOCK_CONV)) {
922                 LBUG(); /* not expecting any blocked async locks yet */
923                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
924                            "lock, returning");
925                 ldlm_lock_dump(D_OTHER, lock, 0);
926                 ldlm_reprocess_all(lock->l_resource);
927                 RETURN(0);
928         }
929
930         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
931
932         stripe = ll_lock_to_stripe_offset(inode, lock);
933         if (stripe < 0)
934                 goto iput;
935
936         if (lock->l_lvb_len) {
937                 struct lov_stripe_md *lsm = lli->lli_smd;
938                 __u64 kms;
939                 lvb = lock->l_lvb_data;
940                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
941
942                 lock_res_and_lock(lock);
943                 ll_inode_size_lock(inode, 1);
944                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
945                 kms = ldlm_extent_shift_kms(NULL, kms);
946                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
947                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
948                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
949                 lsm->lsm_oinfo[stripe].loi_kms = kms;
950                 ll_inode_size_unlock(inode, 1);
951                 unlock_res_and_lock(lock);
952         }
953
954 iput:
955         iput(inode);
956         wake_up(&lock->l_waitq);
957
958         ldlm_lock2handle(lock, &lockh);
959         ldlm_lock_decref(&lockh, LCK_PR);
960         RETURN(0);
961 }
962 #endif
963
964 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
965 {
966         struct ptlrpc_request *req = reqp;
967         struct inode *inode = ll_inode_from_lock(lock);
968         struct ll_inode_info *lli;
969         struct lov_stripe_md *lsm;
970         struct ost_lvb *lvb;
971         int rc, stripe;
972         ENTRY;
973
974         if (inode == NULL)
975                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
976         lli = ll_i2info(inode);
977         if (lli == NULL)
978                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
979         lsm = lli->lli_smd;
980         if (lsm == NULL)
981                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
982
983         /* First, find out which stripe index this lock corresponds to. */
984         stripe = ll_lock_to_stripe_offset(inode, lock);
985         if (stripe < 0)
986                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
987
988         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
989         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
990                              sizeof(*lvb));
991         rc = req_capsule_server_pack(&req->rq_pill);
992         if (rc) {
993                 CERROR("lustre_pack_reply: %d\n", rc);
994                 GOTO(iput, rc);
995         }
996
997         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
998         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
999         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1000         lvb->lvb_atime = LTIME_S(inode->i_atime);
1001         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1002
1003         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1004                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1005                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1006                    lvb->lvb_atime, lvb->lvb_ctime);
1007  iput:
1008         iput(inode);
1009
1010  out:
1011         /* These errors are normal races, so we don't want to fill the console
1012          * with messages by calling ptlrpc_error() */
1013         if (rc == -ELDLM_NO_LOCK_DATA)
1014                 lustre_pack_reply(req, 1, NULL, NULL);
1015
1016         req->rq_status = rc;
1017         return rc;
1018 }
1019
1020 static int ll_merge_lvb(struct inode *inode)
1021 {
1022         struct ll_inode_info *lli = ll_i2info(inode);
1023         struct ll_sb_info *sbi = ll_i2sbi(inode);
1024         struct ost_lvb lvb;
1025         int rc;
1026
1027         ENTRY;
1028
1029         ll_inode_size_lock(inode, 1);
1030         inode_init_lvb(inode, &lvb);
1031         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1032         i_size_write(inode, lvb.lvb_size);
1033         inode->i_blocks = lvb.lvb_blocks;
1034
1035         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1036         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1037         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1038         ll_inode_size_unlock(inode, 1);
1039
1040         RETURN(rc);
1041 }
1042
1043 int ll_local_size(struct inode *inode)
1044 {
1045         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1046         struct ll_inode_info *lli = ll_i2info(inode);
1047         struct ll_sb_info *sbi = ll_i2sbi(inode);
1048         struct lustre_handle lockh = { 0 };
1049         int flags = 0;
1050         int rc;
1051         ENTRY;
1052
1053         if (lli->lli_smd->lsm_stripe_count == 0)
1054                 RETURN(0);
1055
1056         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1057                        &policy, LCK_PR, &flags, inode, &lockh);
1058         if (rc < 0)
1059                 RETURN(rc);
1060         else if (rc == 0)
1061                 RETURN(-ENODATA);
1062
1063         rc = ll_merge_lvb(inode);
1064         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1065         RETURN(rc);
1066 }
1067
1068 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1069                      lstat_t *st)
1070 {
1071         struct lustre_handle lockh = { 0 };
1072         struct ldlm_enqueue_info einfo = { 0 };
1073         struct obd_info oinfo = { { { 0 } } };
1074         struct ost_lvb lvb;
1075         int rc;
1076
1077         ENTRY;
1078
1079         einfo.ei_type = LDLM_EXTENT;
1080         einfo.ei_mode = LCK_PR;
1081         einfo.ei_cb_bl = osc_extent_blocking_cb;
1082         einfo.ei_cb_cp = ldlm_completion_ast;
1083         einfo.ei_cb_gl = ll_glimpse_callback;
1084         einfo.ei_cbdata = NULL;
1085
1086         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1087         oinfo.oi_lockh = &lockh;
1088         oinfo.oi_md = lsm;
1089         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1090
1091         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1092         if (rc == -ENOENT)
1093                 RETURN(rc);
1094         if (rc != 0) {
1095                 CERROR("obd_enqueue returned rc %d, "
1096                        "returning -EIO\n", rc);
1097                 RETURN(rc > 0 ? -EIO : rc);
1098         }
1099
1100         lov_stripe_lock(lsm);
1101         memset(&lvb, 0, sizeof(lvb));
1102         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1103         st->st_size = lvb.lvb_size;
1104         st->st_blocks = lvb.lvb_blocks;
1105         st->st_mtime = lvb.lvb_mtime;
1106         st->st_atime = lvb.lvb_atime;
1107         st->st_ctime = lvb.lvb_ctime;
1108         lov_stripe_unlock(lsm);
1109
1110         RETURN(rc);
1111 }
1112
1113 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1114  * file (because it prefers KMS over RSS when larger) */
1115 int ll_glimpse_size(struct inode *inode, int ast_flags)
1116 {
1117         struct ll_inode_info *lli = ll_i2info(inode);
1118         struct ll_sb_info *sbi = ll_i2sbi(inode);
1119         struct lustre_handle lockh = { 0 };
1120         struct ldlm_enqueue_info einfo = { 0 };
1121         struct obd_info oinfo = { { { 0 } } };
1122         int rc;
1123         ENTRY;
1124
1125         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1126                 RETURN(0);
1127
1128         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1129
1130         if (!lli->lli_smd) {
1131                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1132                 RETURN(0);
1133         }
1134
1135         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1136          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1137          *       won't revoke any conflicting DLM locks held. Instead,
1138          *       ll_glimpse_callback() will be called on each client
1139          *       holding a DLM lock against this file, and resulting size
1140          *       will be returned for each stripe. DLM lock on [0, EOF] is
1141          *       acquired only if there were no conflicting locks. */
1142         einfo.ei_type = LDLM_EXTENT;
1143         einfo.ei_mode = LCK_PR;
1144         einfo.ei_cb_bl = osc_extent_blocking_cb;
1145         einfo.ei_cb_cp = ldlm_completion_ast;
1146         einfo.ei_cb_gl = ll_glimpse_callback;
1147         einfo.ei_cbdata = inode;
1148
1149         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1150         oinfo.oi_lockh = &lockh;
1151         oinfo.oi_md = lli->lli_smd;
1152         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1153
1154         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1155         if (rc == -ENOENT)
1156                 RETURN(rc);
1157         if (rc != 0) {
1158                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1159                 RETURN(rc > 0 ? -EIO : rc);
1160         }
1161
1162         rc = ll_merge_lvb(inode);
1163
1164         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1165                i_size_read(inode), (unsigned long long)inode->i_blocks);
1166
1167         RETURN(rc);
1168 }
1169
1170 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1171                    struct lov_stripe_md *lsm, int mode,
1172                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1173                    int ast_flags)
1174 {
1175         struct ll_sb_info *sbi = ll_i2sbi(inode);
1176         struct ost_lvb lvb;
1177         struct ldlm_enqueue_info einfo = { 0 };
1178         struct obd_info oinfo = { { { 0 } } };
1179         int rc;
1180         ENTRY;
1181
1182         LASSERT(!lustre_handle_is_used(lockh));
1183         LASSERT(lsm != NULL);
1184
1185         /* don't drop the mmapped file to LRU */
1186         if (mapping_mapped(inode->i_mapping))
1187                 ast_flags |= LDLM_FL_NO_LRU;
1188
1189         /* XXX phil: can we do this?  won't it screw the file size up? */
1190         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1191             (sbi->ll_flags & LL_SBI_NOLCK))
1192                 RETURN(0);
1193
1194         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1195                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1196
1197         einfo.ei_type = LDLM_EXTENT;
1198         einfo.ei_mode = mode;
1199         einfo.ei_cb_bl = osc_extent_blocking_cb;
1200         einfo.ei_cb_cp = ldlm_completion_ast;
1201         einfo.ei_cb_gl = ll_glimpse_callback;
1202         einfo.ei_cbdata = inode;
1203
1204         oinfo.oi_policy = *policy;
1205         oinfo.oi_lockh = lockh;
1206         oinfo.oi_md = lsm;
1207         oinfo.oi_flags = ast_flags;
1208
1209         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1210         *policy = oinfo.oi_policy;
1211         if (rc > 0)
1212                 rc = -EIO;
1213
1214         ll_inode_size_lock(inode, 1);
1215         inode_init_lvb(inode, &lvb);
1216         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1217
1218         if (policy->l_extent.start == 0 &&
1219             policy->l_extent.end == OBD_OBJECT_EOF) {
1220                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1221                  * the kms under both a DLM lock and the
1222                  * ll_inode_size_lock().  If we don't get the
1223                  * ll_inode_size_lock() here we can match the DLM lock and
1224                  * reset i_size from the kms before the truncating path has
1225                  * updated the kms.  generic_file_write can then trust the
1226                  * stale i_size when doing appending writes and effectively
1227                  * cancel the result of the truncate.  Getting the
1228                  * ll_inode_size_lock() after the enqueue maintains the DLM
1229                  * -> ll_inode_size_lock() acquiring order. */
1230                 i_size_write(inode, lvb.lvb_size);
1231                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1232                        inode->i_ino, i_size_read(inode));
1233         }
1234
1235         if (rc == 0) {
1236                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1237                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1238                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1239         }
1240         ll_inode_size_unlock(inode, 1);
1241
1242         RETURN(rc);
1243 }
1244
1245 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1246                      struct lov_stripe_md *lsm, int mode,
1247                      struct lustre_handle *lockh)
1248 {
1249         struct ll_sb_info *sbi = ll_i2sbi(inode);
1250         int rc;
1251         ENTRY;
1252
1253         /* XXX phil: can we do this?  won't it screw the file size up? */
1254         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1255             (sbi->ll_flags & LL_SBI_NOLCK))
1256                 RETURN(0);
1257
1258         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1259
1260         RETURN(rc);
1261 }
1262
1263 static void ll_set_file_contended(struct inode *inode)
1264 {
1265         struct ll_inode_info *lli = ll_i2info(inode);
1266         cfs_time_t now = cfs_time_current();
1267
1268         spin_lock(&lli->lli_lock);
1269         lli->lli_contention_time = now;
1270         lli->lli_flags |= LLIF_CONTENDED;
1271         spin_unlock(&lli->lli_lock);
1272 }
1273
1274 void ll_clear_file_contended(struct inode *inode)
1275 {
1276         struct ll_inode_info *lli = ll_i2info(inode);
1277
1278         spin_lock(&lli->lli_lock);
1279         lli->lli_flags &= ~LLIF_CONTENDED;
1280         spin_unlock(&lli->lli_lock);
1281 }
1282
1283 static int ll_is_file_contended(struct file *file)
1284 {
1285         struct inode *inode = file->f_dentry->d_inode;
1286         struct ll_inode_info *lli = ll_i2info(inode);
1287         struct ll_sb_info *sbi = ll_i2sbi(inode);
1288         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1289         ENTRY;
1290
1291         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1292                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1293                        " osc connect flags = 0x"LPX64"\n",
1294                        sbi->ll_lco.lco_flags);
1295                 RETURN(0);
1296         }
1297         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1298                 RETURN(1);
1299         if (lli->lli_flags & LLIF_CONTENDED) {
1300                 cfs_time_t cur_time = cfs_time_current();
1301                 cfs_time_t retry_time;
1302
1303                 retry_time = cfs_time_add(
1304                         lli->lli_contention_time,
1305                         cfs_time_seconds(sbi->ll_contention_time));
1306                 if (cfs_time_after(cur_time, retry_time)) {
1307                         ll_clear_file_contended(inode);
1308                         RETURN(0);
1309                 }
1310                 RETURN(1);
1311         }
1312         RETURN(0);
1313 }
1314
1315 static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
1316                                  const char *buf, size_t count,
1317                                  loff_t start, loff_t end, int rw)
1318 {
1319         int append;
1320         int tree_locked = 0;
1321         int rc;
1322         struct inode * inode = file->f_dentry->d_inode;
1323         ENTRY;
1324
1325         append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
1326
1327         if (append || !ll_is_file_contended(file)) {
1328                 struct ll_lock_tree_node *node;
1329                 int ast_flags;
1330
1331                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1332                 if (file->f_flags & O_NONBLOCK)
1333                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1334                 node = ll_node_from_inode(inode, start, end,
1335                                           (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
1336                 if (IS_ERR(node)) {
1337                         rc = PTR_ERR(node);
1338                         GOTO(out, rc);
1339                 }
1340                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1341                 rc = ll_tree_lock(tree, node, buf, count, ast_flags);
1342                 if (rc == 0)
1343                         tree_locked = 1;
1344                 else if (rc == -EUSERS)
1345                         ll_set_file_contended(inode);
1346                 else
1347                         GOTO(out, rc);
1348         }
1349         RETURN(tree_locked);
1350 out:
1351         return rc;
1352 }
1353
1354 /**
1355  * Checks if requested extent lock is compatible with a lock under a page.
1356  *
1357  * Checks if the lock under \a page is compatible with a read or write lock
1358  * (specified by \a rw) for an extent [\a start , \a end].
1359  *
1360  * \param page the page under which lock is considered
1361  * \param rw OBD_BRW_READ if requested for reading,
1362  *           OBD_BRW_WRITE if requested for writing
1363  * \param start start of the requested extent
1364  * \param end end of the requested extent
1365  * \param cookie transparent parameter for passing locking context
1366  *
1367  * \post result == 1, *cookie == context, appropriate lock is referenced or
1368  * \post result == 0
1369  *
1370  * \retval 1 owned lock is reused for the request
1371  * \retval 0 no lock reused for the request
1372  *
1373  * \see ll_release_short_lock
1374  */
1375 static int ll_reget_short_lock(struct page *page, int rw,
1376                                obd_off start, obd_off end,
1377                                void **cookie)
1378 {
1379         struct ll_async_page *llap;
1380         struct obd_export *exp;
1381         struct inode *inode = page->mapping->host;
1382
1383         ENTRY;
1384
1385         exp = ll_i2dtexp(inode);
1386         if (exp == NULL)
1387                 RETURN(0);
1388
1389         llap = llap_cast_private(page);
1390         if (llap == NULL)
1391                 RETURN(0);
1392
1393         RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
1394                                     &llap->llap_cookie, rw, start, end,
1395                                     cookie));
1396 }
1397
1398 /**
1399  * Releases a reference to a lock taken in a "fast" way.
1400  *
1401  * Releases a read or a write (specified by \a rw) lock
1402  * referenced by \a cookie.
1403  *
1404  * \param inode inode to which data belong
1405  * \param end end of the locked extent
1406  * \param rw OBD_BRW_READ if requested for reading,
1407  *           OBD_BRW_WRITE if requested for writing
1408  * \param cookie transparent parameter for passing locking context
1409  *
1410  * \post appropriate lock is dereferenced
1411  *
1412  * \see ll_reget_short_lock
1413  */
1414 static void ll_release_short_lock(struct inode *inode, obd_off end,
1415                                   void *cookie, int rw)
1416 {
1417         struct obd_export *exp;
1418         int rc;
1419
1420         exp = ll_i2dtexp(inode);
1421         if (exp == NULL)
1422                 return;
1423
1424         rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
1425                                     cookie, rw);
1426         if (rc < 0)
1427                 CERROR("unlock failed (%d)\n", rc);
1428 }
1429
1430 /**
1431  * Checks if requested extent lock is compatible
1432  * with a lock under a page in page cache.
1433  *
1434  * Checks if a lock under some \a page is compatible with a read or write lock
1435  * (specified by \a rw) for an extent [\a start , \a end].
1436  *
1437  * \param file the file under which lock is considered
1438  * \param rw OBD_BRW_READ if requested for reading,
1439  *           OBD_BRW_WRITE if requested for writing
1440  * \param ppos start of the requested extent
1441  * \param end end of the requested extent
1442  * \param cookie transparent parameter for passing locking context
1443  * \param buf userspace buffer for the data
1444  *
1445  * \post result == 1, *cookie == context, appropriate lock is referenced
1446  * \post retuls == 0
1447  *
1448  * \retval 1 owned lock is reused for the request
1449  * \retval 0 no lock reused for the request
1450  *
1451  * \see ll_file_put_fast_lock
1452  */
1453 static inline int ll_file_get_fast_lock(struct file *file,
1454                                         obd_off ppos, obd_off end,
1455                                         char *buf, void **cookie, int rw)
1456 {
1457         int rc = 0;
1458         struct page *page;
1459
1460         ENTRY;
1461
1462         if (!ll_region_mapped((unsigned long)buf, end - ppos)) {
1463                 page = find_lock_page(file->f_dentry->d_inode->i_mapping,
1464                                       ppos >> CFS_PAGE_SHIFT);
1465                 if (page) {
1466                         if (ll_reget_short_lock(page, rw, ppos, end, cookie))
1467                                 rc = 1;
1468
1469                         unlock_page(page);
1470                         page_cache_release(page);
1471                 }
1472         }
1473
1474         RETURN(rc);
1475 }
1476
1477 /**
1478  * Releases a reference to a lock taken in a "fast" way.
1479  *
1480  * Releases a read or a write (specified by \a rw) lock
1481  * referenced by \a cookie.
1482  *
1483  * \param inode inode to which data belong
1484  * \param end end of the locked extent
1485  * \param rw OBD_BRW_READ if requested for reading,
1486  *           OBD_BRW_WRITE if requested for writing
1487  * \param cookie transparent parameter for passing locking context
1488  *
1489  * \post appropriate lock is dereferenced
1490  *
1491  * \see ll_file_get_fast_lock
1492  */
1493 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
1494                                          void *cookie, int rw)
1495 {
1496         ll_release_short_lock(inode, end, cookie, rw);
1497 }
1498
1499 enum ll_lock_style {
1500         LL_LOCK_STYLE_NOLOCK   = 0,
1501         LL_LOCK_STYLE_FASTLOCK = 1,
1502         LL_LOCK_STYLE_TREELOCK = 2
1503 };
1504
1505 /**
1506  * Checks if requested extent lock is compatible with a lock 
1507  * under a page cache page.
1508  *
1509  * Checks if the lock under \a page is compatible with a read or write lock
1510  * (specified by \a rw) for an extent [\a start , \a end].
1511  *
1512  * \param file file under which I/O is processed
1513  * \param rw OBD_BRW_READ if requested for reading,
1514  *           OBD_BRW_WRITE if requested for writing
1515  * \param ppos start of the requested extent
1516  * \param end end of the requested extent
1517  * \param cookie transparent parameter for passing locking context
1518  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1519  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1520  * \param buf userspace buffer for the data
1521  *
1522  * \retval LL_LOCK_STYLE_FASTLOCK owned lock is reused through fast lock
1523  * \retval LL_LOCK_STYLE_TREELOCK got a lock through tree lock
1524  * \retval LL_LOCK_STYLE_NOLOCK got no lock
1525  *
1526  * \see ll_file_put_lock
1527  */
1528 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
1529                                    obd_off end, char *buf, void **cookie,
1530                                    struct ll_lock_tree *tree, int rw)
1531 {
1532         int rc;
1533
1534         ENTRY;
1535
1536         if (ll_file_get_fast_lock(file, ppos, end, buf, cookie, rw))
1537                 RETURN(LL_LOCK_STYLE_FASTLOCK);
1538
1539         rc = ll_file_get_tree_lock(tree, file, buf, ppos - end, ppos, end, rw);
1540         /* rc: 1 for tree lock, 0 for no lock, <0 for error */
1541         switch (rc) {
1542         case 1:
1543                 RETURN(LL_LOCK_STYLE_TREELOCK);
1544         case 0:
1545                 RETURN(LL_LOCK_STYLE_NOLOCK);
1546         }
1547
1548         /* an error happened if we reached this point, rc = -errno here */
1549         RETURN(rc);
1550 }
1551
1552 /**
1553  * Drops the lock taken by ll_file_get_lock.
1554  *
1555  * Releases a read or a write (specified by \a rw) lock
1556  * referenced by \a tree or \a cookie.
1557  *
1558  * \param inode inode to which data belong
1559  * \param end end of the locked extent
1560  * \param lockstyle facility through which the lock was taken
1561  * \param rw OBD_BRW_READ if requested for reading,
1562  *           OBD_BRW_WRITE if requested for writing
1563  * \param cookie transparent parameter for passing locking context
1564  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1565  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1566  *
1567  * \post appropriate lock is dereferenced
1568  *
1569  * \see ll_file_get_lock
1570  */
1571 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
1572                                     enum ll_lock_style lock_style,
1573                                     void *cookie, struct ll_lock_tree *tree,
1574                                     int rw)
1575
1576 {
1577         switch (lock_style) {
1578         case LL_LOCK_STYLE_TREELOCK:
1579                 ll_tree_unlock(tree);
1580                 break;
1581         case LL_LOCK_STYLE_FASTLOCK:
1582                 ll_file_put_fast_lock(inode, end, cookie, rw);
1583                 break;
1584         default:
1585                 CERROR("invalid locking style (%d)\n", lock_style);
1586         }
1587 }
1588
1589 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1590                             loff_t *ppos)
1591 {
1592         struct inode *inode = file->f_dentry->d_inode;
1593         struct ll_inode_info *lli = ll_i2info(inode);
1594         struct lov_stripe_md *lsm = lli->lli_smd;
1595         struct ll_sb_info *sbi = ll_i2sbi(inode);
1596         struct ll_lock_tree tree;
1597         struct ost_lvb lvb;
1598         struct ll_ra_read bead;
1599         int ra = 0;
1600         obd_off end;
1601         ssize_t retval, chunk, sum = 0;
1602         int lock_style;
1603         void *cookie;
1604
1605         __u64 kms;
1606         ENTRY;
1607         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1608                inode->i_ino, inode->i_generation, inode, count, *ppos);
1609         /* "If nbyte is 0, read() will return 0 and have no other results."
1610          *                      -- Single Unix Spec */
1611         if (count == 0)
1612                 RETURN(0);
1613
1614         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1615
1616         if (!lsm) {
1617                 /* Read on file with no objects should return zero-filled
1618                  * buffers up to file size (we can get non-zero sizes with
1619                  * mknod + truncate, then opening file for read. This is a
1620                  * common pattern in NFS case, it seems). Bug 6243 */
1621                 int notzeroed;
1622                 /* Since there are no objects on OSTs, we have nothing to get
1623                  * lock on and so we are forced to access inode->i_size
1624                  * unguarded */
1625
1626                 /* Read beyond end of file */
1627                 if (*ppos >= i_size_read(inode))
1628                         RETURN(0);
1629
1630                 if (count > i_size_read(inode) - *ppos)
1631                         count = i_size_read(inode) - *ppos;
1632                 /* Make sure to correctly adjust the file pos pointer for
1633                  * EFAULT case */
1634                 notzeroed = clear_user(buf, count);
1635                 count -= notzeroed;
1636                 *ppos += count;
1637                 if (!count)
1638                         RETURN(-EFAULT);
1639                 RETURN(count);
1640         }
1641 repeat:
1642         if (sbi->ll_max_rw_chunk != 0) {
1643                 /* first, let's know the end of the current stripe */
1644                 end = *ppos;
1645                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, &end);
1646
1647                 /* correct, the end is beyond the request */
1648                 if (end > *ppos + count - 1)
1649                         end = *ppos + count - 1;
1650
1651                 /* and chunk shouldn't be too large even if striping is wide */
1652                 if (end - *ppos > sbi->ll_max_rw_chunk)
1653                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1654         } else {
1655                 end = *ppos + count - 1;
1656         }
1657
1658         lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
1659                                       buf, &cookie, &tree, OBD_BRW_READ);
1660         if (lock_style < 0)
1661                 GOTO(out, retval = lock_style);
1662
1663         ll_inode_size_lock(inode, 1);
1664         /*
1665          * Consistency guarantees: following possibilities exist for the
1666          * relation between region being read and real file size at this
1667          * moment:
1668          *
1669          *  (A): the region is completely inside of the file;
1670          *
1671          *  (B-x): x bytes of region are inside of the file, the rest is
1672          *  outside;
1673          *
1674          *  (C): the region is completely outside of the file.
1675          *
1676          * This classification is stable under DLM lock acquired by
1677          * ll_tree_lock() above, because to change class, other client has to
1678          * take DLM lock conflicting with our lock. Also, any updates to
1679          * ->i_size by other threads on this client are serialized by
1680          * ll_inode_size_lock(). This guarantees that short reads are handled
1681          * correctly in the face of concurrent writes and truncates.
1682          */
1683         inode_init_lvb(inode, &lvb);
1684         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1685         kms = lvb.lvb_size;
1686         if (*ppos + count - 1 > kms) {
1687                 /* A glimpse is necessary to determine whether we return a
1688                  * short read (B) or some zeroes at the end of the buffer (C) */
1689                 ll_inode_size_unlock(inode, 1);
1690                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1691                 if (retval) {
1692                         if (lock_style != LL_LOCK_STYLE_NOLOCK)
1693                                 ll_file_put_lock(inode, end, lock_style,
1694                                                  cookie, &tree, OBD_BRW_READ);
1695                         goto out;
1696                 }
1697         } else {
1698                 /* region is within kms and, hence, within real file size (A).
1699                  * We need to increase i_size to cover the read region so that
1700                  * generic_file_read() will do its job, but that doesn't mean
1701                  * the kms size is _correct_, it is only the _minimum_ size.
1702                  * If someone does a stat they will get the correct size which
1703                  * will always be >= the kms value here.  b=11081 */
1704                 if (i_size_read(inode) < kms)
1705                         i_size_write(inode, kms);
1706                 ll_inode_size_unlock(inode, 1);
1707         }
1708
1709         chunk = end - *ppos + 1;
1710         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1711                inode->i_ino, chunk, *ppos, i_size_read(inode));
1712
1713         if (lock_style != LL_LOCK_STYLE_NOLOCK) {
1714                 /* turn off the kernel's read-ahead */
1715                 file->f_ra.ra_pages = 0;
1716
1717                 /* initialize read-ahead window once per syscall */
1718                 if (ra == 0) {
1719                         ra = 1;
1720                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1721                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1722                         ll_ra_read_in(file, &bead);
1723                 }
1724
1725                 /* BUG: 5972 */
1726                 file_accessed(file);
1727                 retval = generic_file_read(file, buf, chunk, ppos);
1728                 ll_file_put_lock(inode, end, lock_style, cookie, &tree, 
1729                                  OBD_BRW_READ);
1730         } else {
1731                 retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
1732         }
1733
1734         ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
1735
1736         if (retval > 0) {
1737                 buf += retval;
1738                 count -= retval;
1739                 sum += retval;
1740                 if (retval == chunk && count > 0)
1741                         goto repeat;
1742         }
1743
1744  out:
1745         if (ra != 0)
1746                 ll_ra_read_ex(file, &bead);
1747         retval = (sum > 0) ? sum : retval;
1748         RETURN(retval);
1749 }
1750
1751 /*
1752  * Write to a file (through the page cache).
1753  */
1754 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1755                              loff_t *ppos)
1756 {
1757         struct inode *inode = file->f_dentry->d_inode;
1758         struct ll_sb_info *sbi = ll_i2sbi(inode);
1759         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1760         struct ll_lock_tree tree;
1761         loff_t maxbytes = ll_file_maxbytes(inode);
1762         loff_t lock_start, lock_end, end;
1763         ssize_t retval, chunk, sum = 0;
1764         int tree_locked;
1765         ENTRY;
1766
1767         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1768                inode->i_ino, inode->i_generation, inode, count, *ppos);
1769
1770         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1771
1772         /* POSIX, but surprised the VFS doesn't check this already */
1773         if (count == 0)
1774                 RETURN(0);
1775
1776         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1777          * called on the file, don't fail the below assertion (bug 2388). */
1778         if (file->f_flags & O_LOV_DELAY_CREATE &&
1779             ll_i2info(inode)->lli_smd == NULL)
1780                 RETURN(-EBADF);
1781
1782         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1783
1784         down(&ll_i2info(inode)->lli_write_sem);
1785
1786 repeat:
1787         chunk = 0; /* just to fix gcc's warning */
1788         end = *ppos + count - 1;
1789
1790         if (file->f_flags & O_APPEND) {
1791                 lock_start = 0;
1792                 lock_end = OBD_OBJECT_EOF;
1793         } else if (sbi->ll_max_rw_chunk != 0) {
1794                 /* first, let's know the end of the current stripe */
1795                 end = *ppos;
1796                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1797                                 (obd_off *)&end);
1798
1799                 /* correct, the end is beyond the request */
1800                 if (end > *ppos + count - 1)
1801                         end = *ppos + count - 1;
1802
1803                 /* and chunk shouldn't be too large even if striping is wide */
1804                 if (end - *ppos > sbi->ll_max_rw_chunk)
1805                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1806                 lock_start = *ppos;
1807                 lock_end = end;
1808         } else {
1809                 lock_start = *ppos;
1810                 lock_end = *ppos + count - 1;
1811         }
1812
1813         tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
1814                                             lock_start, lock_end, OBD_BRW_WRITE);
1815         if (tree_locked < 0)
1816                 GOTO(out, retval = tree_locked);
1817
1818         /* This is ok, g_f_w will overwrite this under i_sem if it races
1819          * with a local truncate, it just makes our maxbyte checking easier.
1820          * The i_size value gets updated in ll_extent_lock() as a consequence
1821          * of the [0,EOF] extent lock we requested above. */
1822         if (file->f_flags & O_APPEND) {
1823                 *ppos = i_size_read(inode);
1824                 end = *ppos + count - 1;
1825         }
1826
1827         if (*ppos >= maxbytes) {
1828                 send_sig(SIGXFSZ, current, 0);
1829                 GOTO(out_unlock, retval = -EFBIG);
1830         }
1831         if (end > maxbytes - 1)
1832                 end = maxbytes - 1;
1833
1834         /* generic_file_write handles O_APPEND after getting i_mutex */
1835         chunk = end - *ppos + 1;
1836         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1837                inode->i_ino, chunk, *ppos);
1838         if (tree_locked)
1839                 retval = generic_file_write(file, buf, chunk, ppos);
1840         else
1841                 retval = ll_file_lockless_io(file, (char*)buf, chunk,
1842                                              ppos, WRITE);
1843         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1844
1845 out_unlock:
1846         if (tree_locked)
1847                 ll_tree_unlock(&tree);
1848
1849 out:
1850         if (retval > 0) {
1851                 buf += retval;
1852                 count -= retval;
1853                 sum += retval;
1854                 if (retval == chunk && count > 0)
1855                         goto repeat;
1856         }
1857
1858         up(&ll_i2info(inode)->lli_write_sem);
1859
1860         retval = (sum > 0) ? sum : retval;
1861         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1862                            retval > 0 ? retval : 0);
1863         RETURN(retval);
1864 }
1865
1866 /*
1867  * Send file content (through pagecache) somewhere with helper
1868  */
1869 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1870                                 read_actor_t actor, void *target)
1871 {
1872         struct inode *inode = in_file->f_dentry->d_inode;
1873         struct ll_inode_info *lli = ll_i2info(inode);
1874         struct lov_stripe_md *lsm = lli->lli_smd;
1875         struct ll_lock_tree tree;
1876         struct ll_lock_tree_node *node;
1877         struct ost_lvb lvb;
1878         struct ll_ra_read bead;
1879         int rc;
1880         ssize_t retval;
1881         __u64 kms;
1882         ENTRY;
1883         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1884                inode->i_ino, inode->i_generation, inode, count, *ppos);
1885
1886         /* "If nbyte is 0, read() will return 0 and have no other results."
1887          *                      -- Single Unix Spec */
1888         if (count == 0)
1889                 RETURN(0);
1890
1891         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1892         /* turn off the kernel's read-ahead */
1893         in_file->f_ra.ra_pages = 0;
1894
1895         /* File with no objects, nothing to lock */
1896         if (!lsm)
1897                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1898
1899         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1900         if (IS_ERR(node))
1901                 RETURN(PTR_ERR(node));
1902
1903         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1904         rc = ll_tree_lock(&tree, node, NULL, count,
1905                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1906         if (rc != 0)
1907                 RETURN(rc);
1908
1909         ll_clear_file_contended(inode);
1910         ll_inode_size_lock(inode, 1);
1911         /*
1912          * Consistency guarantees: following possibilities exist for the
1913          * relation between region being read and real file size at this
1914          * moment:
1915          *
1916          *  (A): the region is completely inside of the file;
1917          *
1918          *  (B-x): x bytes of region are inside of the file, the rest is
1919          *  outside;
1920          *
1921          *  (C): the region is completely outside of the file.
1922          *
1923          * This classification is stable under DLM lock acquired by
1924          * ll_tree_lock() above, because to change class, other client has to
1925          * take DLM lock conflicting with our lock. Also, any updates to
1926          * ->i_size by other threads on this client are serialized by
1927          * ll_inode_size_lock(). This guarantees that short reads are handled
1928          * correctly in the face of concurrent writes and truncates.
1929          */
1930         inode_init_lvb(inode, &lvb);
1931         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1932         kms = lvb.lvb_size;
1933         if (*ppos + count - 1 > kms) {
1934                 /* A glimpse is necessary to determine whether we return a
1935                  * short read (B) or some zeroes at the end of the buffer (C) */
1936                 ll_inode_size_unlock(inode, 1);
1937                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1938                 if (retval)
1939                         goto out;
1940         } else {
1941                 /* region is within kms and, hence, within real file size (A) */
1942                 i_size_write(inode, kms);
1943                 ll_inode_size_unlock(inode, 1);
1944         }
1945
1946         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1947                inode->i_ino, count, *ppos, i_size_read(inode));
1948
1949         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1950         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1951         ll_ra_read_in(in_file, &bead);
1952         /* BUG: 5972 */
1953         file_accessed(in_file);
1954         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1955         ll_ra_read_ex(in_file, &bead);
1956
1957  out:
1958         ll_tree_unlock(&tree);
1959         RETURN(retval);
1960 }
1961
1962 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1963                                unsigned long arg)
1964 {
1965         struct ll_inode_info *lli = ll_i2info(inode);
1966         struct obd_export *exp = ll_i2dtexp(inode);
1967         struct ll_recreate_obj ucreatp;
1968         struct obd_trans_info oti = { 0 };
1969         struct obdo *oa = NULL;
1970         int lsm_size;
1971         int rc = 0;
1972         struct lov_stripe_md *lsm, *lsm2;
1973         ENTRY;
1974
1975         if (!capable (CAP_SYS_ADMIN))
1976                 RETURN(-EPERM);
1977
1978         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1979                             sizeof(struct ll_recreate_obj));
1980         if (rc) {
1981                 RETURN(-EFAULT);
1982         }
1983         OBDO_ALLOC(oa);
1984         if (oa == NULL)
1985                 RETURN(-ENOMEM);
1986
1987         down(&lli->lli_size_sem);
1988         lsm = lli->lli_smd;
1989         if (lsm == NULL)
1990                 GOTO(out, rc = -ENOENT);
1991         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1992                    (lsm->lsm_stripe_count));
1993
1994         OBD_ALLOC(lsm2, lsm_size);
1995         if (lsm2 == NULL)
1996                 GOTO(out, rc = -ENOMEM);
1997
1998         oa->o_id = ucreatp.lrc_id;
1999         oa->o_gr = ucreatp.lrc_group;
2000         oa->o_nlink = ucreatp.lrc_ost_idx;
2001         oa->o_flags |= OBD_FL_RECREATE_OBJS;
2002         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
2003         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2004                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2005
2006         memcpy(lsm2, lsm, lsm_size);
2007         rc = obd_create(exp, oa, &lsm2, &oti);
2008
2009         OBD_FREE(lsm2, lsm_size);
2010         GOTO(out, rc);
2011 out:
2012         up(&lli->lli_size_sem);
2013         OBDO_FREE(oa);
2014         return rc;
2015 }
2016
2017 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
2018                              int flags, struct lov_user_md *lum, int lum_size)
2019 {
2020         struct ll_inode_info *lli = ll_i2info(inode);
2021         struct lov_stripe_md *lsm;
2022         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
2023         int rc = 0;
2024         ENTRY;
2025
2026         down(&lli->lli_size_sem);
2027         lsm = lli->lli_smd;
2028         if (lsm) {
2029                 up(&lli->lli_size_sem);
2030                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
2031                        inode->i_ino);
2032                 RETURN(-EEXIST);
2033         }
2034
2035         rc = ll_intent_file_open(file, lum, lum_size, &oit);
2036         if (rc)
2037                 GOTO(out, rc);
2038         if (it_disposition(&oit, DISP_LOOKUP_NEG))
2039                 GOTO(out_req_free, rc = -ENOENT);
2040         rc = oit.d.lustre.it_status;
2041         if (rc < 0)
2042                 GOTO(out_req_free, rc);
2043
2044         ll_release_openhandle(file->f_dentry, &oit);
2045
2046  out:
2047         up(&lli->lli_size_sem);
2048         ll_intent_release(&oit);
2049         RETURN(rc);
2050 out_req_free:
2051         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2052         goto out;
2053 }
2054
2055 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
2056                              struct lov_mds_md **lmmp, int *lmm_size, 
2057                              struct ptlrpc_request **request)
2058 {
2059         struct ll_sb_info *sbi = ll_i2sbi(inode);
2060         struct mdt_body  *body;
2061         struct lov_mds_md *lmm = NULL;
2062         struct ptlrpc_request *req = NULL;
2063         struct obd_capa *oc;
2064         int rc, lmmsize;
2065
2066         rc = ll_get_max_mdsize(sbi, &lmmsize);
2067         if (rc)
2068                 RETURN(rc);
2069
2070         oc = ll_mdscapa_get(inode);
2071         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
2072                              oc, filename, strlen(filename) + 1,
2073                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
2074                              ll_i2suppgid(inode), &req);
2075         capa_put(oc);
2076         if (rc < 0) {
2077                 CDEBUG(D_INFO, "md_getattr_name failed "
2078                        "on %s: rc %d\n", filename, rc);
2079                 GOTO(out, rc);
2080         }
2081
2082         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2083         LASSERT(body != NULL); /* checked by mdc_getattr_name */
2084
2085         lmmsize = body->eadatasize;
2086
2087         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
2088                         lmmsize == 0) {
2089                 GOTO(out, rc = -ENODATA);
2090         }
2091
2092         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
2093         LASSERT(lmm != NULL);
2094
2095         /*
2096          * This is coming from the MDS, so is probably in
2097          * little endian.  We convert it to host endian before
2098          * passing it to userspace.
2099          */
2100         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
2101                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
2102                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
2103         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
2104                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
2105         }
2106
2107         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
2108                 struct lov_stripe_md *lsm;
2109                 struct lov_user_md_join *lmj;
2110                 int lmj_size, i, aindex = 0;
2111
2112                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
2113                 if (rc < 0)
2114                         GOTO(out, rc = -ENOMEM);
2115                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
2116                 if (rc)
2117                         GOTO(out_free_memmd, rc);
2118
2119                 lmj_size = sizeof(struct lov_user_md_join) +
2120                            lsm->lsm_stripe_count *
2121                            sizeof(struct lov_user_ost_data_join);
2122                 OBD_ALLOC(lmj, lmj_size);
2123                 if (!lmj)
2124                         GOTO(out_free_memmd, rc = -ENOMEM);
2125
2126                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
2127                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2128                         struct lov_extent *lex =
2129                                 &lsm->lsm_array->lai_ext_array[aindex];
2130
2131                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
2132                                 aindex ++;
2133                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
2134                                         LPU64" len %d\n", aindex, i,
2135                                         lex->le_start, (int)lex->le_len);
2136                         lmj->lmm_objects[i].l_extent_start =
2137                                 lex->le_start;
2138
2139                         if ((int)lex->le_len == -1)
2140                                 lmj->lmm_objects[i].l_extent_end = -1;
2141                         else
2142                                 lmj->lmm_objects[i].l_extent_end =
2143                                         lex->le_start + lex->le_len;
2144                         lmj->lmm_objects[i].l_object_id =
2145                                 lsm->lsm_oinfo[i]->loi_id;
2146                         lmj->lmm_objects[i].l_object_gr =
2147                                 lsm->lsm_oinfo[i]->loi_gr;
2148                         lmj->lmm_objects[i].l_ost_gen =
2149                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2150                         lmj->lmm_objects[i].l_ost_idx =
2151                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2152                 }
2153                 lmm = (struct lov_mds_md *)lmj;
2154                 lmmsize = lmj_size;
2155 out_free_memmd:
2156                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
2157         }
2158 out:
2159         *lmmp = lmm;
2160         *lmm_size = lmmsize;
2161         *request = req;
2162         return rc;
2163 }
2164
2165 static int ll_lov_setea(struct inode *inode, struct file *file,
2166                             unsigned long arg)
2167 {
2168         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2169         struct lov_user_md  *lump;
2170         int lum_size = sizeof(struct lov_user_md) +
2171                        sizeof(struct lov_user_ost_data);
2172         int rc;
2173         ENTRY;
2174
2175         if (!capable (CAP_SYS_ADMIN))
2176                 RETURN(-EPERM);
2177
2178         OBD_ALLOC(lump, lum_size);
2179         if (lump == NULL) {
2180                 RETURN(-ENOMEM);
2181         }
2182         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2183         if (rc) {
2184                 OBD_FREE(lump, lum_size);
2185                 RETURN(-EFAULT);
2186         }
2187
2188         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2189
2190         OBD_FREE(lump, lum_size);
2191         RETURN(rc);
2192 }
2193
2194 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2195                             unsigned long arg)
2196 {
2197         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
2198         int rc;
2199         int flags = FMODE_WRITE;
2200         ENTRY;
2201
2202         /* Bug 1152: copy properly when this is no longer true */
2203         LASSERT(sizeof(lum) == sizeof(*lump));
2204         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
2205         rc = copy_from_user(&lum, lump, sizeof(lum));
2206         if (rc)
2207                 RETURN(-EFAULT);
2208
2209         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
2210         if (rc == 0) {
2211                  put_user(0, &lump->lmm_stripe_count);
2212                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
2213                                     0, ll_i2info(inode)->lli_smd, lump);
2214         }
2215         RETURN(rc);
2216 }
2217
2218 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2219 {
2220         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2221
2222         if (!lsm)
2223                 RETURN(-ENODATA);
2224
2225         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
2226                             (void *)arg);
2227 }
2228
2229 static int ll_get_grouplock(struct inode *inode, struct file *file,
2230                             unsigned long arg)
2231 {
2232         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2233         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2234                                                     .end = OBD_OBJECT_EOF}};
2235         struct lustre_handle lockh = { 0 };
2236         struct ll_inode_info *lli = ll_i2info(inode);
2237         struct lov_stripe_md *lsm = lli->lli_smd;
2238         int flags = 0, rc;
2239         ENTRY;
2240
2241         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2242                 RETURN(-EINVAL);
2243         }
2244
2245         policy.l_extent.gid = arg;
2246         if (file->f_flags & O_NONBLOCK)
2247                 flags = LDLM_FL_BLOCK_NOWAIT;
2248
2249         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2250         if (rc)
2251                 RETURN(rc);
2252
2253         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2254         fd->fd_gid = arg;
2255         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2256
2257         RETURN(0);
2258 }
2259
2260 static int ll_put_grouplock(struct inode *inode, struct file *file,
2261                             unsigned long arg)
2262 {
2263         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2264         struct ll_inode_info *lli = ll_i2info(inode);
2265         struct lov_stripe_md *lsm = lli->lli_smd;
2266         int rc;
2267         ENTRY;
2268
2269         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2270                 /* Ugh, it's already unlocked. */
2271                 RETURN(-EINVAL);
2272         }
2273
2274         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2275                 RETURN(-EINVAL);
2276
2277         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2278
2279         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2280         if (rc)
2281                 RETURN(rc);
2282
2283         fd->fd_gid = 0;
2284         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2285
2286         RETURN(0);
2287 }
2288
2289 static int join_sanity_check(struct inode *head, struct inode *tail)
2290 {
2291         ENTRY;
2292         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2293                 CERROR("server do not support join \n");
2294                 RETURN(-EINVAL);
2295         }
2296         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2297                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2298                        head->i_ino, tail->i_ino);
2299                 RETURN(-EINVAL);
2300         }
2301         if (head->i_ino == tail->i_ino) {
2302                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2303                 RETURN(-EINVAL);
2304         }
2305         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2306                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2307                 RETURN(-EINVAL);
2308         }
2309         RETURN(0);
2310 }
2311
2312 static int join_file(struct inode *head_inode, struct file *head_filp,
2313                      struct file *tail_filp)
2314 {
2315         struct dentry *tail_dentry = tail_filp->f_dentry;
2316         struct lookup_intent oit = {.it_op = IT_OPEN,
2317                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2318         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2319                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2320
2321         struct lustre_handle lockh;
2322         struct md_op_data *op_data;
2323         int    rc;
2324         loff_t data;
2325         ENTRY;
2326
2327         tail_dentry = tail_filp->f_dentry;
2328
2329         data = i_size_read(head_inode);
2330         op_data = ll_prep_md_op_data(NULL, head_inode,
2331                                      tail_dentry->d_parent->d_inode,
2332                                      tail_dentry->d_name.name,
2333                                      tail_dentry->d_name.len, 0,
2334                                      LUSTRE_OPC_ANY, &data);
2335         if (IS_ERR(op_data))
2336                 RETURN(PTR_ERR(op_data));
2337
2338         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2339                          op_data, &lockh, NULL, 0, 0);
2340
2341         ll_finish_md_op_data(op_data);
2342         if (rc < 0)
2343                 GOTO(out, rc);
2344
2345         rc = oit.d.lustre.it_status;
2346
2347         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2348                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2349                 ptlrpc_req_finished((struct ptlrpc_request *)
2350                                     oit.d.lustre.it_data);
2351                 GOTO(out, rc);
2352         }
2353
2354         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2355                                            * away */
2356                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2357                 oit.d.lustre.it_lock_mode = 0;
2358         }
2359         ll_release_openhandle(head_filp->f_dentry, &oit);
2360 out:
2361         ll_intent_release(&oit);
2362         RETURN(rc);
2363 }
2364
2365 static int ll_file_join(struct inode *head, struct file *filp,
2366                         char *filename_tail)
2367 {
2368         struct inode *tail = NULL, *first = NULL, *second = NULL;
2369         struct dentry *tail_dentry;
2370         struct file *tail_filp, *first_filp, *second_filp;
2371         struct ll_lock_tree first_tree, second_tree;
2372         struct ll_lock_tree_node *first_node, *second_node;
2373         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2374         int rc = 0, cleanup_phase = 0;
2375         ENTRY;
2376
2377         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2378                head->i_ino, head->i_generation, head, filename_tail);
2379
2380         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2381         if (IS_ERR(tail_filp)) {
2382                 CERROR("Can not open tail file %s", filename_tail);
2383                 rc = PTR_ERR(tail_filp);
2384                 GOTO(cleanup, rc);
2385         }
2386         tail = igrab(tail_filp->f_dentry->d_inode);
2387
2388         tlli = ll_i2info(tail);
2389         tail_dentry = tail_filp->f_dentry;
2390         LASSERT(tail_dentry);
2391         cleanup_phase = 1;
2392
2393         /*reorder the inode for lock sequence*/
2394         first = head->i_ino > tail->i_ino ? head : tail;
2395         second = head->i_ino > tail->i_ino ? tail : head;
2396         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2397         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2398
2399         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2400                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2401         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2402         if (IS_ERR(first_node)){
2403                 rc = PTR_ERR(first_node);
2404                 GOTO(cleanup, rc);
2405         }
2406         first_tree.lt_fd = first_filp->private_data;
2407         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2408         if (rc != 0)
2409                 GOTO(cleanup, rc);
2410         cleanup_phase = 2;
2411
2412         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2413         if (IS_ERR(second_node)){
2414                 rc = PTR_ERR(second_node);
2415                 GOTO(cleanup, rc);
2416         }
2417         second_tree.lt_fd = second_filp->private_data;
2418         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2419         if (rc != 0)
2420                 GOTO(cleanup, rc);
2421         cleanup_phase = 3;
2422
2423         rc = join_sanity_check(head, tail);
2424         if (rc)
2425                 GOTO(cleanup, rc);
2426
2427         rc = join_file(head, filp, tail_filp);
2428         if (rc)
2429                 GOTO(cleanup, rc);
2430 cleanup:
2431         switch (cleanup_phase) {
2432         case 3:
2433                 ll_tree_unlock(&second_tree);
2434                 obd_cancel_unused(ll_i2dtexp(second),
2435                                   ll_i2info(second)->lli_smd, 0, NULL);
2436         case 2:
2437                 ll_tree_unlock(&first_tree);
2438                 obd_cancel_unused(ll_i2dtexp(first),
2439                                   ll_i2info(first)->lli_smd, 0, NULL);
2440         case 1:
2441                 filp_close(tail_filp, 0);
2442                 if (tail)
2443                         iput(tail);
2444                 if (head && rc == 0) {
2445                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2446                                        &hlli->lli_smd);
2447                         hlli->lli_smd = NULL;
2448                 }
2449         case 0:
2450                 break;
2451         default:
2452                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2453                 LBUG();
2454         }
2455         RETURN(rc);
2456 }
2457
2458 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2459 {
2460         struct inode *inode = dentry->d_inode;
2461         struct obd_client_handle *och;
2462         int rc;
2463         ENTRY;
2464
2465         LASSERT(inode);
2466
2467         /* Root ? Do nothing. */
2468         if (dentry->d_inode->i_sb->s_root == dentry)
2469                 RETURN(0);
2470
2471         /* No open handle to close? Move away */
2472         if (!it_disposition(it, DISP_OPEN_OPEN))
2473                 RETURN(0);
2474
2475         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2476
2477         OBD_ALLOC(och, sizeof(*och));
2478         if (!och)
2479                 GOTO(out, rc = -ENOMEM);
2480
2481         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2482                     ll_i2info(inode), it, och);
2483
2484         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2485                                        inode, och);
2486  out:
2487         /* this one is in place of ll_file_open */
2488         ptlrpc_req_finished(it->d.lustre.it_data);
2489         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2490         RETURN(rc);
2491 }
2492
2493 /**
2494  * Get size for inode for which FIEMAP mapping is requested.
2495  * Make the FIEMAP get_info call and returns the result.
2496  */
2497 int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
2498               int num_bytes)
2499 {
2500         struct obd_export *exp = ll_i2dtexp(inode);
2501         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2502         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
2503         int vallen = num_bytes;
2504         int rc;
2505         ENTRY;
2506
2507         /* If the stripe_count > 1 and the application does not understand
2508          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
2509          */
2510         if (lsm->lsm_stripe_count > 1 &&
2511             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
2512                 return -EOPNOTSUPP;
2513
2514         fm_key.oa.o_id = lsm->lsm_object_id;
2515         fm_key.oa.o_gr = lsm->lsm_object_gr;
2516         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2517
2518         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLFID | OBD_MD_FLGROUP |
2519                         OBD_MD_FLSIZE);
2520
2521         /* If filesize is 0, then there would be no objects for mapping */
2522         if (fm_key.oa.o_size == 0) {
2523                 fiemap->fm_mapped_extents = 0;
2524                 RETURN(0);
2525         }
2526
2527         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
2528
2529         rc = obd_get_info(exp, sizeof(fm_key), &fm_key, &vallen, fiemap, lsm);
2530         if (rc)
2531                 CERROR("obd_get_info failed: rc = %d\n", rc);
2532
2533         RETURN(rc);
2534 }
2535
2536 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2537                   unsigned long arg)
2538 {
2539         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2540         int flags;
2541         ENTRY;
2542
2543         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2544                inode->i_generation, inode, cmd);
2545         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2546
2547         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2548         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2549                 RETURN(-ENOTTY);
2550
2551         switch(cmd) {
2552         case LL_IOC_GETFLAGS:
2553                 /* Get the current value of the file flags */
2554                 return put_user(fd->fd_flags, (int *)arg);
2555         case LL_IOC_SETFLAGS:
2556         case LL_IOC_CLRFLAGS:
2557                 /* Set or clear specific file flags */
2558                 /* XXX This probably needs checks to ensure the flags are
2559                  *     not abused, and to handle any flag side effects.
2560                  */
2561                 if (get_user(flags, (int *) arg))
2562                         RETURN(-EFAULT);
2563
2564                 if (cmd == LL_IOC_SETFLAGS) {
2565                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2566                             !(file->f_flags & O_DIRECT)) {
2567                                 CERROR("%s: unable to disable locking on "
2568                                        "non-O_DIRECT file\n", current->comm);
2569                                 RETURN(-EINVAL);
2570                         }
2571
2572                         fd->fd_flags |= flags;
2573                 } else {
2574                         fd->fd_flags &= ~flags;
2575                 }
2576                 RETURN(0);
2577         case LL_IOC_LOV_SETSTRIPE:
2578                 RETURN(ll_lov_setstripe(inode, file, arg));
2579         case LL_IOC_LOV_SETEA:
2580                 RETURN(ll_lov_setea(inode, file, arg));
2581         case LL_IOC_LOV_GETSTRIPE:
2582                 RETURN(ll_lov_getstripe(inode, arg));
2583         case LL_IOC_RECREATE_OBJ:
2584                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2585         case EXT3_IOC_FIEMAP: {
2586                 struct ll_user_fiemap *fiemap_s;
2587                 size_t num_bytes, ret_bytes;
2588                 unsigned int extent_count;
2589                 int rc = 0;
2590
2591                 /* Get the extent count so we can calculate the size of
2592                  * required fiemap buffer */
2593                 if (get_user(extent_count,
2594                     &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
2595                         RETURN(-EFAULT);
2596                 num_bytes = sizeof(*fiemap_s) + (extent_count *
2597                                                  sizeof(struct ll_fiemap_extent));
2598                 OBD_VMALLOC(fiemap_s, num_bytes);
2599                 if (fiemap_s == NULL)
2600                         RETURN(-ENOMEM);
2601
2602                 if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
2603                                    sizeof(*fiemap_s)))
2604                         GOTO(error, rc = -EFAULT);
2605
2606                 if (fiemap_s->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
2607                         fiemap_s->fm_flags = fiemap_s->fm_flags &
2608                                                     ~LUSTRE_FIEMAP_FLAGS_COMPAT;
2609                         if (copy_to_user((char *)arg, fiemap_s,
2610                                          sizeof(*fiemap_s)))
2611                                 GOTO(error, rc = -EFAULT);
2612
2613                         GOTO(error, rc = -EBADR);
2614                 }
2615
2616                 /* If fm_extent_count is non-zero, read the first extent since
2617                  * it is used to calculate end_offset and device from previous
2618                  * fiemap call. */
2619                 if (extent_count) {
2620                         if (copy_from_user(&fiemap_s->fm_extents[0],
2621                             (char __user *)arg + sizeof(*fiemap_s),
2622                             sizeof(struct ll_fiemap_extent)))
2623                                 GOTO(error, rc = -EFAULT);
2624                 }
2625
2626                 if (fiemap_s->fm_flags & FIEMAP_FLAG_SYNC) {
2627                         int rc;
2628
2629                         rc = filemap_fdatawrite(inode->i_mapping);
2630                         if (rc)
2631                                 GOTO(error, rc);
2632                 }
2633
2634                 rc = ll_fiemap(inode, fiemap_s, num_bytes);
2635                 if (rc)
2636                         GOTO(error, rc);
2637
2638                 ret_bytes = sizeof(struct ll_user_fiemap);
2639
2640                 if (extent_count != 0)
2641                         ret_bytes += (fiemap_s->fm_mapped_extents *
2642                                          sizeof(struct ll_fiemap_extent));
2643
2644                 if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
2645                         rc = -EFAULT;
2646
2647 error:
2648                 OBD_VFREE(fiemap_s, num_bytes);
2649                 RETURN(rc);
2650         }
2651         case EXT3_IOC_GETFLAGS:
2652         case EXT3_IOC_SETFLAGS:
2653                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2654         case EXT3_IOC_GETVERSION_OLD:
2655         case EXT3_IOC_GETVERSION:
2656                 RETURN(put_user(inode->i_generation, (int *)arg));
2657         case LL_IOC_JOIN: {
2658                 char *ftail;
2659                 int rc;
2660
2661                 ftail = getname((const char *)arg);
2662                 if (IS_ERR(ftail))
2663                         RETURN(PTR_ERR(ftail));
2664                 rc = ll_file_join(inode, file, ftail);
2665                 putname(ftail);
2666                 RETURN(rc);
2667         }
2668         case LL_IOC_GROUP_LOCK:
2669                 RETURN(ll_get_grouplock(inode, file, arg));
2670         case LL_IOC_GROUP_UNLOCK:
2671                 RETURN(ll_put_grouplock(inode, file, arg));
2672         case IOC_OBD_STATFS:
2673                 RETURN(ll_obd_statfs(inode, (void *)arg));
2674
2675         /* We need to special case any other ioctls we want to handle,
2676          * to send them to the MDS/OST as appropriate and to properly
2677          * network encode the arg field.
2678         case EXT3_IOC_SETVERSION_OLD:
2679         case EXT3_IOC_SETVERSION:
2680         */
2681         case LL_IOC_FLUSHCTX:
2682                 RETURN(ll_flush_ctx(inode));
2683         default: {
2684                 int err;
2685
2686                 if (LLIOC_STOP == 
2687                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2688                         RETURN(err);
2689
2690                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2691                                      (void *)arg));
2692         }
2693         }
2694 }
2695
2696 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2697 {
2698         struct inode *inode = file->f_dentry->d_inode;
2699         struct ll_inode_info *lli = ll_i2info(inode);
2700         struct lov_stripe_md *lsm = lli->lli_smd;
2701         loff_t retval;
2702         ENTRY;
2703         retval = offset + ((origin == 2) ? i_size_read(inode) :
2704                            (origin == 1) ? file->f_pos : 0);
2705         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2706                inode->i_ino, inode->i_generation, inode, retval, retval,
2707                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2708         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2709
2710         if (origin == 2) { /* SEEK_END */
2711                 int nonblock = 0, rc;
2712
2713                 if (file->f_flags & O_NONBLOCK)
2714                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2715
2716                 if (lsm != NULL) {
2717                         rc = ll_glimpse_size(inode, nonblock);
2718                         if (rc != 0)
2719                                 RETURN(rc);
2720                 }
2721
2722                 ll_inode_size_lock(inode, 0);
2723                 offset += i_size_read(inode);
2724                 ll_inode_size_unlock(inode, 0);
2725         } else if (origin == 1) { /* SEEK_CUR */
2726                 offset += file->f_pos;
2727         }
2728
2729         retval = -EINVAL;
2730         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2731                 if (offset != file->f_pos) {
2732                         file->f_pos = offset;
2733 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2734                         file->f_reada = 0;
2735                         file->f_version = ++event;
2736 #endif
2737                 }
2738                 retval = offset;
2739         }
2740         
2741         RETURN(retval);
2742 }
2743
2744 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2745 {
2746         struct inode *inode = dentry->d_inode;
2747         struct ll_inode_info *lli = ll_i2info(inode);
2748         struct lov_stripe_md *lsm = lli->lli_smd;
2749         struct ptlrpc_request *req;
2750         struct obd_capa *oc;
2751         int rc, err;
2752         ENTRY;
2753         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2754                inode->i_generation, inode);
2755         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2756
2757         /* fsync's caller has already called _fdata{sync,write}, we want
2758          * that IO to finish before calling the osc and mdc sync methods */
2759         rc = filemap_fdatawait(inode->i_mapping);
2760
2761         /* catch async errors that were recorded back when async writeback
2762          * failed for pages in this mapping. */
2763         err = lli->lli_async_rc;
2764         lli->lli_async_rc = 0;
2765         if (rc == 0)
2766                 rc = err;
2767         if (lsm) {
2768                 err = lov_test_and_clear_async_rc(lsm);
2769                 if (rc == 0)
2770                         rc = err;
2771         }
2772
2773         oc = ll_mdscapa_get(inode);
2774         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2775                       &req);
2776         capa_put(oc);
2777         if (!rc)
2778                 rc = err;
2779         if (!err)
2780                 ptlrpc_req_finished(req);
2781
2782         if (data && lsm) {
2783                 struct obdo *oa;
2784                 
2785                 OBDO_ALLOC(oa);
2786                 if (!oa)
2787                         RETURN(rc ? rc : -ENOMEM);
2788
2789                 oa->o_id = lsm->lsm_object_id;
2790                 oa->o_gr = lsm->lsm_object_gr;
2791                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2792                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2793                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2794                                            OBD_MD_FLGROUP);
2795
2796                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2797                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2798                                0, OBD_OBJECT_EOF, oc);
2799                 capa_put(oc);
2800                 if (!rc)
2801                         rc = err;
2802                 OBDO_FREE(oa);
2803         }
2804
2805         RETURN(rc);
2806 }
2807
2808 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2809 {
2810         struct inode *inode = file->f_dentry->d_inode;
2811         struct ll_sb_info *sbi = ll_i2sbi(inode);
2812         struct ldlm_res_id res_id =
2813                 { .name = { fid_seq(ll_inode2fid(inode)),
2814                             fid_oid(ll_inode2fid(inode)),
2815                             fid_ver(ll_inode2fid(inode)),
2816                             LDLM_FLOCK} };
2817         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2818                 ldlm_flock_completion_ast, NULL, file_lock };
2819         struct lustre_handle lockh = {0};
2820         ldlm_policy_data_t flock;
2821         int flags = 0;
2822         int rc;
2823         ENTRY;
2824
2825         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2826                inode->i_ino, file_lock);
2827
2828         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2829  
2830         if (file_lock->fl_flags & FL_FLOCK) {
2831                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2832                 /* set missing params for flock() calls */
2833                 file_lock->fl_end = OFFSET_MAX;
2834                 file_lock->fl_pid = current->tgid;
2835         }
2836         flock.l_flock.pid = file_lock->fl_pid;
2837         flock.l_flock.start = file_lock->fl_start;
2838         flock.l_flock.end = file_lock->fl_end;
2839
2840         switch (file_lock->fl_type) {
2841         case F_RDLCK:
2842                 einfo.ei_mode = LCK_PR;
2843                 break;
2844         case F_UNLCK:
2845                 /* An unlock request may or may not have any relation to
2846                  * existing locks so we may not be able to pass a lock handle
2847                  * via a normal ldlm_lock_cancel() request. The request may even
2848                  * unlock a byte range in the middle of an existing lock. In
2849                  * order to process an unlock request we need all of the same
2850                  * information that is given with a normal read or write record
2851                  * lock request. To avoid creating another ldlm unlock (cancel)
2852                  * message we'll treat a LCK_NL flock request as an unlock. */
2853                 einfo.ei_mode = LCK_NL;
2854                 break;
2855         case F_WRLCK:
2856                 einfo.ei_mode = LCK_PW;
2857                 break;
2858         default:
2859                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2860                 LBUG();
2861         }
2862
2863         switch (cmd) {
2864         case F_SETLKW:
2865 #ifdef F_SETLKW64
2866         case F_SETLKW64:
2867 #endif
2868                 flags = 0;
2869                 break;
2870         case F_SETLK:
2871 #ifdef F_SETLK64
2872         case F_SETLK64:
2873 #endif
2874                 flags = LDLM_FL_BLOCK_NOWAIT;
2875                 break;
2876         case F_GETLK:
2877 #ifdef F_GETLK64
2878         case F_GETLK64:
2879 #endif
2880                 flags = LDLM_FL_TEST_LOCK;
2881                 /* Save the old mode so that if the mode in the lock changes we
2882                  * can decrement the appropriate reader or writer refcount. */
2883                 file_lock->fl_type = einfo.ei_mode;
2884                 break;
2885         default:
2886                 CERROR("unknown fcntl lock command: %d\n", cmd);
2887                 LBUG();
2888         }
2889
2890         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2891                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2892                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2893
2894         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2895                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2896         if ((file_lock->fl_flags & FL_FLOCK) &&
2897             (rc == 0 || file_lock->fl_type == F_UNLCK))
2898                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2899 #ifdef HAVE_F_OP_FLOCK
2900         if ((file_lock->fl_flags & FL_POSIX) &&
2901             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2902             !(flags & LDLM_FL_TEST_LOCK))
2903                 posix_lock_file_wait(file, file_lock);
2904 #endif
2905
2906         RETURN(rc);
2907 }
2908
2909 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2910 {
2911         ENTRY;
2912
2913         RETURN(-ENOSYS);
2914 }
2915
2916 int ll_have_md_lock(struct inode *inode, __u64 bits)
2917 {
2918         struct lustre_handle lockh;
2919         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2920         struct lu_fid *fid;
2921         int flags;
2922         ENTRY;
2923
2924         if (!inode)
2925                RETURN(0);
2926
2927         fid = &ll_i2info(inode)->lli_fid;
2928         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2929
2930         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2931         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2932                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2933                 RETURN(1);
2934         }
2935         RETURN(0);
2936 }
2937
2938 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2939                             struct lustre_handle *lockh)
2940 {
2941         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2942         struct lu_fid *fid;
2943         ldlm_mode_t rc;
2944         int flags;
2945         ENTRY;
2946
2947         fid = &ll_i2info(inode)->lli_fid;
2948         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2949
2950         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2951         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2952                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2953         RETURN(rc);
2954 }
2955
2956 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2957         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2958                               * and return success */
2959                 inode->i_nlink = 0;
2960                 /* This path cannot be hit for regular files unless in
2961                  * case of obscure races, so no need to to validate
2962                  * size. */
2963                 if (!S_ISREG(inode->i_mode) &&
2964                     !S_ISDIR(inode->i_mode))
2965                         return 0;
2966         }
2967
2968         if (rc) {
2969                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2970                 return -abs(rc);
2971
2972         }
2973
2974         return 0;
2975 }
2976
2977 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2978 {
2979         struct inode *inode = dentry->d_inode;
2980         struct ptlrpc_request *req = NULL;
2981         struct ll_sb_info *sbi;
2982         struct obd_export *exp;
2983         int rc;
2984         ENTRY;
2985
2986         if (!inode) {
2987                 CERROR("REPORT THIS LINE TO PETER\n");
2988                 RETURN(0);
2989         }
2990         sbi = ll_i2sbi(inode);
2991
2992         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2993                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2994
2995         exp = ll_i2mdexp(inode);
2996
2997         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2998                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2999                 struct md_op_data *op_data;
3000
3001                 /* Call getattr by fid, so do not provide name at all. */
3002                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
3003                                              dentry->d_inode, NULL, 0, 0,
3004                                              LUSTRE_OPC_ANY, NULL);
3005                 if (IS_ERR(op_data))
3006                         RETURN(PTR_ERR(op_data));
3007
3008                 oit.it_flags |= O_CHECK_STALE;
3009                 rc = md_intent_lock(exp, op_data, NULL, 0,
3010                                     /* we are not interested in name
3011                                        based lookup */
3012                                     &oit, 0, &req,
3013                                     ll_md_blocking_ast, 0);
3014                 ll_finish_md_op_data(op_data);
3015                 oit.it_flags &= ~O_CHECK_STALE;
3016                 if (rc < 0) {
3017                         rc = ll_inode_revalidate_fini(inode, rc);
3018                         GOTO (out, rc);
3019                 }
3020
3021                 rc = ll_revalidate_it_finish(req, &oit, dentry);
3022                 if (rc != 0) {
3023                         ll_intent_release(&oit);
3024                         GOTO(out, rc);
3025                 }
3026
3027                 /* Unlinked? Unhash dentry, so it is not picked up later by
3028                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
3029                    here to preserve get_cwd functionality on 2.6.
3030                    Bug 10503 */
3031                 if (!dentry->d_inode->i_nlink) {
3032                         spin_lock(&dcache_lock);
3033                         ll_drop_dentry(dentry);
3034                         spin_unlock(&dcache_lock);
3035                 }
3036
3037                 ll_lookup_finish_locks(&oit, dentry);
3038         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
3039                                                      MDS_INODELOCK_LOOKUP)) {
3040                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
3041                 obd_valid valid = OBD_MD_FLGETATTR;
3042                 struct obd_capa *oc;
3043                 int ealen = 0;
3044
3045                 if (S_ISREG(inode->i_mode)) {
3046                         rc = ll_get_max_mdsize(sbi, &ealen);
3047                         if (rc)
3048                                 RETURN(rc);
3049                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
3050                 }
3051                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
3052                  * capa for this inode. Because we only keep capas of dirs
3053                  * fresh. */
3054                 oc = ll_mdscapa_get(inode);
3055                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
3056                                 ealen, &req);
3057                 capa_put(oc);
3058                 if (rc) {
3059                         rc = ll_inode_revalidate_fini(inode, rc);
3060                         RETURN(rc);
3061                 }
3062
3063                 rc = ll_prep_inode(&inode, req, NULL);
3064                 if (rc)
3065                         GOTO(out, rc);
3066         }
3067
3068         /* if object not yet allocated, don't validate size */
3069         if (ll_i2info(inode)->lli_smd == NULL)
3070                 GOTO(out, rc = 0);
3071
3072         /* ll_glimpse_size will prefer locally cached writes if they extend
3073          * the file */
3074         rc = ll_glimpse_size(inode, 0);
3075         EXIT;
3076 out:
3077         ptlrpc_req_finished(req);
3078         return rc;
3079 }
3080
3081 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
3082                   struct lookup_intent *it, struct kstat *stat)
3083 {
3084         struct inode *inode = de->d_inode;
3085         int res = 0;
3086
3087         res = ll_inode_revalidate_it(de, it);
3088         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
3089
3090         if (res)
3091                 return res;
3092
3093         stat->dev = inode->i_sb->s_dev;
3094         stat->ino = inode->i_ino;
3095         stat->mode = inode->i_mode;
3096         stat->nlink = inode->i_nlink;
3097         stat->uid = inode->i_uid;
3098         stat->gid = inode->i_gid;
3099         stat->rdev = kdev_t_to_nr(inode->i_rdev);
3100         stat->atime = inode->i_atime;
3101         stat->mtime = inode->i_mtime;
3102         stat->ctime = inode->i_ctime;
3103 #ifdef HAVE_INODE_BLKSIZE
3104         stat->blksize = inode->i_blksize;
3105 #else
3106         stat->blksize = 1 << inode->i_blkbits;
3107 #endif
3108
3109         ll_inode_size_lock(inode, 0);
3110         stat->size = i_size_read(inode);
3111         stat->blocks = inode->i_blocks;
3112         ll_inode_size_unlock(inode, 0);
3113
3114         return 0;
3115 }
3116 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3117 {
3118         struct lookup_intent it = { .it_op = IT_GETATTR };
3119
3120         return ll_getattr_it(mnt, de, &it, stat);
3121 }
3122
3123 static
3124 int lustre_check_acl(struct inode *inode, int mask)
3125 {
3126 #ifdef CONFIG_FS_POSIX_ACL
3127         struct ll_inode_info *lli = ll_i2info(inode);
3128         struct posix_acl *acl;
3129         int rc;
3130         ENTRY;
3131
3132         spin_lock(&lli->lli_lock);
3133         acl = posix_acl_dup(lli->lli_posix_acl);
3134         spin_unlock(&lli->lli_lock);
3135
3136         if (!acl)
3137                 RETURN(-EAGAIN);
3138
3139         rc = posix_acl_permission(inode, acl, mask);
3140         posix_acl_release(acl);
3141
3142         RETURN(rc);
3143 #else
3144         return -EAGAIN;
3145 #endif
3146 }
3147
3148 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
3149 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3150 {
3151         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3152                inode->i_ino, inode->i_generation, inode, mask);
3153         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3154                 return lustre_check_remote_perm(inode, mask);
3155         
3156         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3157         return generic_permission(inode, mask, lustre_check_acl);
3158 }
3159 #else
3160 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3161 {
3162         int mode = inode->i_mode;
3163         int rc;
3164
3165         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3166                inode->i_ino, inode->i_generation, inode, mask);
3167
3168         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3169                 return lustre_check_remote_perm(inode, mask);
3170
3171         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3172
3173         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
3174             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
3175                 return -EROFS;
3176         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
3177                 return -EACCES;
3178         if (current->fsuid == inode->i_uid) {
3179                 mode >>= 6;
3180         } else if (1) {
3181                 if (((mode >> 3) & mask & S_IRWXO) != mask)
3182                         goto check_groups;
3183                 rc = lustre_check_acl(inode, mask);
3184                 if (rc == -EAGAIN)
3185                         goto check_groups;
3186                 if (rc == -EACCES)
3187                         goto check_capabilities;
3188                 return rc;
3189         } else {
3190 check_groups:
3191                 if (in_group_p(inode->i_gid))
3192                         mode >>= 3;
3193         }
3194         if ((mode & mask & S_IRWXO) == mask)
3195                 return 0;
3196
3197 check_capabilities:
3198         if (!(mask & MAY_EXEC) ||
3199             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
3200                 if (capable(CAP_DAC_OVERRIDE))
3201                         return 0;
3202
3203         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
3204             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
3205                 return 0;
3206         
3207         return -EACCES;
3208 }
3209 #endif
3210
3211 /* -o localflock - only provides locally consistent flock locks */
3212 struct file_operations ll_file_operations = {
3213         .read           = ll_file_read,
3214         .write          = ll_file_write,
3215         .ioctl          = ll_file_ioctl,
3216         .open           = ll_file_open,
3217         .release        = ll_file_release,
3218         .mmap           = ll_file_mmap,
3219         .llseek         = ll_file_seek,
3220         .sendfile       = ll_file_sendfile,
3221         .fsync          = ll_fsync,
3222 };
3223
3224 struct file_operations ll_file_operations_flock = {
3225         .read           = ll_file_read,
3226         .write          = ll_file_write,
3227         .ioctl          = ll_file_ioctl,
3228         .open           = ll_file_open,
3229         .release        = ll_file_release,
3230         .mmap           = ll_file_mmap,
3231         .llseek         = ll_file_seek,
3232         .sendfile       = ll_file_sendfile,
3233         .fsync          = ll_fsync,
3234 #ifdef HAVE_F_OP_FLOCK
3235         .flock          = ll_file_flock,
3236 #endif
3237         .lock           = ll_file_flock
3238 };
3239
3240 /* These are for -o noflock - to return ENOSYS on flock calls */
3241 struct file_operations ll_file_operations_noflock = {
3242         .read           = ll_file_read,
3243         .write          = ll_file_write,
3244         .ioctl          = ll_file_ioctl,
3245         .open           = ll_file_open,
3246         .release        = ll_file_release,
3247         .mmap           = ll_file_mmap,
3248         .llseek         = ll_file_seek,
3249         .sendfile       = ll_file_sendfile,
3250         .fsync          = ll_fsync,
3251 #ifdef HAVE_F_OP_FLOCK
3252         .flock          = ll_file_noflock,
3253 #endif
3254         .lock           = ll_file_noflock
3255 };
3256
3257 struct inode_operations ll_file_inode_operations = {
3258 #ifdef HAVE_VFS_INTENT_PATCHES
3259         .setattr_raw    = ll_setattr_raw,
3260 #endif
3261         .setattr        = ll_setattr,
3262         .truncate       = ll_truncate,
3263         .getattr        = ll_getattr,
3264         .permission     = ll_inode_permission,
3265         .setxattr       = ll_setxattr,
3266         .getxattr       = ll_getxattr,
3267         .listxattr      = ll_listxattr,
3268         .removexattr    = ll_removexattr,
3269 };
3270
3271 /* dynamic ioctl number support routins */
3272 static struct llioc_ctl_data {
3273         struct rw_semaphore ioc_sem;
3274         struct list_head    ioc_head;
3275 } llioc = { 
3276         __RWSEM_INITIALIZER(llioc.ioc_sem), 
3277         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3278 };
3279
3280
3281 struct llioc_data {
3282         struct list_head        iocd_list;
3283         unsigned int            iocd_size;
3284         llioc_callback_t        iocd_cb;
3285         unsigned int            iocd_count;
3286         unsigned int            iocd_cmd[0];
3287 };
3288
3289 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3290 {
3291         unsigned int size;
3292         struct llioc_data *in_data = NULL;
3293         ENTRY;
3294
3295         if (cb == NULL || cmd == NULL ||
3296             count > LLIOC_MAX_CMD || count < 0)
3297                 RETURN(NULL);
3298
3299         size = sizeof(*in_data) + count * sizeof(unsigned int);
3300         OBD_ALLOC(in_data, size);
3301         if (in_data == NULL)
3302                 RETURN(NULL);
3303
3304         memset(in_data, 0, sizeof(*in_data));
3305         in_data->iocd_size = size;
3306         in_data->iocd_cb = cb;
3307         in_data->iocd_count = count;
3308         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3309
3310         down_write(&llioc.ioc_sem);
3311         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3312         up_write(&llioc.ioc_sem);
3313
3314         RETURN(in_data);
3315 }
3316
3317 void ll_iocontrol_unregister(void *magic)
3318 {
3319         struct llioc_data *tmp;
3320
3321         if (magic == NULL)
3322                 return;
3323
3324         down_write(&llioc.ioc_sem);
3325         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3326                 if (tmp == magic) {
3327                         unsigned int size = tmp->iocd_size;
3328
3329                         list_del(&tmp->iocd_list);
3330                         up_write(&llioc.ioc_sem);
3331
3332                         OBD_FREE(tmp, size);
3333                         return;
3334                 }
3335         }
3336         up_write(&llioc.ioc_sem);
3337
3338         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3339 }
3340
3341 EXPORT_SYMBOL(ll_iocontrol_register);
3342 EXPORT_SYMBOL(ll_iocontrol_unregister);
3343
3344 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
3345                         unsigned int cmd, unsigned long arg, int *rcp)
3346 {
3347         enum llioc_iter ret = LLIOC_CONT;
3348         struct llioc_data *data;
3349         int rc = -EINVAL, i;
3350
3351         down_read(&llioc.ioc_sem);
3352         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3353                 for (i = 0; i < data->iocd_count; i++) {
3354                         if (cmd != data->iocd_cmd[i]) 
3355                                 continue;
3356
3357                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3358                         break;
3359                 }
3360
3361                 if (ret == LLIOC_STOP)
3362                         break;
3363         }
3364         up_read(&llioc.ioc_sem);
3365
3366         if (rcp)
3367                 *rcp = rc;
3368         return ret;
3369 }