Whamcloud - gitweb
CLIO uses lock weighting policy to keep locks over mmapped regions in
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <lustre_mdc.h>
47 #include <linux/pagemap.h>
48 #include <linux/file.h>
49 #include "llite_internal.h"
50 #include <lustre/ll_fiemap.h>
51
52 /* also used by llite/special.c:ll_special_open() */
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
58         return fd;
59 }
60
61 static void ll_file_data_put(struct ll_file_data *fd)
62 {
63         if (fd != NULL)
64                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
65 }
66
67 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
68                           struct lustre_handle *fh)
69 {
70         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
71         op_data->op_attr.ia_mode = inode->i_mode;
72         op_data->op_attr.ia_atime = inode->i_atime;
73         op_data->op_attr.ia_mtime = inode->i_mtime;
74         op_data->op_attr.ia_ctime = inode->i_ctime;
75         op_data->op_attr.ia_size = i_size_read(inode);
76         op_data->op_attr_blocks = inode->i_blocks;
77         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
78         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
79         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
80         op_data->op_capa1 = ll_mdscapa_get(inode);
81 }
82
83 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
84                              struct obd_client_handle *och)
85 {
86         ENTRY;
87
88         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
89                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
90
91         if (!(och->och_flags & FMODE_WRITE))
92                 goto out;
93
94         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
95             !S_ISREG(inode->i_mode))
96                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
97         else
98                 ll_epoch_close(inode, op_data, &och, 0);
99
100 out:
101         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
102         EXIT;
103 }
104
105 static int ll_close_inode_openhandle(struct obd_export *md_exp,
106                                      struct inode *inode,
107                                      struct obd_client_handle *och)
108 {
109         struct obd_export *exp = ll_i2mdexp(inode);
110         struct md_op_data *op_data;
111         struct ptlrpc_request *req = NULL;
112         struct obd_device *obd = class_exp2obd(exp);
113         int epoch_close = 1;
114         int seq_end = 0, rc;
115         ENTRY;
116
117         if (obd == NULL) {
118                 /*
119                  * XXX: in case of LMV, is this correct to access
120                  * ->exp_handle?
121                  */
122                 CERROR("Invalid MDC connection handle "LPX64"\n",
123                        ll_i2mdexp(inode)->exp_handle.h_cookie);
124                 GOTO(out, rc = 0);
125         }
126
127         /*
128          * here we check if this is forced umount. If so this is called on
129          * canceling "open lock" and we do not call md_close() in this case, as
130          * it will not be successful, as import is already deactivated.
131          */
132         if (obd->obd_force)
133                 GOTO(out, rc = 0);
134
135         OBD_ALLOC_PTR(op_data);
136         if (op_data == NULL)
137                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
138
139         ll_prepare_close(inode, op_data, och);
140         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
141         rc = md_close(md_exp, op_data, och->och_mod, &req);
142         if (rc != -EAGAIN)
143                 seq_end = 1;
144
145         if (rc == -EAGAIN) {
146                 /* This close must have the epoch closed. */
147                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
148                 LASSERT(epoch_close);
149                 /* MDS has instructed us to obtain Size-on-MDS attribute from
150                  * OSTs and send setattr to back to MDS. */
151                 rc = ll_sizeonmds_update(inode, och->och_mod,
152                                          &och->och_fh, op_data->op_ioepoch);
153                 if (rc) {
154                         CERROR("inode %lu mdc Size-on-MDS update failed: "
155                                "rc = %d\n", inode->i_ino, rc);
156                         rc = 0;
157                 }
158         } else if (rc) {
159                 CERROR("inode %lu mdc close failed: rc = %d\n",
160                        inode->i_ino, rc);
161         }
162         ll_finish_md_op_data(op_data);
163
164         if (rc == 0) {
165                 rc = ll_objects_destroy(req, inode);
166                 if (rc)
167                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
168                                inode->i_ino, rc);
169         }
170
171         EXIT;
172 out:
173
174         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
175             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
176                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
177         } else {
178                 if (seq_end)
179                         ptlrpc_close_replay_seq(req);
180                 md_clear_open_replay_data(md_exp, och);
181                 /* Free @och if it is not waiting for DONE_WRITING. */
182                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
183                 OBD_FREE_PTR(och);
184         }
185         if (req) /* This is close request */
186                 ptlrpc_req_finished(req);
187         return rc;
188 }
189
190 int ll_md_real_close(struct inode *inode, int flags)
191 {
192         struct ll_inode_info *lli = ll_i2info(inode);
193         struct obd_client_handle **och_p;
194         struct obd_client_handle *och;
195         __u64 *och_usecount;
196         int rc = 0;
197         ENTRY;
198
199         if (flags & FMODE_WRITE) {
200                 och_p = &lli->lli_mds_write_och;
201                 och_usecount = &lli->lli_open_fd_write_count;
202         } else if (flags & FMODE_EXEC) {
203                 och_p = &lli->lli_mds_exec_och;
204                 och_usecount = &lli->lli_open_fd_exec_count;
205         } else {
206                 LASSERT(flags & FMODE_READ);
207                 och_p = &lli->lli_mds_read_och;
208                 och_usecount = &lli->lli_open_fd_read_count;
209         }
210
211         down(&lli->lli_och_sem);
212         if (*och_usecount) { /* There are still users of this handle, so
213                                 skip freeing it. */
214                 up(&lli->lli_och_sem);
215                 RETURN(0);
216         }
217         och=*och_p;
218         *och_p = NULL;
219         up(&lli->lli_och_sem);
220
221         if (och) { /* There might be a race and somebody have freed this och
222                       already */
223                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
224                                                inode, och);
225         }
226
227         RETURN(rc);
228 }
229
230 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
231                 struct file *file)
232 {
233         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
234         struct ll_inode_info *lli = ll_i2info(inode);
235         int rc = 0;
236         ENTRY;
237
238         /* clear group lock, if present */
239         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
240                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
241                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
242                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
243                                       &fd->fd_cwlockh);
244         }
245
246         /* Let's see if we have good enough OPEN lock on the file and if
247            we can skip talking to MDS */
248         if (file->f_dentry->d_inode) { /* Can this ever be false? */
249                 int lockmode;
250                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
251                 struct lustre_handle lockh;
252                 struct inode *inode = file->f_dentry->d_inode;
253                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
254
255                 down(&lli->lli_och_sem);
256                 if (fd->fd_omode & FMODE_WRITE) {
257                         lockmode = LCK_CW;
258                         LASSERT(lli->lli_open_fd_write_count);
259                         lli->lli_open_fd_write_count--;
260                 } else if (fd->fd_omode & FMODE_EXEC) {
261                         lockmode = LCK_PR;
262                         LASSERT(lli->lli_open_fd_exec_count);
263                         lli->lli_open_fd_exec_count--;
264                 } else {
265                         lockmode = LCK_CR;
266                         LASSERT(lli->lli_open_fd_read_count);
267                         lli->lli_open_fd_read_count--;
268                 }
269                 up(&lli->lli_och_sem);
270
271                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
272                                    LDLM_IBITS, &policy, lockmode,
273                                    &lockh)) {
274                         rc = ll_md_real_close(file->f_dentry->d_inode,
275                                               fd->fd_omode);
276                 }
277         } else {
278                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
279                        file, file->f_dentry, file->f_dentry->d_name.name);
280         }
281
282         LUSTRE_FPRIVATE(file) = NULL;
283         ll_file_data_put(fd);
284         ll_capa_close(inode);
285
286         RETURN(rc);
287 }
288
289 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
290
291 /* While this returns an error code, fput() the caller does not, so we need
292  * to make every effort to clean up all of our state here.  Also, applications
293  * rarely check close errors and even if an error is returned they will not
294  * re-try the close call.
295  */
296 int ll_file_release(struct inode *inode, struct file *file)
297 {
298         struct ll_file_data *fd;
299         struct ll_sb_info *sbi = ll_i2sbi(inode);
300         struct ll_inode_info *lli = ll_i2info(inode);
301         struct lov_stripe_md *lsm = lli->lli_smd;
302         int rc;
303         ENTRY;
304
305         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
306                inode->i_generation, inode);
307
308 #ifdef CONFIG_FS_POSIX_ACL
309         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
310             inode == inode->i_sb->s_root->d_inode) {
311                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
312
313                 LASSERT(fd != NULL);
314                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
315                         fd->fd_flags &= ~LL_FILE_RMTACL;
316                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
317                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
318                 }
319         }
320 #endif
321
322         if (inode->i_sb->s_root != file->f_dentry)
323                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
324         fd = LUSTRE_FPRIVATE(file);
325         LASSERT(fd != NULL);
326
327         /* The last ref on @file, maybe not the the owner pid of statahead.
328          * Different processes can open the same dir, "ll_opendir_key" means:
329          * it is me that should stop the statahead thread. */
330         if (lli->lli_opendir_key == fd && lli->lli_opendir_pid != 0)
331                 ll_stop_statahead(inode, fd);
332
333         if (inode->i_sb->s_root == file->f_dentry) {
334                 LUSTRE_FPRIVATE(file) = NULL;
335                 ll_file_data_put(fd);
336                 RETURN(0);
337         }
338
339         if (lsm)
340                 lov_test_and_clear_async_rc(lsm);
341         lli->lli_async_rc = 0;
342
343         rc = ll_md_close(sbi->ll_md_exp, inode, file);
344         RETURN(rc);
345 }
346
347 static int ll_intent_file_open(struct file *file, void *lmm,
348                                int lmmsize, struct lookup_intent *itp)
349 {
350         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
351         struct dentry *parent = file->f_dentry->d_parent;
352         const char *name = file->f_dentry->d_name.name;
353         const int len = file->f_dentry->d_name.len;
354         struct md_op_data *op_data;
355         struct ptlrpc_request *req;
356         int rc;
357         ENTRY;
358
359         if (!parent)
360                 RETURN(-ENOENT);
361
362         /* Usually we come here only for NFSD, and we want open lock.
363            But we can also get here with pre 2.6.15 patchless kernels, and in
364            that case that lock is also ok */
365         /* We can also get here if there was cached open handle in revalidate_it
366          * but it disappeared while we were getting from there to ll_file_open.
367          * But this means this file was closed and immediatelly opened which
368          * makes a good candidate for using OPEN lock */
369         /* If lmmsize & lmm are not 0, we are just setting stripe info
370          * parameters. No need for the open lock */
371         if (!lmm && !lmmsize)
372                 itp->it_flags |= MDS_OPEN_LOCK;
373
374         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
375                                       file->f_dentry->d_inode, name, len,
376                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
377         if (IS_ERR(op_data))
378                 RETURN(PTR_ERR(op_data));
379
380         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
381                             0 /*unused */, &req, ll_md_blocking_ast, 0);
382         ll_finish_md_op_data(op_data);
383         if (rc == -ESTALE) {
384                 /* reason for keep own exit path - don`t flood log
385                 * with messages with -ESTALE errors.
386                 */
387                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
388                      it_open_error(DISP_OPEN_OPEN, itp))
389                         GOTO(out, rc);
390                 ll_release_openhandle(file->f_dentry, itp);
391                 GOTO(out, rc);
392         }
393
394         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
395                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
396                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
397                 GOTO(out, rc);
398         }
399
400         if (itp->d.lustre.it_lock_mode)
401                 md_set_lock_data(sbi->ll_md_exp,
402                                  &itp->d.lustre.it_lock_handle,
403                                  file->f_dentry->d_inode);
404
405         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
406 out:
407         ptlrpc_req_finished(itp->d.lustre.it_data);
408         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
409         ll_intent_drop_lock(itp);
410
411         RETURN(rc);
412 }
413
414 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
415                        struct lookup_intent *it, struct obd_client_handle *och)
416 {
417         struct ptlrpc_request *req = it->d.lustre.it_data;
418         struct mdt_body *body;
419
420         LASSERT(och);
421
422         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
423         LASSERT(body != NULL);                      /* reply already checked out */
424
425         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
426         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
427         och->och_fid = lli->lli_fid;
428         och->och_flags = it->it_flags;
429         lli->lli_ioepoch = body->ioepoch;
430
431         return md_set_open_replay_data(md_exp, och, req);
432 }
433
434 int ll_local_open(struct file *file, struct lookup_intent *it,
435                   struct ll_file_data *fd, struct obd_client_handle *och)
436 {
437         struct inode *inode = file->f_dentry->d_inode;
438         struct ll_inode_info *lli = ll_i2info(inode);
439         ENTRY;
440
441         LASSERT(!LUSTRE_FPRIVATE(file));
442
443         LASSERT(fd != NULL);
444
445         if (och) {
446                 struct ptlrpc_request *req = it->d.lustre.it_data;
447                 struct mdt_body *body;
448                 int rc;
449
450                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
451                 if (rc)
452                         RETURN(rc);
453
454                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
455                 if ((it->it_flags & FMODE_WRITE) &&
456                     (body->valid & OBD_MD_FLSIZE))
457                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
458                                lli->lli_ioepoch, PFID(&lli->lli_fid));
459         }
460
461         LUSTRE_FPRIVATE(file) = fd;
462         ll_readahead_init(inode, &fd->fd_ras);
463         fd->fd_omode = it->it_flags;
464         RETURN(0);
465 }
466
467 /* Open a file, and (for the very first open) create objects on the OSTs at
468  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
469  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
470  * lli_open_sem to ensure no other process will create objects, send the
471  * stripe MD to the MDS, or try to destroy the objects if that fails.
472  *
473  * If we already have the stripe MD locally then we don't request it in
474  * md_open(), by passing a lmm_size = 0.
475  *
476  * It is up to the application to ensure no other processes open this file
477  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
478  * used.  We might be able to avoid races of that sort by getting lli_open_sem
479  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
480  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
481  */
482 int ll_file_open(struct inode *inode, struct file *file)
483 {
484         struct ll_inode_info *lli = ll_i2info(inode);
485         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
486                                           .it_flags = file->f_flags };
487         struct lov_stripe_md *lsm;
488         struct ptlrpc_request *req = NULL;
489         struct obd_client_handle **och_p;
490         __u64 *och_usecount;
491         struct ll_file_data *fd;
492         int rc = 0, opendir_set = 0;
493         ENTRY;
494
495         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
496                inode->i_generation, inode, file->f_flags);
497
498 #ifdef HAVE_VFS_INTENT_PATCHES
499         it = file->f_it;
500 #else
501         it = file->private_data; /* XXX: compat macro */
502         file->private_data = NULL; /* prevent ll_local_open assertion */
503 #endif
504
505         fd = ll_file_data_get();
506         if (fd == NULL)
507                 RETURN(-ENOMEM);
508
509         if (S_ISDIR(inode->i_mode)) {
510 again:
511                 spin_lock(&lli->lli_lock);
512                 if (lli->lli_opendir_key == NULL && lli->lli_opendir_pid == 0) {
513                         LASSERT(lli->lli_sai == NULL);
514                         lli->lli_opendir_key = fd;
515                         lli->lli_opendir_pid = cfs_curproc_pid();
516                         opendir_set = 1;
517                 } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid() &&
518                                     lli->lli_opendir_key != NULL)) {
519                         /* Two cases for this:
520                          * (1) The same process open such directory many times.
521                          * (2) The old process opened the directory, and exited
522                          *     before its children processes. Then new process
523                          *     with the same pid opens such directory before the
524                          *     old process's children processes exit.
525                          * reset stat ahead for such cases. */
526                         spin_unlock(&lli->lli_lock);
527                         CDEBUG(D_INFO, "Conflict statahead for %.*s "DFID
528                                " reset it.\n", file->f_dentry->d_name.len,
529                                file->f_dentry->d_name.name,
530                                PFID(&lli->lli_fid));
531                         ll_stop_statahead(inode, lli->lli_opendir_key);
532                         goto again;
533                 }
534                 spin_unlock(&lli->lli_lock);
535         }
536
537         if (inode->i_sb->s_root == file->f_dentry) {
538                 LUSTRE_FPRIVATE(file) = fd;
539                 RETURN(0);
540         }
541
542         if (!it || !it->d.lustre.it_disposition) {
543                 /* Convert f_flags into access mode. We cannot use file->f_mode,
544                  * because everything but O_ACCMODE mask was stripped from
545                  * there */
546                 if ((oit.it_flags + 1) & O_ACCMODE)
547                         oit.it_flags++;
548                 if (file->f_flags & O_TRUNC)
549                         oit.it_flags |= FMODE_WRITE;
550
551                 /* kernel only call f_op->open in dentry_open.  filp_open calls
552                  * dentry_open after call to open_namei that checks permissions.
553                  * Only nfsd_open call dentry_open directly without checking
554                  * permissions and because of that this code below is safe. */
555                 if (oit.it_flags & FMODE_WRITE)
556                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
557
558                 /* We do not want O_EXCL here, presumably we opened the file
559                  * already? XXX - NFS implications? */
560                 oit.it_flags &= ~O_EXCL;
561
562                 it = &oit;
563         }
564
565 restart:
566         /* Let's see if we have file open on MDS already. */
567         if (it->it_flags & FMODE_WRITE) {
568                 och_p = &lli->lli_mds_write_och;
569                 och_usecount = &lli->lli_open_fd_write_count;
570         } else if (it->it_flags & FMODE_EXEC) {
571                 och_p = &lli->lli_mds_exec_och;
572                 och_usecount = &lli->lli_open_fd_exec_count;
573          } else {
574                 och_p = &lli->lli_mds_read_och;
575                 och_usecount = &lli->lli_open_fd_read_count;
576         }
577
578         down(&lli->lli_och_sem);
579         if (*och_p) { /* Open handle is present */
580                 if (it_disposition(it, DISP_OPEN_OPEN)) {
581                         /* Well, there's extra open request that we do not need,
582                            let's close it somehow. This will decref request. */
583                         rc = it_open_error(DISP_OPEN_OPEN, it);
584                         if (rc) {
585                                 up(&lli->lli_och_sem);
586                                 ll_file_data_put(fd);
587                                 GOTO(out_openerr, rc);
588                         }
589                         ll_release_openhandle(file->f_dentry, it);
590                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
591                                              LPROC_LL_OPEN);
592                 }
593                 (*och_usecount)++;
594
595                 rc = ll_local_open(file, it, fd, NULL);
596                 if (rc) {
597                         (*och_usecount)--;
598                         up(&lli->lli_och_sem);
599                         ll_file_data_put(fd);
600                         GOTO(out_openerr, rc);
601                 }
602         } else {
603                 LASSERT(*och_usecount == 0);
604                 if (!it->d.lustre.it_disposition) {
605                         /* We cannot just request lock handle now, new ELC code
606                            means that one of other OPEN locks for this file
607                            could be cancelled, and since blocking ast handler
608                            would attempt to grab och_sem as well, that would
609                            result in a deadlock */
610                         up(&lli->lli_och_sem);
611                         it->it_flags |= O_CHECK_STALE;
612                         rc = ll_intent_file_open(file, NULL, 0, it);
613                         it->it_flags &= ~O_CHECK_STALE;
614                         if (rc) {
615                                 ll_file_data_put(fd);
616                                 GOTO(out_openerr, rc);
617                         }
618
619                         /* Got some error? Release the request */
620                         if (it->d.lustre.it_status < 0) {
621                                 req = it->d.lustre.it_data;
622                                 ptlrpc_req_finished(req);
623                         }
624                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
625                                          &it->d.lustre.it_lock_handle,
626                                          file->f_dentry->d_inode);
627                         goto restart;
628                 }
629                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
630                 if (!*och_p) {
631                         ll_file_data_put(fd);
632                         GOTO(out_och_free, rc = -ENOMEM);
633                 }
634                 (*och_usecount)++;
635                 req = it->d.lustre.it_data;
636
637                 /* md_intent_lock() didn't get a request ref if there was an
638                  * open error, so don't do cleanup on the request here
639                  * (bug 3430) */
640                 /* XXX (green): Should not we bail out on any error here, not
641                  * just open error? */
642                 rc = it_open_error(DISP_OPEN_OPEN, it);
643                 if (rc) {
644                         ll_file_data_put(fd);
645                         GOTO(out_och_free, rc);
646                 }
647
648                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
649                 rc = ll_local_open(file, it, fd, *och_p);
650                 if (rc) {
651                         ll_file_data_put(fd);
652                         GOTO(out_och_free, rc);
653                 }
654         }
655         up(&lli->lli_och_sem);
656
657         /* Must do this outside lli_och_sem lock to prevent deadlock where
658            different kind of OPEN lock for this same inode gets cancelled
659            by ldlm_cancel_lru */
660         if (!S_ISREG(inode->i_mode))
661                 GOTO(out, rc);
662
663         ll_capa_open(inode);
664
665         lsm = lli->lli_smd;
666         if (lsm == NULL) {
667                 if (file->f_flags & O_LOV_DELAY_CREATE ||
668                     !(file->f_mode & FMODE_WRITE)) {
669                         CDEBUG(D_INODE, "object creation was delayed\n");
670                         GOTO(out, rc);
671                 }
672         }
673         file->f_flags &= ~O_LOV_DELAY_CREATE;
674         GOTO(out, rc);
675 out:
676         ptlrpc_req_finished(req);
677         if (req)
678                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
679 out_och_free:
680         if (rc) {
681                 if (*och_p) {
682                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
683                         *och_p = NULL; /* OBD_FREE writes some magic there */
684                         (*och_usecount)--;
685                 }
686                 up(&lli->lli_och_sem);
687 out_openerr:
688                 if (opendir_set != 0)
689                         ll_stop_statahead(inode, fd);
690         }
691
692         return rc;
693 }
694
695 /* Fills the obdo with the attributes for the inode defined by lsm */
696 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
697 {
698         struct ptlrpc_request_set *set;
699         struct ll_inode_info *lli = ll_i2info(inode);
700         struct lov_stripe_md *lsm = lli->lli_smd;
701
702         struct obd_info oinfo = { { { 0 } } };
703         int rc;
704         ENTRY;
705
706         LASSERT(lsm != NULL);
707
708         oinfo.oi_md = lsm;
709         oinfo.oi_oa = obdo;
710         oinfo.oi_oa->o_id = lsm->lsm_object_id;
711         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
712         oinfo.oi_oa->o_mode = S_IFREG;
713         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
714                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
715                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
716                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
717                                OBD_MD_FLGROUP;
718         oinfo.oi_capa = ll_mdscapa_get(inode);
719
720         set = ptlrpc_prep_set();
721         if (set == NULL) {
722                 CERROR("can't allocate ptlrpc set\n");
723                 rc = -ENOMEM;
724         } else {
725                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
726                 if (rc == 0)
727                         rc = ptlrpc_set_wait(set);
728                 ptlrpc_set_destroy(set);
729         }
730         capa_put(oinfo.oi_capa);
731         if (rc)
732                 RETURN(rc);
733
734         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
735                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
736                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
737
738         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
739         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
740                lli->lli_smd->lsm_object_id, i_size_read(inode),
741                (unsigned long long)inode->i_blocks,
742                (unsigned long)ll_inode_blksize(inode));
743         RETURN(0);
744 }
745
746 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
747 {
748         struct ll_inode_info *lli = ll_i2info(inode);
749         struct lov_stripe_md *lsm = lli->lli_smd;
750         struct obd_export *exp = ll_i2dtexp(inode);
751         struct {
752                 char name[16];
753                 struct ldlm_lock *lock;
754         } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock };
755         __u32 stripe, vallen = sizeof(stripe);
756         struct lov_oinfo *loinfo;
757         int rc;
758         ENTRY;
759
760         if (lsm->lsm_stripe_count == 1)
761                 GOTO(check, stripe = 0);
762
763         /* get our offset in the lov */
764         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe, lsm);
765         if (rc != 0) {
766                 CERROR("obd_get_info: rc = %d\n", rc);
767                 RETURN(rc);
768         }
769         LASSERT(stripe < lsm->lsm_stripe_count);
770
771 check:
772         loinfo = lsm->lsm_oinfo[stripe];
773         if (!osc_res_name_eq(loinfo->loi_id, loinfo->loi_gr,
774                             &lock->l_resource->lr_name)){
775                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
776                            loinfo->loi_id, loinfo->loi_gr);
777                 RETURN(-ELDLM_NO_LOCK_DATA);
778         }
779
780         RETURN(stripe);
781 }
782
783 /* Get extra page reference to ensure it is not going away */
784 void ll_pin_extent_cb(void *data)
785 {
786         struct page *page = data;
787
788         page_cache_get(page);
789
790         return;
791 }
792
793 /* Flush the page from page cache for an extent as its canceled.
794  * Page to remove is delivered as @data.
795  *
796  * No one can dirty the extent until we've finished our work and they cannot
797  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
798  * but other kernel actors could have pages locked.
799  *
800  * If @discard is set, there is no need to write the page if it is dirty.
801  *
802  * Called with the DLM lock held. */
803 int ll_page_removal_cb(void *data, int discard)
804 {
805         int rc;
806         struct page *page = data;
807         struct address_space *mapping;
808
809         ENTRY;
810
811         /* We have page reference already from ll_pin_page */
812         lock_page(page);
813
814         /* Already truncated by somebody */
815         if (!page->mapping)
816                 GOTO(out, rc = 0);
817         mapping = page->mapping;
818
819         ll_teardown_mmaps(mapping,
820                           (__u64)page->index << PAGE_CACHE_SHIFT,
821                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
822                                                               ~PAGE_CACHE_MASK);
823         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
824
825         if (!discard && clear_page_dirty_for_io(page)) {
826                 LASSERT(page->mapping);
827                 rc = ll_call_writepage(page->mapping->host, page);
828                 /* either waiting for io to complete or reacquiring
829                  * the lock that the failed writepage released */
830                 lock_page(page);
831                 wait_on_page_writeback(page);
832                 if (rc != 0) {
833                         CERROR("writepage inode %lu(%p) of page %p "
834                                "failed: %d\n", mapping->host->i_ino,
835                                mapping->host, page, rc);
836                         if (rc == -ENOSPC)
837                                 set_bit(AS_ENOSPC, &mapping->flags);
838                         else
839                                 set_bit(AS_EIO, &mapping->flags);
840                 }
841                 set_bit(AS_EIO, &mapping->flags);
842         }
843         if (page->mapping != NULL) {
844                 struct ll_async_page *llap = llap_cast_private(page);
845                 /* checking again to account for writeback's lock_page() */
846                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
847                 if (llap)
848                         ll_ra_accounting(llap, page->mapping);
849                 ll_truncate_complete_page(page);
850         }
851         EXIT;
852 out:
853         LASSERT(!PageWriteback(page));
854         unlock_page(page);
855         page_cache_release(page);
856
857         return 0;
858 }
859
860 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
861                              void *data, int flag)
862 {
863         struct inode *inode;
864         struct ll_inode_info *lli;
865         struct lov_stripe_md *lsm;
866         int stripe;
867         __u64 kms;
868
869         ENTRY;
870
871         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
872                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
873                 LBUG();
874         }
875
876         inode = ll_inode_from_lock(lock);
877         if (inode == NULL)
878                 RETURN(0);
879         lli = ll_i2info(inode);
880         if (lli == NULL)
881                 GOTO(iput, 0);
882         if (lli->lli_smd == NULL)
883                 GOTO(iput, 0);
884         lsm = lli->lli_smd;
885
886         stripe = ll_lock_to_stripe_offset(inode, lock);
887         if (stripe < 0)
888                 GOTO(iput, 0);
889
890         lov_stripe_lock(lsm);
891         lock_res_and_lock(lock);
892         kms = ldlm_extent_shift_kms(lock,
893                                     lsm->lsm_oinfo[stripe]->loi_kms);
894
895         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
896                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
897                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
898         lsm->lsm_oinfo[stripe]->loi_kms = kms;
899         unlock_res_and_lock(lock);
900         lov_stripe_unlock(lsm);
901         ll_queue_done_writing(inode, 0);
902         EXIT;
903 iput:
904         iput(inode);
905
906         return 0;
907 }
908
909 #if 0
910 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
911 {
912         /* XXX ALLOCATE - 160 bytes */
913         struct inode *inode = ll_inode_from_lock(lock);
914         struct ll_inode_info *lli = ll_i2info(inode);
915         struct lustre_handle lockh = { 0 };
916         struct ost_lvb *lvb;
917         int stripe;
918         ENTRY;
919
920         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
921                      LDLM_FL_BLOCK_CONV)) {
922                 LBUG(); /* not expecting any blocked async locks yet */
923                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
924                            "lock, returning");
925                 ldlm_lock_dump(D_OTHER, lock, 0);
926                 ldlm_reprocess_all(lock->l_resource);
927                 RETURN(0);
928         }
929
930         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
931
932         stripe = ll_lock_to_stripe_offset(inode, lock);
933         if (stripe < 0)
934                 goto iput;
935
936         if (lock->l_lvb_len) {
937                 struct lov_stripe_md *lsm = lli->lli_smd;
938                 __u64 kms;
939                 lvb = lock->l_lvb_data;
940                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
941
942                 lock_res_and_lock(lock);
943                 ll_inode_size_lock(inode, 1);
944                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
945                 kms = ldlm_extent_shift_kms(NULL, kms);
946                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
947                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
948                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
949                 lsm->lsm_oinfo[stripe].loi_kms = kms;
950                 ll_inode_size_unlock(inode, 1);
951                 unlock_res_and_lock(lock);
952         }
953
954 iput:
955         iput(inode);
956         wake_up(&lock->l_waitq);
957
958         ldlm_lock2handle(lock, &lockh);
959         ldlm_lock_decref(&lockh, LCK_PR);
960         RETURN(0);
961 }
962 #endif
963
964 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
965 {
966         struct ptlrpc_request *req = reqp;
967         struct inode *inode = ll_inode_from_lock(lock);
968         struct ll_inode_info *lli;
969         struct lov_stripe_md *lsm;
970         struct ost_lvb *lvb;
971         int rc, stripe;
972         ENTRY;
973
974         if (inode == NULL)
975                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
976         lli = ll_i2info(inode);
977         if (lli == NULL)
978                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
979         lsm = lli->lli_smd;
980         if (lsm == NULL)
981                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
982
983         /* First, find out which stripe index this lock corresponds to. */
984         stripe = ll_lock_to_stripe_offset(inode, lock);
985         if (stripe < 0)
986                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
987
988         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
989         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
990                              sizeof(*lvb));
991         rc = req_capsule_server_pack(&req->rq_pill);
992         if (rc) {
993                 CERROR("lustre_pack_reply: %d\n", rc);
994                 GOTO(iput, rc);
995         }
996
997         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
998         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
999         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1000         lvb->lvb_atime = LTIME_S(inode->i_atime);
1001         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1002
1003         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1004                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1005                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1006                    lvb->lvb_atime, lvb->lvb_ctime);
1007  iput:
1008         iput(inode);
1009
1010  out:
1011         /* These errors are normal races, so we don't want to fill the console
1012          * with messages by calling ptlrpc_error() */
1013         if (rc == -ELDLM_NO_LOCK_DATA)
1014                 lustre_pack_reply(req, 1, NULL, NULL);
1015
1016         req->rq_status = rc;
1017         return rc;
1018 }
1019
1020 static int ll_merge_lvb(struct inode *inode)
1021 {
1022         struct ll_inode_info *lli = ll_i2info(inode);
1023         struct ll_sb_info *sbi = ll_i2sbi(inode);
1024         struct ost_lvb lvb;
1025         int rc;
1026
1027         ENTRY;
1028
1029         ll_inode_size_lock(inode, 1);
1030         inode_init_lvb(inode, &lvb);
1031         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1032         i_size_write(inode, lvb.lvb_size);
1033         inode->i_blocks = lvb.lvb_blocks;
1034
1035         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1036         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1037         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1038         ll_inode_size_unlock(inode, 1);
1039
1040         RETURN(rc);
1041 }
1042
1043 int ll_local_size(struct inode *inode)
1044 {
1045         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1046         struct ll_inode_info *lli = ll_i2info(inode);
1047         struct ll_sb_info *sbi = ll_i2sbi(inode);
1048         struct lustre_handle lockh = { 0 };
1049         int flags = 0;
1050         int rc;
1051         ENTRY;
1052
1053         if (lli->lli_smd->lsm_stripe_count == 0)
1054                 RETURN(0);
1055
1056         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1057                        &policy, LCK_PR, &flags, inode, &lockh);
1058         if (rc < 0)
1059                 RETURN(rc);
1060         else if (rc == 0)
1061                 RETURN(-ENODATA);
1062
1063         rc = ll_merge_lvb(inode);
1064         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1065         RETURN(rc);
1066 }
1067
1068 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1069                      lstat_t *st)
1070 {
1071         struct lustre_handle lockh = { 0 };
1072         struct ldlm_enqueue_info einfo = { 0 };
1073         struct obd_info oinfo = { { { 0 } } };
1074         struct ost_lvb lvb;
1075         int rc;
1076
1077         ENTRY;
1078
1079         einfo.ei_type = LDLM_EXTENT;
1080         einfo.ei_mode = LCK_PR;
1081         einfo.ei_cb_bl = osc_extent_blocking_cb;
1082         einfo.ei_cb_cp = ldlm_completion_ast;
1083         einfo.ei_cb_gl = ll_glimpse_callback;
1084         einfo.ei_cbdata = NULL;
1085
1086         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1087         oinfo.oi_lockh = &lockh;
1088         oinfo.oi_md = lsm;
1089         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1090
1091         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1092         if (rc == -ENOENT)
1093                 RETURN(rc);
1094         if (rc != 0) {
1095                 CERROR("obd_enqueue returned rc %d, "
1096                        "returning -EIO\n", rc);
1097                 RETURN(rc > 0 ? -EIO : rc);
1098         }
1099
1100         lov_stripe_lock(lsm);
1101         memset(&lvb, 0, sizeof(lvb));
1102         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1103         st->st_size = lvb.lvb_size;
1104         st->st_blocks = lvb.lvb_blocks;
1105         st->st_mtime = lvb.lvb_mtime;
1106         st->st_atime = lvb.lvb_atime;
1107         st->st_ctime = lvb.lvb_ctime;
1108         lov_stripe_unlock(lsm);
1109
1110         RETURN(rc);
1111 }
1112
1113 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1114  * file (because it prefers KMS over RSS when larger) */
1115 int ll_glimpse_size(struct inode *inode, int ast_flags)
1116 {
1117         struct ll_inode_info *lli = ll_i2info(inode);
1118         struct ll_sb_info *sbi = ll_i2sbi(inode);
1119         struct lustre_handle lockh = { 0 };
1120         struct ldlm_enqueue_info einfo = { 0 };
1121         struct obd_info oinfo = { { { 0 } } };
1122         int rc;
1123         ENTRY;
1124
1125         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1126                 RETURN(0);
1127
1128         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1129
1130         if (!lli->lli_smd) {
1131                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1132                 RETURN(0);
1133         }
1134
1135         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1136          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1137          *       won't revoke any conflicting DLM locks held. Instead,
1138          *       ll_glimpse_callback() will be called on each client
1139          *       holding a DLM lock against this file, and resulting size
1140          *       will be returned for each stripe. DLM lock on [0, EOF] is
1141          *       acquired only if there were no conflicting locks. */
1142         einfo.ei_type = LDLM_EXTENT;
1143         einfo.ei_mode = LCK_PR;
1144         einfo.ei_cb_bl = osc_extent_blocking_cb;
1145         einfo.ei_cb_cp = ldlm_completion_ast;
1146         einfo.ei_cb_gl = ll_glimpse_callback;
1147         einfo.ei_cbdata = inode;
1148
1149         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1150         oinfo.oi_lockh = &lockh;
1151         oinfo.oi_md = lli->lli_smd;
1152         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1153
1154         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1155         if (rc == -ENOENT)
1156                 RETURN(rc);
1157         if (rc != 0) {
1158                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1159                 RETURN(rc > 0 ? -EIO : rc);
1160         }
1161
1162         rc = ll_merge_lvb(inode);
1163
1164         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1165                i_size_read(inode), (unsigned long long)inode->i_blocks);
1166
1167         RETURN(rc);
1168 }
1169
1170 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1171                    struct lov_stripe_md *lsm, int mode,
1172                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1173                    int ast_flags)
1174 {
1175         struct ll_sb_info *sbi = ll_i2sbi(inode);
1176         struct ost_lvb lvb;
1177         struct ldlm_enqueue_info einfo = { 0 };
1178         struct obd_info oinfo = { { { 0 } } };
1179         int rc;
1180         ENTRY;
1181
1182         LASSERT(!lustre_handle_is_used(lockh));
1183         LASSERT(lsm != NULL);
1184
1185         /* XXX phil: can we do this?  won't it screw the file size up? */
1186         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1187             (sbi->ll_flags & LL_SBI_NOLCK))
1188                 RETURN(0);
1189
1190         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1191                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1192
1193         einfo.ei_type = LDLM_EXTENT;
1194         einfo.ei_mode = mode;
1195         einfo.ei_cb_bl = osc_extent_blocking_cb;
1196         einfo.ei_cb_cp = ldlm_completion_ast;
1197         einfo.ei_cb_gl = ll_glimpse_callback;
1198         einfo.ei_cbdata = inode;
1199
1200         oinfo.oi_policy = *policy;
1201         oinfo.oi_lockh = lockh;
1202         oinfo.oi_md = lsm;
1203         oinfo.oi_flags = ast_flags;
1204
1205         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1206         *policy = oinfo.oi_policy;
1207         if (rc > 0)
1208                 rc = -EIO;
1209
1210         ll_inode_size_lock(inode, 1);
1211         inode_init_lvb(inode, &lvb);
1212         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1213
1214         if (policy->l_extent.start == 0 &&
1215             policy->l_extent.end == OBD_OBJECT_EOF) {
1216                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1217                  * the kms under both a DLM lock and the
1218                  * ll_inode_size_lock().  If we don't get the
1219                  * ll_inode_size_lock() here we can match the DLM lock and
1220                  * reset i_size from the kms before the truncating path has
1221                  * updated the kms.  generic_file_write can then trust the
1222                  * stale i_size when doing appending writes and effectively
1223                  * cancel the result of the truncate.  Getting the
1224                  * ll_inode_size_lock() after the enqueue maintains the DLM
1225                  * -> ll_inode_size_lock() acquiring order. */
1226                 i_size_write(inode, lvb.lvb_size);
1227                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1228                        inode->i_ino, i_size_read(inode));
1229         }
1230
1231         if (rc == 0) {
1232                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1233                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1234                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1235         }
1236         ll_inode_size_unlock(inode, 1);
1237
1238         RETURN(rc);
1239 }
1240
1241 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1242                      struct lov_stripe_md *lsm, int mode,
1243                      struct lustre_handle *lockh)
1244 {
1245         struct ll_sb_info *sbi = ll_i2sbi(inode);
1246         int rc;
1247         ENTRY;
1248
1249         /* XXX phil: can we do this?  won't it screw the file size up? */
1250         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1251             (sbi->ll_flags & LL_SBI_NOLCK))
1252                 RETURN(0);
1253
1254         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1255
1256         RETURN(rc);
1257 }
1258
1259 static void ll_set_file_contended(struct inode *inode)
1260 {
1261         struct ll_inode_info *lli = ll_i2info(inode);
1262         cfs_time_t now = cfs_time_current();
1263
1264         spin_lock(&lli->lli_lock);
1265         lli->lli_contention_time = now;
1266         lli->lli_flags |= LLIF_CONTENDED;
1267         spin_unlock(&lli->lli_lock);
1268 }
1269
1270 void ll_clear_file_contended(struct inode *inode)
1271 {
1272         struct ll_inode_info *lli = ll_i2info(inode);
1273
1274         spin_lock(&lli->lli_lock);
1275         lli->lli_flags &= ~LLIF_CONTENDED;
1276         spin_unlock(&lli->lli_lock);
1277 }
1278
1279 static int ll_is_file_contended(struct file *file)
1280 {
1281         struct inode *inode = file->f_dentry->d_inode;
1282         struct ll_inode_info *lli = ll_i2info(inode);
1283         struct ll_sb_info *sbi = ll_i2sbi(inode);
1284         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1285         ENTRY;
1286
1287         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1288                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1289                        " osc connect flags = 0x"LPX64"\n",
1290                        sbi->ll_lco.lco_flags);
1291                 RETURN(0);
1292         }
1293         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1294                 RETURN(1);
1295         if (lli->lli_flags & LLIF_CONTENDED) {
1296                 cfs_time_t cur_time = cfs_time_current();
1297                 cfs_time_t retry_time;
1298
1299                 retry_time = cfs_time_add(
1300                         lli->lli_contention_time,
1301                         cfs_time_seconds(sbi->ll_contention_time));
1302                 if (cfs_time_after(cur_time, retry_time)) {
1303                         ll_clear_file_contended(inode);
1304                         RETURN(0);
1305                 }
1306                 RETURN(1);
1307         }
1308         RETURN(0);
1309 }
1310
1311 static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
1312                                  const char *buf, size_t count,
1313                                  loff_t start, loff_t end, int rw)
1314 {
1315         int append;
1316         int tree_locked = 0;
1317         int rc;
1318         struct inode * inode = file->f_dentry->d_inode;
1319         ENTRY;
1320
1321         append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
1322
1323         if (append || !ll_is_file_contended(file)) {
1324                 struct ll_lock_tree_node *node;
1325                 int ast_flags;
1326
1327                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1328                 if (file->f_flags & O_NONBLOCK)
1329                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1330                 node = ll_node_from_inode(inode, start, end,
1331                                           (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
1332                 if (IS_ERR(node)) {
1333                         rc = PTR_ERR(node);
1334                         GOTO(out, rc);
1335                 }
1336                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1337                 rc = ll_tree_lock(tree, node, buf, count, ast_flags);
1338                 if (rc == 0)
1339                         tree_locked = 1;
1340                 else if (rc == -EUSERS)
1341                         ll_set_file_contended(inode);
1342                 else
1343                         GOTO(out, rc);
1344         }
1345         RETURN(tree_locked);
1346 out:
1347         return rc;
1348 }
1349
1350 /**
1351  * Checks if requested extent lock is compatible with a lock under a page.
1352  *
1353  * Checks if the lock under \a page is compatible with a read or write lock
1354  * (specified by \a rw) for an extent [\a start , \a end].
1355  *
1356  * \param page the page under which lock is considered
1357  * \param rw OBD_BRW_READ if requested for reading,
1358  *           OBD_BRW_WRITE if requested for writing
1359  * \param start start of the requested extent
1360  * \param end end of the requested extent
1361  * \param cookie transparent parameter for passing locking context
1362  *
1363  * \post result == 1, *cookie == context, appropriate lock is referenced or
1364  * \post result == 0
1365  *
1366  * \retval 1 owned lock is reused for the request
1367  * \retval 0 no lock reused for the request
1368  *
1369  * \see ll_release_short_lock
1370  */
1371 static int ll_reget_short_lock(struct page *page, int rw,
1372                                obd_off start, obd_off end,
1373                                void **cookie)
1374 {
1375         struct ll_async_page *llap;
1376         struct obd_export *exp;
1377         struct inode *inode = page->mapping->host;
1378
1379         ENTRY;
1380
1381         exp = ll_i2dtexp(inode);
1382         if (exp == NULL)
1383                 RETURN(0);
1384
1385         llap = llap_cast_private(page);
1386         if (llap == NULL)
1387                 RETURN(0);
1388
1389         RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
1390                                     &llap->llap_cookie, rw, start, end,
1391                                     cookie));
1392 }
1393
1394 /**
1395  * Releases a reference to a lock taken in a "fast" way.
1396  *
1397  * Releases a read or a write (specified by \a rw) lock
1398  * referenced by \a cookie.
1399  *
1400  * \param inode inode to which data belong
1401  * \param end end of the locked extent
1402  * \param rw OBD_BRW_READ if requested for reading,
1403  *           OBD_BRW_WRITE if requested for writing
1404  * \param cookie transparent parameter for passing locking context
1405  *
1406  * \post appropriate lock is dereferenced
1407  *
1408  * \see ll_reget_short_lock
1409  */
1410 static void ll_release_short_lock(struct inode *inode, obd_off end,
1411                                   void *cookie, int rw)
1412 {
1413         struct obd_export *exp;
1414         int rc;
1415
1416         exp = ll_i2dtexp(inode);
1417         if (exp == NULL)
1418                 return;
1419
1420         rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
1421                                     cookie, rw);
1422         if (rc < 0)
1423                 CERROR("unlock failed (%d)\n", rc);
1424 }
1425
1426 /**
1427  * Checks if requested extent lock is compatible
1428  * with a lock under a page in page cache.
1429  *
1430  * Checks if a lock under some \a page is compatible with a read or write lock
1431  * (specified by \a rw) for an extent [\a start , \a end].
1432  *
1433  * \param file the file under which lock is considered
1434  * \param rw OBD_BRW_READ if requested for reading,
1435  *           OBD_BRW_WRITE if requested for writing
1436  * \param ppos start of the requested extent
1437  * \param end end of the requested extent
1438  * \param cookie transparent parameter for passing locking context
1439  * \param buf userspace buffer for the data
1440  *
1441  * \post result == 1, *cookie == context, appropriate lock is referenced
1442  * \post retuls == 0
1443  *
1444  * \retval 1 owned lock is reused for the request
1445  * \retval 0 no lock reused for the request
1446  *
1447  * \see ll_file_put_fast_lock
1448  */
1449 static inline int ll_file_get_fast_lock(struct file *file,
1450                                         obd_off ppos, obd_off end,
1451                                         char *buf, void **cookie, int rw)
1452 {
1453         int rc = 0;
1454         struct page *page;
1455
1456         ENTRY;
1457
1458         if (!ll_region_mapped((unsigned long)buf, end - ppos)) {
1459                 page = find_lock_page(file->f_dentry->d_inode->i_mapping,
1460                                       ppos >> CFS_PAGE_SHIFT);
1461                 if (page) {
1462                         if (ll_reget_short_lock(page, rw, ppos, end, cookie))
1463                                 rc = 1;
1464
1465                         unlock_page(page);
1466                         page_cache_release(page);
1467                 }
1468         }
1469
1470         RETURN(rc);
1471 }
1472
1473 /**
1474  * Releases a reference to a lock taken in a "fast" way.
1475  *
1476  * Releases a read or a write (specified by \a rw) lock
1477  * referenced by \a cookie.
1478  *
1479  * \param inode inode to which data belong
1480  * \param end end of the locked extent
1481  * \param rw OBD_BRW_READ if requested for reading,
1482  *           OBD_BRW_WRITE if requested for writing
1483  * \param cookie transparent parameter for passing locking context
1484  *
1485  * \post appropriate lock is dereferenced
1486  *
1487  * \see ll_file_get_fast_lock
1488  */
1489 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
1490                                          void *cookie, int rw)
1491 {
1492         ll_release_short_lock(inode, end, cookie, rw);
1493 }
1494
1495 enum ll_lock_style {
1496         LL_LOCK_STYLE_NOLOCK   = 0,
1497         LL_LOCK_STYLE_FASTLOCK = 1,
1498         LL_LOCK_STYLE_TREELOCK = 2
1499 };
1500
1501 /**
1502  * Checks if requested extent lock is compatible with a lock
1503  * under a page cache page.
1504  *
1505  * Checks if the lock under \a page is compatible with a read or write lock
1506  * (specified by \a rw) for an extent [\a start , \a end].
1507  *
1508  * \param file file under which I/O is processed
1509  * \param rw OBD_BRW_READ if requested for reading,
1510  *           OBD_BRW_WRITE if requested for writing
1511  * \param ppos start of the requested extent
1512  * \param end end of the requested extent
1513  * \param cookie transparent parameter for passing locking context
1514  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1515  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1516  * \param buf userspace buffer for the data
1517  *
1518  * \retval LL_LOCK_STYLE_FASTLOCK owned lock is reused through fast lock
1519  * \retval LL_LOCK_STYLE_TREELOCK got a lock through tree lock
1520  * \retval LL_LOCK_STYLE_NOLOCK got no lock
1521  *
1522  * \see ll_file_put_lock
1523  */
1524 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
1525                                    obd_off end, char *buf, void **cookie,
1526                                    struct ll_lock_tree *tree, int rw)
1527 {
1528         int rc;
1529
1530         ENTRY;
1531
1532         if (ll_file_get_fast_lock(file, ppos, end, buf, cookie, rw))
1533                 RETURN(LL_LOCK_STYLE_FASTLOCK);
1534
1535         rc = ll_file_get_tree_lock(tree, file, buf, ppos - end, ppos, end, rw);
1536         /* rc: 1 for tree lock, 0 for no lock, <0 for error */
1537         switch (rc) {
1538         case 1:
1539                 RETURN(LL_LOCK_STYLE_TREELOCK);
1540         case 0:
1541                 RETURN(LL_LOCK_STYLE_NOLOCK);
1542         }
1543
1544         /* an error happened if we reached this point, rc = -errno here */
1545         RETURN(rc);
1546 }
1547
1548 /**
1549  * Drops the lock taken by ll_file_get_lock.
1550  *
1551  * Releases a read or a write (specified by \a rw) lock
1552  * referenced by \a tree or \a cookie.
1553  *
1554  * \param inode inode to which data belong
1555  * \param end end of the locked extent
1556  * \param lockstyle facility through which the lock was taken
1557  * \param rw OBD_BRW_READ if requested for reading,
1558  *           OBD_BRW_WRITE if requested for writing
1559  * \param cookie transparent parameter for passing locking context
1560  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1561  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1562  *
1563  * \post appropriate lock is dereferenced
1564  *
1565  * \see ll_file_get_lock
1566  */
1567 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
1568                                     enum ll_lock_style lock_style,
1569                                     void *cookie, struct ll_lock_tree *tree,
1570                                     int rw)
1571
1572 {
1573         switch (lock_style) {
1574         case LL_LOCK_STYLE_TREELOCK:
1575                 ll_tree_unlock(tree);
1576                 break;
1577         case LL_LOCK_STYLE_FASTLOCK:
1578                 ll_file_put_fast_lock(inode, end, cookie, rw);
1579                 break;
1580         default:
1581                 CERROR("invalid locking style (%d)\n", lock_style);
1582         }
1583 }
1584
1585 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1586                             loff_t *ppos)
1587 {
1588         struct inode *inode = file->f_dentry->d_inode;
1589         struct ll_inode_info *lli = ll_i2info(inode);
1590         struct lov_stripe_md *lsm = lli->lli_smd;
1591         struct ll_sb_info *sbi = ll_i2sbi(inode);
1592         struct ll_lock_tree tree;
1593         struct ost_lvb lvb;
1594         struct ll_ra_read bead;
1595         int ra = 0;
1596         obd_off end;
1597         ssize_t retval, chunk, sum = 0;
1598         int lock_style;
1599         void *cookie;
1600
1601         __u64 kms;
1602         ENTRY;
1603         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1604                inode->i_ino, inode->i_generation, inode, count, *ppos);
1605         /* "If nbyte is 0, read() will return 0 and have no other results."
1606          *                      -- Single Unix Spec */
1607         if (count == 0)
1608                 RETURN(0);
1609
1610         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1611
1612         if (!lsm) {
1613                 /* Read on file with no objects should return zero-filled
1614                  * buffers up to file size (we can get non-zero sizes with
1615                  * mknod + truncate, then opening file for read. This is a
1616                  * common pattern in NFS case, it seems). Bug 6243 */
1617                 int notzeroed;
1618                 /* Since there are no objects on OSTs, we have nothing to get
1619                  * lock on and so we are forced to access inode->i_size
1620                  * unguarded */
1621
1622                 /* Read beyond end of file */
1623                 if (*ppos >= i_size_read(inode))
1624                         RETURN(0);
1625
1626                 if (count > i_size_read(inode) - *ppos)
1627                         count = i_size_read(inode) - *ppos;
1628                 /* Make sure to correctly adjust the file pos pointer for
1629                  * EFAULT case */
1630                 notzeroed = clear_user(buf, count);
1631                 count -= notzeroed;
1632                 *ppos += count;
1633                 if (!count)
1634                         RETURN(-EFAULT);
1635                 RETURN(count);
1636         }
1637 repeat:
1638         if (sbi->ll_max_rw_chunk != 0) {
1639                 /* first, let's know the end of the current stripe */
1640                 end = *ppos;
1641                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, &end);
1642
1643                 /* correct, the end is beyond the request */
1644                 if (end > *ppos + count - 1)
1645                         end = *ppos + count - 1;
1646
1647                 /* and chunk shouldn't be too large even if striping is wide */
1648                 if (end - *ppos > sbi->ll_max_rw_chunk)
1649                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1650         } else {
1651                 end = *ppos + count - 1;
1652         }
1653
1654         lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
1655                                       buf, &cookie, &tree, OBD_BRW_READ);
1656         if (lock_style < 0)
1657                 GOTO(out, retval = lock_style);
1658
1659         ll_inode_size_lock(inode, 1);
1660         /*
1661          * Consistency guarantees: following possibilities exist for the
1662          * relation between region being read and real file size at this
1663          * moment:
1664          *
1665          *  (A): the region is completely inside of the file;
1666          *
1667          *  (B-x): x bytes of region are inside of the file, the rest is
1668          *  outside;
1669          *
1670          *  (C): the region is completely outside of the file.
1671          *
1672          * This classification is stable under DLM lock acquired by
1673          * ll_tree_lock() above, because to change class, other client has to
1674          * take DLM lock conflicting with our lock. Also, any updates to
1675          * ->i_size by other threads on this client are serialized by
1676          * ll_inode_size_lock(). This guarantees that short reads are handled
1677          * correctly in the face of concurrent writes and truncates.
1678          */
1679         inode_init_lvb(inode, &lvb);
1680         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1681         kms = lvb.lvb_size;
1682         if (*ppos + count - 1 > kms) {
1683                 /* A glimpse is necessary to determine whether we return a
1684                  * short read (B) or some zeroes at the end of the buffer (C) */
1685                 ll_inode_size_unlock(inode, 1);
1686                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1687                 if (retval) {
1688                         if (lock_style != LL_LOCK_STYLE_NOLOCK)
1689                                 ll_file_put_lock(inode, end, lock_style,
1690                                                  cookie, &tree, OBD_BRW_READ);
1691                         goto out;
1692                 }
1693         } else {
1694                 /* region is within kms and, hence, within real file size (A).
1695                  * We need to increase i_size to cover the read region so that
1696                  * generic_file_read() will do its job, but that doesn't mean
1697                  * the kms size is _correct_, it is only the _minimum_ size.
1698                  * If someone does a stat they will get the correct size which
1699                  * will always be >= the kms value here.  b=11081 */
1700                 if (i_size_read(inode) < kms)
1701                         i_size_write(inode, kms);
1702                 ll_inode_size_unlock(inode, 1);
1703         }
1704
1705         chunk = end - *ppos + 1;
1706         CDEBUG(D_INODE,"Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1707                inode->i_ino, chunk, *ppos, i_size_read(inode));
1708
1709         if (lock_style != LL_LOCK_STYLE_NOLOCK) {
1710                 /* turn off the kernel's read-ahead */
1711                 file->f_ra.ra_pages = 0;
1712
1713                 /* initialize read-ahead window once per syscall */
1714                 if (ra == 0) {
1715                         ra = 1;
1716                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1717                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1718                         ll_ra_read_in(file, &bead);
1719                 }
1720
1721                 /* BUG: 5972 */
1722                 file_accessed(file);
1723                 retval = generic_file_read(file, buf, chunk, ppos);
1724                 ll_file_put_lock(inode, end, lock_style, cookie, &tree,
1725                                  OBD_BRW_READ);
1726         } else {
1727                 retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
1728         }
1729
1730         ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
1731
1732         if (retval > 0) {
1733                 buf += retval;
1734                 count -= retval;
1735                 sum += retval;
1736                 if (retval == chunk && count > 0)
1737                         goto repeat;
1738         }
1739
1740  out:
1741         if (ra != 0)
1742                 ll_ra_read_ex(file, &bead);
1743         retval = (sum > 0) ? sum : retval;
1744         RETURN(retval);
1745 }
1746
1747 /*
1748  * Write to a file (through the page cache).
1749  */
1750 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1751                              loff_t *ppos)
1752 {
1753         struct inode *inode = file->f_dentry->d_inode;
1754         struct ll_sb_info *sbi = ll_i2sbi(inode);
1755         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1756         struct ll_lock_tree tree;
1757         loff_t maxbytes = ll_file_maxbytes(inode);
1758         loff_t lock_start, lock_end, end;
1759         ssize_t retval, chunk, sum = 0;
1760         int tree_locked;
1761         ENTRY;
1762
1763         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1764                inode->i_ino, inode->i_generation, inode, count, *ppos);
1765
1766         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1767
1768         /* POSIX, but surprised the VFS doesn't check this already */
1769         if (count == 0)
1770                 RETURN(0);
1771
1772         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1773          * called on the file, don't fail the below assertion (bug 2388). */
1774         if (file->f_flags & O_LOV_DELAY_CREATE &&
1775             ll_i2info(inode)->lli_smd == NULL)
1776                 RETURN(-EBADF);
1777
1778         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1779
1780         down(&ll_i2info(inode)->lli_write_sem);
1781
1782 repeat:
1783         chunk = 0; /* just to fix gcc's warning */
1784         end = *ppos + count - 1;
1785
1786         if (file->f_flags & O_APPEND) {
1787                 lock_start = 0;
1788                 lock_end = OBD_OBJECT_EOF;
1789         } else if (sbi->ll_max_rw_chunk != 0) {
1790                 /* first, let's know the end of the current stripe */
1791                 end = *ppos;
1792                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1793                                 (obd_off *)&end);
1794
1795                 /* correct, the end is beyond the request */
1796                 if (end > *ppos + count - 1)
1797                         end = *ppos + count - 1;
1798
1799                 /* and chunk shouldn't be too large even if striping is wide */
1800                 if (end - *ppos > sbi->ll_max_rw_chunk)
1801                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1802                 lock_start = *ppos;
1803                 lock_end = end;
1804         } else {
1805                 lock_start = *ppos;
1806                 lock_end = *ppos + count - 1;
1807         }
1808
1809         tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
1810                                             lock_start, lock_end, OBD_BRW_WRITE);
1811         if (tree_locked < 0)
1812                 GOTO(out, retval = tree_locked);
1813
1814         /* This is ok, g_f_w will overwrite this under i_sem if it races
1815          * with a local truncate, it just makes our maxbyte checking easier.
1816          * The i_size value gets updated in ll_extent_lock() as a consequence
1817          * of the [0,EOF] extent lock we requested above. */
1818         if (file->f_flags & O_APPEND) {
1819                 *ppos = i_size_read(inode);
1820                 end = *ppos + count - 1;
1821         }
1822
1823         if (*ppos >= maxbytes) {
1824                 send_sig(SIGXFSZ, current, 0);
1825                 GOTO(out_unlock, retval = -EFBIG);
1826         }
1827         if (end > maxbytes - 1)
1828                 end = maxbytes - 1;
1829
1830         /* generic_file_write handles O_APPEND after getting i_mutex */
1831         chunk = end - *ppos + 1;
1832         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1833                inode->i_ino, chunk, *ppos);
1834         if (tree_locked)
1835                 retval = generic_file_write(file, buf, chunk, ppos);
1836         else
1837                 retval = ll_file_lockless_io(file, (char*)buf, chunk,
1838                                              ppos, WRITE);
1839         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1840
1841 out_unlock:
1842         if (tree_locked)
1843                 ll_tree_unlock(&tree);
1844
1845 out:
1846         if (retval > 0) {
1847                 buf += retval;
1848                 count -= retval;
1849                 sum += retval;
1850                 if (retval == chunk && count > 0)
1851                         goto repeat;
1852         }
1853
1854         up(&ll_i2info(inode)->lli_write_sem);
1855
1856         retval = (sum > 0) ? sum : retval;
1857         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1858                            retval > 0 ? retval : 0);
1859         RETURN(retval);
1860 }
1861
1862 /*
1863  * Send file content (through pagecache) somewhere with helper
1864  */
1865 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1866                                 read_actor_t actor, void *target)
1867 {
1868         struct inode *inode = in_file->f_dentry->d_inode;
1869         struct ll_inode_info *lli = ll_i2info(inode);
1870         struct lov_stripe_md *lsm = lli->lli_smd;
1871         struct ll_lock_tree tree;
1872         struct ll_lock_tree_node *node;
1873         struct ost_lvb lvb;
1874         struct ll_ra_read bead;
1875         int rc;
1876         ssize_t retval;
1877         __u64 kms;
1878         ENTRY;
1879         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1880                inode->i_ino, inode->i_generation, inode, count, *ppos);
1881
1882         /* "If nbyte is 0, read() will return 0 and have no other results."
1883          *                      -- Single Unix Spec */
1884         if (count == 0)
1885                 RETURN(0);
1886
1887         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1888         /* turn off the kernel's read-ahead */
1889         in_file->f_ra.ra_pages = 0;
1890
1891         /* File with no objects, nothing to lock */
1892         if (!lsm)
1893                 RETURN(generic_file_sendfile(in_file, ppos,count,actor,target));
1894
1895         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1896         if (IS_ERR(node))
1897                 RETURN(PTR_ERR(node));
1898
1899         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1900         rc = ll_tree_lock(&tree, node, NULL, count,
1901                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1902         if (rc != 0)
1903                 RETURN(rc);
1904
1905         ll_clear_file_contended(inode);
1906         ll_inode_size_lock(inode, 1);
1907         /*
1908          * Consistency guarantees: following possibilities exist for the
1909          * relation between region being read and real file size at this
1910          * moment:
1911          *
1912          *  (A): the region is completely inside of the file;
1913          *
1914          *  (B-x): x bytes of region are inside of the file, the rest is
1915          *  outside;
1916          *
1917          *  (C): the region is completely outside of the file.
1918          *
1919          * This classification is stable under DLM lock acquired by
1920          * ll_tree_lock() above, because to change class, other client has to
1921          * take DLM lock conflicting with our lock. Also, any updates to
1922          * ->i_size by other threads on this client are serialized by
1923          * ll_inode_size_lock(). This guarantees that short reads are handled
1924          * correctly in the face of concurrent writes and truncates.
1925          */
1926         inode_init_lvb(inode, &lvb);
1927         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1928         kms = lvb.lvb_size;
1929         if (*ppos + count - 1 > kms) {
1930                 /* A glimpse is necessary to determine whether we return a
1931                  * short read (B) or some zeroes at the end of the buffer (C) */
1932                 ll_inode_size_unlock(inode, 1);
1933                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1934                 if (retval)
1935                         goto out;
1936         } else {
1937                 /* region is within kms and, hence, within real file size (A) */
1938                 i_size_write(inode, kms);
1939                 ll_inode_size_unlock(inode, 1);
1940         }
1941
1942         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1943                inode->i_ino, count, *ppos, i_size_read(inode));
1944
1945         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1946         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1947         ll_ra_read_in(in_file, &bead);
1948         /* BUG: 5972 */
1949         file_accessed(in_file);
1950         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1951         ll_ra_read_ex(in_file, &bead);
1952
1953  out:
1954         ll_tree_unlock(&tree);
1955         RETURN(retval);
1956 }
1957
1958 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1959                                unsigned long arg)
1960 {
1961         struct ll_inode_info *lli = ll_i2info(inode);
1962         struct obd_export *exp = ll_i2dtexp(inode);
1963         struct ll_recreate_obj ucreatp;
1964         struct obd_trans_info oti = { 0 };
1965         struct obdo *oa = NULL;
1966         int lsm_size;
1967         int rc = 0;
1968         struct lov_stripe_md *lsm, *lsm2;
1969         ENTRY;
1970
1971         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1972                 RETURN(-EPERM);
1973
1974         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1975                             sizeof(struct ll_recreate_obj));
1976         if (rc) {
1977                 RETURN(-EFAULT);
1978         }
1979         OBDO_ALLOC(oa);
1980         if (oa == NULL)
1981                 RETURN(-ENOMEM);
1982
1983         down(&lli->lli_size_sem);
1984         lsm = lli->lli_smd;
1985         if (lsm == NULL)
1986                 GOTO(out, rc = -ENOENT);
1987         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1988                    (lsm->lsm_stripe_count));
1989
1990         OBD_ALLOC(lsm2, lsm_size);
1991         if (lsm2 == NULL)
1992                 GOTO(out, rc = -ENOMEM);
1993
1994         oa->o_id = ucreatp.lrc_id;
1995         oa->o_gr = ucreatp.lrc_group;
1996         oa->o_nlink = ucreatp.lrc_ost_idx;
1997         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1998         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1999         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2000                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2001
2002         memcpy(lsm2, lsm, lsm_size);
2003         rc = obd_create(exp, oa, &lsm2, &oti);
2004
2005         OBD_FREE(lsm2, lsm_size);
2006         GOTO(out, rc);
2007 out:
2008         up(&lli->lli_size_sem);
2009         OBDO_FREE(oa);
2010         return rc;
2011 }
2012
2013 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
2014                              int flags, struct lov_user_md *lum, int lum_size)
2015 {
2016         struct ll_inode_info *lli = ll_i2info(inode);
2017         struct lov_stripe_md *lsm;
2018         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
2019         int rc = 0;
2020         ENTRY;
2021
2022         down(&lli->lli_size_sem);
2023         lsm = lli->lli_smd;
2024         if (lsm) {
2025                 up(&lli->lli_size_sem);
2026                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
2027                        inode->i_ino);
2028                 RETURN(-EEXIST);
2029         }
2030
2031         rc = ll_intent_file_open(file, lum, lum_size, &oit);
2032         if (rc)
2033                 GOTO(out, rc);
2034         if (it_disposition(&oit, DISP_LOOKUP_NEG))
2035                 GOTO(out_req_free, rc = -ENOENT);
2036         rc = oit.d.lustre.it_status;
2037         if (rc < 0)
2038                 GOTO(out_req_free, rc);
2039
2040         ll_release_openhandle(file->f_dentry, &oit);
2041
2042  out:
2043         up(&lli->lli_size_sem);
2044         ll_intent_release(&oit);
2045         RETURN(rc);
2046 out_req_free:
2047         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2048         goto out;
2049 }
2050
2051 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
2052                              struct lov_mds_md **lmmp, int *lmm_size,
2053                              struct ptlrpc_request **request)
2054 {
2055         struct ll_sb_info *sbi = ll_i2sbi(inode);
2056         struct mdt_body  *body;
2057         struct lov_mds_md *lmm = NULL;
2058         struct ptlrpc_request *req = NULL;
2059         struct obd_capa *oc;
2060         int rc, lmmsize;
2061
2062         rc = ll_get_max_mdsize(sbi, &lmmsize);
2063         if (rc)
2064                 RETURN(rc);
2065
2066         oc = ll_mdscapa_get(inode);
2067         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
2068                              oc, filename, strlen(filename) + 1,
2069                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
2070                              ll_i2suppgid(inode), &req);
2071         capa_put(oc);
2072         if (rc < 0) {
2073                 CDEBUG(D_INFO, "md_getattr_name failed "
2074                        "on %s: rc %d\n", filename, rc);
2075                 GOTO(out, rc);
2076         }
2077
2078         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2079         LASSERT(body != NULL); /* checked by mdc_getattr_name */
2080
2081         lmmsize = body->eadatasize;
2082
2083         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
2084                         lmmsize == 0) {
2085                 GOTO(out, rc = -ENODATA);
2086         }
2087
2088         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
2089         LASSERT(lmm != NULL);
2090
2091         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
2092             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3)) &&
2093             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_JOIN))) {
2094                 GOTO(out, rc = -EPROTO);
2095         }
2096
2097         /*
2098          * This is coming from the MDS, so is probably in
2099          * little endian.  We convert it to host endian before
2100          * passing it to userspace.
2101          */
2102         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
2103                 /* if function called for directory - we should
2104                  * avoid swab not existent lsm objects */
2105                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
2106                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
2107                         if (S_ISREG(body->mode))
2108                                 lustre_swab_lov_user_md_objects(
2109                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
2110                                  ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
2111                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
2112                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
2113                         if (S_ISREG(body->mode))
2114                                 lustre_swab_lov_user_md_objects(
2115                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
2116                                  ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
2117                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_JOIN)) {
2118                         lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
2119                 }
2120         }
2121
2122         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
2123                 struct lov_stripe_md *lsm;
2124                 struct lov_user_md_join *lmj;
2125                 int lmj_size, i, aindex = 0;
2126
2127                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
2128                 if (rc < 0)
2129                         GOTO(out, rc = -ENOMEM);
2130                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
2131                 if (rc)
2132                         GOTO(out_free_memmd, rc);
2133
2134                 lmj_size = sizeof(struct lov_user_md_join) +
2135                            lsm->lsm_stripe_count *
2136                            sizeof(struct lov_user_ost_data_join);
2137                 OBD_ALLOC(lmj, lmj_size);
2138                 if (!lmj)
2139                         GOTO(out_free_memmd, rc = -ENOMEM);
2140
2141                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
2142                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2143                         struct lov_extent *lex =
2144                                 &lsm->lsm_array->lai_ext_array[aindex];
2145
2146                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
2147                                 aindex ++;
2148                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
2149                                         LPU64" len %d\n", aindex, i,
2150                                         lex->le_start, (int)lex->le_len);
2151                         lmj->lmm_objects[i].l_extent_start =
2152                                 lex->le_start;
2153
2154                         if ((int)lex->le_len == -1)
2155                                 lmj->lmm_objects[i].l_extent_end = -1;
2156                         else
2157                                 lmj->lmm_objects[i].l_extent_end =
2158                                         lex->le_start + lex->le_len;
2159                         lmj->lmm_objects[i].l_object_id =
2160                                 lsm->lsm_oinfo[i]->loi_id;
2161                         lmj->lmm_objects[i].l_object_gr =
2162                                 lsm->lsm_oinfo[i]->loi_gr;
2163                         lmj->lmm_objects[i].l_ost_gen =
2164                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2165                         lmj->lmm_objects[i].l_ost_idx =
2166                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2167                 }
2168                 lmm = (struct lov_mds_md *)lmj;
2169                 lmmsize = lmj_size;
2170 out_free_memmd:
2171                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
2172         }
2173 out:
2174         *lmmp = lmm;
2175         *lmm_size = lmmsize;
2176         *request = req;
2177         return rc;
2178 }
2179
2180 static int ll_lov_setea(struct inode *inode, struct file *file,
2181                             unsigned long arg)
2182 {
2183         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2184         struct lov_user_md  *lump;
2185         int lum_size = sizeof(struct lov_user_md) +
2186                        sizeof(struct lov_user_ost_data);
2187         int rc;
2188         ENTRY;
2189
2190         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2191                 RETURN(-EPERM);
2192
2193         OBD_ALLOC(lump, lum_size);
2194         if (lump == NULL) {
2195                 RETURN(-ENOMEM);
2196         }
2197         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2198         if (rc) {
2199                 OBD_FREE(lump, lum_size);
2200                 RETURN(-EFAULT);
2201         }
2202
2203         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2204
2205         OBD_FREE(lump, lum_size);
2206         RETURN(rc);
2207 }
2208
2209 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2210                             unsigned long arg)
2211 {
2212         struct lov_user_md_v3 lumv3;
2213         struct lov_user_md_v1 *lumv1 = (struct lov_user_md_v1 *)&lumv3;
2214         struct lov_user_md_v1 *lumv1p = (struct lov_user_md_v1 *)arg;
2215         struct lov_user_md_v3 *lumv3p = (struct lov_user_md_v3 *)arg;
2216         int lum_size;
2217         int rc;
2218         int flags = FMODE_WRITE;
2219         ENTRY;
2220
2221         /* first try with v1 which is smaller than v3 */
2222         lum_size = sizeof(struct lov_user_md_v1);
2223         rc = copy_from_user(lumv1, lumv1p, lum_size);
2224         if (rc)
2225                 RETURN(-EFAULT);
2226
2227         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
2228                 lum_size = sizeof(struct lov_user_md_v3);
2229                 rc = copy_from_user(&lumv3, lumv3p, lum_size);
2230                 if (rc)
2231                         RETURN(-EFAULT);
2232         }
2233
2234         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
2235         if (rc == 0) {
2236                  put_user(0, &lumv1p->lmm_stripe_count);
2237                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
2238                                     0, ll_i2info(inode)->lli_smd,
2239                                     (void *)arg);
2240         }
2241         RETURN(rc);
2242 }
2243
2244 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2245 {
2246         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2247
2248         if (!lsm)
2249                 RETURN(-ENODATA);
2250
2251         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
2252                             (void *)arg);
2253 }
2254
2255 static int ll_get_grouplock(struct inode *inode, struct file *file,
2256                             unsigned long arg)
2257 {
2258         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2259         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2260                                                     .end = OBD_OBJECT_EOF}};
2261         struct lustre_handle lockh = { 0 };
2262         struct ll_inode_info *lli = ll_i2info(inode);
2263         struct lov_stripe_md *lsm = lli->lli_smd;
2264         int flags = 0, rc;
2265         ENTRY;
2266
2267         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2268                 RETURN(-EINVAL);
2269         }
2270
2271         policy.l_extent.gid = arg;
2272         if (file->f_flags & O_NONBLOCK)
2273                 flags = LDLM_FL_BLOCK_NOWAIT;
2274
2275         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2276         if (rc)
2277                 RETURN(rc);
2278
2279         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2280         fd->fd_gid = arg;
2281         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2282
2283         RETURN(0);
2284 }
2285
2286 static int ll_put_grouplock(struct inode *inode, struct file *file,
2287                             unsigned long arg)
2288 {
2289         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2290         struct ll_inode_info *lli = ll_i2info(inode);
2291         struct lov_stripe_md *lsm = lli->lli_smd;
2292         int rc;
2293         ENTRY;
2294
2295         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2296                 /* Ugh, it's already unlocked. */
2297                 RETURN(-EINVAL);
2298         }
2299
2300         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2301                 RETURN(-EINVAL);
2302
2303         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2304
2305         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2306         if (rc)
2307                 RETURN(rc);
2308
2309         fd->fd_gid = 0;
2310         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2311
2312         RETURN(0);
2313 }
2314
2315 #if LUSTRE_FIX >= 50
2316 static int join_sanity_check(struct inode *head, struct inode *tail)
2317 {
2318         ENTRY;
2319         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2320                 CERROR("server do not support join \n");
2321                 RETURN(-EINVAL);
2322         }
2323         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2324                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2325                        head->i_ino, tail->i_ino);
2326                 RETURN(-EINVAL);
2327         }
2328         if (head->i_ino == tail->i_ino) {
2329                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2330                 RETURN(-EINVAL);
2331         }
2332         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2333                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2334                 RETURN(-EINVAL);
2335         }
2336         RETURN(0);
2337 }
2338
2339 static int join_file(struct inode *head_inode, struct file *head_filp,
2340                      struct file *tail_filp)
2341 {
2342         struct dentry *tail_dentry = tail_filp->f_dentry;
2343         struct lookup_intent oit = {.it_op = IT_OPEN,
2344                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2345         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2346                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2347
2348         struct lustre_handle lockh;
2349         struct md_op_data *op_data;
2350         int    rc;
2351         loff_t data;
2352         ENTRY;
2353
2354         tail_dentry = tail_filp->f_dentry;
2355
2356         data = i_size_read(head_inode);
2357         op_data = ll_prep_md_op_data(NULL, head_inode,
2358                                      tail_dentry->d_parent->d_inode,
2359                                      tail_dentry->d_name.name,
2360                                      tail_dentry->d_name.len, 0,
2361                                      LUSTRE_OPC_ANY, &data);
2362         if (IS_ERR(op_data))
2363                 RETURN(PTR_ERR(op_data));
2364
2365         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit,
2366                          op_data, &lockh, NULL, 0, NULL, 0);
2367
2368         ll_finish_md_op_data(op_data);
2369         if (rc < 0)
2370                 GOTO(out, rc);
2371
2372         rc = oit.d.lustre.it_status;
2373
2374         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2375                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2376                 ptlrpc_req_finished((struct ptlrpc_request *)
2377                                     oit.d.lustre.it_data);
2378                 GOTO(out, rc);
2379         }
2380
2381         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2382                                            * away */
2383                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2384                 oit.d.lustre.it_lock_mode = 0;
2385         }
2386         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2387         it_clear_disposition(&oit, DISP_ENQ_COMPLETE);
2388         ll_release_openhandle(head_filp->f_dentry, &oit);
2389 out:
2390         ll_intent_release(&oit);
2391         RETURN(rc);
2392 }
2393
2394 static int ll_file_join(struct inode *head, struct file *filp,
2395                         char *filename_tail)
2396 {
2397         struct inode *tail = NULL, *first = NULL, *second = NULL;
2398         struct dentry *tail_dentry;
2399         struct file *tail_filp, *first_filp, *second_filp;
2400         struct ll_lock_tree first_tree, second_tree;
2401         struct ll_lock_tree_node *first_node, *second_node;
2402         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2403         int rc = 0, cleanup_phase = 0;
2404         ENTRY;
2405
2406         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2407                head->i_ino, head->i_generation, head, filename_tail);
2408
2409         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2410         if (IS_ERR(tail_filp)) {
2411                 CERROR("Can not open tail file %s", filename_tail);
2412                 rc = PTR_ERR(tail_filp);
2413                 GOTO(cleanup, rc);
2414         }
2415         tail = igrab(tail_filp->f_dentry->d_inode);
2416
2417         tlli = ll_i2info(tail);
2418         tail_dentry = tail_filp->f_dentry;
2419         LASSERT(tail_dentry);
2420         cleanup_phase = 1;
2421
2422         /*reorder the inode for lock sequence*/
2423         first = head->i_ino > tail->i_ino ? head : tail;
2424         second = head->i_ino > tail->i_ino ? tail : head;
2425         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2426         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2427
2428         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2429                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2430         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2431         if (IS_ERR(first_node)){
2432                 rc = PTR_ERR(first_node);
2433                 GOTO(cleanup, rc);
2434         }
2435         first_tree.lt_fd = first_filp->private_data;
2436         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2437         if (rc != 0)
2438                 GOTO(cleanup, rc);
2439         cleanup_phase = 2;
2440
2441         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2442         if (IS_ERR(second_node)){
2443                 rc = PTR_ERR(second_node);
2444                 GOTO(cleanup, rc);
2445         }
2446         second_tree.lt_fd = second_filp->private_data;
2447         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2448         if (rc != 0)
2449                 GOTO(cleanup, rc);
2450         cleanup_phase = 3;
2451
2452         rc = join_sanity_check(head, tail);
2453         if (rc)
2454                 GOTO(cleanup, rc);
2455
2456         rc = join_file(head, filp, tail_filp);
2457         if (rc)
2458                 GOTO(cleanup, rc);
2459 cleanup:
2460         switch (cleanup_phase) {
2461         case 3:
2462                 ll_tree_unlock(&second_tree);
2463                 obd_cancel_unused(ll_i2dtexp(second),
2464                                   ll_i2info(second)->lli_smd, 0, NULL);
2465         case 2:
2466                 ll_tree_unlock(&first_tree);
2467                 obd_cancel_unused(ll_i2dtexp(first),
2468                                   ll_i2info(first)->lli_smd, 0, NULL);
2469         case 1:
2470                 filp_close(tail_filp, 0);
2471                 if (tail)
2472                         iput(tail);
2473                 if (head && rc == 0) {
2474                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2475                                        &hlli->lli_smd);
2476                         hlli->lli_smd = NULL;
2477                 }
2478         case 0:
2479                 break;
2480         default:
2481                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2482                 LBUG();
2483         }
2484         RETURN(rc);
2485 }
2486 #endif /* LUSTRE_FIX >= 50 */
2487
2488 /**
2489  * Close inode open handle
2490  *
2491  * \param dentry [in]     dentry which contains the inode
2492  * \param it     [in,out] intent which contains open info and result
2493  *
2494  * \retval 0     success
2495  * \retval <0    failure
2496  */
2497 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2498 {
2499         struct inode *inode = dentry->d_inode;
2500         struct obd_client_handle *och;
2501         int rc;
2502         ENTRY;
2503
2504         LASSERT(inode);
2505
2506         /* Root ? Do nothing. */
2507         if (dentry->d_inode->i_sb->s_root == dentry)
2508                 RETURN(0);
2509
2510         /* No open handle to close? Move away */
2511         if (!it_disposition(it, DISP_OPEN_OPEN))
2512                 RETURN(0);
2513
2514         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2515
2516         OBD_ALLOC(och, sizeof(*och));
2517         if (!och)
2518                 GOTO(out, rc = -ENOMEM);
2519
2520         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2521                     ll_i2info(inode), it, och);
2522
2523         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2524                                        inode, och);
2525  out:
2526         /* this one is in place of ll_file_open */
2527         if (it_disposition(it, DISP_ENQ_OPEN_REF))
2528                 ptlrpc_req_finished(it->d.lustre.it_data);
2529         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2530         RETURN(rc);
2531 }
2532
2533 /**
2534  * Get size for inode for which FIEMAP mapping is requested.
2535  * Make the FIEMAP get_info call and returns the result.
2536  */
2537 int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
2538               int num_bytes)
2539 {
2540         struct obd_export *exp = ll_i2dtexp(inode);
2541         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2542         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
2543         int vallen = num_bytes;
2544         int rc;
2545         ENTRY;
2546
2547         /* If the stripe_count > 1 and the application does not understand
2548          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
2549          */
2550         if (lsm->lsm_stripe_count > 1 &&
2551             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
2552                 return -EOPNOTSUPP;
2553
2554         fm_key.oa.o_id = lsm->lsm_object_id;
2555         fm_key.oa.o_gr = lsm->lsm_object_gr;
2556         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2557
2558         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLFID | OBD_MD_FLGROUP |
2559                         OBD_MD_FLSIZE);
2560
2561         /* If filesize is 0, then there would be no objects for mapping */
2562         if (fm_key.oa.o_size == 0) {
2563                 fiemap->fm_mapped_extents = 0;
2564                 RETURN(0);
2565         }
2566
2567         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
2568
2569         rc = obd_get_info(exp, sizeof(fm_key), &fm_key, &vallen, fiemap, lsm);
2570         if (rc)
2571                 CERROR("obd_get_info failed: rc = %d\n", rc);
2572
2573         RETURN(rc);
2574 }
2575
2576 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2577                   unsigned long arg)
2578 {
2579         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2580         int flags;
2581         ENTRY;
2582
2583         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2584                inode->i_generation, inode, cmd);
2585         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2586
2587         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2588         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2589                 RETURN(-ENOTTY);
2590
2591         switch(cmd) {
2592         case LL_IOC_GETFLAGS:
2593                 /* Get the current value of the file flags */
2594                 return put_user(fd->fd_flags, (int *)arg);
2595         case LL_IOC_SETFLAGS:
2596         case LL_IOC_CLRFLAGS:
2597                 /* Set or clear specific file flags */
2598                 /* XXX This probably needs checks to ensure the flags are
2599                  *     not abused, and to handle any flag side effects.
2600                  */
2601                 if (get_user(flags, (int *) arg))
2602                         RETURN(-EFAULT);
2603
2604                 if (cmd == LL_IOC_SETFLAGS) {
2605                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2606                             !(file->f_flags & O_DIRECT)) {
2607                                 CERROR("%s: unable to disable locking on "
2608                                        "non-O_DIRECT file\n", current->comm);
2609                                 RETURN(-EINVAL);
2610                         }
2611
2612                         fd->fd_flags |= flags;
2613                 } else {
2614                         fd->fd_flags &= ~flags;
2615                 }
2616                 RETURN(0);
2617         case LL_IOC_LOV_SETSTRIPE:
2618                 RETURN(ll_lov_setstripe(inode, file, arg));
2619         case LL_IOC_LOV_SETEA:
2620                 RETURN(ll_lov_setea(inode, file, arg));
2621         case LL_IOC_LOV_GETSTRIPE:
2622                 RETURN(ll_lov_getstripe(inode, arg));
2623         case LL_IOC_RECREATE_OBJ:
2624                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2625         case EXT3_IOC_FIEMAP: {
2626                 struct ll_user_fiemap *fiemap_s;
2627                 size_t num_bytes, ret_bytes;
2628                 unsigned int extent_count;
2629                 int rc = 0;
2630
2631                 /* Get the extent count so we can calculate the size of
2632                  * required fiemap buffer */
2633                 if (get_user(extent_count,
2634                     &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
2635                         RETURN(-EFAULT);
2636                 num_bytes = sizeof(*fiemap_s) + (extent_count *
2637                                                  sizeof(struct ll_fiemap_extent));
2638                 OBD_VMALLOC(fiemap_s, num_bytes);
2639                 if (fiemap_s == NULL)
2640                         RETURN(-ENOMEM);
2641
2642                 if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
2643                                    sizeof(*fiemap_s)))
2644                         GOTO(error, rc = -EFAULT);
2645
2646                 if (fiemap_s->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
2647                         fiemap_s->fm_flags = fiemap_s->fm_flags &
2648                                                     ~LUSTRE_FIEMAP_FLAGS_COMPAT;
2649                         if (copy_to_user((char *)arg, fiemap_s,
2650                                          sizeof(*fiemap_s)))
2651                                 GOTO(error, rc = -EFAULT);
2652
2653                         GOTO(error, rc = -EBADR);
2654                 }
2655
2656                 /* If fm_extent_count is non-zero, read the first extent since
2657                  * it is used to calculate end_offset and device from previous
2658                  * fiemap call. */
2659                 if (extent_count) {
2660                         if (copy_from_user(&fiemap_s->fm_extents[0],
2661                             (char __user *)arg + sizeof(*fiemap_s),
2662                             sizeof(struct ll_fiemap_extent)))
2663                                 GOTO(error, rc = -EFAULT);
2664                 }
2665
2666                 if (fiemap_s->fm_flags & FIEMAP_FLAG_SYNC) {
2667                         int rc;
2668
2669                         rc = filemap_fdatawrite(inode->i_mapping);
2670                         if (rc)
2671                                 GOTO(error, rc);
2672                 }
2673
2674                 rc = ll_fiemap(inode, fiemap_s, num_bytes);
2675                 if (rc)
2676                         GOTO(error, rc);
2677
2678                 ret_bytes = sizeof(struct ll_user_fiemap);
2679
2680                 if (extent_count != 0)
2681                         ret_bytes += (fiemap_s->fm_mapped_extents *
2682                                          sizeof(struct ll_fiemap_extent));
2683
2684                 if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
2685                         rc = -EFAULT;
2686
2687 error:
2688                 OBD_VFREE(fiemap_s, num_bytes);
2689                 RETURN(rc);
2690         }
2691         case EXT3_IOC_GETFLAGS:
2692         case EXT3_IOC_SETFLAGS:
2693                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2694         case EXT3_IOC_GETVERSION_OLD:
2695         case EXT3_IOC_GETVERSION:
2696                 RETURN(put_user(inode->i_generation, (int *)arg));
2697         case LL_IOC_JOIN: {
2698 #if LUSTRE_FIX >= 50
2699                 /* Allow file join in beta builds to allow debuggging */
2700                 char *ftail;
2701                 int rc;
2702
2703                 ftail = getname((const char *)arg);
2704                 if (IS_ERR(ftail))
2705                         RETURN(PTR_ERR(ftail));
2706                 rc = ll_file_join(inode, file, ftail);
2707                 putname(ftail);
2708                 RETURN(rc);
2709 #else
2710                 CWARN("file join is not supported in this version of Lustre\n");
2711                 RETURN(-ENOTTY);
2712 #endif
2713         }
2714         case LL_IOC_GROUP_LOCK:
2715                 RETURN(ll_get_grouplock(inode, file, arg));
2716         case LL_IOC_GROUP_UNLOCK:
2717                 RETURN(ll_put_grouplock(inode, file, arg));
2718         case IOC_OBD_STATFS:
2719                 RETURN(ll_obd_statfs(inode, (void *)arg));
2720
2721         /* We need to special case any other ioctls we want to handle,
2722          * to send them to the MDS/OST as appropriate and to properly
2723          * network encode the arg field.
2724         case EXT3_IOC_SETVERSION_OLD:
2725         case EXT3_IOC_SETVERSION:
2726         */
2727         case LL_IOC_FLUSHCTX:
2728                 RETURN(ll_flush_ctx(inode));
2729         default: {
2730                 int err;
2731
2732                 if (LLIOC_STOP ==
2733                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2734                         RETURN(err);
2735
2736                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2737                                      (void *)arg));
2738         }
2739         }
2740 }
2741
2742 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2743 {
2744         struct inode *inode = file->f_dentry->d_inode;
2745         struct ll_inode_info *lli = ll_i2info(inode);
2746         struct lov_stripe_md *lsm = lli->lli_smd;
2747         loff_t retval;
2748         ENTRY;
2749         retval = offset + ((origin == 2) ? i_size_read(inode) :
2750                            (origin == 1) ? file->f_pos : 0);
2751         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2752                inode->i_ino, inode->i_generation, inode, retval, retval,
2753                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2754         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2755
2756         if (origin == 2) { /* SEEK_END */
2757                 int nonblock = 0, rc;
2758
2759                 if (file->f_flags & O_NONBLOCK)
2760                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2761
2762                 if (lsm != NULL) {
2763                         rc = ll_glimpse_size(inode, nonblock);
2764                         if (rc != 0)
2765                                 RETURN(rc);
2766                 }
2767
2768                 ll_inode_size_lock(inode, 0);
2769                 offset += i_size_read(inode);
2770                 ll_inode_size_unlock(inode, 0);
2771         } else if (origin == 1) { /* SEEK_CUR */
2772                 offset += file->f_pos;
2773         }
2774
2775         retval = -EINVAL;
2776         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2777                 if (offset != file->f_pos) {
2778                         file->f_pos = offset;
2779                 }
2780                 retval = offset;
2781         }
2782
2783         RETURN(retval);
2784 }
2785
2786 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2787 {
2788         struct inode *inode = dentry->d_inode;
2789         struct ll_inode_info *lli = ll_i2info(inode);
2790         struct lov_stripe_md *lsm = lli->lli_smd;
2791         struct ptlrpc_request *req;
2792         struct obd_capa *oc;
2793         int rc, err;
2794         ENTRY;
2795         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2796                inode->i_generation, inode);
2797         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2798
2799         /* fsync's caller has already called _fdata{sync,write}, we want
2800          * that IO to finish before calling the osc and mdc sync methods */
2801         rc = filemap_fdatawait(inode->i_mapping);
2802
2803         /* catch async errors that were recorded back when async writeback
2804          * failed for pages in this mapping. */
2805         err = lli->lli_async_rc;
2806         lli->lli_async_rc = 0;
2807         if (rc == 0)
2808                 rc = err;
2809         if (lsm) {
2810                 err = lov_test_and_clear_async_rc(lsm);
2811                 if (rc == 0)
2812                         rc = err;
2813         }
2814
2815         oc = ll_mdscapa_get(inode);
2816         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2817                       &req);
2818         capa_put(oc);
2819         if (!rc)
2820                 rc = err;
2821         if (!err)
2822                 ptlrpc_req_finished(req);
2823
2824         if (data && lsm) {
2825                 struct obdo *oa;
2826
2827                 OBDO_ALLOC(oa);
2828                 if (!oa)
2829                         RETURN(rc ? rc : -ENOMEM);
2830
2831                 oa->o_id = lsm->lsm_object_id;
2832                 oa->o_gr = lsm->lsm_object_gr;
2833                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2834                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2835                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2836                                            OBD_MD_FLGROUP);
2837
2838                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2839                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2840                                0, OBD_OBJECT_EOF, oc);
2841                 capa_put(oc);
2842                 if (!rc)
2843                         rc = err;
2844                 OBDO_FREE(oa);
2845         }
2846
2847         RETURN(rc);
2848 }
2849
2850 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2851 {
2852         struct inode *inode = file->f_dentry->d_inode;
2853         struct ll_sb_info *sbi = ll_i2sbi(inode);
2854         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
2855                                            .ei_cb_cp =ldlm_flock_completion_ast,
2856                                            .ei_cbdata = file_lock };
2857         struct md_op_data *op_data;
2858         struct lustre_handle lockh = {0};
2859         ldlm_policy_data_t flock;
2860         int flags = 0;
2861         int rc;
2862         ENTRY;
2863
2864         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2865                inode->i_ino, file_lock);
2866
2867         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2868
2869         if (file_lock->fl_flags & FL_FLOCK) {
2870                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2871                 /* set missing params for flock() calls */
2872                 file_lock->fl_end = OFFSET_MAX;
2873                 file_lock->fl_pid = current->tgid;
2874         }
2875         flock.l_flock.pid = file_lock->fl_pid;
2876         flock.l_flock.start = file_lock->fl_start;
2877         flock.l_flock.end = file_lock->fl_end;
2878
2879         switch (file_lock->fl_type) {
2880         case F_RDLCK:
2881                 einfo.ei_mode = LCK_PR;
2882                 break;
2883         case F_UNLCK:
2884                 /* An unlock request may or may not have any relation to
2885                  * existing locks so we may not be able to pass a lock handle
2886                  * via a normal ldlm_lock_cancel() request. The request may even
2887                  * unlock a byte range in the middle of an existing lock. In
2888                  * order to process an unlock request we need all of the same
2889                  * information that is given with a normal read or write record
2890                  * lock request. To avoid creating another ldlm unlock (cancel)
2891                  * message we'll treat a LCK_NL flock request as an unlock. */
2892                 einfo.ei_mode = LCK_NL;
2893                 break;
2894         case F_WRLCK:
2895                 einfo.ei_mode = LCK_PW;
2896                 break;
2897         default:
2898                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2899                 LBUG();
2900         }
2901
2902         switch (cmd) {
2903         case F_SETLKW:
2904 #ifdef F_SETLKW64
2905         case F_SETLKW64:
2906 #endif
2907                 flags = 0;
2908                 break;
2909         case F_SETLK:
2910 #ifdef F_SETLK64
2911         case F_SETLK64:
2912 #endif
2913                 flags = LDLM_FL_BLOCK_NOWAIT;
2914                 break;
2915         case F_GETLK:
2916 #ifdef F_GETLK64
2917         case F_GETLK64:
2918 #endif
2919                 flags = LDLM_FL_TEST_LOCK;
2920                 /* Save the old mode so that if the mode in the lock changes we
2921                  * can decrement the appropriate reader or writer refcount. */
2922                 file_lock->fl_type = einfo.ei_mode;
2923                 break;
2924         default:
2925                 CERROR("unknown fcntl lock command: %d\n", cmd);
2926                 LBUG();
2927         }
2928
2929         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2930                                      LUSTRE_OPC_ANY, NULL);
2931         if (IS_ERR(op_data))
2932                 RETURN(PTR_ERR(op_data));
2933
2934         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2935                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2936                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2937
2938         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2939                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2940
2941         ll_finish_md_op_data(op_data);
2942
2943         if ((file_lock->fl_flags & FL_FLOCK) &&
2944             (rc == 0 || file_lock->fl_type == F_UNLCK))
2945                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2946 #ifdef HAVE_F_OP_FLOCK
2947         if ((file_lock->fl_flags & FL_POSIX) &&
2948             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2949             !(flags & LDLM_FL_TEST_LOCK))
2950                 posix_lock_file_wait(file, file_lock);
2951 #endif
2952
2953         RETURN(rc);
2954 }
2955
2956 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2957 {
2958         ENTRY;
2959
2960         RETURN(-ENOSYS);
2961 }
2962
2963 int ll_have_md_lock(struct inode *inode, __u64 bits)
2964 {
2965         struct lustre_handle lockh;
2966         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2967         struct lu_fid *fid;
2968         int flags;
2969         ENTRY;
2970
2971         if (!inode)
2972                RETURN(0);
2973
2974         fid = &ll_i2info(inode)->lli_fid;
2975         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2976
2977         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2978         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2979                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2980                 RETURN(1);
2981         }
2982         RETURN(0);
2983 }
2984
2985 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2986                             struct lustre_handle *lockh)
2987 {
2988         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2989         struct lu_fid *fid;
2990         ldlm_mode_t rc;
2991         int flags;
2992         ENTRY;
2993
2994         fid = &ll_i2info(inode)->lli_fid;
2995         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2996
2997         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2998         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2999                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
3000         RETURN(rc);
3001 }
3002
3003 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
3004         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
3005                               * and return success */
3006                 inode->i_nlink = 0;
3007                 /* This path cannot be hit for regular files unless in
3008                  * case of obscure races, so no need to to validate
3009                  * size. */
3010                 if (!S_ISREG(inode->i_mode) &&
3011                     !S_ISDIR(inode->i_mode))
3012                         return 0;
3013         }
3014
3015         if (rc) {
3016                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
3017                 return -abs(rc);
3018
3019         }
3020
3021         return 0;
3022 }
3023
3024 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
3025 {
3026         struct inode *inode = dentry->d_inode;
3027         struct ptlrpc_request *req = NULL;
3028         struct ll_sb_info *sbi;
3029         struct obd_export *exp;
3030         int rc;
3031         ENTRY;
3032
3033         if (!inode) {
3034                 CERROR("REPORT THIS LINE TO PETER\n");
3035                 RETURN(0);
3036         }
3037         sbi = ll_i2sbi(inode);
3038
3039         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
3040                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
3041
3042         exp = ll_i2mdexp(inode);
3043
3044         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
3045                 struct lookup_intent oit = { .it_op = IT_GETATTR };
3046                 struct md_op_data *op_data;
3047
3048                 /* Call getattr by fid, so do not provide name at all. */
3049                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
3050                                              dentry->d_inode, NULL, 0, 0,
3051                                              LUSTRE_OPC_ANY, NULL);
3052                 if (IS_ERR(op_data))
3053                         RETURN(PTR_ERR(op_data));
3054
3055                 oit.it_flags |= O_CHECK_STALE;
3056                 rc = md_intent_lock(exp, op_data, NULL, 0,
3057                                     /* we are not interested in name
3058                                        based lookup */
3059                                     &oit, 0, &req,
3060                                     ll_md_blocking_ast, 0);
3061                 ll_finish_md_op_data(op_data);
3062                 oit.it_flags &= ~O_CHECK_STALE;
3063                 if (rc < 0) {
3064                         rc = ll_inode_revalidate_fini(inode, rc);
3065                         GOTO (out, rc);
3066                 }
3067
3068                 rc = ll_revalidate_it_finish(req, &oit, dentry);
3069                 if (rc != 0) {
3070                         ll_intent_release(&oit);
3071                         GOTO(out, rc);
3072                 }
3073
3074                 /* Unlinked? Unhash dentry, so it is not picked up later by
3075                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
3076                    here to preserve get_cwd functionality on 2.6.
3077                    Bug 10503 */
3078                 if (!dentry->d_inode->i_nlink) {
3079                         spin_lock(&ll_lookup_lock);
3080                         spin_lock(&dcache_lock);
3081                         ll_drop_dentry(dentry);
3082                         spin_unlock(&dcache_lock);
3083                         spin_unlock(&ll_lookup_lock);
3084                 }
3085
3086                 ll_lookup_finish_locks(&oit, dentry);
3087         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
3088                                                      MDS_INODELOCK_LOOKUP)) {
3089                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
3090                 obd_valid valid = OBD_MD_FLGETATTR;
3091                 struct obd_capa *oc;
3092                 int ealen = 0;
3093
3094                 if (S_ISREG(inode->i_mode)) {
3095                         rc = ll_get_max_mdsize(sbi, &ealen);
3096                         if (rc)
3097                                 RETURN(rc);
3098                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
3099                 }
3100                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
3101                  * capa for this inode. Because we only keep capas of dirs
3102                  * fresh. */
3103                 oc = ll_mdscapa_get(inode);
3104                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
3105                                 ealen, &req);
3106                 capa_put(oc);
3107                 if (rc) {
3108                         rc = ll_inode_revalidate_fini(inode, rc);
3109                         RETURN(rc);
3110                 }
3111
3112                 rc = ll_prep_inode(&inode, req, NULL);
3113                 if (rc)
3114                         GOTO(out, rc);
3115         }
3116
3117         /* if object not yet allocated, don't validate size */
3118         if (ll_i2info(inode)->lli_smd == NULL)
3119                 GOTO(out, rc = 0);
3120
3121         /* ll_glimpse_size will prefer locally cached writes if they extend
3122          * the file */
3123         rc = ll_glimpse_size(inode, 0);
3124         EXIT;
3125 out:
3126         ptlrpc_req_finished(req);
3127         return rc;
3128 }
3129
3130 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
3131                   struct lookup_intent *it, struct kstat *stat)
3132 {
3133         struct inode *inode = de->d_inode;
3134         int res = 0;
3135
3136         res = ll_inode_revalidate_it(de, it);
3137         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
3138
3139         if (res)
3140                 return res;
3141
3142         stat->dev = inode->i_sb->s_dev;
3143         stat->ino = inode->i_ino;
3144         stat->mode = inode->i_mode;
3145         stat->nlink = inode->i_nlink;
3146         stat->uid = inode->i_uid;
3147         stat->gid = inode->i_gid;
3148         stat->rdev = kdev_t_to_nr(inode->i_rdev);
3149         stat->atime = inode->i_atime;
3150         stat->mtime = inode->i_mtime;
3151         stat->ctime = inode->i_ctime;
3152 #ifdef HAVE_INODE_BLKSIZE
3153         stat->blksize = inode->i_blksize;
3154 #else
3155         stat->blksize = 1 << inode->i_blkbits;
3156 #endif
3157
3158         ll_inode_size_lock(inode, 0);
3159         stat->size = i_size_read(inode);
3160         stat->blocks = inode->i_blocks;
3161         ll_inode_size_unlock(inode, 0);
3162
3163         return 0;
3164 }
3165 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3166 {
3167         struct lookup_intent it = { .it_op = IT_GETATTR };
3168
3169         return ll_getattr_it(mnt, de, &it, stat);
3170 }
3171
3172 static
3173 int lustre_check_acl(struct inode *inode, int mask)
3174 {
3175 #ifdef CONFIG_FS_POSIX_ACL
3176         struct ll_inode_info *lli = ll_i2info(inode);
3177         struct posix_acl *acl;
3178         int rc;
3179         ENTRY;
3180
3181         spin_lock(&lli->lli_lock);
3182         acl = posix_acl_dup(lli->lli_posix_acl);
3183         spin_unlock(&lli->lli_lock);
3184
3185         if (!acl)
3186                 RETURN(-EAGAIN);
3187
3188         rc = posix_acl_permission(inode, acl, mask);
3189         posix_acl_release(acl);
3190
3191         RETURN(rc);
3192 #else
3193         return -EAGAIN;
3194 #endif
3195 }
3196
3197 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
3198 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3199 {
3200         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3201                inode->i_ino, inode->i_generation, inode, mask);
3202         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3203                 return lustre_check_remote_perm(inode, mask);
3204
3205         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3206         return generic_permission(inode, mask, lustre_check_acl);
3207 }
3208 #else
3209 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3210 {
3211         int mode = inode->i_mode;
3212         int rc;
3213
3214         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3215                inode->i_ino, inode->i_generation, inode, mask);
3216
3217         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3218                 return lustre_check_remote_perm(inode, mask);
3219
3220         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3221
3222         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
3223             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
3224                 return -EROFS;
3225         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
3226                 return -EACCES;
3227         if (current->fsuid == inode->i_uid) {
3228                 mode >>= 6;
3229         } else if (1) {
3230                 if (((mode >> 3) & mask & S_IRWXO) != mask)
3231                         goto check_groups;
3232                 rc = lustre_check_acl(inode, mask);
3233                 if (rc == -EAGAIN)
3234                         goto check_groups;
3235                 if (rc == -EACCES)
3236                         goto check_capabilities;
3237                 return rc;
3238         } else {
3239 check_groups:
3240                 if (in_group_p(inode->i_gid))
3241                         mode >>= 3;
3242         }
3243         if ((mode & mask & S_IRWXO) == mask)
3244                 return 0;
3245
3246 check_capabilities:
3247         if (!(mask & MAY_EXEC) ||
3248             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
3249                 if (cfs_capable(CFS_CAP_DAC_OVERRIDE))
3250                         return 0;
3251
3252         if (cfs_capable(CFS_CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
3253             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
3254                 return 0;
3255
3256         return -EACCES;
3257 }
3258 #endif
3259
3260 /* -o localflock - only provides locally consistent flock locks */
3261 struct file_operations ll_file_operations = {
3262         .read           = ll_file_read,
3263         .write          = ll_file_write,
3264         .ioctl          = ll_file_ioctl,
3265         .open           = ll_file_open,
3266         .release        = ll_file_release,
3267         .mmap           = ll_file_mmap,
3268         .llseek         = ll_file_seek,
3269         .sendfile       = ll_file_sendfile,
3270         .fsync          = ll_fsync,
3271 };
3272
3273 struct file_operations ll_file_operations_flock = {
3274         .read           = ll_file_read,
3275         .write          = ll_file_write,
3276         .ioctl          = ll_file_ioctl,
3277         .open           = ll_file_open,
3278         .release        = ll_file_release,
3279         .mmap           = ll_file_mmap,
3280         .llseek         = ll_file_seek,
3281         .sendfile       = ll_file_sendfile,
3282         .fsync          = ll_fsync,
3283 #ifdef HAVE_F_OP_FLOCK
3284         .flock          = ll_file_flock,
3285 #endif
3286         .lock           = ll_file_flock
3287 };
3288
3289 /* These are for -o noflock - to return ENOSYS on flock calls */
3290 struct file_operations ll_file_operations_noflock = {
3291         .read           = ll_file_read,
3292         .write          = ll_file_write,
3293         .ioctl          = ll_file_ioctl,
3294         .open           = ll_file_open,
3295         .release        = ll_file_release,
3296         .mmap           = ll_file_mmap,
3297         .llseek         = ll_file_seek,
3298         .sendfile       = ll_file_sendfile,
3299         .fsync          = ll_fsync,
3300 #ifdef HAVE_F_OP_FLOCK
3301         .flock          = ll_file_noflock,
3302 #endif
3303         .lock           = ll_file_noflock
3304 };
3305
3306 struct inode_operations ll_file_inode_operations = {
3307 #ifdef HAVE_VFS_INTENT_PATCHES
3308         .setattr_raw    = ll_setattr_raw,
3309 #endif
3310         .setattr        = ll_setattr,
3311         .truncate       = ll_truncate,
3312         .getattr        = ll_getattr,
3313         .permission     = ll_inode_permission,
3314         .setxattr       = ll_setxattr,
3315         .getxattr       = ll_getxattr,
3316         .listxattr      = ll_listxattr,
3317         .removexattr    = ll_removexattr,
3318 };
3319
3320 /* dynamic ioctl number support routins */
3321 static struct llioc_ctl_data {
3322         struct rw_semaphore ioc_sem;
3323         struct list_head    ioc_head;
3324 } llioc = {
3325         __RWSEM_INITIALIZER(llioc.ioc_sem),
3326         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3327 };
3328
3329
3330 struct llioc_data {
3331         struct list_head        iocd_list;
3332         unsigned int            iocd_size;
3333         llioc_callback_t        iocd_cb;
3334         unsigned int            iocd_count;
3335         unsigned int            iocd_cmd[0];
3336 };
3337
3338 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3339 {
3340         unsigned int size;
3341         struct llioc_data *in_data = NULL;
3342         ENTRY;
3343
3344         if (cb == NULL || cmd == NULL ||
3345             count > LLIOC_MAX_CMD || count < 0)
3346                 RETURN(NULL);
3347
3348         size = sizeof(*in_data) + count * sizeof(unsigned int);
3349         OBD_ALLOC(in_data, size);
3350         if (in_data == NULL)
3351                 RETURN(NULL);
3352
3353         memset(in_data, 0, sizeof(*in_data));
3354         in_data->iocd_size = size;
3355         in_data->iocd_cb = cb;
3356         in_data->iocd_count = count;
3357         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3358
3359         down_write(&llioc.ioc_sem);
3360         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3361         up_write(&llioc.ioc_sem);
3362
3363         RETURN(in_data);
3364 }
3365
3366 void ll_iocontrol_unregister(void *magic)
3367 {
3368         struct llioc_data *tmp;
3369
3370         if (magic == NULL)
3371                 return;
3372
3373         down_write(&llioc.ioc_sem);
3374         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3375                 if (tmp == magic) {
3376                         unsigned int size = tmp->iocd_size;
3377
3378                         list_del(&tmp->iocd_list);
3379                         up_write(&llioc.ioc_sem);
3380
3381                         OBD_FREE(tmp, size);
3382                         return;
3383                 }
3384         }
3385         up_write(&llioc.ioc_sem);
3386
3387         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3388 }
3389
3390 EXPORT_SYMBOL(ll_iocontrol_register);
3391 EXPORT_SYMBOL(ll_iocontrol_unregister);
3392
3393 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
3394                         unsigned int cmd, unsigned long arg, int *rcp)
3395 {
3396         enum llioc_iter ret = LLIOC_CONT;
3397         struct llioc_data *data;
3398         int rc = -EINVAL, i;
3399
3400         down_read(&llioc.ioc_sem);
3401         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3402                 for (i = 0; i < data->iocd_count; i++) {
3403                         if (cmd != data->iocd_cmd[i])
3404                                 continue;
3405
3406                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3407                         break;
3408                 }
3409
3410                 if (ret == LLIOC_STOP)
3411                         break;
3412         }
3413         up_read(&llioc.ioc_sem);
3414
3415         if (rcp)
3416                 *rcp = rc;
3417         return ret;
3418 }