Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <lustre_mdc.h>
47 #include <linux/pagemap.h>
48 #include <linux/file.h>
49 #include "llite_internal.h"
50
51 /* also used by llite/special.c:ll_special_open() */
52 struct ll_file_data *ll_file_data_get(void)
53 {
54         struct ll_file_data *fd;
55
56         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
57         return fd;
58 }
59
60 static void ll_file_data_put(struct ll_file_data *fd)
61 {
62         if (fd != NULL)
63                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
64 }
65
66 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
67                           struct lustre_handle *fh)
68 {
69         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
70         op_data->op_attr.ia_mode = inode->i_mode;
71         op_data->op_attr.ia_atime = inode->i_atime;
72         op_data->op_attr.ia_mtime = inode->i_mtime;
73         op_data->op_attr.ia_ctime = inode->i_ctime;
74         op_data->op_attr.ia_size = i_size_read(inode);
75         op_data->op_attr_blocks = inode->i_blocks;
76         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
77         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
78         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
79         op_data->op_capa1 = ll_mdscapa_get(inode);
80 }
81
82 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
83                              struct obd_client_handle *och)
84 {
85         ENTRY;
86
87         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
88                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
89
90         if (!(och->och_flags & FMODE_WRITE))
91                 goto out;
92
93         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
94             !S_ISREG(inode->i_mode))
95                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
96         else
97                 ll_epoch_close(inode, op_data, &och, 0);
98
99 out:
100         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
101         EXIT;
102 }
103
104 static int ll_close_inode_openhandle(struct obd_export *md_exp,
105                                      struct inode *inode,
106                                      struct obd_client_handle *och)
107 {
108         struct obd_export *exp = ll_i2mdexp(inode);
109         struct md_op_data *op_data;
110         struct ptlrpc_request *req = NULL;
111         struct obd_device *obd = class_exp2obd(exp);
112         int epoch_close = 1;
113         int seq_end = 0, rc;
114         ENTRY;
115
116         if (obd == NULL) {
117                 /*
118                  * XXX: in case of LMV, is this correct to access
119                  * ->exp_handle?
120                  */
121                 CERROR("Invalid MDC connection handle "LPX64"\n",
122                        ll_i2mdexp(inode)->exp_handle.h_cookie);
123                 GOTO(out, rc = 0);
124         }
125
126         /*
127          * here we check if this is forced umount. If so this is called on
128          * canceling "open lock" and we do not call md_close() in this case, as
129          * it will not be successful, as import is already deactivated.
130          */
131         if (obd->obd_force)
132                 GOTO(out, rc = 0);
133
134         OBD_ALLOC_PTR(op_data);
135         if (op_data == NULL)
136                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
137
138         ll_prepare_close(inode, op_data, och);
139         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
140         rc = md_close(md_exp, op_data, och->och_mod, &req);
141         if (rc != -EAGAIN)
142                 seq_end = 1;
143
144         if (rc == -EAGAIN) {
145                 /* This close must have the epoch closed. */
146                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
147                 LASSERT(epoch_close);
148                 /* MDS has instructed us to obtain Size-on-MDS attribute from
149                  * OSTs and send setattr to back to MDS. */
150                 rc = ll_sizeonmds_update(inode, och->och_mod,
151                                          &och->och_fh, op_data->op_ioepoch);
152                 if (rc) {
153                         CERROR("inode %lu mdc Size-on-MDS update failed: "
154                                "rc = %d\n", inode->i_ino, rc);
155                         rc = 0;
156                 }
157         } else if (rc) {
158                 CERROR("inode %lu mdc close failed: rc = %d\n",
159                        inode->i_ino, rc);
160         }
161         ll_finish_md_op_data(op_data);
162
163         if (rc == 0) {
164                 rc = ll_objects_destroy(req, inode);
165                 if (rc)
166                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
167                                inode->i_ino, rc);
168         }
169
170         EXIT;
171 out:
172       
173         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
174             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
175                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
176         } else {
177                 if (seq_end)
178                         ptlrpc_close_replay_seq(req);
179                 md_clear_open_replay_data(md_exp, och);
180                 /* Free @och if it is not waiting for DONE_WRITING. */
181                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
182                 OBD_FREE_PTR(och);
183         }
184         if (req) /* This is close request */
185                 ptlrpc_req_finished(req);
186         return rc;
187 }
188
189 int ll_md_real_close(struct inode *inode, int flags)
190 {
191         struct ll_inode_info *lli = ll_i2info(inode);
192         struct obd_client_handle **och_p;
193         struct obd_client_handle *och;
194         __u64 *och_usecount;
195         int rc = 0;
196         ENTRY;
197
198         if (flags & FMODE_WRITE) {
199                 och_p = &lli->lli_mds_write_och;
200                 och_usecount = &lli->lli_open_fd_write_count;
201         } else if (flags & FMODE_EXEC) {
202                 och_p = &lli->lli_mds_exec_och;
203                 och_usecount = &lli->lli_open_fd_exec_count;
204         } else {
205                 LASSERT(flags & FMODE_READ);
206                 och_p = &lli->lli_mds_read_och;
207                 och_usecount = &lli->lli_open_fd_read_count;
208         }
209
210         down(&lli->lli_och_sem);
211         if (*och_usecount) { /* There are still users of this handle, so
212                                 skip freeing it. */
213                 up(&lli->lli_och_sem);
214                 RETURN(0);
215         }
216         och=*och_p;
217         *och_p = NULL;
218         up(&lli->lli_och_sem);
219
220         if (och) { /* There might be a race and somebody have freed this och
221                       already */
222                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
223                                                inode, och);
224         }
225
226         RETURN(rc);
227 }
228
229 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
230                 struct file *file)
231 {
232         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
233         struct ll_inode_info *lli = ll_i2info(inode);
234         int rc = 0;
235         ENTRY;
236
237         /* clear group lock, if present */
238         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
239                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
240                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
241                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
242                                       &fd->fd_cwlockh);
243         }
244
245         /* Let's see if we have good enough OPEN lock on the file and if
246            we can skip talking to MDS */
247         if (file->f_dentry->d_inode) { /* Can this ever be false? */
248                 int lockmode;
249                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
250                 struct lustre_handle lockh;
251                 struct inode *inode = file->f_dentry->d_inode;
252                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
253
254                 down(&lli->lli_och_sem);
255                 if (fd->fd_omode & FMODE_WRITE) {
256                         lockmode = LCK_CW;
257                         LASSERT(lli->lli_open_fd_write_count);
258                         lli->lli_open_fd_write_count--;
259                 } else if (fd->fd_omode & FMODE_EXEC) {
260                         lockmode = LCK_PR;
261                         LASSERT(lli->lli_open_fd_exec_count);
262                         lli->lli_open_fd_exec_count--;
263                 } else {
264                         lockmode = LCK_CR;
265                         LASSERT(lli->lli_open_fd_read_count);
266                         lli->lli_open_fd_read_count--;
267                 }
268                 up(&lli->lli_och_sem);
269
270                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
271                                    LDLM_IBITS, &policy, lockmode,
272                                    &lockh)) {
273                         rc = ll_md_real_close(file->f_dentry->d_inode,
274                                               fd->fd_omode);
275                 }
276         } else {
277                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
278                        file, file->f_dentry, file->f_dentry->d_name.name);
279         }
280
281         LUSTRE_FPRIVATE(file) = NULL;
282         ll_file_data_put(fd);
283         ll_capa_close(inode);
284
285         RETURN(rc);
286 }
287
288 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
289
290 /* While this returns an error code, fput() the caller does not, so we need
291  * to make every effort to clean up all of our state here.  Also, applications
292  * rarely check close errors and even if an error is returned they will not
293  * re-try the close call.
294  */
295 int ll_file_release(struct inode *inode, struct file *file)
296 {
297         struct ll_file_data *fd;
298         struct ll_sb_info *sbi = ll_i2sbi(inode);
299         struct ll_inode_info *lli = ll_i2info(inode);
300         struct lov_stripe_md *lsm = lli->lli_smd;
301         int rc;
302
303         ENTRY;
304         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
305                inode->i_generation, inode);
306
307 #ifdef CONFIG_FS_POSIX_ACL
308         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
309             inode == inode->i_sb->s_root->d_inode) {
310                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
311
312                 LASSERT(fd != NULL);
313                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
314                         fd->fd_flags &= ~LL_FILE_RMTACL;
315                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
316                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
317                 }
318         }
319 #endif
320
321         if (inode->i_sb->s_root != file->f_dentry)
322                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
323         fd = LUSTRE_FPRIVATE(file);
324         LASSERT(fd != NULL);
325
326         /* The last ref on @file, maybe not the the owner pid of statahead.
327          * Different processes can open the same dir, "ll_opendir_key" means:
328          * it is me that should stop the statahead thread. */
329         if (lli->lli_opendir_key == fd)
330                 ll_stop_statahead(inode, fd);
331
332         if (inode->i_sb->s_root == file->f_dentry) {
333                 LUSTRE_FPRIVATE(file) = NULL;
334                 ll_file_data_put(fd);
335                 RETURN(0);
336         }
337         
338         if (lsm)
339                 lov_test_and_clear_async_rc(lsm);
340         lli->lli_async_rc = 0;
341
342         rc = ll_md_close(sbi->ll_md_exp, inode, file);
343         RETURN(rc);
344 }
345
346 static int ll_intent_file_open(struct file *file, void *lmm,
347                                int lmmsize, struct lookup_intent *itp)
348 {
349         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
350         struct dentry *parent = file->f_dentry->d_parent;
351         const char *name = file->f_dentry->d_name.name;
352         const int len = file->f_dentry->d_name.len;
353         struct md_op_data *op_data;
354         struct ptlrpc_request *req;
355         int rc;
356         ENTRY;
357
358         if (!parent)
359                 RETURN(-ENOENT);
360
361         /* Usually we come here only for NFSD, and we want open lock.
362            But we can also get here with pre 2.6.15 patchless kernels, and in
363            that case that lock is also ok */
364         /* We can also get here if there was cached open handle in revalidate_it
365          * but it disappeared while we were getting from there to ll_file_open.
366          * But this means this file was closed and immediatelly opened which
367          * makes a good candidate for using OPEN lock */
368         /* If lmmsize & lmm are not 0, we are just setting stripe info
369          * parameters. No need for the open lock */
370         if (!lmm && !lmmsize)
371                 itp->it_flags |= MDS_OPEN_LOCK;
372
373         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
374                                       file->f_dentry->d_inode, name, len,
375                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
376         if (IS_ERR(op_data))
377                 RETURN(PTR_ERR(op_data));
378
379         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
380                             0 /*unused */, &req, ll_md_blocking_ast, 0);
381         ll_finish_md_op_data(op_data);
382         if (rc == -ESTALE) {
383                 /* reason for keep own exit path - don`t flood log
384                 * with messages with -ESTALE errors.
385                 */
386                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
387                      it_open_error(DISP_OPEN_OPEN, itp))
388                         GOTO(out, rc);
389                 ll_release_openhandle(file->f_dentry, itp);
390                 GOTO(out_stale, rc);
391         }
392
393         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
394                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
395                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
396                 GOTO(out, rc);
397         }
398
399         if (itp->d.lustre.it_lock_mode)
400                 md_set_lock_data(sbi->ll_md_exp,
401                                  &itp->d.lustre.it_lock_handle, 
402                                  file->f_dentry->d_inode);
403
404         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
405 out:
406         ptlrpc_req_finished(itp->d.lustre.it_data);
407
408 out_stale:
409         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
410         ll_intent_drop_lock(itp);
411
412         RETURN(rc);
413 }
414
415 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
416                        struct lookup_intent *it, struct obd_client_handle *och)
417 {
418         struct ptlrpc_request *req = it->d.lustre.it_data;
419         struct mdt_body *body;
420
421         LASSERT(och);
422
423         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
424         LASSERT(body != NULL);                      /* reply already checked out */
425
426         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
427         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
428         och->och_fid = lli->lli_fid;
429         och->och_flags = it->it_flags;
430         lli->lli_ioepoch = body->ioepoch;
431
432         return md_set_open_replay_data(md_exp, och, req);
433 }
434
435 int ll_local_open(struct file *file, struct lookup_intent *it,
436                   struct ll_file_data *fd, struct obd_client_handle *och)
437 {
438         struct inode *inode = file->f_dentry->d_inode;
439         struct ll_inode_info *lli = ll_i2info(inode);
440         ENTRY;
441
442         LASSERT(!LUSTRE_FPRIVATE(file));
443
444         LASSERT(fd != NULL);
445
446         if (och) {
447                 struct ptlrpc_request *req = it->d.lustre.it_data;
448                 struct mdt_body *body;
449                 int rc;
450
451                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
452                 if (rc)
453                         RETURN(rc);
454
455                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
456                 if ((it->it_flags & FMODE_WRITE) &&
457                     (body->valid & OBD_MD_FLSIZE))
458                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
459                                lli->lli_ioepoch, PFID(&lli->lli_fid));
460         }
461
462         LUSTRE_FPRIVATE(file) = fd;
463         ll_readahead_init(inode, &fd->fd_ras);
464         fd->fd_omode = it->it_flags;
465         RETURN(0);
466 }
467
468 /* Open a file, and (for the very first open) create objects on the OSTs at
469  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
470  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
471  * lli_open_sem to ensure no other process will create objects, send the
472  * stripe MD to the MDS, or try to destroy the objects if that fails.
473  *
474  * If we already have the stripe MD locally then we don't request it in
475  * md_open(), by passing a lmm_size = 0.
476  *
477  * It is up to the application to ensure no other processes open this file
478  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
479  * used.  We might be able to avoid races of that sort by getting lli_open_sem
480  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
481  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
482  */
483 int ll_file_open(struct inode *inode, struct file *file)
484 {
485         struct ll_inode_info *lli = ll_i2info(inode);
486         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
487                                           .it_flags = file->f_flags };
488         struct lov_stripe_md *lsm;
489         struct ptlrpc_request *req = NULL;
490         struct obd_client_handle **och_p;
491         __u64 *och_usecount;
492         struct ll_file_data *fd;
493         int rc = 0, opendir_set = 0;
494         ENTRY;
495
496         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
497                inode->i_generation, inode, file->f_flags);
498
499 #ifdef HAVE_VFS_INTENT_PATCHES
500         it = file->f_it;
501 #else
502         it = file->private_data; /* XXX: compat macro */
503         file->private_data = NULL; /* prevent ll_local_open assertion */
504 #endif
505
506         fd = ll_file_data_get();
507         if (fd == NULL)
508                 RETURN(-ENOMEM);
509
510         if (S_ISDIR(inode->i_mode)) {
511                 spin_lock(&lli->lli_lock);
512                 /* "lli->lli_opendir_pid != 0" means someone has set it.
513                  * "lli->lli_sai != NULL" means the previous statahead has not
514                  *                        been cleanup. */ 
515                 if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) {
516                         opendir_set = 1;
517                         lli->lli_opendir_pid = cfs_curproc_pid();
518                         lli->lli_opendir_key = fd;
519                 } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) {
520                         /* Two cases for this:
521                          * (1) The same process open such directory many times.
522                          * (2) The old process opened the directory, and exited
523                          *     before its children processes. Then new process
524                          *     with the same pid opens such directory before the
525                          *     old process's children processes exit.
526                          * Change the owner to the latest one. */
527                         opendir_set = 2;
528                         lli->lli_opendir_key = fd;
529                 }
530                 spin_unlock(&lli->lli_lock);
531         }
532
533         if (inode->i_sb->s_root == file->f_dentry) {
534                 LUSTRE_FPRIVATE(file) = fd;
535                 RETURN(0);
536         }
537
538         if (!it || !it->d.lustre.it_disposition) {
539                 /* Convert f_flags into access mode. We cannot use file->f_mode,
540                  * because everything but O_ACCMODE mask was stripped from
541                  * there */
542                 if ((oit.it_flags + 1) & O_ACCMODE)
543                         oit.it_flags++;
544                 if (file->f_flags & O_TRUNC)
545                         oit.it_flags |= FMODE_WRITE;
546
547                 /* kernel only call f_op->open in dentry_open.  filp_open calls
548                  * dentry_open after call to open_namei that checks permissions.
549                  * Only nfsd_open call dentry_open directly without checking
550                  * permissions and because of that this code below is safe. */
551                 if (oit.it_flags & FMODE_WRITE)
552                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
553
554                 /* We do not want O_EXCL here, presumably we opened the file
555                  * already? XXX - NFS implications? */
556                 oit.it_flags &= ~O_EXCL;
557
558                 it = &oit;
559         }
560
561 restart:
562         /* Let's see if we have file open on MDS already. */
563         if (it->it_flags & FMODE_WRITE) {
564                 och_p = &lli->lli_mds_write_och;
565                 och_usecount = &lli->lli_open_fd_write_count;
566         } else if (it->it_flags & FMODE_EXEC) {
567                 och_p = &lli->lli_mds_exec_och;
568                 och_usecount = &lli->lli_open_fd_exec_count;
569          } else {
570                 och_p = &lli->lli_mds_read_och;
571                 och_usecount = &lli->lli_open_fd_read_count;
572         }
573         
574         down(&lli->lli_och_sem);
575         if (*och_p) { /* Open handle is present */
576                 if (it_disposition(it, DISP_OPEN_OPEN)) {
577                         /* Well, there's extra open request that we do not need,
578                            let's close it somehow. This will decref request. */
579                         rc = it_open_error(DISP_OPEN_OPEN, it);
580                         if (rc) {
581                                 ll_file_data_put(fd);
582                                 GOTO(out_och_free, rc);
583                         }       
584                         ll_release_openhandle(file->f_dentry, it);
585                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
586                                              LPROC_LL_OPEN);
587                 }
588                 (*och_usecount)++;
589
590                 rc = ll_local_open(file, it, fd, NULL);
591                 if (rc) {
592                         up(&lli->lli_och_sem);
593                         ll_file_data_put(fd);
594                         RETURN(rc);
595                 }
596         } else {
597                 LASSERT(*och_usecount == 0);
598                 if (!it->d.lustre.it_disposition) {
599                         /* We cannot just request lock handle now, new ELC code
600                            means that one of other OPEN locks for this file
601                            could be cancelled, and since blocking ast handler
602                            would attempt to grab och_sem as well, that would
603                            result in a deadlock */
604                         up(&lli->lli_och_sem);
605                         it->it_flags |= O_CHECK_STALE;
606                         rc = ll_intent_file_open(file, NULL, 0, it);
607                         it->it_flags &= ~O_CHECK_STALE;
608                         if (rc) {
609                                 ll_file_data_put(fd);
610                                 GOTO(out_openerr, rc);
611                         }
612
613                         /* Got some error? Release the request */
614                         if (it->d.lustre.it_status < 0) {
615                                 req = it->d.lustre.it_data;
616                                 ptlrpc_req_finished(req);
617                         }
618                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
619                                          &it->d.lustre.it_lock_handle,
620                                          file->f_dentry->d_inode);
621                         goto restart;
622                 }
623                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
624                 if (!*och_p) {
625                         ll_file_data_put(fd);
626                         GOTO(out_och_free, rc = -ENOMEM);
627                 }
628                 (*och_usecount)++;
629                 req = it->d.lustre.it_data;
630
631                 /* md_intent_lock() didn't get a request ref if there was an
632                  * open error, so don't do cleanup on the request here
633                  * (bug 3430) */
634                 /* XXX (green): Should not we bail out on any error here, not
635                  * just open error? */
636                 rc = it_open_error(DISP_OPEN_OPEN, it);
637                 if (rc) {
638                         ll_file_data_put(fd);
639                         GOTO(out_och_free, rc);
640                 }
641
642                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
643                 rc = ll_local_open(file, it, fd, *och_p);
644                 if (rc) {
645                         up(&lli->lli_och_sem);
646                         ll_file_data_put(fd);
647                         GOTO(out_och_free, rc);
648                 }
649         }
650         up(&lli->lli_och_sem);
651
652         /* Must do this outside lli_och_sem lock to prevent deadlock where
653            different kind of OPEN lock for this same inode gets cancelled
654            by ldlm_cancel_lru */
655         if (!S_ISREG(inode->i_mode))
656                 GOTO(out, rc);
657
658         ll_capa_open(inode);
659
660         lsm = lli->lli_smd;
661         if (lsm == NULL) {
662                 if (file->f_flags & O_LOV_DELAY_CREATE ||
663                     !(file->f_mode & FMODE_WRITE)) {
664                         CDEBUG(D_INODE, "object creation was delayed\n");
665                         GOTO(out, rc);
666                 }
667         }
668         file->f_flags &= ~O_LOV_DELAY_CREATE;
669         GOTO(out, rc);
670 out:
671         ptlrpc_req_finished(req);
672         if (req)
673                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
674 out_och_free:
675         if (rc) {
676                 if (*och_p) {
677                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
678                         *och_p = NULL; /* OBD_FREE writes some magic there */
679                         (*och_usecount)--;
680                 }
681                 up(&lli->lli_och_sem);
682 out_openerr:
683                 if (opendir_set == 1) {
684                         lli->lli_opendir_key = NULL;
685                         lli->lli_opendir_pid = 0;
686                 } else if (unlikely(opendir_set == 2)) {
687                         ll_stop_statahead(inode, fd);
688                 }
689         }
690
691         return rc;
692 }
693
694 /* Fills the obdo with the attributes for the inode defined by lsm */
695 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
696 {
697         struct ptlrpc_request_set *set;
698         struct ll_inode_info *lli = ll_i2info(inode);
699         struct lov_stripe_md *lsm = lli->lli_smd;
700
701         struct obd_info oinfo = { { { 0 } } };
702         int rc;
703         ENTRY;
704
705         LASSERT(lsm != NULL);
706
707         oinfo.oi_md = lsm;
708         oinfo.oi_oa = obdo;
709         oinfo.oi_oa->o_id = lsm->lsm_object_id;
710         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
711         oinfo.oi_oa->o_mode = S_IFREG;
712         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
713                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
714                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
715                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
716                                OBD_MD_FLGROUP;
717         oinfo.oi_capa = ll_mdscapa_get(inode);
718
719         set = ptlrpc_prep_set();
720         if (set == NULL) {
721                 CERROR("can't allocate ptlrpc set\n");
722                 rc = -ENOMEM;
723         } else {
724                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
725                 if (rc == 0)
726                         rc = ptlrpc_set_wait(set);
727                 ptlrpc_set_destroy(set);
728         }
729         capa_put(oinfo.oi_capa);
730         if (rc)
731                 RETURN(rc);
732
733         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
734                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
735                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
736
737         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
738         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
739                lli->lli_smd->lsm_object_id, i_size_read(inode),
740                (unsigned long long)inode->i_blocks,
741                (unsigned long)ll_inode_blksize(inode));
742         RETURN(0);
743 }
744
745 static inline void ll_remove_suid(struct inode *inode)
746 {
747         unsigned int mode;
748
749         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
750         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
751
752         /* was any of the uid bits set? */
753         mode &= inode->i_mode;
754         if (mode && !capable(CAP_FSETID)) {
755                 inode->i_mode &= ~mode;
756                 // XXX careful here - we cannot change the size
757         }
758 }
759
760 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
761 {
762         struct ll_inode_info *lli = ll_i2info(inode);
763         struct lov_stripe_md *lsm = lli->lli_smd;
764         struct obd_export *exp = ll_i2dtexp(inode);
765         struct {
766                 char name[16];
767                 struct ldlm_lock *lock;
768                 struct lov_stripe_md *lsm;
769         } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock, .lsm = lsm };
770         __u32 stripe, vallen = sizeof(stripe);
771         struct lov_oinfo *loinfo;
772         int rc;
773         ENTRY;
774
775         if (lsm->lsm_stripe_count == 1)
776                 GOTO(check, stripe = 0);
777
778         /* get our offset in the lov */
779         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
780         if (rc != 0) {
781                 CERROR("obd_get_info: rc = %d\n", rc);
782                 RETURN(rc);
783         }
784         LASSERT(stripe < lsm->lsm_stripe_count);
785
786 check:
787         loinfo = lsm->lsm_oinfo[stripe];
788         if (!osc_res_name_eq(loinfo->loi_id, loinfo->loi_gr,
789                             &lock->l_resource->lr_name)){
790                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
791                            loinfo->loi_id, loinfo->loi_gr);
792                 RETURN(-ELDLM_NO_LOCK_DATA);
793         }
794
795         RETURN(stripe);
796 }
797
798 /* Get extra page reference to ensure it is not going away */
799 void ll_pin_extent_cb(void *data)
800 {
801         struct page *page = data;
802         
803         page_cache_get(page);
804
805         return;
806 }
807
808 /* Flush the page from page cache for an extent as its canceled.
809  * Page to remove is delivered as @data.
810  *
811  * No one can dirty the extent until we've finished our work and they cannot
812  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
813  * but other kernel actors could have pages locked.
814  *
815  * If @discard is set, there is no need to write the page if it is dirty.
816  *
817  * Called with the DLM lock held. */
818 int ll_page_removal_cb(void *data, int discard)
819 {
820         int rc;
821         struct page *page = data;
822         struct address_space *mapping;
823  
824         ENTRY;
825
826         /* We have page reference already from ll_pin_page */
827         lock_page(page);
828
829         /* Already truncated by somebody */
830         if (!page->mapping)
831                 GOTO(out, rc = 0);
832         mapping = page->mapping;
833
834         ll_teardown_mmaps(mapping,
835                           (__u64)page->index << PAGE_CACHE_SHIFT,
836                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
837                                                               ~PAGE_CACHE_MASK);        
838         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
839
840         if (!discard && clear_page_dirty_for_io(page)) {
841                 LASSERT(page->mapping);
842                 rc = ll_call_writepage(page->mapping->host, page);
843                 /* either waiting for io to complete or reacquiring
844                  * the lock that the failed writepage released */
845                 lock_page(page);
846                 wait_on_page_writeback(page);
847                 if (rc != 0) {
848                         CERROR("writepage inode %lu(%p) of page %p "
849                                "failed: %d\n", mapping->host->i_ino,
850                                mapping->host, page, rc);
851                         if (rc == -ENOSPC)
852                                 set_bit(AS_ENOSPC, &mapping->flags);
853                         else
854                                 set_bit(AS_EIO, &mapping->flags);
855                 }
856                 set_bit(AS_EIO, &mapping->flags);
857         }
858         if (page->mapping != NULL) {
859                 struct ll_async_page *llap = llap_cast_private(page);
860                 /* checking again to account for writeback's lock_page() */
861                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
862                 if (llap)
863                         ll_ra_accounting(llap, page->mapping);
864                 ll_truncate_complete_page(page);
865         }
866         EXIT;
867 out:
868         LASSERT(!PageWriteback(page));
869         unlock_page(page);
870         page_cache_release(page);
871
872         return 0;
873 }
874
875 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
876                              void *data, int flag)
877 {
878         struct inode *inode;
879         struct ll_inode_info *lli;
880         struct lov_stripe_md *lsm;
881         int stripe;
882         __u64 kms;
883
884         ENTRY;
885
886         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
887                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
888                 LBUG();
889         }
890
891         inode = ll_inode_from_lock(lock);
892         if (inode == NULL)
893                 RETURN(0);
894         lli = ll_i2info(inode);
895         if (lli == NULL)
896                 GOTO(iput, 0);
897         if (lli->lli_smd == NULL)
898                 GOTO(iput, 0);
899         lsm = lli->lli_smd;
900
901         stripe = ll_lock_to_stripe_offset(inode, lock);
902         if (stripe < 0)
903                 GOTO(iput, 0);
904
905         lov_stripe_lock(lsm);
906         lock_res_and_lock(lock);
907         kms = ldlm_extent_shift_kms(lock,
908                                     lsm->lsm_oinfo[stripe]->loi_kms);
909
910         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
911                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
912                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
913         lsm->lsm_oinfo[stripe]->loi_kms = kms;
914         unlock_res_and_lock(lock);
915         lov_stripe_unlock(lsm);
916         ll_queue_done_writing(inode, 0);
917         EXIT;
918 iput:
919         iput(inode);
920
921         return 0;
922 }
923
924 #if 0
925 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
926 {
927         /* XXX ALLOCATE - 160 bytes */
928         struct inode *inode = ll_inode_from_lock(lock);
929         struct ll_inode_info *lli = ll_i2info(inode);
930         struct lustre_handle lockh = { 0 };
931         struct ost_lvb *lvb;
932         int stripe;
933         ENTRY;
934
935         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
936                      LDLM_FL_BLOCK_CONV)) {
937                 LBUG(); /* not expecting any blocked async locks yet */
938                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
939                            "lock, returning");
940                 ldlm_lock_dump(D_OTHER, lock, 0);
941                 ldlm_reprocess_all(lock->l_resource);
942                 RETURN(0);
943         }
944
945         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
946
947         stripe = ll_lock_to_stripe_offset(inode, lock);
948         if (stripe < 0)
949                 goto iput;
950
951         if (lock->l_lvb_len) {
952                 struct lov_stripe_md *lsm = lli->lli_smd;
953                 __u64 kms;
954                 lvb = lock->l_lvb_data;
955                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
956
957                 lock_res_and_lock(lock);
958                 ll_inode_size_lock(inode, 1);
959                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
960                 kms = ldlm_extent_shift_kms(NULL, kms);
961                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
962                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
963                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
964                 lsm->lsm_oinfo[stripe].loi_kms = kms;
965                 ll_inode_size_unlock(inode, 1);
966                 unlock_res_and_lock(lock);
967         }
968
969 iput:
970         iput(inode);
971         wake_up(&lock->l_waitq);
972
973         ldlm_lock2handle(lock, &lockh);
974         ldlm_lock_decref(&lockh, LCK_PR);
975         RETURN(0);
976 }
977 #endif
978
979 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
980 {
981         struct ptlrpc_request *req = reqp;
982         struct inode *inode = ll_inode_from_lock(lock);
983         struct ll_inode_info *lli;
984         struct lov_stripe_md *lsm;
985         struct ost_lvb *lvb;
986         int rc, stripe;
987         ENTRY;
988
989         if (inode == NULL)
990                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
991         lli = ll_i2info(inode);
992         if (lli == NULL)
993                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
994         lsm = lli->lli_smd;
995         if (lsm == NULL)
996                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
997
998         /* First, find out which stripe index this lock corresponds to. */
999         stripe = ll_lock_to_stripe_offset(inode, lock);
1000         if (stripe < 0)
1001                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1002
1003         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
1004         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
1005                              sizeof(*lvb));
1006         rc = req_capsule_server_pack(&req->rq_pill);
1007         if (rc) {
1008                 CERROR("lustre_pack_reply: %d\n", rc);
1009                 GOTO(iput, rc);
1010         }
1011
1012         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
1013         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1014         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1015         lvb->lvb_atime = LTIME_S(inode->i_atime);
1016         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1017
1018         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1019                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1020                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1021                    lvb->lvb_atime, lvb->lvb_ctime);
1022  iput:
1023         iput(inode);
1024
1025  out:
1026         /* These errors are normal races, so we don't want to fill the console
1027          * with messages by calling ptlrpc_error() */
1028         if (rc == -ELDLM_NO_LOCK_DATA)
1029                 lustre_pack_reply(req, 1, NULL, NULL);
1030
1031         req->rq_status = rc;
1032         return rc;
1033 }
1034
1035 static int ll_merge_lvb(struct inode *inode)
1036 {
1037         struct ll_inode_info *lli = ll_i2info(inode);
1038         struct ll_sb_info *sbi = ll_i2sbi(inode);
1039         struct ost_lvb lvb;
1040         int rc;
1041
1042         ENTRY;
1043
1044         ll_inode_size_lock(inode, 1);
1045         inode_init_lvb(inode, &lvb);
1046         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1047         i_size_write(inode, lvb.lvb_size);
1048         inode->i_blocks = lvb.lvb_blocks;
1049
1050         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1051         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1052         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1053         ll_inode_size_unlock(inode, 1);
1054
1055         RETURN(rc);
1056 }
1057
1058 int ll_local_size(struct inode *inode)
1059 {
1060         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1061         struct ll_inode_info *lli = ll_i2info(inode);
1062         struct ll_sb_info *sbi = ll_i2sbi(inode);
1063         struct lustre_handle lockh = { 0 };
1064         int flags = 0;
1065         int rc;
1066         ENTRY;
1067
1068         if (lli->lli_smd->lsm_stripe_count == 0)
1069                 RETURN(0);
1070
1071         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1072                        &policy, LCK_PR, &flags, inode, &lockh);
1073         if (rc < 0)
1074                 RETURN(rc);
1075         else if (rc == 0)
1076                 RETURN(-ENODATA);
1077
1078         rc = ll_merge_lvb(inode);
1079         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1080         RETURN(rc);
1081 }
1082
1083 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1084                      lstat_t *st)
1085 {
1086         struct lustre_handle lockh = { 0 };
1087         struct ldlm_enqueue_info einfo = { 0 };
1088         struct obd_info oinfo = { { { 0 } } };
1089         struct ost_lvb lvb;
1090         int rc;
1091
1092         ENTRY;
1093
1094         einfo.ei_type = LDLM_EXTENT;
1095         einfo.ei_mode = LCK_PR;
1096         einfo.ei_cb_bl = osc_extent_blocking_cb;
1097         einfo.ei_cb_cp = ldlm_completion_ast;
1098         einfo.ei_cb_gl = ll_glimpse_callback;
1099         einfo.ei_cbdata = NULL;
1100
1101         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1102         oinfo.oi_lockh = &lockh;
1103         oinfo.oi_md = lsm;
1104         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1105
1106         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1107         if (rc == -ENOENT)
1108                 RETURN(rc);
1109         if (rc != 0) {
1110                 CERROR("obd_enqueue returned rc %d, "
1111                        "returning -EIO\n", rc);
1112                 RETURN(rc > 0 ? -EIO : rc);
1113         }
1114
1115         lov_stripe_lock(lsm);
1116         memset(&lvb, 0, sizeof(lvb));
1117         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1118         st->st_size = lvb.lvb_size;
1119         st->st_blocks = lvb.lvb_blocks;
1120         st->st_mtime = lvb.lvb_mtime;
1121         st->st_atime = lvb.lvb_atime;
1122         st->st_ctime = lvb.lvb_ctime;
1123         lov_stripe_unlock(lsm);
1124
1125         RETURN(rc);
1126 }
1127
1128 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1129  * file (because it prefers KMS over RSS when larger) */
1130 int ll_glimpse_size(struct inode *inode, int ast_flags)
1131 {
1132         struct ll_inode_info *lli = ll_i2info(inode);
1133         struct ll_sb_info *sbi = ll_i2sbi(inode);
1134         struct lustre_handle lockh = { 0 };
1135         struct ldlm_enqueue_info einfo = { 0 };
1136         struct obd_info oinfo = { { { 0 } } };
1137         int rc;
1138         ENTRY;
1139
1140         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1141                 RETURN(0);
1142
1143         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1144
1145         if (!lli->lli_smd) {
1146                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1147                 RETURN(0);
1148         }
1149
1150         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1151          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1152          *       won't revoke any conflicting DLM locks held. Instead,
1153          *       ll_glimpse_callback() will be called on each client
1154          *       holding a DLM lock against this file, and resulting size
1155          *       will be returned for each stripe. DLM lock on [0, EOF] is
1156          *       acquired only if there were no conflicting locks. */
1157         einfo.ei_type = LDLM_EXTENT;
1158         einfo.ei_mode = LCK_PR;
1159         einfo.ei_cb_bl = osc_extent_blocking_cb;
1160         einfo.ei_cb_cp = ldlm_completion_ast;
1161         einfo.ei_cb_gl = ll_glimpse_callback;
1162         einfo.ei_cbdata = inode;
1163
1164         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1165         oinfo.oi_lockh = &lockh;
1166         oinfo.oi_md = lli->lli_smd;
1167         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1168
1169         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1170         if (rc == -ENOENT)
1171                 RETURN(rc);
1172         if (rc != 0) {
1173                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1174                 RETURN(rc > 0 ? -EIO : rc);
1175         }
1176
1177         rc = ll_merge_lvb(inode);
1178
1179         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1180                i_size_read(inode), (unsigned long long)inode->i_blocks);
1181
1182         RETURN(rc);
1183 }
1184
1185 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1186                    struct lov_stripe_md *lsm, int mode,
1187                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1188                    int ast_flags)
1189 {
1190         struct ll_sb_info *sbi = ll_i2sbi(inode);
1191         struct ost_lvb lvb;
1192         struct ldlm_enqueue_info einfo = { 0 };
1193         struct obd_info oinfo = { { { 0 } } };
1194         int rc;
1195         ENTRY;
1196
1197         LASSERT(!lustre_handle_is_used(lockh));
1198         LASSERT(lsm != NULL);
1199
1200         /* don't drop the mmapped file to LRU */
1201         if (mapping_mapped(inode->i_mapping))
1202                 ast_flags |= LDLM_FL_NO_LRU;
1203
1204         /* XXX phil: can we do this?  won't it screw the file size up? */
1205         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1206             (sbi->ll_flags & LL_SBI_NOLCK))
1207                 RETURN(0);
1208
1209         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1210                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1211
1212         einfo.ei_type = LDLM_EXTENT;
1213         einfo.ei_mode = mode;
1214         einfo.ei_cb_bl = osc_extent_blocking_cb;
1215         einfo.ei_cb_cp = ldlm_completion_ast;
1216         einfo.ei_cb_gl = ll_glimpse_callback;
1217         einfo.ei_cbdata = inode;
1218
1219         oinfo.oi_policy = *policy;
1220         oinfo.oi_lockh = lockh;
1221         oinfo.oi_md = lsm;
1222         oinfo.oi_flags = ast_flags;
1223
1224         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1225         *policy = oinfo.oi_policy;
1226         if (rc > 0)
1227                 rc = -EIO;
1228
1229         ll_inode_size_lock(inode, 1);
1230         inode_init_lvb(inode, &lvb);
1231         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1232
1233         if (policy->l_extent.start == 0 &&
1234             policy->l_extent.end == OBD_OBJECT_EOF) {
1235                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1236                  * the kms under both a DLM lock and the
1237                  * ll_inode_size_lock().  If we don't get the
1238                  * ll_inode_size_lock() here we can match the DLM lock and
1239                  * reset i_size from the kms before the truncating path has
1240                  * updated the kms.  generic_file_write can then trust the
1241                  * stale i_size when doing appending writes and effectively
1242                  * cancel the result of the truncate.  Getting the
1243                  * ll_inode_size_lock() after the enqueue maintains the DLM
1244                  * -> ll_inode_size_lock() acquiring order. */
1245                 i_size_write(inode, lvb.lvb_size);
1246                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1247                        inode->i_ino, i_size_read(inode));
1248         }
1249
1250         if (rc == 0) {
1251                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1252                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1253                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1254         }
1255         ll_inode_size_unlock(inode, 1);
1256
1257         RETURN(rc);
1258 }
1259
1260 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1261                      struct lov_stripe_md *lsm, int mode,
1262                      struct lustre_handle *lockh)
1263 {
1264         struct ll_sb_info *sbi = ll_i2sbi(inode);
1265         int rc;
1266         ENTRY;
1267
1268         /* XXX phil: can we do this?  won't it screw the file size up? */
1269         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1270             (sbi->ll_flags & LL_SBI_NOLCK))
1271                 RETURN(0);
1272
1273         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1274
1275         RETURN(rc);
1276 }
1277
1278 static void ll_set_file_contended(struct inode *inode)
1279 {
1280         struct ll_inode_info *lli = ll_i2info(inode);
1281         cfs_time_t now = cfs_time_current();
1282
1283         spin_lock(&lli->lli_lock);
1284         lli->lli_contention_time = now;
1285         lli->lli_flags |= LLIF_CONTENDED;
1286         spin_unlock(&lli->lli_lock);
1287 }
1288
1289 void ll_clear_file_contended(struct inode *inode)
1290 {
1291         struct ll_inode_info *lli = ll_i2info(inode);
1292
1293         spin_lock(&lli->lli_lock);
1294         lli->lli_flags &= ~LLIF_CONTENDED;
1295         spin_unlock(&lli->lli_lock);
1296 }
1297
1298 static int ll_is_file_contended(struct file *file)
1299 {
1300         struct inode *inode = file->f_dentry->d_inode;
1301         struct ll_inode_info *lli = ll_i2info(inode);
1302         struct ll_sb_info *sbi = ll_i2sbi(inode);
1303         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1304         ENTRY;
1305
1306         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1307                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1308                        " osc connect flags = 0x"LPX64"\n",
1309                        sbi->ll_lco.lco_flags);
1310                 RETURN(0);
1311         }
1312         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1313                 RETURN(1);
1314         if (lli->lli_flags & LLIF_CONTENDED) {
1315                 cfs_time_t cur_time = cfs_time_current();
1316                 cfs_time_t retry_time;
1317
1318                 retry_time = cfs_time_add(
1319                         lli->lli_contention_time,
1320                         cfs_time_seconds(sbi->ll_contention_time));
1321                 if (cfs_time_after(cur_time, retry_time)) {
1322                         ll_clear_file_contended(inode);
1323                         RETURN(0);
1324                 }
1325                 RETURN(1);
1326         }
1327         RETURN(0);
1328 }
1329
1330 static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
1331                                  const char *buf, size_t count,
1332                                  loff_t start, loff_t end, int rw)
1333 {
1334         int append;
1335         int tree_locked = 0;
1336         int rc;
1337         struct inode * inode = file->f_dentry->d_inode;
1338         ENTRY;
1339
1340         append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
1341
1342         if (append || !ll_is_file_contended(file)) {
1343                 struct ll_lock_tree_node *node;
1344                 int ast_flags;
1345
1346                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1347                 if (file->f_flags & O_NONBLOCK)
1348                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1349                 node = ll_node_from_inode(inode, start, end,
1350                                           (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
1351                 if (IS_ERR(node)) {
1352                         rc = PTR_ERR(node);
1353                         GOTO(out, rc);
1354                 }
1355                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1356                 rc = ll_tree_lock(tree, node, buf, count, ast_flags);
1357                 if (rc == 0)
1358                         tree_locked = 1;
1359                 else if (rc == -EUSERS)
1360                         ll_set_file_contended(inode);
1361                 else
1362                         GOTO(out, rc);
1363         }
1364         RETURN(tree_locked);
1365 out:
1366         return rc;
1367 }
1368
1369 /**
1370  * Checks if requested extent lock is compatible with a lock under a page.
1371  *
1372  * Checks if the lock under \a page is compatible with a read or write lock
1373  * (specified by \a rw) for an extent [\a start , \a end].
1374  *
1375  * \param page the page under which lock is considered
1376  * \param rw OBD_BRW_READ if requested for reading,
1377  *           OBD_BRW_WRITE if requested for writing
1378  * \param start start of the requested extent
1379  * \param end end of the requested extent
1380  * \param cookie transparent parameter for passing locking context
1381  *
1382  * \post result == 1, *cookie == context, appropriate lock is referenced or
1383  * \post result == 0
1384  *
1385  * \retval 1 owned lock is reused for the request
1386  * \retval 0 no lock reused for the request
1387  *
1388  * \see ll_release_short_lock
1389  */
1390 static int ll_reget_short_lock(struct page *page, int rw,
1391                                obd_off start, obd_off end,
1392                                void **cookie)
1393 {
1394         struct ll_async_page *llap;
1395         struct obd_export *exp;
1396         struct inode *inode = page->mapping->host;
1397
1398         ENTRY;
1399
1400         exp = ll_i2dtexp(inode);
1401         if (exp == NULL)
1402                 RETURN(0);
1403
1404         llap = llap_cast_private(page);
1405         if (llap == NULL)
1406                 RETURN(0);
1407
1408         RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
1409                                     &llap->llap_cookie, rw, start, end,
1410                                     cookie));
1411 }
1412
1413 /**
1414  * Releases a reference to a lock taken in a "fast" way.
1415  *
1416  * Releases a read or a write (specified by \a rw) lock
1417  * referenced by \a cookie.
1418  *
1419  * \param inode inode to which data belong
1420  * \param end end of the locked extent
1421  * \param rw OBD_BRW_READ if requested for reading,
1422  *           OBD_BRW_WRITE if requested for writing
1423  * \param cookie transparent parameter for passing locking context
1424  *
1425  * \post appropriate lock is dereferenced
1426  *
1427  * \see ll_reget_short_lock
1428  */
1429 static void ll_release_short_lock(struct inode *inode, obd_off end,
1430                                   void *cookie, int rw)
1431 {
1432         struct obd_export *exp;
1433         int rc;
1434
1435         exp = ll_i2dtexp(inode);
1436         if (exp == NULL)
1437                 return;
1438
1439         rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
1440                                     cookie, rw);
1441         if (rc < 0)
1442                 CERROR("unlock failed (%d)\n", rc);
1443 }
1444
1445 /**
1446  * Checks if requested extent lock is compatible
1447  * with a lock under a page in page cache.
1448  *
1449  * Checks if a lock under some \a page is compatible with a read or write lock
1450  * (specified by \a rw) for an extent [\a start , \a end].
1451  *
1452  * \param file the file under which lock is considered
1453  * \param rw OBD_BRW_READ if requested for reading,
1454  *           OBD_BRW_WRITE if requested for writing
1455  * \param ppos start of the requested extent
1456  * \param end end of the requested extent
1457  * \param cookie transparent parameter for passing locking context
1458  * \param buf userspace buffer for the data
1459  *
1460  * \post result == 1, *cookie == context, appropriate lock is referenced
1461  * \post retuls == 0
1462  *
1463  * \retval 1 owned lock is reused for the request
1464  * \retval 0 no lock reused for the request
1465  *
1466  * \see ll_file_put_fast_lock
1467  */
1468 static inline int ll_file_get_fast_lock(struct file *file,
1469                                         obd_off ppos, obd_off end,
1470                                         char *buf, void **cookie, int rw)
1471 {
1472         int rc = 0;
1473         struct page *page;
1474
1475         ENTRY;
1476
1477         if (!ll_region_mapped((unsigned long)buf, end - ppos)) {
1478                 page = find_lock_page(file->f_dentry->d_inode->i_mapping,
1479                                       ppos >> CFS_PAGE_SHIFT);
1480                 if (page) {
1481                         if (ll_reget_short_lock(page, rw, ppos, end, cookie))
1482                                 rc = 1;
1483
1484                         unlock_page(page);
1485                         page_cache_release(page);
1486                 }
1487         }
1488
1489         RETURN(rc);
1490 }
1491
1492 /**
1493  * Releases a reference to a lock taken in a "fast" way.
1494  *
1495  * Releases a read or a write (specified by \a rw) lock
1496  * referenced by \a cookie.
1497  *
1498  * \param inode inode to which data belong
1499  * \param end end of the locked extent
1500  * \param rw OBD_BRW_READ if requested for reading,
1501  *           OBD_BRW_WRITE if requested for writing
1502  * \param cookie transparent parameter for passing locking context
1503  *
1504  * \post appropriate lock is dereferenced
1505  *
1506  * \see ll_file_get_fast_lock
1507  */
1508 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
1509                                          void *cookie, int rw)
1510 {
1511         ll_release_short_lock(inode, end, cookie, rw);
1512 }
1513
1514 enum ll_lock_style {
1515         LL_LOCK_STYLE_NOLOCK   = 0,
1516         LL_LOCK_STYLE_FASTLOCK = 1,
1517         LL_LOCK_STYLE_TREELOCK = 2
1518 };
1519
1520 /**
1521  * Checks if requested extent lock is compatible with a lock 
1522  * under a page cache page.
1523  *
1524  * Checks if the lock under \a page is compatible with a read or write lock
1525  * (specified by \a rw) for an extent [\a start , \a end].
1526  *
1527  * \param file file under which I/O is processed
1528  * \param rw OBD_BRW_READ if requested for reading,
1529  *           OBD_BRW_WRITE if requested for writing
1530  * \param ppos start of the requested extent
1531  * \param end end of the requested extent
1532  * \param cookie transparent parameter for passing locking context
1533  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1534  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1535  * \param buf userspace buffer for the data
1536  *
1537  * \retval LL_LOCK_STYLE_FASTLOCK owned lock is reused through fast lock
1538  * \retval LL_LOCK_STYLE_TREELOCK got a lock through tree lock
1539  * \retval LL_LOCK_STYLE_NOLOCK got no lock
1540  *
1541  * \see ll_file_put_lock
1542  */
1543 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
1544                                    obd_off end, char *buf, void **cookie,
1545                                    struct ll_lock_tree *tree, int rw)
1546 {
1547         int rc;
1548
1549         ENTRY;
1550
1551         if (ll_file_get_fast_lock(file, ppos, end, buf, cookie, rw))
1552                 RETURN(LL_LOCK_STYLE_FASTLOCK);
1553
1554         rc = ll_file_get_tree_lock(tree, file, buf, ppos - end, ppos, end, rw);
1555         /* rc: 1 for tree lock, 0 for no lock, <0 for error */
1556         switch (rc) {
1557         case 1:
1558                 RETURN(LL_LOCK_STYLE_TREELOCK);
1559         case 0:
1560                 RETURN(LL_LOCK_STYLE_NOLOCK);
1561         }
1562
1563         /* an error happened if we reached this point, rc = -errno here */
1564         RETURN(rc);
1565 }
1566
1567 /**
1568  * Drops the lock taken by ll_file_get_lock.
1569  *
1570  * Releases a read or a write (specified by \a rw) lock
1571  * referenced by \a tree or \a cookie.
1572  *
1573  * \param inode inode to which data belong
1574  * \param end end of the locked extent
1575  * \param lockstyle facility through which the lock was taken
1576  * \param rw OBD_BRW_READ if requested for reading,
1577  *           OBD_BRW_WRITE if requested for writing
1578  * \param cookie transparent parameter for passing locking context
1579  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1580  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1581  *
1582  * \post appropriate lock is dereferenced
1583  *
1584  * \see ll_file_get_lock
1585  */
1586 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
1587                                     enum ll_lock_style lock_style,
1588                                     void *cookie, struct ll_lock_tree *tree,
1589                                     int rw)
1590
1591 {
1592         switch (lock_style) {
1593         case LL_LOCK_STYLE_TREELOCK:
1594                 ll_tree_unlock(tree);
1595                 break;
1596         case LL_LOCK_STYLE_FASTLOCK:
1597                 ll_file_put_fast_lock(inode, end, cookie, rw);
1598                 break;
1599         default:
1600                 CERROR("invalid locking style (%d)\n", lock_style);
1601         }
1602 }
1603
1604 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1605                             loff_t *ppos)
1606 {
1607         struct inode *inode = file->f_dentry->d_inode;
1608         struct ll_inode_info *lli = ll_i2info(inode);
1609         struct lov_stripe_md *lsm = lli->lli_smd;
1610         struct ll_sb_info *sbi = ll_i2sbi(inode);
1611         struct ll_lock_tree tree;
1612         struct ost_lvb lvb;
1613         struct ll_ra_read bead;
1614         int ra = 0;
1615         obd_off end;
1616         ssize_t retval, chunk, sum = 0;
1617         int lock_style;
1618         void *cookie;
1619
1620         __u64 kms;
1621         ENTRY;
1622         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1623                inode->i_ino, inode->i_generation, inode, count, *ppos);
1624         /* "If nbyte is 0, read() will return 0 and have no other results."
1625          *                      -- Single Unix Spec */
1626         if (count == 0)
1627                 RETURN(0);
1628
1629         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1630
1631         if (!lsm) {
1632                 /* Read on file with no objects should return zero-filled
1633                  * buffers up to file size (we can get non-zero sizes with
1634                  * mknod + truncate, then opening file for read. This is a
1635                  * common pattern in NFS case, it seems). Bug 6243 */
1636                 int notzeroed;
1637                 /* Since there are no objects on OSTs, we have nothing to get
1638                  * lock on and so we are forced to access inode->i_size
1639                  * unguarded */
1640
1641                 /* Read beyond end of file */
1642                 if (*ppos >= i_size_read(inode))
1643                         RETURN(0);
1644
1645                 if (count > i_size_read(inode) - *ppos)
1646                         count = i_size_read(inode) - *ppos;
1647                 /* Make sure to correctly adjust the file pos pointer for
1648                  * EFAULT case */
1649                 notzeroed = clear_user(buf, count);
1650                 count -= notzeroed;
1651                 *ppos += count;
1652                 if (!count)
1653                         RETURN(-EFAULT);
1654                 RETURN(count);
1655         }
1656 repeat:
1657         if (sbi->ll_max_rw_chunk != 0) {
1658                 /* first, let's know the end of the current stripe */
1659                 end = *ppos;
1660                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, &end);
1661
1662                 /* correct, the end is beyond the request */
1663                 if (end > *ppos + count - 1)
1664                         end = *ppos + count - 1;
1665
1666                 /* and chunk shouldn't be too large even if striping is wide */
1667                 if (end - *ppos > sbi->ll_max_rw_chunk)
1668                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1669         } else {
1670                 end = *ppos + count - 1;
1671         }
1672
1673         lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
1674                                       buf, &cookie, &tree, OBD_BRW_READ);
1675         if (lock_style < 0)
1676                 GOTO(out, retval = lock_style);
1677
1678         ll_inode_size_lock(inode, 1);
1679         /*
1680          * Consistency guarantees: following possibilities exist for the
1681          * relation between region being read and real file size at this
1682          * moment:
1683          *
1684          *  (A): the region is completely inside of the file;
1685          *
1686          *  (B-x): x bytes of region are inside of the file, the rest is
1687          *  outside;
1688          *
1689          *  (C): the region is completely outside of the file.
1690          *
1691          * This classification is stable under DLM lock acquired by
1692          * ll_tree_lock() above, because to change class, other client has to
1693          * take DLM lock conflicting with our lock. Also, any updates to
1694          * ->i_size by other threads on this client are serialized by
1695          * ll_inode_size_lock(). This guarantees that short reads are handled
1696          * correctly in the face of concurrent writes and truncates.
1697          */
1698         inode_init_lvb(inode, &lvb);
1699         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1700         kms = lvb.lvb_size;
1701         if (*ppos + count - 1 > kms) {
1702                 /* A glimpse is necessary to determine whether we return a
1703                  * short read (B) or some zeroes at the end of the buffer (C) */
1704                 ll_inode_size_unlock(inode, 1);
1705                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1706                 if (retval) {
1707                         if (lock_style != LL_LOCK_STYLE_NOLOCK)
1708                                 ll_file_put_lock(inode, end, lock_style,
1709                                                  cookie, &tree, OBD_BRW_READ);
1710                         goto out;
1711                 }
1712         } else {
1713                 /* region is within kms and, hence, within real file size (A).
1714                  * We need to increase i_size to cover the read region so that
1715                  * generic_file_read() will do its job, but that doesn't mean
1716                  * the kms size is _correct_, it is only the _minimum_ size.
1717                  * If someone does a stat they will get the correct size which
1718                  * will always be >= the kms value here.  b=11081 */
1719                 if (i_size_read(inode) < kms)
1720                         i_size_write(inode, kms);
1721                 ll_inode_size_unlock(inode, 1);
1722         }
1723
1724         chunk = end - *ppos + 1;
1725         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1726                inode->i_ino, chunk, *ppos, i_size_read(inode));
1727
1728         if (lock_style != LL_LOCK_STYLE_NOLOCK) {
1729                 /* turn off the kernel's read-ahead */
1730                 file->f_ra.ra_pages = 0;
1731
1732                 /* initialize read-ahead window once per syscall */
1733                 if (ra == 0) {
1734                         ra = 1;
1735                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1736                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1737                         ll_ra_read_in(file, &bead);
1738                 }
1739
1740                 /* BUG: 5972 */
1741                 file_accessed(file);
1742                 retval = generic_file_read(file, buf, chunk, ppos);
1743                 ll_file_put_lock(inode, end, lock_style, cookie, &tree, 
1744                                  OBD_BRW_READ);
1745         } else {
1746                 retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
1747         }
1748
1749         ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
1750
1751         if (retval > 0) {
1752                 buf += retval;
1753                 count -= retval;
1754                 sum += retval;
1755                 if (retval == chunk && count > 0)
1756                         goto repeat;
1757         }
1758
1759  out:
1760         if (ra != 0)
1761                 ll_ra_read_ex(file, &bead);
1762         retval = (sum > 0) ? sum : retval;
1763         RETURN(retval);
1764 }
1765
1766 /*
1767  * Write to a file (through the page cache).
1768  */
1769 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1770                              loff_t *ppos)
1771 {
1772         struct inode *inode = file->f_dentry->d_inode;
1773         struct ll_sb_info *sbi = ll_i2sbi(inode);
1774         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1775         struct ll_lock_tree tree;
1776         loff_t maxbytes = ll_file_maxbytes(inode);
1777         loff_t lock_start, lock_end, end;
1778         ssize_t retval, chunk, sum = 0;
1779         int tree_locked;
1780         ENTRY;
1781
1782         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1783                inode->i_ino, inode->i_generation, inode, count, *ppos);
1784
1785         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1786
1787         /* POSIX, but surprised the VFS doesn't check this already */
1788         if (count == 0)
1789                 RETURN(0);
1790
1791         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1792          * called on the file, don't fail the below assertion (bug 2388). */
1793         if (file->f_flags & O_LOV_DELAY_CREATE &&
1794             ll_i2info(inode)->lli_smd == NULL)
1795                 RETURN(-EBADF);
1796
1797         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1798
1799         down(&ll_i2info(inode)->lli_write_sem);
1800
1801 repeat:
1802         chunk = 0; /* just to fix gcc's warning */
1803         end = *ppos + count - 1;
1804
1805         if (file->f_flags & O_APPEND) {
1806                 lock_start = 0;
1807                 lock_end = OBD_OBJECT_EOF;
1808         } else if (sbi->ll_max_rw_chunk != 0) {
1809                 /* first, let's know the end of the current stripe */
1810                 end = *ppos;
1811                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1812                                 (obd_off *)&end);
1813
1814                 /* correct, the end is beyond the request */
1815                 if (end > *ppos + count - 1)
1816                         end = *ppos + count - 1;
1817
1818                 /* and chunk shouldn't be too large even if striping is wide */
1819                 if (end - *ppos > sbi->ll_max_rw_chunk)
1820                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1821                 lock_start = *ppos;
1822                 lock_end = end;
1823         } else {
1824                 lock_start = *ppos;
1825                 lock_end = *ppos + count - 1;
1826         }
1827
1828         tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
1829                                             lock_start, lock_end, OBD_BRW_WRITE);
1830         if (tree_locked < 0)
1831                 GOTO(out, retval = tree_locked);
1832
1833         /* This is ok, g_f_w will overwrite this under i_sem if it races
1834          * with a local truncate, it just makes our maxbyte checking easier.
1835          * The i_size value gets updated in ll_extent_lock() as a consequence
1836          * of the [0,EOF] extent lock we requested above. */
1837         if (file->f_flags & O_APPEND) {
1838                 *ppos = i_size_read(inode);
1839                 end = *ppos + count - 1;
1840         }
1841
1842         if (*ppos >= maxbytes) {
1843                 send_sig(SIGXFSZ, current, 0);
1844                 GOTO(out_unlock, retval = -EFBIG);
1845         }
1846         if (end > maxbytes - 1)
1847                 end = maxbytes - 1;
1848
1849         /* generic_file_write handles O_APPEND after getting i_mutex */
1850         chunk = end - *ppos + 1;
1851         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1852                inode->i_ino, chunk, *ppos);
1853         if (tree_locked)
1854                 retval = generic_file_write(file, buf, chunk, ppos);
1855         else
1856                 retval = ll_file_lockless_io(file, (char*)buf, chunk,
1857                                              ppos, WRITE);
1858         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1859
1860 out_unlock:
1861         if (tree_locked)
1862                 ll_tree_unlock(&tree);
1863
1864 out:
1865         if (retval > 0) {
1866                 buf += retval;
1867                 count -= retval;
1868                 sum += retval;
1869                 if (retval == chunk && count > 0)
1870                         goto repeat;
1871         }
1872
1873         up(&ll_i2info(inode)->lli_write_sem);
1874
1875         retval = (sum > 0) ? sum : retval;
1876         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1877                            retval > 0 ? retval : 0);
1878         RETURN(retval);
1879 }
1880
1881 /*
1882  * Send file content (through pagecache) somewhere with helper
1883  */
1884 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1885                                 read_actor_t actor, void *target)
1886 {
1887         struct inode *inode = in_file->f_dentry->d_inode;
1888         struct ll_inode_info *lli = ll_i2info(inode);
1889         struct lov_stripe_md *lsm = lli->lli_smd;
1890         struct ll_lock_tree tree;
1891         struct ll_lock_tree_node *node;
1892         struct ost_lvb lvb;
1893         struct ll_ra_read bead;
1894         int rc;
1895         ssize_t retval;
1896         __u64 kms;
1897         ENTRY;
1898         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1899                inode->i_ino, inode->i_generation, inode, count, *ppos);
1900
1901         /* "If nbyte is 0, read() will return 0 and have no other results."
1902          *                      -- Single Unix Spec */
1903         if (count == 0)
1904                 RETURN(0);
1905
1906         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1907         /* turn off the kernel's read-ahead */
1908         in_file->f_ra.ra_pages = 0;
1909
1910         /* File with no objects, nothing to lock */
1911         if (!lsm)
1912                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1913
1914         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1915         if (IS_ERR(node))
1916                 RETURN(PTR_ERR(node));
1917
1918         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1919         rc = ll_tree_lock(&tree, node, NULL, count,
1920                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1921         if (rc != 0)
1922                 RETURN(rc);
1923
1924         ll_clear_file_contended(inode);
1925         ll_inode_size_lock(inode, 1);
1926         /*
1927          * Consistency guarantees: following possibilities exist for the
1928          * relation between region being read and real file size at this
1929          * moment:
1930          *
1931          *  (A): the region is completely inside of the file;
1932          *
1933          *  (B-x): x bytes of region are inside of the file, the rest is
1934          *  outside;
1935          *
1936          *  (C): the region is completely outside of the file.
1937          *
1938          * This classification is stable under DLM lock acquired by
1939          * ll_tree_lock() above, because to change class, other client has to
1940          * take DLM lock conflicting with our lock. Also, any updates to
1941          * ->i_size by other threads on this client are serialized by
1942          * ll_inode_size_lock(). This guarantees that short reads are handled
1943          * correctly in the face of concurrent writes and truncates.
1944          */
1945         inode_init_lvb(inode, &lvb);
1946         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1947         kms = lvb.lvb_size;
1948         if (*ppos + count - 1 > kms) {
1949                 /* A glimpse is necessary to determine whether we return a
1950                  * short read (B) or some zeroes at the end of the buffer (C) */
1951                 ll_inode_size_unlock(inode, 1);
1952                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1953                 if (retval)
1954                         goto out;
1955         } else {
1956                 /* region is within kms and, hence, within real file size (A) */
1957                 i_size_write(inode, kms);
1958                 ll_inode_size_unlock(inode, 1);
1959         }
1960
1961         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1962                inode->i_ino, count, *ppos, i_size_read(inode));
1963
1964         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1965         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1966         ll_ra_read_in(in_file, &bead);
1967         /* BUG: 5972 */
1968         file_accessed(in_file);
1969         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1970         ll_ra_read_ex(in_file, &bead);
1971
1972  out:
1973         ll_tree_unlock(&tree);
1974         RETURN(retval);
1975 }
1976
1977 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1978                                unsigned long arg)
1979 {
1980         struct ll_inode_info *lli = ll_i2info(inode);
1981         struct obd_export *exp = ll_i2dtexp(inode);
1982         struct ll_recreate_obj ucreatp;
1983         struct obd_trans_info oti = { 0 };
1984         struct obdo *oa = NULL;
1985         int lsm_size;
1986         int rc = 0;
1987         struct lov_stripe_md *lsm, *lsm2;
1988         ENTRY;
1989
1990         if (!capable (CAP_SYS_ADMIN))
1991                 RETURN(-EPERM);
1992
1993         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1994                             sizeof(struct ll_recreate_obj));
1995         if (rc) {
1996                 RETURN(-EFAULT);
1997         }
1998         OBDO_ALLOC(oa);
1999         if (oa == NULL)
2000                 RETURN(-ENOMEM);
2001
2002         down(&lli->lli_size_sem);
2003         lsm = lli->lli_smd;
2004         if (lsm == NULL)
2005                 GOTO(out, rc = -ENOENT);
2006         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
2007                    (lsm->lsm_stripe_count));
2008
2009         OBD_ALLOC(lsm2, lsm_size);
2010         if (lsm2 == NULL)
2011                 GOTO(out, rc = -ENOMEM);
2012
2013         oa->o_id = ucreatp.lrc_id;
2014         oa->o_gr = ucreatp.lrc_group;
2015         oa->o_nlink = ucreatp.lrc_ost_idx;
2016         oa->o_flags |= OBD_FL_RECREATE_OBJS;
2017         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
2018         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2019                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2020
2021         memcpy(lsm2, lsm, lsm_size);
2022         rc = obd_create(exp, oa, &lsm2, &oti);
2023
2024         OBD_FREE(lsm2, lsm_size);
2025         GOTO(out, rc);
2026 out:
2027         up(&lli->lli_size_sem);
2028         OBDO_FREE(oa);
2029         return rc;
2030 }
2031
2032 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
2033                              int flags, struct lov_user_md *lum, int lum_size)
2034 {
2035         struct ll_inode_info *lli = ll_i2info(inode);
2036         struct lov_stripe_md *lsm;
2037         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
2038         int rc = 0;
2039         ENTRY;
2040
2041         down(&lli->lli_size_sem);
2042         lsm = lli->lli_smd;
2043         if (lsm) {
2044                 up(&lli->lli_size_sem);
2045                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
2046                        inode->i_ino);
2047                 RETURN(-EEXIST);
2048         }
2049
2050         rc = ll_intent_file_open(file, lum, lum_size, &oit);
2051         if (rc)
2052                 GOTO(out, rc);
2053         if (it_disposition(&oit, DISP_LOOKUP_NEG))
2054                 GOTO(out_req_free, rc = -ENOENT);
2055         rc = oit.d.lustre.it_status;
2056         if (rc < 0)
2057                 GOTO(out_req_free, rc);
2058
2059         ll_release_openhandle(file->f_dentry, &oit);
2060
2061  out:
2062         up(&lli->lli_size_sem);
2063         ll_intent_release(&oit);
2064         RETURN(rc);
2065 out_req_free:
2066         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2067         goto out;
2068 }
2069
2070 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
2071                              struct lov_mds_md **lmmp, int *lmm_size, 
2072                              struct ptlrpc_request **request)
2073 {
2074         struct ll_sb_info *sbi = ll_i2sbi(inode);
2075         struct mdt_body  *body;
2076         struct lov_mds_md *lmm = NULL;
2077         struct ptlrpc_request *req = NULL;
2078         struct obd_capa *oc;
2079         int rc, lmmsize;
2080
2081         rc = ll_get_max_mdsize(sbi, &lmmsize);
2082         if (rc)
2083                 RETURN(rc);
2084
2085         oc = ll_mdscapa_get(inode);
2086         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
2087                              oc, filename, strlen(filename) + 1,
2088                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
2089                              ll_i2suppgid(inode), &req);
2090         capa_put(oc);
2091         if (rc < 0) {
2092                 CDEBUG(D_INFO, "md_getattr_name failed "
2093                        "on %s: rc %d\n", filename, rc);
2094                 GOTO(out, rc);
2095         }
2096
2097         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2098         LASSERT(body != NULL); /* checked by mdc_getattr_name */
2099
2100         lmmsize = body->eadatasize;
2101
2102         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
2103                         lmmsize == 0) {
2104                 GOTO(out, rc = -ENODATA);
2105         }
2106
2107         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
2108         LASSERT(lmm != NULL);
2109
2110         /*
2111          * This is coming from the MDS, so is probably in
2112          * little endian.  We convert it to host endian before
2113          * passing it to userspace.
2114          */
2115         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
2116                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
2117                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
2118         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
2119                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
2120         }
2121
2122         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
2123                 struct lov_stripe_md *lsm;
2124                 struct lov_user_md_join *lmj;
2125                 int lmj_size, i, aindex = 0;
2126
2127                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
2128                 if (rc < 0)
2129                         GOTO(out, rc = -ENOMEM);
2130                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
2131                 if (rc)
2132                         GOTO(out_free_memmd, rc);
2133
2134                 lmj_size = sizeof(struct lov_user_md_join) +
2135                            lsm->lsm_stripe_count *
2136                            sizeof(struct lov_user_ost_data_join);
2137                 OBD_ALLOC(lmj, lmj_size);
2138                 if (!lmj)
2139                         GOTO(out_free_memmd, rc = -ENOMEM);
2140
2141                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
2142                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2143                         struct lov_extent *lex =
2144                                 &lsm->lsm_array->lai_ext_array[aindex];
2145
2146                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
2147                                 aindex ++;
2148                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
2149                                         LPU64" len %d\n", aindex, i,
2150                                         lex->le_start, (int)lex->le_len);
2151                         lmj->lmm_objects[i].l_extent_start =
2152                                 lex->le_start;
2153
2154                         if ((int)lex->le_len == -1)
2155                                 lmj->lmm_objects[i].l_extent_end = -1;
2156                         else
2157                                 lmj->lmm_objects[i].l_extent_end =
2158                                         lex->le_start + lex->le_len;
2159                         lmj->lmm_objects[i].l_object_id =
2160                                 lsm->lsm_oinfo[i]->loi_id;
2161                         lmj->lmm_objects[i].l_object_gr =
2162                                 lsm->lsm_oinfo[i]->loi_gr;
2163                         lmj->lmm_objects[i].l_ost_gen =
2164                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2165                         lmj->lmm_objects[i].l_ost_idx =
2166                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2167                 }
2168                 lmm = (struct lov_mds_md *)lmj;
2169                 lmmsize = lmj_size;
2170 out_free_memmd:
2171                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
2172         }
2173 out:
2174         *lmmp = lmm;
2175         *lmm_size = lmmsize;
2176         *request = req;
2177         return rc;
2178 }
2179
2180 static int ll_lov_setea(struct inode *inode, struct file *file,
2181                             unsigned long arg)
2182 {
2183         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2184         struct lov_user_md  *lump;
2185         int lum_size = sizeof(struct lov_user_md) +
2186                        sizeof(struct lov_user_ost_data);
2187         int rc;
2188         ENTRY;
2189
2190         if (!capable (CAP_SYS_ADMIN))
2191                 RETURN(-EPERM);
2192
2193         OBD_ALLOC(lump, lum_size);
2194         if (lump == NULL) {
2195                 RETURN(-ENOMEM);
2196         }
2197         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2198         if (rc) {
2199                 OBD_FREE(lump, lum_size);
2200                 RETURN(-EFAULT);
2201         }
2202
2203         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2204
2205         OBD_FREE(lump, lum_size);
2206         RETURN(rc);
2207 }
2208
2209 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2210                             unsigned long arg)
2211 {
2212         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
2213         int rc;
2214         int flags = FMODE_WRITE;
2215         ENTRY;
2216
2217         /* Bug 1152: copy properly when this is no longer true */
2218         LASSERT(sizeof(lum) == sizeof(*lump));
2219         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
2220         rc = copy_from_user(&lum, lump, sizeof(lum));
2221         if (rc)
2222                 RETURN(-EFAULT);
2223
2224         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
2225         if (rc == 0) {
2226                  put_user(0, &lump->lmm_stripe_count);
2227                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
2228                                     0, ll_i2info(inode)->lli_smd, lump);
2229         }
2230         RETURN(rc);
2231 }
2232
2233 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2234 {
2235         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2236
2237         if (!lsm)
2238                 RETURN(-ENODATA);
2239
2240         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
2241                             (void *)arg);
2242 }
2243
2244 static int ll_get_grouplock(struct inode *inode, struct file *file,
2245                             unsigned long arg)
2246 {
2247         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2248         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2249                                                     .end = OBD_OBJECT_EOF}};
2250         struct lustre_handle lockh = { 0 };
2251         struct ll_inode_info *lli = ll_i2info(inode);
2252         struct lov_stripe_md *lsm = lli->lli_smd;
2253         int flags = 0, rc;
2254         ENTRY;
2255
2256         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2257                 RETURN(-EINVAL);
2258         }
2259
2260         policy.l_extent.gid = arg;
2261         if (file->f_flags & O_NONBLOCK)
2262                 flags = LDLM_FL_BLOCK_NOWAIT;
2263
2264         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2265         if (rc)
2266                 RETURN(rc);
2267
2268         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2269         fd->fd_gid = arg;
2270         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2271
2272         RETURN(0);
2273 }
2274
2275 static int ll_put_grouplock(struct inode *inode, struct file *file,
2276                             unsigned long arg)
2277 {
2278         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2279         struct ll_inode_info *lli = ll_i2info(inode);
2280         struct lov_stripe_md *lsm = lli->lli_smd;
2281         int rc;
2282         ENTRY;
2283
2284         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2285                 /* Ugh, it's already unlocked. */
2286                 RETURN(-EINVAL);
2287         }
2288
2289         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2290                 RETURN(-EINVAL);
2291
2292         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2293
2294         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2295         if (rc)
2296                 RETURN(rc);
2297
2298         fd->fd_gid = 0;
2299         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2300
2301         RETURN(0);
2302 }
2303
2304 static int join_sanity_check(struct inode *head, struct inode *tail)
2305 {
2306         ENTRY;
2307         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2308                 CERROR("server do not support join \n");
2309                 RETURN(-EINVAL);
2310         }
2311         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2312                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2313                        head->i_ino, tail->i_ino);
2314                 RETURN(-EINVAL);
2315         }
2316         if (head->i_ino == tail->i_ino) {
2317                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2318                 RETURN(-EINVAL);
2319         }
2320         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2321                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2322                 RETURN(-EINVAL);
2323         }
2324         RETURN(0);
2325 }
2326
2327 static int join_file(struct inode *head_inode, struct file *head_filp,
2328                      struct file *tail_filp)
2329 {
2330         struct dentry *tail_dentry = tail_filp->f_dentry;
2331         struct lookup_intent oit = {.it_op = IT_OPEN,
2332                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2333         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2334                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2335
2336         struct lustre_handle lockh;
2337         struct md_op_data *op_data;
2338         int    rc;
2339         loff_t data;
2340         ENTRY;
2341
2342         tail_dentry = tail_filp->f_dentry;
2343
2344         data = i_size_read(head_inode);
2345         op_data = ll_prep_md_op_data(NULL, head_inode,
2346                                      tail_dentry->d_parent->d_inode,
2347                                      tail_dentry->d_name.name,
2348                                      tail_dentry->d_name.len, 0,
2349                                      LUSTRE_OPC_ANY, &data);
2350         if (IS_ERR(op_data))
2351                 RETURN(PTR_ERR(op_data));
2352
2353         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2354                          op_data, &lockh, NULL, 0, 0);
2355
2356         ll_finish_md_op_data(op_data);
2357         if (rc < 0)
2358                 GOTO(out, rc);
2359
2360         rc = oit.d.lustre.it_status;
2361
2362         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2363                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2364                 ptlrpc_req_finished((struct ptlrpc_request *)
2365                                     oit.d.lustre.it_data);
2366                 GOTO(out, rc);
2367         }
2368
2369         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2370                                            * away */
2371                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2372                 oit.d.lustre.it_lock_mode = 0;
2373         }
2374         ll_release_openhandle(head_filp->f_dentry, &oit);
2375 out:
2376         ll_intent_release(&oit);
2377         RETURN(rc);
2378 }
2379
2380 static int ll_file_join(struct inode *head, struct file *filp,
2381                         char *filename_tail)
2382 {
2383         struct inode *tail = NULL, *first = NULL, *second = NULL;
2384         struct dentry *tail_dentry;
2385         struct file *tail_filp, *first_filp, *second_filp;
2386         struct ll_lock_tree first_tree, second_tree;
2387         struct ll_lock_tree_node *first_node, *second_node;
2388         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2389         int rc = 0, cleanup_phase = 0;
2390         ENTRY;
2391
2392         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2393                head->i_ino, head->i_generation, head, filename_tail);
2394
2395         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2396         if (IS_ERR(tail_filp)) {
2397                 CERROR("Can not open tail file %s", filename_tail);
2398                 rc = PTR_ERR(tail_filp);
2399                 GOTO(cleanup, rc);
2400         }
2401         tail = igrab(tail_filp->f_dentry->d_inode);
2402
2403         tlli = ll_i2info(tail);
2404         tail_dentry = tail_filp->f_dentry;
2405         LASSERT(tail_dentry);
2406         cleanup_phase = 1;
2407
2408         /*reorder the inode for lock sequence*/
2409         first = head->i_ino > tail->i_ino ? head : tail;
2410         second = head->i_ino > tail->i_ino ? tail : head;
2411         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2412         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2413
2414         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2415                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2416         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2417         if (IS_ERR(first_node)){
2418                 rc = PTR_ERR(first_node);
2419                 GOTO(cleanup, rc);
2420         }
2421         first_tree.lt_fd = first_filp->private_data;
2422         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2423         if (rc != 0)
2424                 GOTO(cleanup, rc);
2425         cleanup_phase = 2;
2426
2427         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2428         if (IS_ERR(second_node)){
2429                 rc = PTR_ERR(second_node);
2430                 GOTO(cleanup, rc);
2431         }
2432         second_tree.lt_fd = second_filp->private_data;
2433         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2434         if (rc != 0)
2435                 GOTO(cleanup, rc);
2436         cleanup_phase = 3;
2437
2438         rc = join_sanity_check(head, tail);
2439         if (rc)
2440                 GOTO(cleanup, rc);
2441
2442         rc = join_file(head, filp, tail_filp);
2443         if (rc)
2444                 GOTO(cleanup, rc);
2445 cleanup:
2446         switch (cleanup_phase) {
2447         case 3:
2448                 ll_tree_unlock(&second_tree);
2449                 obd_cancel_unused(ll_i2dtexp(second),
2450                                   ll_i2info(second)->lli_smd, 0, NULL);
2451         case 2:
2452                 ll_tree_unlock(&first_tree);
2453                 obd_cancel_unused(ll_i2dtexp(first),
2454                                   ll_i2info(first)->lli_smd, 0, NULL);
2455         case 1:
2456                 filp_close(tail_filp, 0);
2457                 if (tail)
2458                         iput(tail);
2459                 if (head && rc == 0) {
2460                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2461                                        &hlli->lli_smd);
2462                         hlli->lli_smd = NULL;
2463                 }
2464         case 0:
2465                 break;
2466         default:
2467                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2468                 LBUG();
2469         }
2470         RETURN(rc);
2471 }
2472
2473 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2474 {
2475         struct inode *inode = dentry->d_inode;
2476         struct obd_client_handle *och;
2477         int rc;
2478         ENTRY;
2479
2480         LASSERT(inode);
2481
2482         /* Root ? Do nothing. */
2483         if (dentry->d_inode->i_sb->s_root == dentry)
2484                 RETURN(0);
2485
2486         /* No open handle to close? Move away */
2487         if (!it_disposition(it, DISP_OPEN_OPEN))
2488                 RETURN(0);
2489
2490         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2491
2492         OBD_ALLOC(och, sizeof(*och));
2493         if (!och)
2494                 GOTO(out, rc = -ENOMEM);
2495
2496         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2497                     ll_i2info(inode), it, och);
2498
2499         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2500                                        inode, och);
2501  out:
2502         /* this one is in place of ll_file_open */
2503         ptlrpc_req_finished(it->d.lustre.it_data);
2504         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2505         RETURN(rc);
2506 }
2507
2508 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2509                   unsigned long arg)
2510 {
2511         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2512         int flags;
2513         ENTRY;
2514
2515         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2516                inode->i_generation, inode, cmd);
2517         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2518
2519         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2520         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2521                 RETURN(-ENOTTY);
2522
2523         switch(cmd) {
2524         case LL_IOC_GETFLAGS:
2525                 /* Get the current value of the file flags */
2526                 return put_user(fd->fd_flags, (int *)arg);
2527         case LL_IOC_SETFLAGS:
2528         case LL_IOC_CLRFLAGS:
2529                 /* Set or clear specific file flags */
2530                 /* XXX This probably needs checks to ensure the flags are
2531                  *     not abused, and to handle any flag side effects.
2532                  */
2533                 if (get_user(flags, (int *) arg))
2534                         RETURN(-EFAULT);
2535
2536                 if (cmd == LL_IOC_SETFLAGS) {
2537                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2538                             !(file->f_flags & O_DIRECT)) {
2539                                 CERROR("%s: unable to disable locking on "
2540                                        "non-O_DIRECT file\n", current->comm);
2541                                 RETURN(-EINVAL);
2542                         }
2543
2544                         fd->fd_flags |= flags;
2545                 } else {
2546                         fd->fd_flags &= ~flags;
2547                 }
2548                 RETURN(0);
2549         case LL_IOC_LOV_SETSTRIPE:
2550                 RETURN(ll_lov_setstripe(inode, file, arg));
2551         case LL_IOC_LOV_SETEA:
2552                 RETURN(ll_lov_setea(inode, file, arg));
2553         case LL_IOC_LOV_GETSTRIPE:
2554                 RETURN(ll_lov_getstripe(inode, arg));
2555         case LL_IOC_RECREATE_OBJ:
2556                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2557         case EXT3_IOC_GETFLAGS:
2558         case EXT3_IOC_SETFLAGS:
2559                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2560         case EXT3_IOC_GETVERSION_OLD:
2561         case EXT3_IOC_GETVERSION:
2562                 RETURN(put_user(inode->i_generation, (int *)arg));
2563         case LL_IOC_JOIN: {
2564                 char *ftail;
2565                 int rc;
2566
2567                 ftail = getname((const char *)arg);
2568                 if (IS_ERR(ftail))
2569                         RETURN(PTR_ERR(ftail));
2570                 rc = ll_file_join(inode, file, ftail);
2571                 putname(ftail);
2572                 RETURN(rc);
2573         }
2574         case LL_IOC_GROUP_LOCK:
2575                 RETURN(ll_get_grouplock(inode, file, arg));
2576         case LL_IOC_GROUP_UNLOCK:
2577                 RETURN(ll_put_grouplock(inode, file, arg));
2578         case IOC_OBD_STATFS:
2579                 RETURN(ll_obd_statfs(inode, (void *)arg));
2580
2581         /* We need to special case any other ioctls we want to handle,
2582          * to send them to the MDS/OST as appropriate and to properly
2583          * network encode the arg field.
2584         case EXT3_IOC_SETVERSION_OLD:
2585         case EXT3_IOC_SETVERSION:
2586         */
2587         case LL_IOC_FLUSHCTX:
2588                 RETURN(ll_flush_ctx(inode));
2589         default: {
2590                 int err;
2591
2592                 if (LLIOC_STOP == 
2593                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2594                         RETURN(err);
2595
2596                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2597                                      (void *)arg));
2598         }
2599         }
2600 }
2601
2602 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2603 {
2604         struct inode *inode = file->f_dentry->d_inode;
2605         struct ll_inode_info *lli = ll_i2info(inode);
2606         struct lov_stripe_md *lsm = lli->lli_smd;
2607         loff_t retval;
2608         ENTRY;
2609         retval = offset + ((origin == 2) ? i_size_read(inode) :
2610                            (origin == 1) ? file->f_pos : 0);
2611         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2612                inode->i_ino, inode->i_generation, inode, retval, retval,
2613                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2614         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2615
2616         if (origin == 2) { /* SEEK_END */
2617                 int nonblock = 0, rc;
2618
2619                 if (file->f_flags & O_NONBLOCK)
2620                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2621
2622                 if (lsm != NULL) {
2623                         rc = ll_glimpse_size(inode, nonblock);
2624                         if (rc != 0)
2625                                 RETURN(rc);
2626                 }
2627
2628                 ll_inode_size_lock(inode, 0);
2629                 offset += i_size_read(inode);
2630                 ll_inode_size_unlock(inode, 0);
2631         } else if (origin == 1) { /* SEEK_CUR */
2632                 offset += file->f_pos;
2633         }
2634
2635         retval = -EINVAL;
2636         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2637                 if (offset != file->f_pos) {
2638                         file->f_pos = offset;
2639 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2640                         file->f_reada = 0;
2641                         file->f_version = ++event;
2642 #endif
2643                 }
2644                 retval = offset;
2645         }
2646         
2647         RETURN(retval);
2648 }
2649
2650 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2651 {
2652         struct inode *inode = dentry->d_inode;
2653         struct ll_inode_info *lli = ll_i2info(inode);
2654         struct lov_stripe_md *lsm = lli->lli_smd;
2655         struct ptlrpc_request *req;
2656         struct obd_capa *oc;
2657         int rc, err;
2658         ENTRY;
2659         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2660                inode->i_generation, inode);
2661         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2662
2663         /* fsync's caller has already called _fdata{sync,write}, we want
2664          * that IO to finish before calling the osc and mdc sync methods */
2665         rc = filemap_fdatawait(inode->i_mapping);
2666
2667         /* catch async errors that were recorded back when async writeback
2668          * failed for pages in this mapping. */
2669         err = lli->lli_async_rc;
2670         lli->lli_async_rc = 0;
2671         if (rc == 0)
2672                 rc = err;
2673         if (lsm) {
2674                 err = lov_test_and_clear_async_rc(lsm);
2675                 if (rc == 0)
2676                         rc = err;
2677         }
2678
2679         oc = ll_mdscapa_get(inode);
2680         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2681                       &req);
2682         capa_put(oc);
2683         if (!rc)
2684                 rc = err;
2685         if (!err)
2686                 ptlrpc_req_finished(req);
2687
2688         if (data && lsm) {
2689                 struct obdo *oa;
2690                 
2691                 OBDO_ALLOC(oa);
2692                 if (!oa)
2693                         RETURN(rc ? rc : -ENOMEM);
2694
2695                 oa->o_id = lsm->lsm_object_id;
2696                 oa->o_gr = lsm->lsm_object_gr;
2697                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2698                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2699                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2700                                            OBD_MD_FLGROUP);
2701
2702                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2703                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2704                                0, OBD_OBJECT_EOF, oc);
2705                 capa_put(oc);
2706                 if (!rc)
2707                         rc = err;
2708                 OBDO_FREE(oa);
2709         }
2710
2711         RETURN(rc);
2712 }
2713
2714 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2715 {
2716         struct inode *inode = file->f_dentry->d_inode;
2717         struct ll_sb_info *sbi = ll_i2sbi(inode);
2718         struct ldlm_res_id res_id =
2719                 { .name = { fid_seq(ll_inode2fid(inode)),
2720                             fid_oid(ll_inode2fid(inode)),
2721                             fid_ver(ll_inode2fid(inode)),
2722                             LDLM_FLOCK} };
2723         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2724                 ldlm_flock_completion_ast, NULL, file_lock };
2725         struct lustre_handle lockh = {0};
2726         ldlm_policy_data_t flock;
2727         int flags = 0;
2728         int rc;
2729         ENTRY;
2730
2731         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2732                inode->i_ino, file_lock);
2733
2734         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2735  
2736         if (file_lock->fl_flags & FL_FLOCK) {
2737                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2738                 /* set missing params for flock() calls */
2739                 file_lock->fl_end = OFFSET_MAX;
2740                 file_lock->fl_pid = current->tgid;
2741         }
2742         flock.l_flock.pid = file_lock->fl_pid;
2743         flock.l_flock.start = file_lock->fl_start;
2744         flock.l_flock.end = file_lock->fl_end;
2745
2746         switch (file_lock->fl_type) {
2747         case F_RDLCK:
2748                 einfo.ei_mode = LCK_PR;
2749                 break;
2750         case F_UNLCK:
2751                 /* An unlock request may or may not have any relation to
2752                  * existing locks so we may not be able to pass a lock handle
2753                  * via a normal ldlm_lock_cancel() request. The request may even
2754                  * unlock a byte range in the middle of an existing lock. In
2755                  * order to process an unlock request we need all of the same
2756                  * information that is given with a normal read or write record
2757                  * lock request. To avoid creating another ldlm unlock (cancel)
2758                  * message we'll treat a LCK_NL flock request as an unlock. */
2759                 einfo.ei_mode = LCK_NL;
2760                 break;
2761         case F_WRLCK:
2762                 einfo.ei_mode = LCK_PW;
2763                 break;
2764         default:
2765                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2766                 LBUG();
2767         }
2768
2769         switch (cmd) {
2770         case F_SETLKW:
2771 #ifdef F_SETLKW64
2772         case F_SETLKW64:
2773 #endif
2774                 flags = 0;
2775                 break;
2776         case F_SETLK:
2777 #ifdef F_SETLK64
2778         case F_SETLK64:
2779 #endif
2780                 flags = LDLM_FL_BLOCK_NOWAIT;
2781                 break;
2782         case F_GETLK:
2783 #ifdef F_GETLK64
2784         case F_GETLK64:
2785 #endif
2786                 flags = LDLM_FL_TEST_LOCK;
2787                 /* Save the old mode so that if the mode in the lock changes we
2788                  * can decrement the appropriate reader or writer refcount. */
2789                 file_lock->fl_type = einfo.ei_mode;
2790                 break;
2791         default:
2792                 CERROR("unknown fcntl lock command: %d\n", cmd);
2793                 LBUG();
2794         }
2795
2796         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2797                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2798                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2799
2800         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2801                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2802         if ((file_lock->fl_flags & FL_FLOCK) &&
2803             (rc == 0 || file_lock->fl_type == F_UNLCK))
2804                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2805 #ifdef HAVE_F_OP_FLOCK
2806         if ((file_lock->fl_flags & FL_POSIX) &&
2807             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2808             !(flags & LDLM_FL_TEST_LOCK))
2809                 posix_lock_file_wait(file, file_lock);
2810 #endif
2811
2812         RETURN(rc);
2813 }
2814
2815 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2816 {
2817         ENTRY;
2818
2819         RETURN(-ENOSYS);
2820 }
2821
2822 int ll_have_md_lock(struct inode *inode, __u64 bits)
2823 {
2824         struct lustre_handle lockh;
2825         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2826         struct lu_fid *fid;
2827         int flags;
2828         ENTRY;
2829
2830         if (!inode)
2831                RETURN(0);
2832
2833         fid = &ll_i2info(inode)->lli_fid;
2834         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2835
2836         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2837         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2838                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2839                 RETURN(1);
2840         }
2841         RETURN(0);
2842 }
2843
2844 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2845                             struct lustre_handle *lockh)
2846 {
2847         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2848         struct lu_fid *fid;
2849         ldlm_mode_t rc;
2850         int flags;
2851         ENTRY;
2852
2853         fid = &ll_i2info(inode)->lli_fid;
2854         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2855
2856         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2857         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2858                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2859         RETURN(rc);
2860 }
2861
2862 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2863         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2864                               * and return success */
2865                 inode->i_nlink = 0;
2866                 /* This path cannot be hit for regular files unless in
2867                  * case of obscure races, so no need to to validate
2868                  * size. */
2869                 if (!S_ISREG(inode->i_mode) &&
2870                     !S_ISDIR(inode->i_mode))
2871                         return 0;
2872         }
2873
2874         if (rc) {
2875                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2876                 return -abs(rc);
2877
2878         }
2879
2880         return 0;
2881 }
2882
2883 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2884 {
2885         struct inode *inode = dentry->d_inode;
2886         struct ptlrpc_request *req = NULL;
2887         struct ll_sb_info *sbi;
2888         struct obd_export *exp;
2889         int rc;
2890         ENTRY;
2891
2892         if (!inode) {
2893                 CERROR("REPORT THIS LINE TO PETER\n");
2894                 RETURN(0);
2895         }
2896         sbi = ll_i2sbi(inode);
2897
2898         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2899                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2900
2901         exp = ll_i2mdexp(inode);
2902
2903         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2904                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2905                 struct md_op_data *op_data;
2906
2907                 /* Call getattr by fid, so do not provide name at all. */
2908                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2909                                              dentry->d_inode, NULL, 0, 0,
2910                                              LUSTRE_OPC_ANY, NULL);
2911                 if (IS_ERR(op_data))
2912                         RETURN(PTR_ERR(op_data));
2913
2914                 oit.it_flags |= O_CHECK_STALE;
2915                 rc = md_intent_lock(exp, op_data, NULL, 0,
2916                                     /* we are not interested in name
2917                                        based lookup */
2918                                     &oit, 0, &req,
2919                                     ll_md_blocking_ast, 0);
2920                 ll_finish_md_op_data(op_data);
2921                 oit.it_flags &= ~O_CHECK_STALE;
2922                 if (rc < 0) {
2923                         rc = ll_inode_revalidate_fini(inode, rc);
2924                         GOTO (out, rc);
2925                 }
2926
2927                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2928                 if (rc != 0) {
2929                         ll_intent_release(&oit);
2930                         GOTO(out, rc);
2931                 }
2932
2933                 /* Unlinked? Unhash dentry, so it is not picked up later by
2934                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2935                    here to preserve get_cwd functionality on 2.6.
2936                    Bug 10503 */
2937                 if (!dentry->d_inode->i_nlink) {
2938                         spin_lock(&dcache_lock);
2939                         ll_drop_dentry(dentry);
2940                         spin_unlock(&dcache_lock);
2941                 }
2942
2943                 ll_lookup_finish_locks(&oit, dentry);
2944         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
2945                                                      MDS_INODELOCK_LOOKUP)) {
2946                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2947                 obd_valid valid = OBD_MD_FLGETATTR;
2948                 struct obd_capa *oc;
2949                 int ealen = 0;
2950
2951                 if (S_ISREG(inode->i_mode)) {
2952                         rc = ll_get_max_mdsize(sbi, &ealen);
2953                         if (rc)
2954                                 RETURN(rc);
2955                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2956                 }
2957                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2958                  * capa for this inode. Because we only keep capas of dirs
2959                  * fresh. */
2960                 oc = ll_mdscapa_get(inode);
2961                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2962                                 ealen, &req);
2963                 capa_put(oc);
2964                 if (rc) {
2965                         rc = ll_inode_revalidate_fini(inode, rc);
2966                         RETURN(rc);
2967                 }
2968
2969                 rc = ll_prep_inode(&inode, req, NULL);
2970                 if (rc)
2971                         GOTO(out, rc);
2972         }
2973
2974         /* if object not yet allocated, don't validate size */
2975         if (ll_i2info(inode)->lli_smd == NULL)
2976                 GOTO(out, rc = 0);
2977
2978         /* ll_glimpse_size will prefer locally cached writes if they extend
2979          * the file */
2980         rc = ll_glimpse_size(inode, 0);
2981         EXIT;
2982 out:
2983         ptlrpc_req_finished(req);
2984         return rc;
2985 }
2986
2987 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2988                   struct lookup_intent *it, struct kstat *stat)
2989 {
2990         struct inode *inode = de->d_inode;
2991         int res = 0;
2992
2993         res = ll_inode_revalidate_it(de, it);
2994         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2995
2996         if (res)
2997                 return res;
2998
2999         stat->dev = inode->i_sb->s_dev;
3000         stat->ino = inode->i_ino;
3001         stat->mode = inode->i_mode;
3002         stat->nlink = inode->i_nlink;
3003         stat->uid = inode->i_uid;
3004         stat->gid = inode->i_gid;
3005         stat->rdev = kdev_t_to_nr(inode->i_rdev);
3006         stat->atime = inode->i_atime;
3007         stat->mtime = inode->i_mtime;
3008         stat->ctime = inode->i_ctime;
3009 #ifdef HAVE_INODE_BLKSIZE
3010         stat->blksize = inode->i_blksize;
3011 #else
3012         stat->blksize = 1 << inode->i_blkbits;
3013 #endif
3014
3015         ll_inode_size_lock(inode, 0);
3016         stat->size = i_size_read(inode);
3017         stat->blocks = inode->i_blocks;
3018         ll_inode_size_unlock(inode, 0);
3019
3020         return 0;
3021 }
3022 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3023 {
3024         struct lookup_intent it = { .it_op = IT_GETATTR };
3025
3026         return ll_getattr_it(mnt, de, &it, stat);
3027 }
3028
3029 static
3030 int lustre_check_acl(struct inode *inode, int mask)
3031 {
3032 #ifdef CONFIG_FS_POSIX_ACL
3033         struct ll_inode_info *lli = ll_i2info(inode);
3034         struct posix_acl *acl;
3035         int rc;
3036         ENTRY;
3037
3038         spin_lock(&lli->lli_lock);
3039         acl = posix_acl_dup(lli->lli_posix_acl);
3040         spin_unlock(&lli->lli_lock);
3041
3042         if (!acl)
3043                 RETURN(-EAGAIN);
3044
3045         rc = posix_acl_permission(inode, acl, mask);
3046         posix_acl_release(acl);
3047
3048         RETURN(rc);
3049 #else
3050         return -EAGAIN;
3051 #endif
3052 }
3053
3054 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
3055 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3056 {
3057         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3058                inode->i_ino, inode->i_generation, inode, mask);
3059         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3060                 return lustre_check_remote_perm(inode, mask);
3061         
3062         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3063         return generic_permission(inode, mask, lustre_check_acl);
3064 }
3065 #else
3066 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3067 {
3068         int mode = inode->i_mode;
3069         int rc;
3070
3071         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3072                inode->i_ino, inode->i_generation, inode, mask);
3073
3074         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3075                 return lustre_check_remote_perm(inode, mask);
3076
3077         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3078
3079         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
3080             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
3081                 return -EROFS;
3082         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
3083                 return -EACCES;
3084         if (current->fsuid == inode->i_uid) {
3085                 mode >>= 6;
3086         } else if (1) {
3087                 if (((mode >> 3) & mask & S_IRWXO) != mask)
3088                         goto check_groups;
3089                 rc = lustre_check_acl(inode, mask);
3090                 if (rc == -EAGAIN)
3091                         goto check_groups;
3092                 if (rc == -EACCES)
3093                         goto check_capabilities;
3094                 return rc;
3095         } else {
3096 check_groups:
3097                 if (in_group_p(inode->i_gid))
3098                         mode >>= 3;
3099         }
3100         if ((mode & mask & S_IRWXO) == mask)
3101                 return 0;
3102
3103 check_capabilities:
3104         if (!(mask & MAY_EXEC) ||
3105             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
3106                 if (capable(CAP_DAC_OVERRIDE))
3107                         return 0;
3108
3109         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
3110             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
3111                 return 0;
3112         
3113         return -EACCES;
3114 }
3115 #endif
3116
3117 /* -o localflock - only provides locally consistent flock locks */
3118 struct file_operations ll_file_operations = {
3119         .read           = ll_file_read,
3120         .write          = ll_file_write,
3121         .ioctl          = ll_file_ioctl,
3122         .open           = ll_file_open,
3123         .release        = ll_file_release,
3124         .mmap           = ll_file_mmap,
3125         .llseek         = ll_file_seek,
3126         .sendfile       = ll_file_sendfile,
3127         .fsync          = ll_fsync,
3128 };
3129
3130 struct file_operations ll_file_operations_flock = {
3131         .read           = ll_file_read,
3132         .write          = ll_file_write,
3133         .ioctl          = ll_file_ioctl,
3134         .open           = ll_file_open,
3135         .release        = ll_file_release,
3136         .mmap           = ll_file_mmap,
3137         .llseek         = ll_file_seek,
3138         .sendfile       = ll_file_sendfile,
3139         .fsync          = ll_fsync,
3140 #ifdef HAVE_F_OP_FLOCK
3141         .flock          = ll_file_flock,
3142 #endif
3143         .lock           = ll_file_flock
3144 };
3145
3146 /* These are for -o noflock - to return ENOSYS on flock calls */
3147 struct file_operations ll_file_operations_noflock = {
3148         .read           = ll_file_read,
3149         .write          = ll_file_write,
3150         .ioctl          = ll_file_ioctl,
3151         .open           = ll_file_open,
3152         .release        = ll_file_release,
3153         .mmap           = ll_file_mmap,
3154         .llseek         = ll_file_seek,
3155         .sendfile       = ll_file_sendfile,
3156         .fsync          = ll_fsync,
3157 #ifdef HAVE_F_OP_FLOCK
3158         .flock          = ll_file_noflock,
3159 #endif
3160         .lock           = ll_file_noflock
3161 };
3162
3163 struct inode_operations ll_file_inode_operations = {
3164 #ifdef HAVE_VFS_INTENT_PATCHES
3165         .setattr_raw    = ll_setattr_raw,
3166 #endif
3167         .setattr        = ll_setattr,
3168         .truncate       = ll_truncate,
3169         .getattr        = ll_getattr,
3170         .permission     = ll_inode_permission,
3171         .setxattr       = ll_setxattr,
3172         .getxattr       = ll_getxattr,
3173         .listxattr      = ll_listxattr,
3174         .removexattr    = ll_removexattr,
3175 };
3176
3177 /* dynamic ioctl number support routins */
3178 static struct llioc_ctl_data {
3179         struct rw_semaphore ioc_sem;
3180         struct list_head    ioc_head;
3181 } llioc = { 
3182         __RWSEM_INITIALIZER(llioc.ioc_sem), 
3183         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3184 };
3185
3186
3187 struct llioc_data {
3188         struct list_head        iocd_list;
3189         unsigned int            iocd_size;
3190         llioc_callback_t        iocd_cb;
3191         unsigned int            iocd_count;
3192         unsigned int            iocd_cmd[0];
3193 };
3194
3195 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3196 {
3197         unsigned int size;
3198         struct llioc_data *in_data = NULL;
3199         ENTRY;
3200
3201         if (cb == NULL || cmd == NULL ||
3202             count > LLIOC_MAX_CMD || count < 0)
3203                 RETURN(NULL);
3204
3205         size = sizeof(*in_data) + count * sizeof(unsigned int);
3206         OBD_ALLOC(in_data, size);
3207         if (in_data == NULL)
3208                 RETURN(NULL);
3209
3210         memset(in_data, 0, sizeof(*in_data));
3211         in_data->iocd_size = size;
3212         in_data->iocd_cb = cb;
3213         in_data->iocd_count = count;
3214         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3215
3216         down_write(&llioc.ioc_sem);
3217         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3218         up_write(&llioc.ioc_sem);
3219
3220         RETURN(in_data);
3221 }
3222
3223 void ll_iocontrol_unregister(void *magic)
3224 {
3225         struct llioc_data *tmp;
3226
3227         if (magic == NULL)
3228                 return;
3229
3230         down_write(&llioc.ioc_sem);
3231         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3232                 if (tmp == magic) {
3233                         unsigned int size = tmp->iocd_size;
3234
3235                         list_del(&tmp->iocd_list);
3236                         up_write(&llioc.ioc_sem);
3237
3238                         OBD_FREE(tmp, size);
3239                         return;
3240                 }
3241         }
3242         up_write(&llioc.ioc_sem);
3243
3244         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3245 }
3246
3247 EXPORT_SYMBOL(ll_iocontrol_register);
3248 EXPORT_SYMBOL(ll_iocontrol_unregister);
3249
3250 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
3251                         unsigned int cmd, unsigned long arg, int *rcp)
3252 {
3253         enum llioc_iter ret = LLIOC_CONT;
3254         struct llioc_data *data;
3255         int rc = -EINVAL, i;
3256
3257         down_read(&llioc.ioc_sem);
3258         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3259                 for (i = 0; i < data->iocd_count; i++) {
3260                         if (cmd != data->iocd_cmd[i]) 
3261                                 continue;
3262
3263                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3264                         break;
3265                 }
3266
3267                 if (ret == LLIOC_STOP)
3268                         break;
3269         }
3270         up_read(&llioc.ioc_sem);
3271
3272         if (rcp)
3273                 *rcp = rc;
3274         return ret;
3275 }