Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <lustre_mdc.h>
47 #include <linux/pagemap.h>
48 #include <linux/file.h>
49 #include "llite_internal.h"
50 #include <lustre/ll_fiemap.h>
51
52 /* also used by llite/special.c:ll_special_open() */
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
58         return fd;
59 }
60
61 static void ll_file_data_put(struct ll_file_data *fd)
62 {
63         if (fd != NULL)
64                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
65 }
66
67 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
68                           struct lustre_handle *fh)
69 {
70         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
71         op_data->op_attr.ia_mode = inode->i_mode;
72         op_data->op_attr.ia_atime = inode->i_atime;
73         op_data->op_attr.ia_mtime = inode->i_mtime;
74         op_data->op_attr.ia_ctime = inode->i_ctime;
75         op_data->op_attr.ia_size = i_size_read(inode);
76         op_data->op_attr_blocks = inode->i_blocks;
77         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
78         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
79         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
80         op_data->op_capa1 = ll_mdscapa_get(inode);
81 }
82
83 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
84                              struct obd_client_handle *och)
85 {
86         ENTRY;
87
88         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
89                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
90
91         if (!(och->och_flags & FMODE_WRITE))
92                 goto out;
93
94         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
95             !S_ISREG(inode->i_mode))
96                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
97         else
98                 ll_epoch_close(inode, op_data, &och, 0);
99
100 out:
101         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
102         EXIT;
103 }
104
105 static int ll_close_inode_openhandle(struct obd_export *md_exp,
106                                      struct inode *inode,
107                                      struct obd_client_handle *och)
108 {
109         struct obd_export *exp = ll_i2mdexp(inode);
110         struct md_op_data *op_data;
111         struct ptlrpc_request *req = NULL;
112         struct obd_device *obd = class_exp2obd(exp);
113         int epoch_close = 1;
114         int seq_end = 0, rc;
115         ENTRY;
116
117         if (obd == NULL) {
118                 /*
119                  * XXX: in case of LMV, is this correct to access
120                  * ->exp_handle?
121                  */
122                 CERROR("Invalid MDC connection handle "LPX64"\n",
123                        ll_i2mdexp(inode)->exp_handle.h_cookie);
124                 GOTO(out, rc = 0);
125         }
126
127         /*
128          * here we check if this is forced umount. If so this is called on
129          * canceling "open lock" and we do not call md_close() in this case, as
130          * it will not be successful, as import is already deactivated.
131          */
132         if (obd->obd_force)
133                 GOTO(out, rc = 0);
134
135         OBD_ALLOC_PTR(op_data);
136         if (op_data == NULL)
137                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
138
139         ll_prepare_close(inode, op_data, och);
140         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
141         rc = md_close(md_exp, op_data, och->och_mod, &req);
142         if (rc != -EAGAIN)
143                 seq_end = 1;
144
145         if (rc == -EAGAIN) {
146                 /* This close must have the epoch closed. */
147                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
148                 LASSERT(epoch_close);
149                 /* MDS has instructed us to obtain Size-on-MDS attribute from
150                  * OSTs and send setattr to back to MDS. */
151                 rc = ll_sizeonmds_update(inode, och->och_mod,
152                                          &och->och_fh, op_data->op_ioepoch);
153                 if (rc) {
154                         CERROR("inode %lu mdc Size-on-MDS update failed: "
155                                "rc = %d\n", inode->i_ino, rc);
156                         rc = 0;
157                 }
158         } else if (rc) {
159                 CERROR("inode %lu mdc close failed: rc = %d\n",
160                        inode->i_ino, rc);
161         }
162         ll_finish_md_op_data(op_data);
163
164         if (rc == 0) {
165                 rc = ll_objects_destroy(req, inode);
166                 if (rc)
167                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
168                                inode->i_ino, rc);
169         }
170
171         EXIT;
172 out:
173
174         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
175             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
176                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
177         } else {
178                 if (seq_end)
179                         ptlrpc_close_replay_seq(req);
180                 md_clear_open_replay_data(md_exp, och);
181                 /* Free @och if it is not waiting for DONE_WRITING. */
182                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
183                 OBD_FREE_PTR(och);
184         }
185         if (req) /* This is close request */
186                 ptlrpc_req_finished(req);
187         return rc;
188 }
189
190 int ll_md_real_close(struct inode *inode, int flags)
191 {
192         struct ll_inode_info *lli = ll_i2info(inode);
193         struct obd_client_handle **och_p;
194         struct obd_client_handle *och;
195         __u64 *och_usecount;
196         int rc = 0;
197         ENTRY;
198
199         if (flags & FMODE_WRITE) {
200                 och_p = &lli->lli_mds_write_och;
201                 och_usecount = &lli->lli_open_fd_write_count;
202         } else if (flags & FMODE_EXEC) {
203                 och_p = &lli->lli_mds_exec_och;
204                 och_usecount = &lli->lli_open_fd_exec_count;
205         } else {
206                 LASSERT(flags & FMODE_READ);
207                 och_p = &lli->lli_mds_read_och;
208                 och_usecount = &lli->lli_open_fd_read_count;
209         }
210
211         down(&lli->lli_och_sem);
212         if (*och_usecount) { /* There are still users of this handle, so
213                                 skip freeing it. */
214                 up(&lli->lli_och_sem);
215                 RETURN(0);
216         }
217         och=*och_p;
218         *och_p = NULL;
219         up(&lli->lli_och_sem);
220
221         if (och) { /* There might be a race and somebody have freed this och
222                       already */
223                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
224                                                inode, och);
225         }
226
227         RETURN(rc);
228 }
229
230 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
231                 struct file *file)
232 {
233         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
234         struct ll_inode_info *lli = ll_i2info(inode);
235         int rc = 0;
236         ENTRY;
237
238         /* clear group lock, if present */
239         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
240                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
241                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
242                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
243                                       &fd->fd_cwlockh);
244         }
245
246         /* Let's see if we have good enough OPEN lock on the file and if
247            we can skip talking to MDS */
248         if (file->f_dentry->d_inode) { /* Can this ever be false? */
249                 int lockmode;
250                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
251                 struct lustre_handle lockh;
252                 struct inode *inode = file->f_dentry->d_inode;
253                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
254
255                 down(&lli->lli_och_sem);
256                 if (fd->fd_omode & FMODE_WRITE) {
257                         lockmode = LCK_CW;
258                         LASSERT(lli->lli_open_fd_write_count);
259                         lli->lli_open_fd_write_count--;
260                 } else if (fd->fd_omode & FMODE_EXEC) {
261                         lockmode = LCK_PR;
262                         LASSERT(lli->lli_open_fd_exec_count);
263                         lli->lli_open_fd_exec_count--;
264                 } else {
265                         lockmode = LCK_CR;
266                         LASSERT(lli->lli_open_fd_read_count);
267                         lli->lli_open_fd_read_count--;
268                 }
269                 up(&lli->lli_och_sem);
270
271                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
272                                    LDLM_IBITS, &policy, lockmode,
273                                    &lockh)) {
274                         rc = ll_md_real_close(file->f_dentry->d_inode,
275                                               fd->fd_omode);
276                 }
277         } else {
278                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
279                        file, file->f_dentry, file->f_dentry->d_name.name);
280         }
281
282         LUSTRE_FPRIVATE(file) = NULL;
283         ll_file_data_put(fd);
284         ll_capa_close(inode);
285
286         RETURN(rc);
287 }
288
289 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
290
291 /* While this returns an error code, fput() the caller does not, so we need
292  * to make every effort to clean up all of our state here.  Also, applications
293  * rarely check close errors and even if an error is returned they will not
294  * re-try the close call.
295  */
296 int ll_file_release(struct inode *inode, struct file *file)
297 {
298         struct ll_file_data *fd;
299         struct ll_sb_info *sbi = ll_i2sbi(inode);
300         struct ll_inode_info *lli = ll_i2info(inode);
301         struct lov_stripe_md *lsm = lli->lli_smd;
302         int rc;
303
304         ENTRY;
305         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
306                inode->i_generation, inode);
307
308 #ifdef CONFIG_FS_POSIX_ACL
309         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
310             inode == inode->i_sb->s_root->d_inode) {
311                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
312
313                 LASSERT(fd != NULL);
314                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
315                         fd->fd_flags &= ~LL_FILE_RMTACL;
316                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
317                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
318                 }
319         }
320 #endif
321
322         if (inode->i_sb->s_root != file->f_dentry)
323                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
324         fd = LUSTRE_FPRIVATE(file);
325         LASSERT(fd != NULL);
326
327         /* The last ref on @file, maybe not the the owner pid of statahead.
328          * Different processes can open the same dir, "ll_opendir_key" means:
329          * it is me that should stop the statahead thread. */
330         if (lli->lli_opendir_key == fd)
331                 ll_stop_statahead(inode, fd);
332
333         if (inode->i_sb->s_root == file->f_dentry) {
334                 LUSTRE_FPRIVATE(file) = NULL;
335                 ll_file_data_put(fd);
336                 RETURN(0);
337         }
338
339         if (lsm)
340                 lov_test_and_clear_async_rc(lsm);
341         lli->lli_async_rc = 0;
342
343         rc = ll_md_close(sbi->ll_md_exp, inode, file);
344         RETURN(rc);
345 }
346
347 static int ll_intent_file_open(struct file *file, void *lmm,
348                                int lmmsize, struct lookup_intent *itp)
349 {
350         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
351         struct dentry *parent = file->f_dentry->d_parent;
352         const char *name = file->f_dentry->d_name.name;
353         const int len = file->f_dentry->d_name.len;
354         struct md_op_data *op_data;
355         struct ptlrpc_request *req;
356         int rc;
357         ENTRY;
358
359         if (!parent)
360                 RETURN(-ENOENT);
361
362         /* Usually we come here only for NFSD, and we want open lock.
363            But we can also get here with pre 2.6.15 patchless kernels, and in
364            that case that lock is also ok */
365         /* We can also get here if there was cached open handle in revalidate_it
366          * but it disappeared while we were getting from there to ll_file_open.
367          * But this means this file was closed and immediatelly opened which
368          * makes a good candidate for using OPEN lock */
369         /* If lmmsize & lmm are not 0, we are just setting stripe info
370          * parameters. No need for the open lock */
371         if (!lmm && !lmmsize)
372                 itp->it_flags |= MDS_OPEN_LOCK;
373
374         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
375                                       file->f_dentry->d_inode, name, len,
376                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
377         if (IS_ERR(op_data))
378                 RETURN(PTR_ERR(op_data));
379
380         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
381                             0 /*unused */, &req, ll_md_blocking_ast, 0);
382         ll_finish_md_op_data(op_data);
383         if (rc == -ESTALE) {
384                 /* reason for keep own exit path - don`t flood log
385                 * with messages with -ESTALE errors.
386                 */
387                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
388                      it_open_error(DISP_OPEN_OPEN, itp))
389                         GOTO(out, rc);
390                 ll_release_openhandle(file->f_dentry, itp);
391                 GOTO(out, rc);
392         }
393
394         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
395                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
396                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
397                 GOTO(out, rc);
398         }
399
400         if (itp->d.lustre.it_lock_mode)
401                 md_set_lock_data(sbi->ll_md_exp,
402                                  &itp->d.lustre.it_lock_handle,
403                                  file->f_dentry->d_inode);
404
405         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
406 out:
407         ptlrpc_req_finished(itp->d.lustre.it_data);
408         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
409         ll_intent_drop_lock(itp);
410
411         RETURN(rc);
412 }
413
414 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
415                        struct lookup_intent *it, struct obd_client_handle *och)
416 {
417         struct ptlrpc_request *req = it->d.lustre.it_data;
418         struct mdt_body *body;
419
420         LASSERT(och);
421
422         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
423         LASSERT(body != NULL);                      /* reply already checked out */
424
425         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
426         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
427         och->och_fid = lli->lli_fid;
428         och->och_flags = it->it_flags;
429         lli->lli_ioepoch = body->ioepoch;
430
431         return md_set_open_replay_data(md_exp, och, req);
432 }
433
434 int ll_local_open(struct file *file, struct lookup_intent *it,
435                   struct ll_file_data *fd, struct obd_client_handle *och)
436 {
437         struct inode *inode = file->f_dentry->d_inode;
438         struct ll_inode_info *lli = ll_i2info(inode);
439         ENTRY;
440
441         LASSERT(!LUSTRE_FPRIVATE(file));
442
443         LASSERT(fd != NULL);
444
445         if (och) {
446                 struct ptlrpc_request *req = it->d.lustre.it_data;
447                 struct mdt_body *body;
448                 int rc;
449
450                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
451                 if (rc)
452                         RETURN(rc);
453
454                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
455                 if ((it->it_flags & FMODE_WRITE) &&
456                     (body->valid & OBD_MD_FLSIZE))
457                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
458                                lli->lli_ioepoch, PFID(&lli->lli_fid));
459         }
460
461         LUSTRE_FPRIVATE(file) = fd;
462         ll_readahead_init(inode, &fd->fd_ras);
463         fd->fd_omode = it->it_flags;
464         RETURN(0);
465 }
466
467 /* Open a file, and (for the very first open) create objects on the OSTs at
468  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
469  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
470  * lli_open_sem to ensure no other process will create objects, send the
471  * stripe MD to the MDS, or try to destroy the objects if that fails.
472  *
473  * If we already have the stripe MD locally then we don't request it in
474  * md_open(), by passing a lmm_size = 0.
475  *
476  * It is up to the application to ensure no other processes open this file
477  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
478  * used.  We might be able to avoid races of that sort by getting lli_open_sem
479  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
480  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
481  */
482 int ll_file_open(struct inode *inode, struct file *file)
483 {
484         struct ll_inode_info *lli = ll_i2info(inode);
485         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
486                                           .it_flags = file->f_flags };
487         struct lov_stripe_md *lsm;
488         struct ptlrpc_request *req = NULL;
489         struct obd_client_handle **och_p;
490         __u64 *och_usecount;
491         struct ll_file_data *fd;
492         int rc = 0, opendir_set = 0;
493         ENTRY;
494
495         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
496                inode->i_generation, inode, file->f_flags);
497
498 #ifdef HAVE_VFS_INTENT_PATCHES
499         it = file->f_it;
500 #else
501         it = file->private_data; /* XXX: compat macro */
502         file->private_data = NULL; /* prevent ll_local_open assertion */
503 #endif
504
505         fd = ll_file_data_get();
506         if (fd == NULL)
507                 RETURN(-ENOMEM);
508
509         if (S_ISDIR(inode->i_mode)) {
510                 spin_lock(&lli->lli_lock);
511                 /* "lli->lli_opendir_pid != 0" means someone has set it.
512                  * "lli->lli_sai != NULL" means the previous statahead has not
513                  *                        been cleanup. */
514                 if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) {
515                         opendir_set = 1;
516                         lli->lli_opendir_pid = cfs_curproc_pid();
517                         lli->lli_opendir_key = fd;
518                 } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) {
519                         /* Two cases for this:
520                          * (1) The same process open such directory many times.
521                          * (2) The old process opened the directory, and exited
522                          *     before its children processes. Then new process
523                          *     with the same pid opens such directory before the
524                          *     old process's children processes exit.
525                          * Change the owner to the latest one. */
526                         opendir_set = 2;
527                         lli->lli_opendir_key = fd;
528                 }
529                 spin_unlock(&lli->lli_lock);
530         }
531
532         if (inode->i_sb->s_root == file->f_dentry) {
533                 LUSTRE_FPRIVATE(file) = fd;
534                 RETURN(0);
535         }
536
537         if (!it || !it->d.lustre.it_disposition) {
538                 /* Convert f_flags into access mode. We cannot use file->f_mode,
539                  * because everything but O_ACCMODE mask was stripped from
540                  * there */
541                 if ((oit.it_flags + 1) & O_ACCMODE)
542                         oit.it_flags++;
543                 if (file->f_flags & O_TRUNC)
544                         oit.it_flags |= FMODE_WRITE;
545
546                 /* kernel only call f_op->open in dentry_open.  filp_open calls
547                  * dentry_open after call to open_namei that checks permissions.
548                  * Only nfsd_open call dentry_open directly without checking
549                  * permissions and because of that this code below is safe. */
550                 if (oit.it_flags & FMODE_WRITE)
551                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
552
553                 /* We do not want O_EXCL here, presumably we opened the file
554                  * already? XXX - NFS implications? */
555                 oit.it_flags &= ~O_EXCL;
556
557                 it = &oit;
558         }
559
560 restart:
561         /* Let's see if we have file open on MDS already. */
562         if (it->it_flags & FMODE_WRITE) {
563                 och_p = &lli->lli_mds_write_och;
564                 och_usecount = &lli->lli_open_fd_write_count;
565         } else if (it->it_flags & FMODE_EXEC) {
566                 och_p = &lli->lli_mds_exec_och;
567                 och_usecount = &lli->lli_open_fd_exec_count;
568          } else {
569                 och_p = &lli->lli_mds_read_och;
570                 och_usecount = &lli->lli_open_fd_read_count;
571         }
572
573         down(&lli->lli_och_sem);
574         if (*och_p) { /* Open handle is present */
575                 if (it_disposition(it, DISP_OPEN_OPEN)) {
576                         /* Well, there's extra open request that we do not need,
577                            let's close it somehow. This will decref request. */
578                         rc = it_open_error(DISP_OPEN_OPEN, it);
579                         if (rc) {
580                                 up(&lli->lli_och_sem);
581                                 ll_file_data_put(fd);
582                                 GOTO(out_openerr, rc);
583                         }
584                         ll_release_openhandle(file->f_dentry, it);
585                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
586                                              LPROC_LL_OPEN);
587                 }
588                 (*och_usecount)++;
589
590                 rc = ll_local_open(file, it, fd, NULL);
591                 if (rc) {
592                         (*och_usecount)--;
593                         up(&lli->lli_och_sem);
594                         ll_file_data_put(fd);
595                         GOTO(out_openerr, rc);
596                 }
597         } else {
598                 LASSERT(*och_usecount == 0);
599                 if (!it->d.lustre.it_disposition) {
600                         /* We cannot just request lock handle now, new ELC code
601                            means that one of other OPEN locks for this file
602                            could be cancelled, and since blocking ast handler
603                            would attempt to grab och_sem as well, that would
604                            result in a deadlock */
605                         up(&lli->lli_och_sem);
606                         it->it_flags |= O_CHECK_STALE;
607                         rc = ll_intent_file_open(file, NULL, 0, it);
608                         it->it_flags &= ~O_CHECK_STALE;
609                         if (rc) {
610                                 ll_file_data_put(fd);
611                                 GOTO(out_openerr, rc);
612                         }
613
614                         /* Got some error? Release the request */
615                         if (it->d.lustre.it_status < 0) {
616                                 req = it->d.lustre.it_data;
617                                 ptlrpc_req_finished(req);
618                         }
619                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
620                                          &it->d.lustre.it_lock_handle,
621                                          file->f_dentry->d_inode);
622                         goto restart;
623                 }
624                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
625                 if (!*och_p) {
626                         ll_file_data_put(fd);
627                         GOTO(out_och_free, rc = -ENOMEM);
628                 }
629                 (*och_usecount)++;
630                 req = it->d.lustre.it_data;
631
632                 /* md_intent_lock() didn't get a request ref if there was an
633                  * open error, so don't do cleanup on the request here
634                  * (bug 3430) */
635                 /* XXX (green): Should not we bail out on any error here, not
636                  * just open error? */
637                 rc = it_open_error(DISP_OPEN_OPEN, it);
638                 if (rc) {
639                         ll_file_data_put(fd);
640                         GOTO(out_och_free, rc);
641                 }
642
643                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
644                 rc = ll_local_open(file, it, fd, *och_p);
645                 if (rc) {
646                         ll_file_data_put(fd);
647                         GOTO(out_och_free, rc);
648                 }
649         }
650         up(&lli->lli_och_sem);
651
652         /* Must do this outside lli_och_sem lock to prevent deadlock where
653            different kind of OPEN lock for this same inode gets cancelled
654            by ldlm_cancel_lru */
655         if (!S_ISREG(inode->i_mode))
656                 GOTO(out, rc);
657
658         ll_capa_open(inode);
659
660         lsm = lli->lli_smd;
661         if (lsm == NULL) {
662                 if (file->f_flags & O_LOV_DELAY_CREATE ||
663                     !(file->f_mode & FMODE_WRITE)) {
664                         CDEBUG(D_INODE, "object creation was delayed\n");
665                         GOTO(out, rc);
666                 }
667         }
668         file->f_flags &= ~O_LOV_DELAY_CREATE;
669         GOTO(out, rc);
670 out:
671         ptlrpc_req_finished(req);
672         if (req)
673                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
674 out_och_free:
675         if (rc) {
676                 if (*och_p) {
677                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
678                         *och_p = NULL; /* OBD_FREE writes some magic there */
679                         (*och_usecount)--;
680                 }
681                 up(&lli->lli_och_sem);
682 out_openerr:
683                 if (opendir_set == 1) {
684                         lli->lli_opendir_key = NULL;
685                         lli->lli_opendir_pid = 0;
686                 } else if (unlikely(opendir_set == 2)) {
687                         ll_stop_statahead(inode, fd);
688                 }
689         }
690
691         return rc;
692 }
693
694 /* Fills the obdo with the attributes for the inode defined by lsm */
695 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
696 {
697         struct ptlrpc_request_set *set;
698         struct ll_inode_info *lli = ll_i2info(inode);
699         struct lov_stripe_md *lsm = lli->lli_smd;
700
701         struct obd_info oinfo = { { { 0 } } };
702         int rc;
703         ENTRY;
704
705         LASSERT(lsm != NULL);
706
707         oinfo.oi_md = lsm;
708         oinfo.oi_oa = obdo;
709         oinfo.oi_oa->o_id = lsm->lsm_object_id;
710         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
711         oinfo.oi_oa->o_mode = S_IFREG;
712         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
713                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
714                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
715                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
716                                OBD_MD_FLGROUP;
717         oinfo.oi_capa = ll_mdscapa_get(inode);
718
719         set = ptlrpc_prep_set();
720         if (set == NULL) {
721                 CERROR("can't allocate ptlrpc set\n");
722                 rc = -ENOMEM;
723         } else {
724                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
725                 if (rc == 0)
726                         rc = ptlrpc_set_wait(set);
727                 ptlrpc_set_destroy(set);
728         }
729         capa_put(oinfo.oi_capa);
730         if (rc)
731                 RETURN(rc);
732
733         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
734                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
735                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
736
737         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
738         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
739                lli->lli_smd->lsm_object_id, i_size_read(inode),
740                (unsigned long long)inode->i_blocks,
741                (unsigned long)ll_inode_blksize(inode));
742         RETURN(0);
743 }
744
745 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
746 {
747         struct ll_inode_info *lli = ll_i2info(inode);
748         struct lov_stripe_md *lsm = lli->lli_smd;
749         struct obd_export *exp = ll_i2dtexp(inode);
750         struct {
751                 char name[16];
752                 struct ldlm_lock *lock;
753         } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock };
754         __u32 stripe, vallen = sizeof(stripe);
755         struct lov_oinfo *loinfo;
756         int rc;
757         ENTRY;
758
759         if (lsm->lsm_stripe_count == 1)
760                 GOTO(check, stripe = 0);
761
762         /* get our offset in the lov */
763         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe, lsm);
764         if (rc != 0) {
765                 CERROR("obd_get_info: rc = %d\n", rc);
766                 RETURN(rc);
767         }
768         LASSERT(stripe < lsm->lsm_stripe_count);
769
770 check:
771         loinfo = lsm->lsm_oinfo[stripe];
772         if (!osc_res_name_eq(loinfo->loi_id, loinfo->loi_gr,
773                             &lock->l_resource->lr_name)){
774                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
775                            loinfo->loi_id, loinfo->loi_gr);
776                 RETURN(-ELDLM_NO_LOCK_DATA);
777         }
778
779         RETURN(stripe);
780 }
781
782 /* Get extra page reference to ensure it is not going away */
783 void ll_pin_extent_cb(void *data)
784 {
785         struct page *page = data;
786
787         page_cache_get(page);
788
789         return;
790 }
791
792 /* Flush the page from page cache for an extent as its canceled.
793  * Page to remove is delivered as @data.
794  *
795  * No one can dirty the extent until we've finished our work and they cannot
796  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
797  * but other kernel actors could have pages locked.
798  *
799  * If @discard is set, there is no need to write the page if it is dirty.
800  *
801  * Called with the DLM lock held. */
802 int ll_page_removal_cb(void *data, int discard)
803 {
804         int rc;
805         struct page *page = data;
806         struct address_space *mapping;
807
808         ENTRY;
809
810         /* We have page reference already from ll_pin_page */
811         lock_page(page);
812
813         /* Already truncated by somebody */
814         if (!page->mapping)
815                 GOTO(out, rc = 0);
816         mapping = page->mapping;
817
818         ll_teardown_mmaps(mapping,
819                           (__u64)page->index << PAGE_CACHE_SHIFT,
820                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
821                                                               ~PAGE_CACHE_MASK);
822         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
823
824         if (!discard && clear_page_dirty_for_io(page)) {
825                 LASSERT(page->mapping);
826                 rc = ll_call_writepage(page->mapping->host, page);
827                 /* either waiting for io to complete or reacquiring
828                  * the lock that the failed writepage released */
829                 lock_page(page);
830                 wait_on_page_writeback(page);
831                 if (rc != 0) {
832                         CERROR("writepage inode %lu(%p) of page %p "
833                                "failed: %d\n", mapping->host->i_ino,
834                                mapping->host, page, rc);
835                         if (rc == -ENOSPC)
836                                 set_bit(AS_ENOSPC, &mapping->flags);
837                         else
838                                 set_bit(AS_EIO, &mapping->flags);
839                 }
840                 set_bit(AS_EIO, &mapping->flags);
841         }
842         if (page->mapping != NULL) {
843                 struct ll_async_page *llap = llap_cast_private(page);
844                 /* checking again to account for writeback's lock_page() */
845                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
846                 if (llap)
847                         ll_ra_accounting(llap, page->mapping);
848                 ll_truncate_complete_page(page);
849         }
850         EXIT;
851 out:
852         LASSERT(!PageWriteback(page));
853         unlock_page(page);
854         page_cache_release(page);
855
856         return 0;
857 }
858
859 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
860                              void *data, int flag)
861 {
862         struct inode *inode;
863         struct ll_inode_info *lli;
864         struct lov_stripe_md *lsm;
865         int stripe;
866         __u64 kms;
867
868         ENTRY;
869
870         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
871                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
872                 LBUG();
873         }
874
875         inode = ll_inode_from_lock(lock);
876         if (inode == NULL)
877                 RETURN(0);
878         lli = ll_i2info(inode);
879         if (lli == NULL)
880                 GOTO(iput, 0);
881         if (lli->lli_smd == NULL)
882                 GOTO(iput, 0);
883         lsm = lli->lli_smd;
884
885         stripe = ll_lock_to_stripe_offset(inode, lock);
886         if (stripe < 0)
887                 GOTO(iput, 0);
888
889         lov_stripe_lock(lsm);
890         lock_res_and_lock(lock);
891         kms = ldlm_extent_shift_kms(lock,
892                                     lsm->lsm_oinfo[stripe]->loi_kms);
893
894         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
895                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
896                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
897         lsm->lsm_oinfo[stripe]->loi_kms = kms;
898         unlock_res_and_lock(lock);
899         lov_stripe_unlock(lsm);
900         ll_queue_done_writing(inode, 0);
901         EXIT;
902 iput:
903         iput(inode);
904
905         return 0;
906 }
907
908 #if 0
909 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
910 {
911         /* XXX ALLOCATE - 160 bytes */
912         struct inode *inode = ll_inode_from_lock(lock);
913         struct ll_inode_info *lli = ll_i2info(inode);
914         struct lustre_handle lockh = { 0 };
915         struct ost_lvb *lvb;
916         int stripe;
917         ENTRY;
918
919         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
920                      LDLM_FL_BLOCK_CONV)) {
921                 LBUG(); /* not expecting any blocked async locks yet */
922                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
923                            "lock, returning");
924                 ldlm_lock_dump(D_OTHER, lock, 0);
925                 ldlm_reprocess_all(lock->l_resource);
926                 RETURN(0);
927         }
928
929         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
930
931         stripe = ll_lock_to_stripe_offset(inode, lock);
932         if (stripe < 0)
933                 goto iput;
934
935         if (lock->l_lvb_len) {
936                 struct lov_stripe_md *lsm = lli->lli_smd;
937                 __u64 kms;
938                 lvb = lock->l_lvb_data;
939                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
940
941                 lock_res_and_lock(lock);
942                 ll_inode_size_lock(inode, 1);
943                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
944                 kms = ldlm_extent_shift_kms(NULL, kms);
945                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
946                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
947                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
948                 lsm->lsm_oinfo[stripe].loi_kms = kms;
949                 ll_inode_size_unlock(inode, 1);
950                 unlock_res_and_lock(lock);
951         }
952
953 iput:
954         iput(inode);
955         wake_up(&lock->l_waitq);
956
957         ldlm_lock2handle(lock, &lockh);
958         ldlm_lock_decref(&lockh, LCK_PR);
959         RETURN(0);
960 }
961 #endif
962
963 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
964 {
965         struct ptlrpc_request *req = reqp;
966         struct inode *inode = ll_inode_from_lock(lock);
967         struct ll_inode_info *lli;
968         struct lov_stripe_md *lsm;
969         struct ost_lvb *lvb;
970         int rc, stripe;
971         ENTRY;
972
973         if (inode == NULL)
974                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
975         lli = ll_i2info(inode);
976         if (lli == NULL)
977                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
978         lsm = lli->lli_smd;
979         if (lsm == NULL)
980                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
981
982         /* First, find out which stripe index this lock corresponds to. */
983         stripe = ll_lock_to_stripe_offset(inode, lock);
984         if (stripe < 0)
985                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
986
987         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
988         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
989                              sizeof(*lvb));
990         rc = req_capsule_server_pack(&req->rq_pill);
991         if (rc) {
992                 CERROR("lustre_pack_reply: %d\n", rc);
993                 GOTO(iput, rc);
994         }
995
996         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
997         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
998         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
999         lvb->lvb_atime = LTIME_S(inode->i_atime);
1000         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1001
1002         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1003                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1004                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1005                    lvb->lvb_atime, lvb->lvb_ctime);
1006  iput:
1007         iput(inode);
1008
1009  out:
1010         /* These errors are normal races, so we don't want to fill the console
1011          * with messages by calling ptlrpc_error() */
1012         if (rc == -ELDLM_NO_LOCK_DATA)
1013                 lustre_pack_reply(req, 1, NULL, NULL);
1014
1015         req->rq_status = rc;
1016         return rc;
1017 }
1018
1019 static int ll_merge_lvb(struct inode *inode)
1020 {
1021         struct ll_inode_info *lli = ll_i2info(inode);
1022         struct ll_sb_info *sbi = ll_i2sbi(inode);
1023         struct ost_lvb lvb;
1024         int rc;
1025
1026         ENTRY;
1027
1028         ll_inode_size_lock(inode, 1);
1029         inode_init_lvb(inode, &lvb);
1030         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1031         i_size_write(inode, lvb.lvb_size);
1032         inode->i_blocks = lvb.lvb_blocks;
1033
1034         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1035         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1036         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1037         ll_inode_size_unlock(inode, 1);
1038
1039         RETURN(rc);
1040 }
1041
1042 int ll_local_size(struct inode *inode)
1043 {
1044         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1045         struct ll_inode_info *lli = ll_i2info(inode);
1046         struct ll_sb_info *sbi = ll_i2sbi(inode);
1047         struct lustre_handle lockh = { 0 };
1048         int flags = 0;
1049         int rc;
1050         ENTRY;
1051
1052         if (lli->lli_smd->lsm_stripe_count == 0)
1053                 RETURN(0);
1054
1055         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1056                        &policy, LCK_PR, &flags, inode, &lockh);
1057         if (rc < 0)
1058                 RETURN(rc);
1059         else if (rc == 0)
1060                 RETURN(-ENODATA);
1061
1062         rc = ll_merge_lvb(inode);
1063         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1064         RETURN(rc);
1065 }
1066
1067 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1068                      lstat_t *st)
1069 {
1070         struct lustre_handle lockh = { 0 };
1071         struct ldlm_enqueue_info einfo = { 0 };
1072         struct obd_info oinfo = { { { 0 } } };
1073         struct ost_lvb lvb;
1074         int rc;
1075
1076         ENTRY;
1077
1078         einfo.ei_type = LDLM_EXTENT;
1079         einfo.ei_mode = LCK_PR;
1080         einfo.ei_cb_bl = osc_extent_blocking_cb;
1081         einfo.ei_cb_cp = ldlm_completion_ast;
1082         einfo.ei_cb_gl = ll_glimpse_callback;
1083         einfo.ei_cbdata = NULL;
1084
1085         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1086         oinfo.oi_lockh = &lockh;
1087         oinfo.oi_md = lsm;
1088         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1089
1090         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1091         if (rc == -ENOENT)
1092                 RETURN(rc);
1093         if (rc != 0) {
1094                 CERROR("obd_enqueue returned rc %d, "
1095                        "returning -EIO\n", rc);
1096                 RETURN(rc > 0 ? -EIO : rc);
1097         }
1098
1099         lov_stripe_lock(lsm);
1100         memset(&lvb, 0, sizeof(lvb));
1101         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1102         st->st_size = lvb.lvb_size;
1103         st->st_blocks = lvb.lvb_blocks;
1104         st->st_mtime = lvb.lvb_mtime;
1105         st->st_atime = lvb.lvb_atime;
1106         st->st_ctime = lvb.lvb_ctime;
1107         lov_stripe_unlock(lsm);
1108
1109         RETURN(rc);
1110 }
1111
1112 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1113  * file (because it prefers KMS over RSS when larger) */
1114 int ll_glimpse_size(struct inode *inode, int ast_flags)
1115 {
1116         struct ll_inode_info *lli = ll_i2info(inode);
1117         struct ll_sb_info *sbi = ll_i2sbi(inode);
1118         struct lustre_handle lockh = { 0 };
1119         struct ldlm_enqueue_info einfo = { 0 };
1120         struct obd_info oinfo = { { { 0 } } };
1121         int rc;
1122         ENTRY;
1123
1124         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1125                 RETURN(0);
1126
1127         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1128
1129         if (!lli->lli_smd) {
1130                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1131                 RETURN(0);
1132         }
1133
1134         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1135          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1136          *       won't revoke any conflicting DLM locks held. Instead,
1137          *       ll_glimpse_callback() will be called on each client
1138          *       holding a DLM lock against this file, and resulting size
1139          *       will be returned for each stripe. DLM lock on [0, EOF] is
1140          *       acquired only if there were no conflicting locks. */
1141         einfo.ei_type = LDLM_EXTENT;
1142         einfo.ei_mode = LCK_PR;
1143         einfo.ei_cb_bl = osc_extent_blocking_cb;
1144         einfo.ei_cb_cp = ldlm_completion_ast;
1145         einfo.ei_cb_gl = ll_glimpse_callback;
1146         einfo.ei_cbdata = inode;
1147
1148         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1149         oinfo.oi_lockh = &lockh;
1150         oinfo.oi_md = lli->lli_smd;
1151         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1152
1153         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1154         if (rc == -ENOENT)
1155                 RETURN(rc);
1156         if (rc != 0) {
1157                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1158                 RETURN(rc > 0 ? -EIO : rc);
1159         }
1160
1161         rc = ll_merge_lvb(inode);
1162
1163         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1164                i_size_read(inode), (unsigned long long)inode->i_blocks);
1165
1166         RETURN(rc);
1167 }
1168
1169 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1170                    struct lov_stripe_md *lsm, int mode,
1171                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1172                    int ast_flags)
1173 {
1174         struct ll_sb_info *sbi = ll_i2sbi(inode);
1175         struct ost_lvb lvb;
1176         struct ldlm_enqueue_info einfo = { 0 };
1177         struct obd_info oinfo = { { { 0 } } };
1178         int rc;
1179         ENTRY;
1180
1181         LASSERT(!lustre_handle_is_used(lockh));
1182         LASSERT(lsm != NULL);
1183
1184         /* don't drop the mmapped file to LRU */
1185         if (mapping_mapped(inode->i_mapping))
1186                 ast_flags |= LDLM_FL_NO_LRU;
1187
1188         /* XXX phil: can we do this?  won't it screw the file size up? */
1189         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1190             (sbi->ll_flags & LL_SBI_NOLCK))
1191                 RETURN(0);
1192
1193         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1194                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1195
1196         einfo.ei_type = LDLM_EXTENT;
1197         einfo.ei_mode = mode;
1198         einfo.ei_cb_bl = osc_extent_blocking_cb;
1199         einfo.ei_cb_cp = ldlm_completion_ast;
1200         einfo.ei_cb_gl = ll_glimpse_callback;
1201         einfo.ei_cbdata = inode;
1202
1203         oinfo.oi_policy = *policy;
1204         oinfo.oi_lockh = lockh;
1205         oinfo.oi_md = lsm;
1206         oinfo.oi_flags = ast_flags;
1207
1208         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1209         *policy = oinfo.oi_policy;
1210         if (rc > 0)
1211                 rc = -EIO;
1212
1213         ll_inode_size_lock(inode, 1);
1214         inode_init_lvb(inode, &lvb);
1215         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1216
1217         if (policy->l_extent.start == 0 &&
1218             policy->l_extent.end == OBD_OBJECT_EOF) {
1219                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1220                  * the kms under both a DLM lock and the
1221                  * ll_inode_size_lock().  If we don't get the
1222                  * ll_inode_size_lock() here we can match the DLM lock and
1223                  * reset i_size from the kms before the truncating path has
1224                  * updated the kms.  generic_file_write can then trust the
1225                  * stale i_size when doing appending writes and effectively
1226                  * cancel the result of the truncate.  Getting the
1227                  * ll_inode_size_lock() after the enqueue maintains the DLM
1228                  * -> ll_inode_size_lock() acquiring order. */
1229                 i_size_write(inode, lvb.lvb_size);
1230                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1231                        inode->i_ino, i_size_read(inode));
1232         }
1233
1234         if (rc == 0) {
1235                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1236                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1237                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1238         }
1239         ll_inode_size_unlock(inode, 1);
1240
1241         RETURN(rc);
1242 }
1243
1244 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1245                      struct lov_stripe_md *lsm, int mode,
1246                      struct lustre_handle *lockh)
1247 {
1248         struct ll_sb_info *sbi = ll_i2sbi(inode);
1249         int rc;
1250         ENTRY;
1251
1252         /* XXX phil: can we do this?  won't it screw the file size up? */
1253         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1254             (sbi->ll_flags & LL_SBI_NOLCK))
1255                 RETURN(0);
1256
1257         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1258
1259         RETURN(rc);
1260 }
1261
1262 static void ll_set_file_contended(struct inode *inode)
1263 {
1264         struct ll_inode_info *lli = ll_i2info(inode);
1265         cfs_time_t now = cfs_time_current();
1266
1267         spin_lock(&lli->lli_lock);
1268         lli->lli_contention_time = now;
1269         lli->lli_flags |= LLIF_CONTENDED;
1270         spin_unlock(&lli->lli_lock);
1271 }
1272
1273 void ll_clear_file_contended(struct inode *inode)
1274 {
1275         struct ll_inode_info *lli = ll_i2info(inode);
1276
1277         spin_lock(&lli->lli_lock);
1278         lli->lli_flags &= ~LLIF_CONTENDED;
1279         spin_unlock(&lli->lli_lock);
1280 }
1281
1282 static int ll_is_file_contended(struct file *file)
1283 {
1284         struct inode *inode = file->f_dentry->d_inode;
1285         struct ll_inode_info *lli = ll_i2info(inode);
1286         struct ll_sb_info *sbi = ll_i2sbi(inode);
1287         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1288         ENTRY;
1289
1290         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1291                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1292                        " osc connect flags = 0x"LPX64"\n",
1293                        sbi->ll_lco.lco_flags);
1294                 RETURN(0);
1295         }
1296         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1297                 RETURN(1);
1298         if (lli->lli_flags & LLIF_CONTENDED) {
1299                 cfs_time_t cur_time = cfs_time_current();
1300                 cfs_time_t retry_time;
1301
1302                 retry_time = cfs_time_add(
1303                         lli->lli_contention_time,
1304                         cfs_time_seconds(sbi->ll_contention_time));
1305                 if (cfs_time_after(cur_time, retry_time)) {
1306                         ll_clear_file_contended(inode);
1307                         RETURN(0);
1308                 }
1309                 RETURN(1);
1310         }
1311         RETURN(0);
1312 }
1313
1314 static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
1315                                  const char *buf, size_t count,
1316                                  loff_t start, loff_t end, int rw)
1317 {
1318         int append;
1319         int tree_locked = 0;
1320         int rc;
1321         struct inode * inode = file->f_dentry->d_inode;
1322         ENTRY;
1323
1324         append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
1325
1326         if (append || !ll_is_file_contended(file)) {
1327                 struct ll_lock_tree_node *node;
1328                 int ast_flags;
1329
1330                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1331                 if (file->f_flags & O_NONBLOCK)
1332                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1333                 node = ll_node_from_inode(inode, start, end,
1334                                           (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
1335                 if (IS_ERR(node)) {
1336                         rc = PTR_ERR(node);
1337                         GOTO(out, rc);
1338                 }
1339                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1340                 rc = ll_tree_lock(tree, node, buf, count, ast_flags);
1341                 if (rc == 0)
1342                         tree_locked = 1;
1343                 else if (rc == -EUSERS)
1344                         ll_set_file_contended(inode);
1345                 else
1346                         GOTO(out, rc);
1347         }
1348         RETURN(tree_locked);
1349 out:
1350         return rc;
1351 }
1352
1353 /**
1354  * Checks if requested extent lock is compatible with a lock under a page.
1355  *
1356  * Checks if the lock under \a page is compatible with a read or write lock
1357  * (specified by \a rw) for an extent [\a start , \a end].
1358  *
1359  * \param page the page under which lock is considered
1360  * \param rw OBD_BRW_READ if requested for reading,
1361  *           OBD_BRW_WRITE if requested for writing
1362  * \param start start of the requested extent
1363  * \param end end of the requested extent
1364  * \param cookie transparent parameter for passing locking context
1365  *
1366  * \post result == 1, *cookie == context, appropriate lock is referenced or
1367  * \post result == 0
1368  *
1369  * \retval 1 owned lock is reused for the request
1370  * \retval 0 no lock reused for the request
1371  *
1372  * \see ll_release_short_lock
1373  */
1374 static int ll_reget_short_lock(struct page *page, int rw,
1375                                obd_off start, obd_off end,
1376                                void **cookie)
1377 {
1378         struct ll_async_page *llap;
1379         struct obd_export *exp;
1380         struct inode *inode = page->mapping->host;
1381
1382         ENTRY;
1383
1384         exp = ll_i2dtexp(inode);
1385         if (exp == NULL)
1386                 RETURN(0);
1387
1388         llap = llap_cast_private(page);
1389         if (llap == NULL)
1390                 RETURN(0);
1391
1392         RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
1393                                     &llap->llap_cookie, rw, start, end,
1394                                     cookie));
1395 }
1396
1397 /**
1398  * Releases a reference to a lock taken in a "fast" way.
1399  *
1400  * Releases a read or a write (specified by \a rw) lock
1401  * referenced by \a cookie.
1402  *
1403  * \param inode inode to which data belong
1404  * \param end end of the locked extent
1405  * \param rw OBD_BRW_READ if requested for reading,
1406  *           OBD_BRW_WRITE if requested for writing
1407  * \param cookie transparent parameter for passing locking context
1408  *
1409  * \post appropriate lock is dereferenced
1410  *
1411  * \see ll_reget_short_lock
1412  */
1413 static void ll_release_short_lock(struct inode *inode, obd_off end,
1414                                   void *cookie, int rw)
1415 {
1416         struct obd_export *exp;
1417         int rc;
1418
1419         exp = ll_i2dtexp(inode);
1420         if (exp == NULL)
1421                 return;
1422
1423         rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
1424                                     cookie, rw);
1425         if (rc < 0)
1426                 CERROR("unlock failed (%d)\n", rc);
1427 }
1428
1429 /**
1430  * Checks if requested extent lock is compatible
1431  * with a lock under a page in page cache.
1432  *
1433  * Checks if a lock under some \a page is compatible with a read or write lock
1434  * (specified by \a rw) for an extent [\a start , \a end].
1435  *
1436  * \param file the file under which lock is considered
1437  * \param rw OBD_BRW_READ if requested for reading,
1438  *           OBD_BRW_WRITE if requested for writing
1439  * \param ppos start of the requested extent
1440  * \param end end of the requested extent
1441  * \param cookie transparent parameter for passing locking context
1442  * \param buf userspace buffer for the data
1443  *
1444  * \post result == 1, *cookie == context, appropriate lock is referenced
1445  * \post retuls == 0
1446  *
1447  * \retval 1 owned lock is reused for the request
1448  * \retval 0 no lock reused for the request
1449  *
1450  * \see ll_file_put_fast_lock
1451  */
1452 static inline int ll_file_get_fast_lock(struct file *file,
1453                                         obd_off ppos, obd_off end,
1454                                         char *buf, void **cookie, int rw)
1455 {
1456         int rc = 0;
1457         struct page *page;
1458
1459         ENTRY;
1460
1461         if (!ll_region_mapped((unsigned long)buf, end - ppos)) {
1462                 page = find_lock_page(file->f_dentry->d_inode->i_mapping,
1463                                       ppos >> CFS_PAGE_SHIFT);
1464                 if (page) {
1465                         if (ll_reget_short_lock(page, rw, ppos, end, cookie))
1466                                 rc = 1;
1467
1468                         unlock_page(page);
1469                         page_cache_release(page);
1470                 }
1471         }
1472
1473         RETURN(rc);
1474 }
1475
1476 /**
1477  * Releases a reference to a lock taken in a "fast" way.
1478  *
1479  * Releases a read or a write (specified by \a rw) lock
1480  * referenced by \a cookie.
1481  *
1482  * \param inode inode to which data belong
1483  * \param end end of the locked extent
1484  * \param rw OBD_BRW_READ if requested for reading,
1485  *           OBD_BRW_WRITE if requested for writing
1486  * \param cookie transparent parameter for passing locking context
1487  *
1488  * \post appropriate lock is dereferenced
1489  *
1490  * \see ll_file_get_fast_lock
1491  */
1492 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
1493                                          void *cookie, int rw)
1494 {
1495         ll_release_short_lock(inode, end, cookie, rw);
1496 }
1497
1498 enum ll_lock_style {
1499         LL_LOCK_STYLE_NOLOCK   = 0,
1500         LL_LOCK_STYLE_FASTLOCK = 1,
1501         LL_LOCK_STYLE_TREELOCK = 2
1502 };
1503
1504 /**
1505  * Checks if requested extent lock is compatible with a lock
1506  * under a page cache page.
1507  *
1508  * Checks if the lock under \a page is compatible with a read or write lock
1509  * (specified by \a rw) for an extent [\a start , \a end].
1510  *
1511  * \param file file under which I/O is processed
1512  * \param rw OBD_BRW_READ if requested for reading,
1513  *           OBD_BRW_WRITE if requested for writing
1514  * \param ppos start of the requested extent
1515  * \param end end of the requested extent
1516  * \param cookie transparent parameter for passing locking context
1517  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1518  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1519  * \param buf userspace buffer for the data
1520  *
1521  * \retval LL_LOCK_STYLE_FASTLOCK owned lock is reused through fast lock
1522  * \retval LL_LOCK_STYLE_TREELOCK got a lock through tree lock
1523  * \retval LL_LOCK_STYLE_NOLOCK got no lock
1524  *
1525  * \see ll_file_put_lock
1526  */
1527 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
1528                                    obd_off end, char *buf, void **cookie,
1529                                    struct ll_lock_tree *tree, int rw)
1530 {
1531         int rc;
1532
1533         ENTRY;
1534
1535         if (ll_file_get_fast_lock(file, ppos, end, buf, cookie, rw))
1536                 RETURN(LL_LOCK_STYLE_FASTLOCK);
1537
1538         rc = ll_file_get_tree_lock(tree, file, buf, ppos - end, ppos, end, rw);
1539         /* rc: 1 for tree lock, 0 for no lock, <0 for error */
1540         switch (rc) {
1541         case 1:
1542                 RETURN(LL_LOCK_STYLE_TREELOCK);
1543         case 0:
1544                 RETURN(LL_LOCK_STYLE_NOLOCK);
1545         }
1546
1547         /* an error happened if we reached this point, rc = -errno here */
1548         RETURN(rc);
1549 }
1550
1551 /**
1552  * Drops the lock taken by ll_file_get_lock.
1553  *
1554  * Releases a read or a write (specified by \a rw) lock
1555  * referenced by \a tree or \a cookie.
1556  *
1557  * \param inode inode to which data belong
1558  * \param end end of the locked extent
1559  * \param lockstyle facility through which the lock was taken
1560  * \param rw OBD_BRW_READ if requested for reading,
1561  *           OBD_BRW_WRITE if requested for writing
1562  * \param cookie transparent parameter for passing locking context
1563  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1564  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1565  *
1566  * \post appropriate lock is dereferenced
1567  *
1568  * \see ll_file_get_lock
1569  */
1570 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
1571                                     enum ll_lock_style lock_style,
1572                                     void *cookie, struct ll_lock_tree *tree,
1573                                     int rw)
1574
1575 {
1576         switch (lock_style) {
1577         case LL_LOCK_STYLE_TREELOCK:
1578                 ll_tree_unlock(tree);
1579                 break;
1580         case LL_LOCK_STYLE_FASTLOCK:
1581                 ll_file_put_fast_lock(inode, end, cookie, rw);
1582                 break;
1583         default:
1584                 CERROR("invalid locking style (%d)\n", lock_style);
1585         }
1586 }
1587
1588 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1589                             loff_t *ppos)
1590 {
1591         struct inode *inode = file->f_dentry->d_inode;
1592         struct ll_inode_info *lli = ll_i2info(inode);
1593         struct lov_stripe_md *lsm = lli->lli_smd;
1594         struct ll_sb_info *sbi = ll_i2sbi(inode);
1595         struct ll_lock_tree tree;
1596         struct ost_lvb lvb;
1597         struct ll_ra_read bead;
1598         int ra = 0;
1599         obd_off end;
1600         ssize_t retval, chunk, sum = 0;
1601         int lock_style;
1602         void *cookie;
1603
1604         __u64 kms;
1605         ENTRY;
1606         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1607                inode->i_ino, inode->i_generation, inode, count, *ppos);
1608         /* "If nbyte is 0, read() will return 0 and have no other results."
1609          *                      -- Single Unix Spec */
1610         if (count == 0)
1611                 RETURN(0);
1612
1613         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1614
1615         if (!lsm) {
1616                 /* Read on file with no objects should return zero-filled
1617                  * buffers up to file size (we can get non-zero sizes with
1618                  * mknod + truncate, then opening file for read. This is a
1619                  * common pattern in NFS case, it seems). Bug 6243 */
1620                 int notzeroed;
1621                 /* Since there are no objects on OSTs, we have nothing to get
1622                  * lock on and so we are forced to access inode->i_size
1623                  * unguarded */
1624
1625                 /* Read beyond end of file */
1626                 if (*ppos >= i_size_read(inode))
1627                         RETURN(0);
1628
1629                 if (count > i_size_read(inode) - *ppos)
1630                         count = i_size_read(inode) - *ppos;
1631                 /* Make sure to correctly adjust the file pos pointer for
1632                  * EFAULT case */
1633                 notzeroed = clear_user(buf, count);
1634                 count -= notzeroed;
1635                 *ppos += count;
1636                 if (!count)
1637                         RETURN(-EFAULT);
1638                 RETURN(count);
1639         }
1640 repeat:
1641         if (sbi->ll_max_rw_chunk != 0) {
1642                 /* first, let's know the end of the current stripe */
1643                 end = *ppos;
1644                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, &end);
1645
1646                 /* correct, the end is beyond the request */
1647                 if (end > *ppos + count - 1)
1648                         end = *ppos + count - 1;
1649
1650                 /* and chunk shouldn't be too large even if striping is wide */
1651                 if (end - *ppos > sbi->ll_max_rw_chunk)
1652                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1653         } else {
1654                 end = *ppos + count - 1;
1655         }
1656
1657         lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
1658                                       buf, &cookie, &tree, OBD_BRW_READ);
1659         if (lock_style < 0)
1660                 GOTO(out, retval = lock_style);
1661
1662         ll_inode_size_lock(inode, 1);
1663         /*
1664          * Consistency guarantees: following possibilities exist for the
1665          * relation between region being read and real file size at this
1666          * moment:
1667          *
1668          *  (A): the region is completely inside of the file;
1669          *
1670          *  (B-x): x bytes of region are inside of the file, the rest is
1671          *  outside;
1672          *
1673          *  (C): the region is completely outside of the file.
1674          *
1675          * This classification is stable under DLM lock acquired by
1676          * ll_tree_lock() above, because to change class, other client has to
1677          * take DLM lock conflicting with our lock. Also, any updates to
1678          * ->i_size by other threads on this client are serialized by
1679          * ll_inode_size_lock(). This guarantees that short reads are handled
1680          * correctly in the face of concurrent writes and truncates.
1681          */
1682         inode_init_lvb(inode, &lvb);
1683         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1684         kms = lvb.lvb_size;
1685         if (*ppos + count - 1 > kms) {
1686                 /* A glimpse is necessary to determine whether we return a
1687                  * short read (B) or some zeroes at the end of the buffer (C) */
1688                 ll_inode_size_unlock(inode, 1);
1689                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1690                 if (retval) {
1691                         if (lock_style != LL_LOCK_STYLE_NOLOCK)
1692                                 ll_file_put_lock(inode, end, lock_style,
1693                                                  cookie, &tree, OBD_BRW_READ);
1694                         goto out;
1695                 }
1696         } else {
1697                 /* region is within kms and, hence, within real file size (A).
1698                  * We need to increase i_size to cover the read region so that
1699                  * generic_file_read() will do its job, but that doesn't mean
1700                  * the kms size is _correct_, it is only the _minimum_ size.
1701                  * If someone does a stat they will get the correct size which
1702                  * will always be >= the kms value here.  b=11081 */
1703                 if (i_size_read(inode) < kms)
1704                         i_size_write(inode, kms);
1705                 ll_inode_size_unlock(inode, 1);
1706         }
1707
1708         chunk = end - *ppos + 1;
1709         CDEBUG(D_INODE,"Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1710                inode->i_ino, chunk, *ppos, i_size_read(inode));
1711
1712         if (lock_style != LL_LOCK_STYLE_NOLOCK) {
1713                 /* turn off the kernel's read-ahead */
1714                 file->f_ra.ra_pages = 0;
1715
1716                 /* initialize read-ahead window once per syscall */
1717                 if (ra == 0) {
1718                         ra = 1;
1719                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1720                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1721                         ll_ra_read_in(file, &bead);
1722                 }
1723
1724                 /* BUG: 5972 */
1725                 file_accessed(file);
1726                 retval = generic_file_read(file, buf, chunk, ppos);
1727                 ll_file_put_lock(inode, end, lock_style, cookie, &tree,
1728                                  OBD_BRW_READ);
1729         } else {
1730                 retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
1731         }
1732
1733         ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
1734
1735         if (retval > 0) {
1736                 buf += retval;
1737                 count -= retval;
1738                 sum += retval;
1739                 if (retval == chunk && count > 0)
1740                         goto repeat;
1741         }
1742
1743  out:
1744         if (ra != 0)
1745                 ll_ra_read_ex(file, &bead);
1746         retval = (sum > 0) ? sum : retval;
1747         RETURN(retval);
1748 }
1749
1750 /*
1751  * Write to a file (through the page cache).
1752  */
1753 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1754                              loff_t *ppos)
1755 {
1756         struct inode *inode = file->f_dentry->d_inode;
1757         struct ll_sb_info *sbi = ll_i2sbi(inode);
1758         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1759         struct ll_lock_tree tree;
1760         loff_t maxbytes = ll_file_maxbytes(inode);
1761         loff_t lock_start, lock_end, end;
1762         ssize_t retval, chunk, sum = 0;
1763         int tree_locked;
1764         ENTRY;
1765
1766         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1767                inode->i_ino, inode->i_generation, inode, count, *ppos);
1768
1769         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1770
1771         /* POSIX, but surprised the VFS doesn't check this already */
1772         if (count == 0)
1773                 RETURN(0);
1774
1775         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1776          * called on the file, don't fail the below assertion (bug 2388). */
1777         if (file->f_flags & O_LOV_DELAY_CREATE &&
1778             ll_i2info(inode)->lli_smd == NULL)
1779                 RETURN(-EBADF);
1780
1781         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1782
1783         down(&ll_i2info(inode)->lli_write_sem);
1784
1785 repeat:
1786         chunk = 0; /* just to fix gcc's warning */
1787         end = *ppos + count - 1;
1788
1789         if (file->f_flags & O_APPEND) {
1790                 lock_start = 0;
1791                 lock_end = OBD_OBJECT_EOF;
1792         } else if (sbi->ll_max_rw_chunk != 0) {
1793                 /* first, let's know the end of the current stripe */
1794                 end = *ppos;
1795                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1796                                 (obd_off *)&end);
1797
1798                 /* correct, the end is beyond the request */
1799                 if (end > *ppos + count - 1)
1800                         end = *ppos + count - 1;
1801
1802                 /* and chunk shouldn't be too large even if striping is wide */
1803                 if (end - *ppos > sbi->ll_max_rw_chunk)
1804                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1805                 lock_start = *ppos;
1806                 lock_end = end;
1807         } else {
1808                 lock_start = *ppos;
1809                 lock_end = *ppos + count - 1;
1810         }
1811
1812         tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
1813                                             lock_start, lock_end, OBD_BRW_WRITE);
1814         if (tree_locked < 0)
1815                 GOTO(out, retval = tree_locked);
1816
1817         /* This is ok, g_f_w will overwrite this under i_sem if it races
1818          * with a local truncate, it just makes our maxbyte checking easier.
1819          * The i_size value gets updated in ll_extent_lock() as a consequence
1820          * of the [0,EOF] extent lock we requested above. */
1821         if (file->f_flags & O_APPEND) {
1822                 *ppos = i_size_read(inode);
1823                 end = *ppos + count - 1;
1824         }
1825
1826         if (*ppos >= maxbytes) {
1827                 send_sig(SIGXFSZ, current, 0);
1828                 GOTO(out_unlock, retval = -EFBIG);
1829         }
1830         if (end > maxbytes - 1)
1831                 end = maxbytes - 1;
1832
1833         /* generic_file_write handles O_APPEND after getting i_mutex */
1834         chunk = end - *ppos + 1;
1835         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1836                inode->i_ino, chunk, *ppos);
1837         if (tree_locked)
1838                 retval = generic_file_write(file, buf, chunk, ppos);
1839         else
1840                 retval = ll_file_lockless_io(file, (char*)buf, chunk,
1841                                              ppos, WRITE);
1842         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1843
1844 out_unlock:
1845         if (tree_locked)
1846                 ll_tree_unlock(&tree);
1847
1848 out:
1849         if (retval > 0) {
1850                 buf += retval;
1851                 count -= retval;
1852                 sum += retval;
1853                 if (retval == chunk && count > 0)
1854                         goto repeat;
1855         }
1856
1857         up(&ll_i2info(inode)->lli_write_sem);
1858
1859         retval = (sum > 0) ? sum : retval;
1860         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1861                            retval > 0 ? retval : 0);
1862         RETURN(retval);
1863 }
1864
1865 /*
1866  * Send file content (through pagecache) somewhere with helper
1867  */
1868 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1869                                 read_actor_t actor, void *target)
1870 {
1871         struct inode *inode = in_file->f_dentry->d_inode;
1872         struct ll_inode_info *lli = ll_i2info(inode);
1873         struct lov_stripe_md *lsm = lli->lli_smd;
1874         struct ll_lock_tree tree;
1875         struct ll_lock_tree_node *node;
1876         struct ost_lvb lvb;
1877         struct ll_ra_read bead;
1878         int rc;
1879         ssize_t retval;
1880         __u64 kms;
1881         ENTRY;
1882         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1883                inode->i_ino, inode->i_generation, inode, count, *ppos);
1884
1885         /* "If nbyte is 0, read() will return 0 and have no other results."
1886          *                      -- Single Unix Spec */
1887         if (count == 0)
1888                 RETURN(0);
1889
1890         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1891         /* turn off the kernel's read-ahead */
1892         in_file->f_ra.ra_pages = 0;
1893
1894         /* File with no objects, nothing to lock */
1895         if (!lsm)
1896                 RETURN(generic_file_sendfile(in_file, ppos,count,actor,target));
1897
1898         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1899         if (IS_ERR(node))
1900                 RETURN(PTR_ERR(node));
1901
1902         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1903         rc = ll_tree_lock(&tree, node, NULL, count,
1904                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1905         if (rc != 0)
1906                 RETURN(rc);
1907
1908         ll_clear_file_contended(inode);
1909         ll_inode_size_lock(inode, 1);
1910         /*
1911          * Consistency guarantees: following possibilities exist for the
1912          * relation between region being read and real file size at this
1913          * moment:
1914          *
1915          *  (A): the region is completely inside of the file;
1916          *
1917          *  (B-x): x bytes of region are inside of the file, the rest is
1918          *  outside;
1919          *
1920          *  (C): the region is completely outside of the file.
1921          *
1922          * This classification is stable under DLM lock acquired by
1923          * ll_tree_lock() above, because to change class, other client has to
1924          * take DLM lock conflicting with our lock. Also, any updates to
1925          * ->i_size by other threads on this client are serialized by
1926          * ll_inode_size_lock(). This guarantees that short reads are handled
1927          * correctly in the face of concurrent writes and truncates.
1928          */
1929         inode_init_lvb(inode, &lvb);
1930         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1931         kms = lvb.lvb_size;
1932         if (*ppos + count - 1 > kms) {
1933                 /* A glimpse is necessary to determine whether we return a
1934                  * short read (B) or some zeroes at the end of the buffer (C) */
1935                 ll_inode_size_unlock(inode, 1);
1936                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1937                 if (retval)
1938                         goto out;
1939         } else {
1940                 /* region is within kms and, hence, within real file size (A) */
1941                 i_size_write(inode, kms);
1942                 ll_inode_size_unlock(inode, 1);
1943         }
1944
1945         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1946                inode->i_ino, count, *ppos, i_size_read(inode));
1947
1948         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1949         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1950         ll_ra_read_in(in_file, &bead);
1951         /* BUG: 5972 */
1952         file_accessed(in_file);
1953         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1954         ll_ra_read_ex(in_file, &bead);
1955
1956  out:
1957         ll_tree_unlock(&tree);
1958         RETURN(retval);
1959 }
1960
1961 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1962                                unsigned long arg)
1963 {
1964         struct ll_inode_info *lli = ll_i2info(inode);
1965         struct obd_export *exp = ll_i2dtexp(inode);
1966         struct ll_recreate_obj ucreatp;
1967         struct obd_trans_info oti = { 0 };
1968         struct obdo *oa = NULL;
1969         int lsm_size;
1970         int rc = 0;
1971         struct lov_stripe_md *lsm, *lsm2;
1972         ENTRY;
1973
1974         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1975                 RETURN(-EPERM);
1976
1977         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1978                             sizeof(struct ll_recreate_obj));
1979         if (rc) {
1980                 RETURN(-EFAULT);
1981         }
1982         OBDO_ALLOC(oa);
1983         if (oa == NULL)
1984                 RETURN(-ENOMEM);
1985
1986         down(&lli->lli_size_sem);
1987         lsm = lli->lli_smd;
1988         if (lsm == NULL)
1989                 GOTO(out, rc = -ENOENT);
1990         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1991                    (lsm->lsm_stripe_count));
1992
1993         OBD_ALLOC(lsm2, lsm_size);
1994         if (lsm2 == NULL)
1995                 GOTO(out, rc = -ENOMEM);
1996
1997         oa->o_id = ucreatp.lrc_id;
1998         oa->o_gr = ucreatp.lrc_group;
1999         oa->o_nlink = ucreatp.lrc_ost_idx;
2000         oa->o_flags |= OBD_FL_RECREATE_OBJS;
2001         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
2002         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2003                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2004
2005         memcpy(lsm2, lsm, lsm_size);
2006         rc = obd_create(exp, oa, &lsm2, &oti);
2007
2008         OBD_FREE(lsm2, lsm_size);
2009         GOTO(out, rc);
2010 out:
2011         up(&lli->lli_size_sem);
2012         OBDO_FREE(oa);
2013         return rc;
2014 }
2015
2016 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
2017                              int flags, struct lov_user_md *lum, int lum_size)
2018 {
2019         struct ll_inode_info *lli = ll_i2info(inode);
2020         struct lov_stripe_md *lsm;
2021         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
2022         int rc = 0;
2023         ENTRY;
2024
2025         down(&lli->lli_size_sem);
2026         lsm = lli->lli_smd;
2027         if (lsm) {
2028                 up(&lli->lli_size_sem);
2029                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
2030                        inode->i_ino);
2031                 RETURN(-EEXIST);
2032         }
2033
2034         rc = ll_intent_file_open(file, lum, lum_size, &oit);
2035         if (rc)
2036                 GOTO(out, rc);
2037         if (it_disposition(&oit, DISP_LOOKUP_NEG))
2038                 GOTO(out_req_free, rc = -ENOENT);
2039         rc = oit.d.lustre.it_status;
2040         if (rc < 0)
2041                 GOTO(out_req_free, rc);
2042
2043         ll_release_openhandle(file->f_dentry, &oit);
2044
2045  out:
2046         up(&lli->lli_size_sem);
2047         ll_intent_release(&oit);
2048         RETURN(rc);
2049 out_req_free:
2050         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2051         goto out;
2052 }
2053
2054 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
2055                              struct lov_mds_md **lmmp, int *lmm_size,
2056                              struct ptlrpc_request **request)
2057 {
2058         struct ll_sb_info *sbi = ll_i2sbi(inode);
2059         struct mdt_body  *body;
2060         struct lov_mds_md *lmm = NULL;
2061         struct ptlrpc_request *req = NULL;
2062         struct obd_capa *oc;
2063         int rc, lmmsize;
2064
2065         rc = ll_get_max_mdsize(sbi, &lmmsize);
2066         if (rc)
2067                 RETURN(rc);
2068
2069         oc = ll_mdscapa_get(inode);
2070         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
2071                              oc, filename, strlen(filename) + 1,
2072                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
2073                              ll_i2suppgid(inode), &req);
2074         capa_put(oc);
2075         if (rc < 0) {
2076                 CDEBUG(D_INFO, "md_getattr_name failed "
2077                        "on %s: rc %d\n", filename, rc);
2078                 GOTO(out, rc);
2079         }
2080
2081         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2082         LASSERT(body != NULL); /* checked by mdc_getattr_name */
2083
2084         lmmsize = body->eadatasize;
2085
2086         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
2087                         lmmsize == 0) {
2088                 GOTO(out, rc = -ENODATA);
2089         }
2090
2091         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
2092         LASSERT(lmm != NULL);
2093
2094         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
2095             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3)) &&
2096             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_JOIN))) {
2097                 GOTO(out, rc = -EPROTO);
2098         }
2099
2100         /*
2101          * This is coming from the MDS, so is probably in
2102          * little endian.  We convert it to host endian before
2103          * passing it to userspace.
2104          */
2105         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
2106                 /* if function called for directory - we should
2107                  * avoid swab not existent lsm objects */
2108                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
2109                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
2110                         if (S_ISREG(body->mode))
2111                                 lustre_swab_lov_user_md_objects(
2112                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
2113                                  ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
2114                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
2115                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
2116                         if (S_ISREG(body->mode))
2117                                 lustre_swab_lov_user_md_objects(
2118                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
2119                                  ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
2120                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_JOIN)) {
2121                         lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
2122                 }
2123         }
2124
2125         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
2126                 struct lov_stripe_md *lsm;
2127                 struct lov_user_md_join *lmj;
2128                 int lmj_size, i, aindex = 0;
2129
2130                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
2131                 if (rc < 0)
2132                         GOTO(out, rc = -ENOMEM);
2133                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
2134                 if (rc)
2135                         GOTO(out_free_memmd, rc);
2136
2137                 lmj_size = sizeof(struct lov_user_md_join) +
2138                            lsm->lsm_stripe_count *
2139                            sizeof(struct lov_user_ost_data_join);
2140                 OBD_ALLOC(lmj, lmj_size);
2141                 if (!lmj)
2142                         GOTO(out_free_memmd, rc = -ENOMEM);
2143
2144                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
2145                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2146                         struct lov_extent *lex =
2147                                 &lsm->lsm_array->lai_ext_array[aindex];
2148
2149                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
2150                                 aindex ++;
2151                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
2152                                         LPU64" len %d\n", aindex, i,
2153                                         lex->le_start, (int)lex->le_len);
2154                         lmj->lmm_objects[i].l_extent_start =
2155                                 lex->le_start;
2156
2157                         if ((int)lex->le_len == -1)
2158                                 lmj->lmm_objects[i].l_extent_end = -1;
2159                         else
2160                                 lmj->lmm_objects[i].l_extent_end =
2161                                         lex->le_start + lex->le_len;
2162                         lmj->lmm_objects[i].l_object_id =
2163                                 lsm->lsm_oinfo[i]->loi_id;
2164                         lmj->lmm_objects[i].l_object_gr =
2165                                 lsm->lsm_oinfo[i]->loi_gr;
2166                         lmj->lmm_objects[i].l_ost_gen =
2167                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2168                         lmj->lmm_objects[i].l_ost_idx =
2169                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2170                 }
2171                 lmm = (struct lov_mds_md *)lmj;
2172                 lmmsize = lmj_size;
2173 out_free_memmd:
2174                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
2175         }
2176 out:
2177         *lmmp = lmm;
2178         *lmm_size = lmmsize;
2179         *request = req;
2180         return rc;
2181 }
2182
2183 static int ll_lov_setea(struct inode *inode, struct file *file,
2184                             unsigned long arg)
2185 {
2186         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2187         struct lov_user_md  *lump;
2188         int lum_size = sizeof(struct lov_user_md) +
2189                        sizeof(struct lov_user_ost_data);
2190         int rc;
2191         ENTRY;
2192
2193         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2194                 RETURN(-EPERM);
2195
2196         OBD_ALLOC(lump, lum_size);
2197         if (lump == NULL) {
2198                 RETURN(-ENOMEM);
2199         }
2200         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2201         if (rc) {
2202                 OBD_FREE(lump, lum_size);
2203                 RETURN(-EFAULT);
2204         }
2205
2206         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2207
2208         OBD_FREE(lump, lum_size);
2209         RETURN(rc);
2210 }
2211
2212 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2213                             unsigned long arg)
2214 {
2215         struct lov_user_md_v3 lumv3;
2216         struct lov_user_md_v1 *lumv1 = (struct lov_user_md_v1 *)&lumv3;
2217         struct lov_user_md_v1 *lumv1p = (struct lov_user_md_v1 *)arg;
2218         struct lov_user_md_v3 *lumv3p = (struct lov_user_md_v3 *)arg;
2219         int lum_size;
2220         int rc;
2221         int flags = FMODE_WRITE;
2222         ENTRY;
2223
2224         /* first try with v1 which is smaller than v3 */
2225         lum_size = sizeof(struct lov_user_md_v1);
2226         rc = copy_from_user(lumv1, lumv1p, lum_size);
2227         if (rc)
2228                 RETURN(-EFAULT);
2229
2230         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
2231                 lum_size = sizeof(struct lov_user_md_v3);
2232                 rc = copy_from_user(&lumv3, lumv3p, lum_size);
2233                 if (rc)
2234                         RETURN(-EFAULT);
2235         }
2236
2237         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
2238         if (rc == 0) {
2239                  put_user(0, &lumv1p->lmm_stripe_count);
2240                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
2241                                     0, ll_i2info(inode)->lli_smd,
2242                                     (void *)arg);
2243         }
2244         RETURN(rc);
2245 }
2246
2247 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2248 {
2249         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2250
2251         if (!lsm)
2252                 RETURN(-ENODATA);
2253
2254         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
2255                             (void *)arg);
2256 }
2257
2258 static int ll_get_grouplock(struct inode *inode, struct file *file,
2259                             unsigned long arg)
2260 {
2261         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2262         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2263                                                     .end = OBD_OBJECT_EOF}};
2264         struct lustre_handle lockh = { 0 };
2265         struct ll_inode_info *lli = ll_i2info(inode);
2266         struct lov_stripe_md *lsm = lli->lli_smd;
2267         int flags = 0, rc;
2268         ENTRY;
2269
2270         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2271                 RETURN(-EINVAL);
2272         }
2273
2274         policy.l_extent.gid = arg;
2275         if (file->f_flags & O_NONBLOCK)
2276                 flags = LDLM_FL_BLOCK_NOWAIT;
2277
2278         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2279         if (rc)
2280                 RETURN(rc);
2281
2282         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2283         fd->fd_gid = arg;
2284         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2285
2286         RETURN(0);
2287 }
2288
2289 static int ll_put_grouplock(struct inode *inode, struct file *file,
2290                             unsigned long arg)
2291 {
2292         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2293         struct ll_inode_info *lli = ll_i2info(inode);
2294         struct lov_stripe_md *lsm = lli->lli_smd;
2295         int rc;
2296         ENTRY;
2297
2298         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2299                 /* Ugh, it's already unlocked. */
2300                 RETURN(-EINVAL);
2301         }
2302
2303         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2304                 RETURN(-EINVAL);
2305
2306         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2307
2308         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2309         if (rc)
2310                 RETURN(rc);
2311
2312         fd->fd_gid = 0;
2313         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2314
2315         RETURN(0);
2316 }
2317
2318 #if LUSTRE_FIX >= 50
2319 static int join_sanity_check(struct inode *head, struct inode *tail)
2320 {
2321         ENTRY;
2322         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2323                 CERROR("server do not support join \n");
2324                 RETURN(-EINVAL);
2325         }
2326         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2327                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2328                        head->i_ino, tail->i_ino);
2329                 RETURN(-EINVAL);
2330         }
2331         if (head->i_ino == tail->i_ino) {
2332                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2333                 RETURN(-EINVAL);
2334         }
2335         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2336                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2337                 RETURN(-EINVAL);
2338         }
2339         RETURN(0);
2340 }
2341
2342 static int join_file(struct inode *head_inode, struct file *head_filp,
2343                      struct file *tail_filp)
2344 {
2345         struct dentry *tail_dentry = tail_filp->f_dentry;
2346         struct lookup_intent oit = {.it_op = IT_OPEN,
2347                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2348         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2349                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2350
2351         struct lustre_handle lockh;
2352         struct md_op_data *op_data;
2353         int    rc;
2354         loff_t data;
2355         ENTRY;
2356
2357         tail_dentry = tail_filp->f_dentry;
2358
2359         data = i_size_read(head_inode);
2360         op_data = ll_prep_md_op_data(NULL, head_inode,
2361                                      tail_dentry->d_parent->d_inode,
2362                                      tail_dentry->d_name.name,
2363                                      tail_dentry->d_name.len, 0,
2364                                      LUSTRE_OPC_ANY, &data);
2365         if (IS_ERR(op_data))
2366                 RETURN(PTR_ERR(op_data));
2367
2368         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit,
2369                          op_data, &lockh, NULL, 0, NULL, 0);
2370
2371         ll_finish_md_op_data(op_data);
2372         if (rc < 0)
2373                 GOTO(out, rc);
2374
2375         rc = oit.d.lustre.it_status;
2376
2377         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2378                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2379                 ptlrpc_req_finished((struct ptlrpc_request *)
2380                                     oit.d.lustre.it_data);
2381                 GOTO(out, rc);
2382         }
2383
2384         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2385                                            * away */
2386                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2387                 oit.d.lustre.it_lock_mode = 0;
2388         }
2389         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2390         it_clear_disposition(&oit, DISP_ENQ_COMPLETE);
2391         ll_release_openhandle(head_filp->f_dentry, &oit);
2392 out:
2393         ll_intent_release(&oit);
2394         RETURN(rc);
2395 }
2396
2397 static int ll_file_join(struct inode *head, struct file *filp,
2398                         char *filename_tail)
2399 {
2400         struct inode *tail = NULL, *first = NULL, *second = NULL;
2401         struct dentry *tail_dentry;
2402         struct file *tail_filp, *first_filp, *second_filp;
2403         struct ll_lock_tree first_tree, second_tree;
2404         struct ll_lock_tree_node *first_node, *second_node;
2405         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2406         int rc = 0, cleanup_phase = 0;
2407         ENTRY;
2408
2409         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2410                head->i_ino, head->i_generation, head, filename_tail);
2411
2412         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2413         if (IS_ERR(tail_filp)) {
2414                 CERROR("Can not open tail file %s", filename_tail);
2415                 rc = PTR_ERR(tail_filp);
2416                 GOTO(cleanup, rc);
2417         }
2418         tail = igrab(tail_filp->f_dentry->d_inode);
2419
2420         tlli = ll_i2info(tail);
2421         tail_dentry = tail_filp->f_dentry;
2422         LASSERT(tail_dentry);
2423         cleanup_phase = 1;
2424
2425         /*reorder the inode for lock sequence*/
2426         first = head->i_ino > tail->i_ino ? head : tail;
2427         second = head->i_ino > tail->i_ino ? tail : head;
2428         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2429         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2430
2431         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2432                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2433         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2434         if (IS_ERR(first_node)){
2435                 rc = PTR_ERR(first_node);
2436                 GOTO(cleanup, rc);
2437         }
2438         first_tree.lt_fd = first_filp->private_data;
2439         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2440         if (rc != 0)
2441                 GOTO(cleanup, rc);
2442         cleanup_phase = 2;
2443
2444         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2445         if (IS_ERR(second_node)){
2446                 rc = PTR_ERR(second_node);
2447                 GOTO(cleanup, rc);
2448         }
2449         second_tree.lt_fd = second_filp->private_data;
2450         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2451         if (rc != 0)
2452                 GOTO(cleanup, rc);
2453         cleanup_phase = 3;
2454
2455         rc = join_sanity_check(head, tail);
2456         if (rc)
2457                 GOTO(cleanup, rc);
2458
2459         rc = join_file(head, filp, tail_filp);
2460         if (rc)
2461                 GOTO(cleanup, rc);
2462 cleanup:
2463         switch (cleanup_phase) {
2464         case 3:
2465                 ll_tree_unlock(&second_tree);
2466                 obd_cancel_unused(ll_i2dtexp(second),
2467                                   ll_i2info(second)->lli_smd, 0, NULL);
2468         case 2:
2469                 ll_tree_unlock(&first_tree);
2470                 obd_cancel_unused(ll_i2dtexp(first),
2471                                   ll_i2info(first)->lli_smd, 0, NULL);
2472         case 1:
2473                 filp_close(tail_filp, 0);
2474                 if (tail)
2475                         iput(tail);
2476                 if (head && rc == 0) {
2477                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2478                                        &hlli->lli_smd);
2479                         hlli->lli_smd = NULL;
2480                 }
2481         case 0:
2482                 break;
2483         default:
2484                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2485                 LBUG();
2486         }
2487         RETURN(rc);
2488 }
2489 #endif /* LUSTRE_FIX >= 50 */
2490
2491 /**
2492  * Close inode open handle
2493  *
2494  * \param dentry [in]     dentry which contains the inode
2495  * \param it     [in,out] intent which contains open info and result
2496  *
2497  * \retval 0     success
2498  * \retval <0    failure
2499  */
2500 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2501 {
2502         struct inode *inode = dentry->d_inode;
2503         struct obd_client_handle *och;
2504         int rc;
2505         ENTRY;
2506
2507         LASSERT(inode);
2508
2509         /* Root ? Do nothing. */
2510         if (dentry->d_inode->i_sb->s_root == dentry)
2511                 RETURN(0);
2512
2513         /* No open handle to close? Move away */
2514         if (!it_disposition(it, DISP_OPEN_OPEN))
2515                 RETURN(0);
2516
2517         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2518
2519         OBD_ALLOC(och, sizeof(*och));
2520         if (!och)
2521                 GOTO(out, rc = -ENOMEM);
2522
2523         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2524                     ll_i2info(inode), it, och);
2525
2526         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2527                                        inode, och);
2528  out:
2529         /* this one is in place of ll_file_open */
2530         if (it_disposition(it, DISP_ENQ_OPEN_REF))
2531                 ptlrpc_req_finished(it->d.lustre.it_data);
2532         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2533         RETURN(rc);
2534 }
2535
2536 /**
2537  * Get size for inode for which FIEMAP mapping is requested.
2538  * Make the FIEMAP get_info call and returns the result.
2539  */
2540 int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
2541               int num_bytes)
2542 {
2543         struct obd_export *exp = ll_i2dtexp(inode);
2544         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2545         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
2546         int vallen = num_bytes;
2547         int rc;
2548         ENTRY;
2549
2550         /* If the stripe_count > 1 and the application does not understand
2551          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
2552          */
2553         if (lsm->lsm_stripe_count > 1 &&
2554             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
2555                 return -EOPNOTSUPP;
2556
2557         fm_key.oa.o_id = lsm->lsm_object_id;
2558         fm_key.oa.o_gr = lsm->lsm_object_gr;
2559         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2560
2561         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLFID | OBD_MD_FLGROUP |
2562                         OBD_MD_FLSIZE);
2563
2564         /* If filesize is 0, then there would be no objects for mapping */
2565         if (fm_key.oa.o_size == 0) {
2566                 fiemap->fm_mapped_extents = 0;
2567                 RETURN(0);
2568         }
2569
2570         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
2571
2572         rc = obd_get_info(exp, sizeof(fm_key), &fm_key, &vallen, fiemap, lsm);
2573         if (rc)
2574                 CERROR("obd_get_info failed: rc = %d\n", rc);
2575
2576         RETURN(rc);
2577 }
2578
2579 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2580                   unsigned long arg)
2581 {
2582         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2583         int flags;
2584         ENTRY;
2585
2586         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2587                inode->i_generation, inode, cmd);
2588         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2589
2590         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2591         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2592                 RETURN(-ENOTTY);
2593
2594         switch(cmd) {
2595         case LL_IOC_GETFLAGS:
2596                 /* Get the current value of the file flags */
2597                 return put_user(fd->fd_flags, (int *)arg);
2598         case LL_IOC_SETFLAGS:
2599         case LL_IOC_CLRFLAGS:
2600                 /* Set or clear specific file flags */
2601                 /* XXX This probably needs checks to ensure the flags are
2602                  *     not abused, and to handle any flag side effects.
2603                  */
2604                 if (get_user(flags, (int *) arg))
2605                         RETURN(-EFAULT);
2606
2607                 if (cmd == LL_IOC_SETFLAGS) {
2608                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2609                             !(file->f_flags & O_DIRECT)) {
2610                                 CERROR("%s: unable to disable locking on "
2611                                        "non-O_DIRECT file\n", current->comm);
2612                                 RETURN(-EINVAL);
2613                         }
2614
2615                         fd->fd_flags |= flags;
2616                 } else {
2617                         fd->fd_flags &= ~flags;
2618                 }
2619                 RETURN(0);
2620         case LL_IOC_LOV_SETSTRIPE:
2621                 RETURN(ll_lov_setstripe(inode, file, arg));
2622         case LL_IOC_LOV_SETEA:
2623                 RETURN(ll_lov_setea(inode, file, arg));
2624         case LL_IOC_LOV_GETSTRIPE:
2625                 RETURN(ll_lov_getstripe(inode, arg));
2626         case LL_IOC_RECREATE_OBJ:
2627                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2628         case EXT3_IOC_FIEMAP: {
2629                 struct ll_user_fiemap *fiemap_s;
2630                 size_t num_bytes, ret_bytes;
2631                 unsigned int extent_count;
2632                 int rc = 0;
2633
2634                 /* Get the extent count so we can calculate the size of
2635                  * required fiemap buffer */
2636                 if (get_user(extent_count,
2637                     &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
2638                         RETURN(-EFAULT);
2639                 num_bytes = sizeof(*fiemap_s) + (extent_count *
2640                                                  sizeof(struct ll_fiemap_extent));
2641                 OBD_VMALLOC(fiemap_s, num_bytes);
2642                 if (fiemap_s == NULL)
2643                         RETURN(-ENOMEM);
2644
2645                 if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
2646                                    sizeof(*fiemap_s)))
2647                         GOTO(error, rc = -EFAULT);
2648
2649                 if (fiemap_s->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
2650                         fiemap_s->fm_flags = fiemap_s->fm_flags &
2651                                                     ~LUSTRE_FIEMAP_FLAGS_COMPAT;
2652                         if (copy_to_user((char *)arg, fiemap_s,
2653                                          sizeof(*fiemap_s)))
2654                                 GOTO(error, rc = -EFAULT);
2655
2656                         GOTO(error, rc = -EBADR);
2657                 }
2658
2659                 /* If fm_extent_count is non-zero, read the first extent since
2660                  * it is used to calculate end_offset and device from previous
2661                  * fiemap call. */
2662                 if (extent_count) {
2663                         if (copy_from_user(&fiemap_s->fm_extents[0],
2664                             (char __user *)arg + sizeof(*fiemap_s),
2665                             sizeof(struct ll_fiemap_extent)))
2666                                 GOTO(error, rc = -EFAULT);
2667                 }
2668
2669                 if (fiemap_s->fm_flags & FIEMAP_FLAG_SYNC) {
2670                         int rc;
2671
2672                         rc = filemap_fdatawrite(inode->i_mapping);
2673                         if (rc)
2674                                 GOTO(error, rc);
2675                 }
2676
2677                 rc = ll_fiemap(inode, fiemap_s, num_bytes);
2678                 if (rc)
2679                         GOTO(error, rc);
2680
2681                 ret_bytes = sizeof(struct ll_user_fiemap);
2682
2683                 if (extent_count != 0)
2684                         ret_bytes += (fiemap_s->fm_mapped_extents *
2685                                          sizeof(struct ll_fiemap_extent));
2686
2687                 if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
2688                         rc = -EFAULT;
2689
2690 error:
2691                 OBD_VFREE(fiemap_s, num_bytes);
2692                 RETURN(rc);
2693         }
2694         case EXT3_IOC_GETFLAGS:
2695         case EXT3_IOC_SETFLAGS:
2696                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2697         case EXT3_IOC_GETVERSION_OLD:
2698         case EXT3_IOC_GETVERSION:
2699                 RETURN(put_user(inode->i_generation, (int *)arg));
2700         case LL_IOC_JOIN: {
2701 #if LUSTRE_FIX >= 50
2702                 /* Allow file join in beta builds to allow debuggging */
2703                 char *ftail;
2704                 int rc;
2705
2706                 ftail = getname((const char *)arg);
2707                 if (IS_ERR(ftail))
2708                         RETURN(PTR_ERR(ftail));
2709                 rc = ll_file_join(inode, file, ftail);
2710                 putname(ftail);
2711                 RETURN(rc);
2712 #else
2713                 CWARN("file join is not supported in this version of Lustre\n");
2714                 RETURN(-ENOTTY);
2715 #endif
2716         }
2717         case LL_IOC_GROUP_LOCK:
2718                 RETURN(ll_get_grouplock(inode, file, arg));
2719         case LL_IOC_GROUP_UNLOCK:
2720                 RETURN(ll_put_grouplock(inode, file, arg));
2721         case IOC_OBD_STATFS:
2722                 RETURN(ll_obd_statfs(inode, (void *)arg));
2723
2724         /* We need to special case any other ioctls we want to handle,
2725          * to send them to the MDS/OST as appropriate and to properly
2726          * network encode the arg field.
2727         case EXT3_IOC_SETVERSION_OLD:
2728         case EXT3_IOC_SETVERSION:
2729         */
2730         case LL_IOC_FLUSHCTX:
2731                 RETURN(ll_flush_ctx(inode));
2732         default: {
2733                 int err;
2734
2735                 if (LLIOC_STOP ==
2736                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2737                         RETURN(err);
2738
2739                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2740                                      (void *)arg));
2741         }
2742         }
2743 }
2744
2745 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2746 {
2747         struct inode *inode = file->f_dentry->d_inode;
2748         struct ll_inode_info *lli = ll_i2info(inode);
2749         struct lov_stripe_md *lsm = lli->lli_smd;
2750         loff_t retval;
2751         ENTRY;
2752         retval = offset + ((origin == 2) ? i_size_read(inode) :
2753                            (origin == 1) ? file->f_pos : 0);
2754         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2755                inode->i_ino, inode->i_generation, inode, retval, retval,
2756                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2757         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2758
2759         if (origin == 2) { /* SEEK_END */
2760                 int nonblock = 0, rc;
2761
2762                 if (file->f_flags & O_NONBLOCK)
2763                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2764
2765                 if (lsm != NULL) {
2766                         rc = ll_glimpse_size(inode, nonblock);
2767                         if (rc != 0)
2768                                 RETURN(rc);
2769                 }
2770
2771                 ll_inode_size_lock(inode, 0);
2772                 offset += i_size_read(inode);
2773                 ll_inode_size_unlock(inode, 0);
2774         } else if (origin == 1) { /* SEEK_CUR */
2775                 offset += file->f_pos;
2776         }
2777
2778         retval = -EINVAL;
2779         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2780                 if (offset != file->f_pos) {
2781                         file->f_pos = offset;
2782                 }
2783                 retval = offset;
2784         }
2785
2786         RETURN(retval);
2787 }
2788
2789 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2790 {
2791         struct inode *inode = dentry->d_inode;
2792         struct ll_inode_info *lli = ll_i2info(inode);
2793         struct lov_stripe_md *lsm = lli->lli_smd;
2794         struct ptlrpc_request *req;
2795         struct obd_capa *oc;
2796         int rc, err;
2797         ENTRY;
2798         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2799                inode->i_generation, inode);
2800         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2801
2802         /* fsync's caller has already called _fdata{sync,write}, we want
2803          * that IO to finish before calling the osc and mdc sync methods */
2804         rc = filemap_fdatawait(inode->i_mapping);
2805
2806         /* catch async errors that were recorded back when async writeback
2807          * failed for pages in this mapping. */
2808         err = lli->lli_async_rc;
2809         lli->lli_async_rc = 0;
2810         if (rc == 0)
2811                 rc = err;
2812         if (lsm) {
2813                 err = lov_test_and_clear_async_rc(lsm);
2814                 if (rc == 0)
2815                         rc = err;
2816         }
2817
2818         oc = ll_mdscapa_get(inode);
2819         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2820                       &req);
2821         capa_put(oc);
2822         if (!rc)
2823                 rc = err;
2824         if (!err)
2825                 ptlrpc_req_finished(req);
2826
2827         if (data && lsm) {
2828                 struct obdo *oa;
2829
2830                 OBDO_ALLOC(oa);
2831                 if (!oa)
2832                         RETURN(rc ? rc : -ENOMEM);
2833
2834                 oa->o_id = lsm->lsm_object_id;
2835                 oa->o_gr = lsm->lsm_object_gr;
2836                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2837                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2838                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2839                                            OBD_MD_FLGROUP);
2840
2841                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2842                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2843                                0, OBD_OBJECT_EOF, oc);
2844                 capa_put(oc);
2845                 if (!rc)
2846                         rc = err;
2847                 OBDO_FREE(oa);
2848         }
2849
2850         RETURN(rc);
2851 }
2852
2853 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2854 {
2855         struct inode *inode = file->f_dentry->d_inode;
2856         struct ll_sb_info *sbi = ll_i2sbi(inode);
2857         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
2858                                            .ei_cb_cp =ldlm_flock_completion_ast,
2859                                            .ei_cbdata = file_lock };
2860         struct md_op_data *op_data;
2861         struct lustre_handle lockh = {0};
2862         ldlm_policy_data_t flock;
2863         int flags = 0;
2864         int rc;
2865         ENTRY;
2866
2867         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2868                inode->i_ino, file_lock);
2869
2870         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2871
2872         if (file_lock->fl_flags & FL_FLOCK) {
2873                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2874                 /* set missing params for flock() calls */
2875                 file_lock->fl_end = OFFSET_MAX;
2876                 file_lock->fl_pid = current->tgid;
2877         }
2878         flock.l_flock.pid = file_lock->fl_pid;
2879         flock.l_flock.start = file_lock->fl_start;
2880         flock.l_flock.end = file_lock->fl_end;
2881
2882         switch (file_lock->fl_type) {
2883         case F_RDLCK:
2884                 einfo.ei_mode = LCK_PR;
2885                 break;
2886         case F_UNLCK:
2887                 /* An unlock request may or may not have any relation to
2888                  * existing locks so we may not be able to pass a lock handle
2889                  * via a normal ldlm_lock_cancel() request. The request may even
2890                  * unlock a byte range in the middle of an existing lock. In
2891                  * order to process an unlock request we need all of the same
2892                  * information that is given with a normal read or write record
2893                  * lock request. To avoid creating another ldlm unlock (cancel)
2894                  * message we'll treat a LCK_NL flock request as an unlock. */
2895                 einfo.ei_mode = LCK_NL;
2896                 break;
2897         case F_WRLCK:
2898                 einfo.ei_mode = LCK_PW;
2899                 break;
2900         default:
2901                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2902                 LBUG();
2903         }
2904
2905         switch (cmd) {
2906         case F_SETLKW:
2907 #ifdef F_SETLKW64
2908         case F_SETLKW64:
2909 #endif
2910                 flags = 0;
2911                 break;
2912         case F_SETLK:
2913 #ifdef F_SETLK64
2914         case F_SETLK64:
2915 #endif
2916                 flags = LDLM_FL_BLOCK_NOWAIT;
2917                 break;
2918         case F_GETLK:
2919 #ifdef F_GETLK64
2920         case F_GETLK64:
2921 #endif
2922                 flags = LDLM_FL_TEST_LOCK;
2923                 /* Save the old mode so that if the mode in the lock changes we
2924                  * can decrement the appropriate reader or writer refcount. */
2925                 file_lock->fl_type = einfo.ei_mode;
2926                 break;
2927         default:
2928                 CERROR("unknown fcntl lock command: %d\n", cmd);
2929                 LBUG();
2930         }
2931
2932         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2933                                      LUSTRE_OPC_ANY, NULL);
2934         if (IS_ERR(op_data))
2935                 RETURN(PTR_ERR(op_data));
2936
2937         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2938                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2939                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2940
2941         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2942                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2943
2944         ll_finish_md_op_data(op_data);
2945
2946         if ((file_lock->fl_flags & FL_FLOCK) &&
2947             (rc == 0 || file_lock->fl_type == F_UNLCK))
2948                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2949 #ifdef HAVE_F_OP_FLOCK
2950         if ((file_lock->fl_flags & FL_POSIX) &&
2951             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2952             !(flags & LDLM_FL_TEST_LOCK))
2953                 posix_lock_file_wait(file, file_lock);
2954 #endif
2955
2956         RETURN(rc);
2957 }
2958
2959 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2960 {
2961         ENTRY;
2962
2963         RETURN(-ENOSYS);
2964 }
2965
2966 int ll_have_md_lock(struct inode *inode, __u64 bits)
2967 {
2968         struct lustre_handle lockh;
2969         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2970         struct lu_fid *fid;
2971         int flags;
2972         ENTRY;
2973
2974         if (!inode)
2975                RETURN(0);
2976
2977         fid = &ll_i2info(inode)->lli_fid;
2978         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2979
2980         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2981         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2982                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2983                 RETURN(1);
2984         }
2985         RETURN(0);
2986 }
2987
2988 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2989                             struct lustre_handle *lockh)
2990 {
2991         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2992         struct lu_fid *fid;
2993         ldlm_mode_t rc;
2994         int flags;
2995         ENTRY;
2996
2997         fid = &ll_i2info(inode)->lli_fid;
2998         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2999
3000         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
3001         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
3002                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
3003         RETURN(rc);
3004 }
3005
3006 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
3007         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
3008                               * and return success */
3009                 inode->i_nlink = 0;
3010                 /* This path cannot be hit for regular files unless in
3011                  * case of obscure races, so no need to to validate
3012                  * size. */
3013                 if (!S_ISREG(inode->i_mode) &&
3014                     !S_ISDIR(inode->i_mode))
3015                         return 0;
3016         }
3017
3018         if (rc) {
3019                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
3020                 return -abs(rc);
3021
3022         }
3023
3024         return 0;
3025 }
3026
3027 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
3028 {
3029         struct inode *inode = dentry->d_inode;
3030         struct ptlrpc_request *req = NULL;
3031         struct ll_sb_info *sbi;
3032         struct obd_export *exp;
3033         int rc;
3034         ENTRY;
3035
3036         if (!inode) {
3037                 CERROR("REPORT THIS LINE TO PETER\n");
3038                 RETURN(0);
3039         }
3040         sbi = ll_i2sbi(inode);
3041
3042         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
3043                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
3044
3045         exp = ll_i2mdexp(inode);
3046
3047         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
3048                 struct lookup_intent oit = { .it_op = IT_GETATTR };
3049                 struct md_op_data *op_data;
3050
3051                 /* Call getattr by fid, so do not provide name at all. */
3052                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
3053                                              dentry->d_inode, NULL, 0, 0,
3054                                              LUSTRE_OPC_ANY, NULL);
3055                 if (IS_ERR(op_data))
3056                         RETURN(PTR_ERR(op_data));
3057
3058                 oit.it_flags |= O_CHECK_STALE;
3059                 rc = md_intent_lock(exp, op_data, NULL, 0,
3060                                     /* we are not interested in name
3061                                        based lookup */
3062                                     &oit, 0, &req,
3063                                     ll_md_blocking_ast, 0);
3064                 ll_finish_md_op_data(op_data);
3065                 oit.it_flags &= ~O_CHECK_STALE;
3066                 if (rc < 0) {
3067                         rc = ll_inode_revalidate_fini(inode, rc);
3068                         GOTO (out, rc);
3069                 }
3070
3071                 rc = ll_revalidate_it_finish(req, &oit, dentry);
3072                 if (rc != 0) {
3073                         ll_intent_release(&oit);
3074                         GOTO(out, rc);
3075                 }
3076
3077                 /* Unlinked? Unhash dentry, so it is not picked up later by
3078                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
3079                    here to preserve get_cwd functionality on 2.6.
3080                    Bug 10503 */
3081                 if (!dentry->d_inode->i_nlink) {
3082                         spin_lock(&ll_lookup_lock);
3083                         spin_lock(&dcache_lock);
3084                         ll_drop_dentry(dentry);
3085                         spin_unlock(&dcache_lock);
3086                         spin_unlock(&ll_lookup_lock);
3087                 }
3088
3089                 ll_lookup_finish_locks(&oit, dentry);
3090         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
3091                                                      MDS_INODELOCK_LOOKUP)) {
3092                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
3093                 obd_valid valid = OBD_MD_FLGETATTR;
3094                 struct obd_capa *oc;
3095                 int ealen = 0;
3096
3097                 if (S_ISREG(inode->i_mode)) {
3098                         rc = ll_get_max_mdsize(sbi, &ealen);
3099                         if (rc)
3100                                 RETURN(rc);
3101                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
3102                 }
3103                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
3104                  * capa for this inode. Because we only keep capas of dirs
3105                  * fresh. */
3106                 oc = ll_mdscapa_get(inode);
3107                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
3108                                 ealen, &req);
3109                 capa_put(oc);
3110                 if (rc) {
3111                         rc = ll_inode_revalidate_fini(inode, rc);
3112                         RETURN(rc);
3113                 }
3114
3115                 rc = ll_prep_inode(&inode, req, NULL);
3116                 if (rc)
3117                         GOTO(out, rc);
3118         }
3119
3120         /* if object not yet allocated, don't validate size */
3121         if (ll_i2info(inode)->lli_smd == NULL)
3122                 GOTO(out, rc = 0);
3123
3124         /* ll_glimpse_size will prefer locally cached writes if they extend
3125          * the file */
3126         rc = ll_glimpse_size(inode, 0);
3127         EXIT;
3128 out:
3129         ptlrpc_req_finished(req);
3130         return rc;
3131 }
3132
3133 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
3134                   struct lookup_intent *it, struct kstat *stat)
3135 {
3136         struct inode *inode = de->d_inode;
3137         int res = 0;
3138
3139         res = ll_inode_revalidate_it(de, it);
3140         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
3141
3142         if (res)
3143                 return res;
3144
3145         stat->dev = inode->i_sb->s_dev;
3146         stat->ino = inode->i_ino;
3147         stat->mode = inode->i_mode;
3148         stat->nlink = inode->i_nlink;
3149         stat->uid = inode->i_uid;
3150         stat->gid = inode->i_gid;
3151         stat->rdev = kdev_t_to_nr(inode->i_rdev);
3152         stat->atime = inode->i_atime;
3153         stat->mtime = inode->i_mtime;
3154         stat->ctime = inode->i_ctime;
3155 #ifdef HAVE_INODE_BLKSIZE
3156         stat->blksize = inode->i_blksize;
3157 #else
3158         stat->blksize = 1 << inode->i_blkbits;
3159 #endif
3160
3161         ll_inode_size_lock(inode, 0);
3162         stat->size = i_size_read(inode);
3163         stat->blocks = inode->i_blocks;
3164         ll_inode_size_unlock(inode, 0);
3165
3166         return 0;
3167 }
3168 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3169 {
3170         struct lookup_intent it = { .it_op = IT_GETATTR };
3171
3172         return ll_getattr_it(mnt, de, &it, stat);
3173 }
3174
3175 static
3176 int lustre_check_acl(struct inode *inode, int mask)
3177 {
3178 #ifdef CONFIG_FS_POSIX_ACL
3179         struct ll_inode_info *lli = ll_i2info(inode);
3180         struct posix_acl *acl;
3181         int rc;
3182         ENTRY;
3183
3184         spin_lock(&lli->lli_lock);
3185         acl = posix_acl_dup(lli->lli_posix_acl);
3186         spin_unlock(&lli->lli_lock);
3187
3188         if (!acl)
3189                 RETURN(-EAGAIN);
3190
3191         rc = posix_acl_permission(inode, acl, mask);
3192         posix_acl_release(acl);
3193
3194         RETURN(rc);
3195 #else
3196         return -EAGAIN;
3197 #endif
3198 }
3199
3200 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
3201 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3202 {
3203         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3204                inode->i_ino, inode->i_generation, inode, mask);
3205         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3206                 return lustre_check_remote_perm(inode, mask);
3207
3208         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3209         return generic_permission(inode, mask, lustre_check_acl);
3210 }
3211 #else
3212 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3213 {
3214         int mode = inode->i_mode;
3215         int rc;
3216
3217         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3218                inode->i_ino, inode->i_generation, inode, mask);
3219
3220         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3221                 return lustre_check_remote_perm(inode, mask);
3222
3223         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3224
3225         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
3226             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
3227                 return -EROFS;
3228         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
3229                 return -EACCES;
3230         if (current->fsuid == inode->i_uid) {
3231                 mode >>= 6;
3232         } else if (1) {
3233                 if (((mode >> 3) & mask & S_IRWXO) != mask)
3234                         goto check_groups;
3235                 rc = lustre_check_acl(inode, mask);
3236                 if (rc == -EAGAIN)
3237                         goto check_groups;
3238                 if (rc == -EACCES)
3239                         goto check_capabilities;
3240                 return rc;
3241         } else {
3242 check_groups:
3243                 if (in_group_p(inode->i_gid))
3244                         mode >>= 3;
3245         }
3246         if ((mode & mask & S_IRWXO) == mask)
3247                 return 0;
3248
3249 check_capabilities:
3250         if (!(mask & MAY_EXEC) ||
3251             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
3252                 if (cfs_capable(CFS_CAP_DAC_OVERRIDE))
3253                         return 0;
3254
3255         if (cfs_capable(CFS_CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
3256             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
3257                 return 0;
3258
3259         return -EACCES;
3260 }
3261 #endif
3262
3263 /* -o localflock - only provides locally consistent flock locks */
3264 struct file_operations ll_file_operations = {
3265         .read           = ll_file_read,
3266         .write          = ll_file_write,
3267         .ioctl          = ll_file_ioctl,
3268         .open           = ll_file_open,
3269         .release        = ll_file_release,
3270         .mmap           = ll_file_mmap,
3271         .llseek         = ll_file_seek,
3272         .sendfile       = ll_file_sendfile,
3273         .fsync          = ll_fsync,
3274 };
3275
3276 struct file_operations ll_file_operations_flock = {
3277         .read           = ll_file_read,
3278         .write          = ll_file_write,
3279         .ioctl          = ll_file_ioctl,
3280         .open           = ll_file_open,
3281         .release        = ll_file_release,
3282         .mmap           = ll_file_mmap,
3283         .llseek         = ll_file_seek,
3284         .sendfile       = ll_file_sendfile,
3285         .fsync          = ll_fsync,
3286 #ifdef HAVE_F_OP_FLOCK
3287         .flock          = ll_file_flock,
3288 #endif
3289         .lock           = ll_file_flock
3290 };
3291
3292 /* These are for -o noflock - to return ENOSYS on flock calls */
3293 struct file_operations ll_file_operations_noflock = {
3294         .read           = ll_file_read,
3295         .write          = ll_file_write,
3296         .ioctl          = ll_file_ioctl,
3297         .open           = ll_file_open,
3298         .release        = ll_file_release,
3299         .mmap           = ll_file_mmap,
3300         .llseek         = ll_file_seek,
3301         .sendfile       = ll_file_sendfile,
3302         .fsync          = ll_fsync,
3303 #ifdef HAVE_F_OP_FLOCK
3304         .flock          = ll_file_noflock,
3305 #endif
3306         .lock           = ll_file_noflock
3307 };
3308
3309 struct inode_operations ll_file_inode_operations = {
3310 #ifdef HAVE_VFS_INTENT_PATCHES
3311         .setattr_raw    = ll_setattr_raw,
3312 #endif
3313         .setattr        = ll_setattr,
3314         .truncate       = ll_truncate,
3315         .getattr        = ll_getattr,
3316         .permission     = ll_inode_permission,
3317         .setxattr       = ll_setxattr,
3318         .getxattr       = ll_getxattr,
3319         .listxattr      = ll_listxattr,
3320         .removexattr    = ll_removexattr,
3321 };
3322
3323 /* dynamic ioctl number support routins */
3324 static struct llioc_ctl_data {
3325         struct rw_semaphore ioc_sem;
3326         struct list_head    ioc_head;
3327 } llioc = {
3328         __RWSEM_INITIALIZER(llioc.ioc_sem),
3329         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3330 };
3331
3332
3333 struct llioc_data {
3334         struct list_head        iocd_list;
3335         unsigned int            iocd_size;
3336         llioc_callback_t        iocd_cb;
3337         unsigned int            iocd_count;
3338         unsigned int            iocd_cmd[0];
3339 };
3340
3341 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3342 {
3343         unsigned int size;
3344         struct llioc_data *in_data = NULL;
3345         ENTRY;
3346
3347         if (cb == NULL || cmd == NULL ||
3348             count > LLIOC_MAX_CMD || count < 0)
3349                 RETURN(NULL);
3350
3351         size = sizeof(*in_data) + count * sizeof(unsigned int);
3352         OBD_ALLOC(in_data, size);
3353         if (in_data == NULL)
3354                 RETURN(NULL);
3355
3356         memset(in_data, 0, sizeof(*in_data));
3357         in_data->iocd_size = size;
3358         in_data->iocd_cb = cb;
3359         in_data->iocd_count = count;
3360         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3361
3362         down_write(&llioc.ioc_sem);
3363         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3364         up_write(&llioc.ioc_sem);
3365
3366         RETURN(in_data);
3367 }
3368
3369 void ll_iocontrol_unregister(void *magic)
3370 {
3371         struct llioc_data *tmp;
3372
3373         if (magic == NULL)
3374                 return;
3375
3376         down_write(&llioc.ioc_sem);
3377         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3378                 if (tmp == magic) {
3379                         unsigned int size = tmp->iocd_size;
3380
3381                         list_del(&tmp->iocd_list);
3382                         up_write(&llioc.ioc_sem);
3383
3384                         OBD_FREE(tmp, size);
3385                         return;
3386                 }
3387         }
3388         up_write(&llioc.ioc_sem);
3389
3390         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3391 }
3392
3393 EXPORT_SYMBOL(ll_iocontrol_register);
3394 EXPORT_SYMBOL(ll_iocontrol_unregister);
3395
3396 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
3397                         unsigned int cmd, unsigned long arg, int *rcp)
3398 {
3399         enum llioc_iter ret = LLIOC_CONT;
3400         struct llioc_data *data;
3401         int rc = -EINVAL, i;
3402
3403         down_read(&llioc.ioc_sem);
3404         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3405                 for (i = 0; i < data->iocd_count; i++) {
3406                         if (cmd != data->iocd_cmd[i])
3407                                 continue;
3408
3409                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3410                         break;
3411                 }
3412
3413                 if (ret == LLIOC_STOP)
3414                         break;
3415         }
3416         up_read(&llioc.ioc_sem);
3417
3418         if (rcp)
3419                 *rcp = rc;
3420         return ret;
3421 }