Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <lustre_mdc.h>
47 #include <linux/pagemap.h>
48 #include <linux/file.h>
49 #include "llite_internal.h"
50 #include <lustre/ll_fiemap.h>
51
52 /* also used by llite/special.c:ll_special_open() */
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
58         return fd;
59 }
60
61 static void ll_file_data_put(struct ll_file_data *fd)
62 {
63         if (fd != NULL)
64                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
65 }
66
67 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
68                           struct lustre_handle *fh)
69 {
70         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
71         op_data->op_attr.ia_mode = inode->i_mode;
72         op_data->op_attr.ia_atime = inode->i_atime;
73         op_data->op_attr.ia_mtime = inode->i_mtime;
74         op_data->op_attr.ia_ctime = inode->i_ctime;
75         op_data->op_attr.ia_size = i_size_read(inode);
76         op_data->op_attr_blocks = inode->i_blocks;
77         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
78         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
79         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
80         op_data->op_capa1 = ll_mdscapa_get(inode);
81 }
82
83 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
84                              struct obd_client_handle *och)
85 {
86         ENTRY;
87
88         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
89                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
90
91         if (!(och->och_flags & FMODE_WRITE))
92                 goto out;
93
94         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
95             !S_ISREG(inode->i_mode))
96                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
97         else
98                 ll_epoch_close(inode, op_data, &och, 0);
99
100 out:
101         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
102         EXIT;
103 }
104
105 static int ll_close_inode_openhandle(struct obd_export *md_exp,
106                                      struct inode *inode,
107                                      struct obd_client_handle *och)
108 {
109         struct obd_export *exp = ll_i2mdexp(inode);
110         struct md_op_data *op_data;
111         struct ptlrpc_request *req = NULL;
112         struct obd_device *obd = class_exp2obd(exp);
113         int epoch_close = 1;
114         int seq_end = 0, rc;
115         ENTRY;
116
117         if (obd == NULL) {
118                 /*
119                  * XXX: in case of LMV, is this correct to access
120                  * ->exp_handle?
121                  */
122                 CERROR("Invalid MDC connection handle "LPX64"\n",
123                        ll_i2mdexp(inode)->exp_handle.h_cookie);
124                 GOTO(out, rc = 0);
125         }
126
127         /*
128          * here we check if this is forced umount. If so this is called on
129          * canceling "open lock" and we do not call md_close() in this case, as
130          * it will not be successful, as import is already deactivated.
131          */
132         if (obd->obd_force)
133                 GOTO(out, rc = 0);
134
135         OBD_ALLOC_PTR(op_data);
136         if (op_data == NULL)
137                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
138
139         ll_prepare_close(inode, op_data, och);
140         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
141         rc = md_close(md_exp, op_data, och->och_mod, &req);
142         if (rc != -EAGAIN)
143                 seq_end = 1;
144
145         if (rc == -EAGAIN) {
146                 /* This close must have the epoch closed. */
147                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
148                 LASSERT(epoch_close);
149                 /* MDS has instructed us to obtain Size-on-MDS attribute from
150                  * OSTs and send setattr to back to MDS. */
151                 rc = ll_sizeonmds_update(inode, och->och_mod,
152                                          &och->och_fh, op_data->op_ioepoch);
153                 if (rc) {
154                         CERROR("inode %lu mdc Size-on-MDS update failed: "
155                                "rc = %d\n", inode->i_ino, rc);
156                         rc = 0;
157                 }
158         } else if (rc) {
159                 CERROR("inode %lu mdc close failed: rc = %d\n",
160                        inode->i_ino, rc);
161         }
162         ll_finish_md_op_data(op_data);
163
164         if (rc == 0) {
165                 rc = ll_objects_destroy(req, inode);
166                 if (rc)
167                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
168                                inode->i_ino, rc);
169         }
170
171         EXIT;
172 out:
173
174         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
175             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
176                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
177         } else {
178                 if (seq_end)
179                         ptlrpc_close_replay_seq(req);
180                 md_clear_open_replay_data(md_exp, och);
181                 /* Free @och if it is not waiting for DONE_WRITING. */
182                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
183                 OBD_FREE_PTR(och);
184         }
185         if (req) /* This is close request */
186                 ptlrpc_req_finished(req);
187         return rc;
188 }
189
190 int ll_md_real_close(struct inode *inode, int flags)
191 {
192         struct ll_inode_info *lli = ll_i2info(inode);
193         struct obd_client_handle **och_p;
194         struct obd_client_handle *och;
195         __u64 *och_usecount;
196         int rc = 0;
197         ENTRY;
198
199         if (flags & FMODE_WRITE) {
200                 och_p = &lli->lli_mds_write_och;
201                 och_usecount = &lli->lli_open_fd_write_count;
202         } else if (flags & FMODE_EXEC) {
203                 och_p = &lli->lli_mds_exec_och;
204                 och_usecount = &lli->lli_open_fd_exec_count;
205         } else {
206                 LASSERT(flags & FMODE_READ);
207                 och_p = &lli->lli_mds_read_och;
208                 och_usecount = &lli->lli_open_fd_read_count;
209         }
210
211         down(&lli->lli_och_sem);
212         if (*och_usecount) { /* There are still users of this handle, so
213                                 skip freeing it. */
214                 up(&lli->lli_och_sem);
215                 RETURN(0);
216         }
217         och=*och_p;
218         *och_p = NULL;
219         up(&lli->lli_och_sem);
220
221         if (och) { /* There might be a race and somebody have freed this och
222                       already */
223                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
224                                                inode, och);
225         }
226
227         RETURN(rc);
228 }
229
230 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
231                 struct file *file)
232 {
233         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
234         struct ll_inode_info *lli = ll_i2info(inode);
235         int rc = 0;
236         ENTRY;
237
238         /* clear group lock, if present */
239         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
240                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
241                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
242                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
243                                       &fd->fd_cwlockh);
244         }
245
246         /* Let's see if we have good enough OPEN lock on the file and if
247            we can skip talking to MDS */
248         if (file->f_dentry->d_inode) { /* Can this ever be false? */
249                 int lockmode;
250                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
251                 struct lustre_handle lockh;
252                 struct inode *inode = file->f_dentry->d_inode;
253                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
254
255                 down(&lli->lli_och_sem);
256                 if (fd->fd_omode & FMODE_WRITE) {
257                         lockmode = LCK_CW;
258                         LASSERT(lli->lli_open_fd_write_count);
259                         lli->lli_open_fd_write_count--;
260                 } else if (fd->fd_omode & FMODE_EXEC) {
261                         lockmode = LCK_PR;
262                         LASSERT(lli->lli_open_fd_exec_count);
263                         lli->lli_open_fd_exec_count--;
264                 } else {
265                         lockmode = LCK_CR;
266                         LASSERT(lli->lli_open_fd_read_count);
267                         lli->lli_open_fd_read_count--;
268                 }
269                 up(&lli->lli_och_sem);
270
271                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
272                                    LDLM_IBITS, &policy, lockmode,
273                                    &lockh)) {
274                         rc = ll_md_real_close(file->f_dentry->d_inode,
275                                               fd->fd_omode);
276                 }
277         } else {
278                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
279                        file, file->f_dentry, file->f_dentry->d_name.name);
280         }
281
282         LUSTRE_FPRIVATE(file) = NULL;
283         ll_file_data_put(fd);
284         ll_capa_close(inode);
285
286         RETURN(rc);
287 }
288
289 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
290
291 /* While this returns an error code, fput() the caller does not, so we need
292  * to make every effort to clean up all of our state here.  Also, applications
293  * rarely check close errors and even if an error is returned they will not
294  * re-try the close call.
295  */
296 int ll_file_release(struct inode *inode, struct file *file)
297 {
298         struct ll_file_data *fd;
299         struct ll_sb_info *sbi = ll_i2sbi(inode);
300         struct ll_inode_info *lli = ll_i2info(inode);
301         struct lov_stripe_md *lsm = lli->lli_smd;
302         int rc;
303         ENTRY;
304
305         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
306                inode->i_generation, inode);
307
308 #ifdef CONFIG_FS_POSIX_ACL
309         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
310             inode == inode->i_sb->s_root->d_inode) {
311                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
312
313                 LASSERT(fd != NULL);
314                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
315                         fd->fd_flags &= ~LL_FILE_RMTACL;
316                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
317                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
318                 }
319         }
320 #endif
321
322         if (inode->i_sb->s_root != file->f_dentry)
323                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
324         fd = LUSTRE_FPRIVATE(file);
325         LASSERT(fd != NULL);
326
327         /* The last ref on @file, maybe not the the owner pid of statahead.
328          * Different processes can open the same dir, "ll_opendir_key" means:
329          * it is me that should stop the statahead thread. */
330         if (lli->lli_opendir_key == fd && lli->lli_opendir_pid != 0)
331                 ll_stop_statahead(inode, fd);
332
333         if (inode->i_sb->s_root == file->f_dentry) {
334                 LUSTRE_FPRIVATE(file) = NULL;
335                 ll_file_data_put(fd);
336                 RETURN(0);
337         }
338
339         if (lsm)
340                 lov_test_and_clear_async_rc(lsm);
341         lli->lli_async_rc = 0;
342
343         rc = ll_md_close(sbi->ll_md_exp, inode, file);
344         RETURN(rc);
345 }
346
347 static int ll_intent_file_open(struct file *file, void *lmm,
348                                int lmmsize, struct lookup_intent *itp)
349 {
350         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
351         struct dentry *parent = file->f_dentry->d_parent;
352         const char *name = file->f_dentry->d_name.name;
353         const int len = file->f_dentry->d_name.len;
354         struct md_op_data *op_data;
355         struct ptlrpc_request *req;
356         int rc;
357         ENTRY;
358
359         if (!parent)
360                 RETURN(-ENOENT);
361
362         /* Usually we come here only for NFSD, and we want open lock.
363            But we can also get here with pre 2.6.15 patchless kernels, and in
364            that case that lock is also ok */
365         /* We can also get here if there was cached open handle in revalidate_it
366          * but it disappeared while we were getting from there to ll_file_open.
367          * But this means this file was closed and immediatelly opened which
368          * makes a good candidate for using OPEN lock */
369         /* If lmmsize & lmm are not 0, we are just setting stripe info
370          * parameters. No need for the open lock */
371         if (!lmm && !lmmsize)
372                 itp->it_flags |= MDS_OPEN_LOCK;
373
374         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
375                                       file->f_dentry->d_inode, name, len,
376                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
377         if (IS_ERR(op_data))
378                 RETURN(PTR_ERR(op_data));
379
380         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
381                             0 /*unused */, &req, ll_md_blocking_ast, 0);
382         ll_finish_md_op_data(op_data);
383         if (rc == -ESTALE) {
384                 /* reason for keep own exit path - don`t flood log
385                 * with messages with -ESTALE errors.
386                 */
387                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
388                      it_open_error(DISP_OPEN_OPEN, itp))
389                         GOTO(out, rc);
390                 ll_release_openhandle(file->f_dentry, itp);
391                 GOTO(out, rc);
392         }
393
394         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
395                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
396                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
397                 GOTO(out, rc);
398         }
399
400         if (itp->d.lustre.it_lock_mode)
401                 md_set_lock_data(sbi->ll_md_exp,
402                                  &itp->d.lustre.it_lock_handle,
403                                  file->f_dentry->d_inode);
404
405         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
406 out:
407         ptlrpc_req_finished(itp->d.lustre.it_data);
408         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
409         ll_intent_drop_lock(itp);
410
411         RETURN(rc);
412 }
413
414 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
415                        struct lookup_intent *it, struct obd_client_handle *och)
416 {
417         struct ptlrpc_request *req = it->d.lustre.it_data;
418         struct mdt_body *body;
419
420         LASSERT(och);
421
422         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
423         LASSERT(body != NULL);                      /* reply already checked out */
424
425         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
426         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
427         och->och_fid = lli->lli_fid;
428         och->och_flags = it->it_flags;
429         lli->lli_ioepoch = body->ioepoch;
430
431         return md_set_open_replay_data(md_exp, och, req);
432 }
433
434 int ll_local_open(struct file *file, struct lookup_intent *it,
435                   struct ll_file_data *fd, struct obd_client_handle *och)
436 {
437         struct inode *inode = file->f_dentry->d_inode;
438         struct ll_inode_info *lli = ll_i2info(inode);
439         ENTRY;
440
441         LASSERT(!LUSTRE_FPRIVATE(file));
442
443         LASSERT(fd != NULL);
444
445         if (och) {
446                 struct ptlrpc_request *req = it->d.lustre.it_data;
447                 struct mdt_body *body;
448                 int rc;
449
450                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
451                 if (rc)
452                         RETURN(rc);
453
454                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
455                 if ((it->it_flags & FMODE_WRITE) &&
456                     (body->valid & OBD_MD_FLSIZE))
457                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
458                                lli->lli_ioepoch, PFID(&lli->lli_fid));
459         }
460
461         LUSTRE_FPRIVATE(file) = fd;
462         ll_readahead_init(inode, &fd->fd_ras);
463         fd->fd_omode = it->it_flags;
464         RETURN(0);
465 }
466
467 /* Open a file, and (for the very first open) create objects on the OSTs at
468  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
469  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
470  * lli_open_sem to ensure no other process will create objects, send the
471  * stripe MD to the MDS, or try to destroy the objects if that fails.
472  *
473  * If we already have the stripe MD locally then we don't request it in
474  * md_open(), by passing a lmm_size = 0.
475  *
476  * It is up to the application to ensure no other processes open this file
477  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
478  * used.  We might be able to avoid races of that sort by getting lli_open_sem
479  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
480  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
481  */
482 int ll_file_open(struct inode *inode, struct file *file)
483 {
484         struct ll_inode_info *lli = ll_i2info(inode);
485         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
486                                           .it_flags = file->f_flags };
487         struct lov_stripe_md *lsm;
488         struct ptlrpc_request *req = NULL;
489         struct obd_client_handle **och_p;
490         __u64 *och_usecount;
491         struct ll_file_data *fd;
492         int rc = 0, opendir_set = 0;
493         ENTRY;
494
495         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
496                inode->i_generation, inode, file->f_flags);
497
498 #ifdef HAVE_VFS_INTENT_PATCHES
499         it = file->f_it;
500 #else
501         it = file->private_data; /* XXX: compat macro */
502         file->private_data = NULL; /* prevent ll_local_open assertion */
503 #endif
504
505         fd = ll_file_data_get();
506         if (fd == NULL)
507                 RETURN(-ENOMEM);
508
509         if (S_ISDIR(inode->i_mode)) {
510 again:
511                 spin_lock(&lli->lli_lock);
512                 if (lli->lli_opendir_key == NULL && lli->lli_opendir_pid == 0) {
513                         LASSERT(lli->lli_sai == NULL);
514                         lli->lli_opendir_key = fd;
515                         lli->lli_opendir_pid = cfs_curproc_pid();
516                         opendir_set = 1;
517                 } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid() &&
518                                     lli->lli_opendir_key != NULL)) {
519                         /* Two cases for this:
520                          * (1) The same process open such directory many times.
521                          * (2) The old process opened the directory, and exited
522                          *     before its children processes. Then new process
523                          *     with the same pid opens such directory before the
524                          *     old process's children processes exit.
525                          * reset stat ahead for such cases. */
526                         spin_unlock(&lli->lli_lock);
527                         CDEBUG(D_INFO, "Conflict statahead for %.*s "DFID
528                                " reset it.\n", file->f_dentry->d_name.len,
529                                file->f_dentry->d_name.name,
530                                PFID(&lli->lli_fid));
531                         ll_stop_statahead(inode, lli->lli_opendir_key);
532                         goto again;
533                 }
534                 spin_unlock(&lli->lli_lock);
535         }
536
537         if (inode->i_sb->s_root == file->f_dentry) {
538                 LUSTRE_FPRIVATE(file) = fd;
539                 RETURN(0);
540         }
541
542         if (!it || !it->d.lustre.it_disposition) {
543                 /* Convert f_flags into access mode. We cannot use file->f_mode,
544                  * because everything but O_ACCMODE mask was stripped from
545                  * there */
546                 if ((oit.it_flags + 1) & O_ACCMODE)
547                         oit.it_flags++;
548                 if (file->f_flags & O_TRUNC)
549                         oit.it_flags |= FMODE_WRITE;
550
551                 /* kernel only call f_op->open in dentry_open.  filp_open calls
552                  * dentry_open after call to open_namei that checks permissions.
553                  * Only nfsd_open call dentry_open directly without checking
554                  * permissions and because of that this code below is safe. */
555                 if (oit.it_flags & FMODE_WRITE)
556                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
557
558                 /* We do not want O_EXCL here, presumably we opened the file
559                  * already? XXX - NFS implications? */
560                 oit.it_flags &= ~O_EXCL;
561
562                 it = &oit;
563         }
564
565 restart:
566         /* Let's see if we have file open on MDS already. */
567         if (it->it_flags & FMODE_WRITE) {
568                 och_p = &lli->lli_mds_write_och;
569                 och_usecount = &lli->lli_open_fd_write_count;
570         } else if (it->it_flags & FMODE_EXEC) {
571                 och_p = &lli->lli_mds_exec_och;
572                 och_usecount = &lli->lli_open_fd_exec_count;
573          } else {
574                 och_p = &lli->lli_mds_read_och;
575                 och_usecount = &lli->lli_open_fd_read_count;
576         }
577
578         down(&lli->lli_och_sem);
579         if (*och_p) { /* Open handle is present */
580                 if (it_disposition(it, DISP_OPEN_OPEN)) {
581                         /* Well, there's extra open request that we do not need,
582                            let's close it somehow. This will decref request. */
583                         rc = it_open_error(DISP_OPEN_OPEN, it);
584                         if (rc) {
585                                 up(&lli->lli_och_sem);
586                                 ll_file_data_put(fd);
587                                 GOTO(out_openerr, rc);
588                         }
589                         ll_release_openhandle(file->f_dentry, it);
590                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
591                                              LPROC_LL_OPEN);
592                 }
593                 (*och_usecount)++;
594
595                 rc = ll_local_open(file, it, fd, NULL);
596                 if (rc) {
597                         (*och_usecount)--;
598                         up(&lli->lli_och_sem);
599                         ll_file_data_put(fd);
600                         GOTO(out_openerr, rc);
601                 }
602         } else {
603                 LASSERT(*och_usecount == 0);
604                 if (!it->d.lustre.it_disposition) {
605                         /* We cannot just request lock handle now, new ELC code
606                            means that one of other OPEN locks for this file
607                            could be cancelled, and since blocking ast handler
608                            would attempt to grab och_sem as well, that would
609                            result in a deadlock */
610                         up(&lli->lli_och_sem);
611                         it->it_flags |= O_CHECK_STALE;
612                         rc = ll_intent_file_open(file, NULL, 0, it);
613                         it->it_flags &= ~O_CHECK_STALE;
614                         if (rc) {
615                                 ll_file_data_put(fd);
616                                 GOTO(out_openerr, rc);
617                         }
618
619                         /* Got some error? Release the request */
620                         if (it->d.lustre.it_status < 0) {
621                                 req = it->d.lustre.it_data;
622                                 ptlrpc_req_finished(req);
623                         }
624                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
625                                          &it->d.lustre.it_lock_handle,
626                                          file->f_dentry->d_inode);
627                         goto restart;
628                 }
629                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
630                 if (!*och_p) {
631                         ll_file_data_put(fd);
632                         GOTO(out_och_free, rc = -ENOMEM);
633                 }
634                 (*och_usecount)++;
635                 req = it->d.lustre.it_data;
636
637                 /* md_intent_lock() didn't get a request ref if there was an
638                  * open error, so don't do cleanup on the request here
639                  * (bug 3430) */
640                 /* XXX (green): Should not we bail out on any error here, not
641                  * just open error? */
642                 rc = it_open_error(DISP_OPEN_OPEN, it);
643                 if (rc) {
644                         ll_file_data_put(fd);
645                         GOTO(out_och_free, rc);
646                 }
647
648                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
649                 rc = ll_local_open(file, it, fd, *och_p);
650                 if (rc) {
651                         ll_file_data_put(fd);
652                         GOTO(out_och_free, rc);
653                 }
654         }
655         up(&lli->lli_och_sem);
656
657         /* Must do this outside lli_och_sem lock to prevent deadlock where
658            different kind of OPEN lock for this same inode gets cancelled
659            by ldlm_cancel_lru */
660         if (!S_ISREG(inode->i_mode))
661                 GOTO(out, rc);
662
663         ll_capa_open(inode);
664
665         lsm = lli->lli_smd;
666         if (lsm == NULL) {
667                 if (file->f_flags & O_LOV_DELAY_CREATE ||
668                     !(file->f_mode & FMODE_WRITE)) {
669                         CDEBUG(D_INODE, "object creation was delayed\n");
670                         GOTO(out, rc);
671                 }
672         }
673         file->f_flags &= ~O_LOV_DELAY_CREATE;
674         GOTO(out, rc);
675 out:
676         ptlrpc_req_finished(req);
677         if (req)
678                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
679 out_och_free:
680         if (rc) {
681                 if (*och_p) {
682                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
683                         *och_p = NULL; /* OBD_FREE writes some magic there */
684                         (*och_usecount)--;
685                 }
686                 up(&lli->lli_och_sem);
687 out_openerr:
688                 if (opendir_set != 0)
689                         ll_stop_statahead(inode, fd);
690         }
691
692         return rc;
693 }
694
695 /* Fills the obdo with the attributes for the inode defined by lsm */
696 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
697 {
698         struct ptlrpc_request_set *set;
699         struct ll_inode_info *lli = ll_i2info(inode);
700         struct lov_stripe_md *lsm = lli->lli_smd;
701
702         struct obd_info oinfo = { { { 0 } } };
703         int rc;
704         ENTRY;
705
706         LASSERT(lsm != NULL);
707
708         oinfo.oi_md = lsm;
709         oinfo.oi_oa = obdo;
710         oinfo.oi_oa->o_id = lsm->lsm_object_id;
711         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
712         oinfo.oi_oa->o_mode = S_IFREG;
713         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
714                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
715                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
716                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
717                                OBD_MD_FLGROUP;
718         oinfo.oi_capa = ll_mdscapa_get(inode);
719
720         set = ptlrpc_prep_set();
721         if (set == NULL) {
722                 CERROR("can't allocate ptlrpc set\n");
723                 rc = -ENOMEM;
724         } else {
725                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
726                 if (rc == 0)
727                         rc = ptlrpc_set_wait(set);
728                 ptlrpc_set_destroy(set);
729         }
730         capa_put(oinfo.oi_capa);
731         if (rc)
732                 RETURN(rc);
733
734         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
735                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
736                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
737
738         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
739         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
740                lli->lli_smd->lsm_object_id, i_size_read(inode),
741                (unsigned long long)inode->i_blocks,
742                (unsigned long)ll_inode_blksize(inode));
743         RETURN(0);
744 }
745
746 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
747 {
748         struct ll_inode_info *lli = ll_i2info(inode);
749         struct lov_stripe_md *lsm = lli->lli_smd;
750         struct obd_export *exp = ll_i2dtexp(inode);
751         struct {
752                 char name[16];
753                 struct ldlm_lock *lock;
754         } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock };
755         __u32 stripe, vallen = sizeof(stripe);
756         struct lov_oinfo *loinfo;
757         int rc;
758         ENTRY;
759
760         if (lsm->lsm_stripe_count == 1)
761                 GOTO(check, stripe = 0);
762
763         /* get our offset in the lov */
764         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe, lsm);
765         if (rc != 0) {
766                 CERROR("obd_get_info: rc = %d\n", rc);
767                 RETURN(rc);
768         }
769         LASSERT(stripe < lsm->lsm_stripe_count);
770
771 check:
772         loinfo = lsm->lsm_oinfo[stripe];
773         if (!osc_res_name_eq(loinfo->loi_id, loinfo->loi_gr,
774                             &lock->l_resource->lr_name)){
775                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
776                            loinfo->loi_id, loinfo->loi_gr);
777                 RETURN(-ELDLM_NO_LOCK_DATA);
778         }
779
780         RETURN(stripe);
781 }
782
783 /* Get extra page reference to ensure it is not going away */
784 void ll_pin_extent_cb(void *data)
785 {
786         struct page *page = data;
787
788         page_cache_get(page);
789
790         return;
791 }
792
793 /* Flush the page from page cache for an extent as its canceled.
794  * Page to remove is delivered as @data.
795  *
796  * No one can dirty the extent until we've finished our work and they cannot
797  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
798  * but other kernel actors could have pages locked.
799  *
800  * If @discard is set, there is no need to write the page if it is dirty.
801  *
802  * Called with the DLM lock held. */
803 int ll_page_removal_cb(void *data, int discard)
804 {
805         int rc;
806         struct page *page = data;
807         struct address_space *mapping;
808
809         ENTRY;
810
811         /* We have page reference already from ll_pin_page */
812         lock_page(page);
813
814         /* Already truncated by somebody */
815         if (!page->mapping)
816                 GOTO(out, rc = 0);
817         mapping = page->mapping;
818
819         ll_teardown_mmaps(mapping,
820                           (__u64)page->index << PAGE_CACHE_SHIFT,
821                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
822                                                               ~PAGE_CACHE_MASK);
823         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
824
825         if (!discard && clear_page_dirty_for_io(page)) {
826                 LASSERT(page->mapping);
827                 rc = ll_call_writepage(page->mapping->host, page);
828                 /* either waiting for io to complete or reacquiring
829                  * the lock that the failed writepage released */
830                 lock_page(page);
831                 wait_on_page_writeback(page);
832                 if (rc != 0) {
833                         CERROR("writepage inode %lu(%p) of page %p "
834                                "failed: %d\n", mapping->host->i_ino,
835                                mapping->host, page, rc);
836                         if (rc == -ENOSPC)
837                                 set_bit(AS_ENOSPC, &mapping->flags);
838                         else
839                                 set_bit(AS_EIO, &mapping->flags);
840                 }
841                 set_bit(AS_EIO, &mapping->flags);
842         }
843         if (page->mapping != NULL) {
844                 struct ll_async_page *llap = llap_cast_private(page);
845                 /* checking again to account for writeback's lock_page() */
846                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
847                 if (llap)
848                         ll_ra_accounting(llap, page->mapping);
849                 ll_truncate_complete_page(page);
850         }
851         EXIT;
852 out:
853         LASSERT(!PageWriteback(page));
854         unlock_page(page);
855         page_cache_release(page);
856
857         return 0;
858 }
859
860 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
861                              void *data, int flag)
862 {
863         struct inode *inode;
864         struct ll_inode_info *lli;
865         struct lov_stripe_md *lsm;
866         int stripe;
867         __u64 kms;
868
869         ENTRY;
870
871         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
872                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
873                 LBUG();
874         }
875
876         inode = ll_inode_from_lock(lock);
877         if (inode == NULL)
878                 RETURN(0);
879         lli = ll_i2info(inode);
880         if (lli == NULL)
881                 GOTO(iput, 0);
882         if (lli->lli_smd == NULL)
883                 GOTO(iput, 0);
884         lsm = lli->lli_smd;
885
886         stripe = ll_lock_to_stripe_offset(inode, lock);
887         if (stripe < 0)
888                 GOTO(iput, 0);
889
890         lov_stripe_lock(lsm);
891         lock_res_and_lock(lock);
892         kms = ldlm_extent_shift_kms(lock,
893                                     lsm->lsm_oinfo[stripe]->loi_kms);
894
895         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
896                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
897                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
898         lsm->lsm_oinfo[stripe]->loi_kms = kms;
899         unlock_res_and_lock(lock);
900         lov_stripe_unlock(lsm);
901         ll_queue_done_writing(inode, 0);
902         EXIT;
903 iput:
904         iput(inode);
905
906         return 0;
907 }
908
909 #if 0
910 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
911 {
912         /* XXX ALLOCATE - 160 bytes */
913         struct inode *inode = ll_inode_from_lock(lock);
914         struct ll_inode_info *lli = ll_i2info(inode);
915         struct lustre_handle lockh = { 0 };
916         struct ost_lvb *lvb;
917         int stripe;
918         ENTRY;
919
920         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
921                      LDLM_FL_BLOCK_CONV)) {
922                 LBUG(); /* not expecting any blocked async locks yet */
923                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
924                            "lock, returning");
925                 ldlm_lock_dump(D_OTHER, lock, 0);
926                 ldlm_reprocess_all(lock->l_resource);
927                 RETURN(0);
928         }
929
930         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
931
932         stripe = ll_lock_to_stripe_offset(inode, lock);
933         if (stripe < 0)
934                 goto iput;
935
936         if (lock->l_lvb_len) {
937                 struct lov_stripe_md *lsm = lli->lli_smd;
938                 __u64 kms;
939                 lvb = lock->l_lvb_data;
940                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
941
942                 lock_res_and_lock(lock);
943                 ll_inode_size_lock(inode, 1);
944                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
945                 kms = ldlm_extent_shift_kms(NULL, kms);
946                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
947                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
948                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
949                 lsm->lsm_oinfo[stripe].loi_kms = kms;
950                 ll_inode_size_unlock(inode, 1);
951                 unlock_res_and_lock(lock);
952         }
953
954 iput:
955         iput(inode);
956         wake_up(&lock->l_waitq);
957
958         ldlm_lock2handle(lock, &lockh);
959         ldlm_lock_decref(&lockh, LCK_PR);
960         RETURN(0);
961 }
962 #endif
963
964 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
965 {
966         struct ptlrpc_request *req = reqp;
967         struct inode *inode = ll_inode_from_lock(lock);
968         struct ll_inode_info *lli;
969         struct lov_stripe_md *lsm;
970         struct ost_lvb *lvb;
971         int rc, stripe;
972         ENTRY;
973
974         if (inode == NULL)
975                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
976         lli = ll_i2info(inode);
977         if (lli == NULL)
978                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
979         lsm = lli->lli_smd;
980         if (lsm == NULL)
981                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
982
983         /* First, find out which stripe index this lock corresponds to. */
984         stripe = ll_lock_to_stripe_offset(inode, lock);
985         if (stripe < 0)
986                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
987
988         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
989         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
990                              sizeof(*lvb));
991         rc = req_capsule_server_pack(&req->rq_pill);
992         if (rc) {
993                 CERROR("lustre_pack_reply: %d\n", rc);
994                 GOTO(iput, rc);
995         }
996
997         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
998         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
999         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1000         lvb->lvb_atime = LTIME_S(inode->i_atime);
1001         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1002
1003         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1004                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1005                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1006                    lvb->lvb_atime, lvb->lvb_ctime);
1007  iput:
1008         iput(inode);
1009
1010  out:
1011         /* These errors are normal races, so we don't want to fill the console
1012          * with messages by calling ptlrpc_error() */
1013         if (rc == -ELDLM_NO_LOCK_DATA)
1014                 lustre_pack_reply(req, 1, NULL, NULL);
1015
1016         req->rq_status = rc;
1017         return rc;
1018 }
1019
1020 static int ll_merge_lvb(struct inode *inode)
1021 {
1022         struct ll_inode_info *lli = ll_i2info(inode);
1023         struct ll_sb_info *sbi = ll_i2sbi(inode);
1024         struct ost_lvb lvb;
1025         int rc;
1026
1027         ENTRY;
1028
1029         ll_inode_size_lock(inode, 1);
1030         inode_init_lvb(inode, &lvb);
1031         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1032         i_size_write(inode, lvb.lvb_size);
1033         inode->i_blocks = lvb.lvb_blocks;
1034
1035         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1036         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1037         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1038         ll_inode_size_unlock(inode, 1);
1039
1040         RETURN(rc);
1041 }
1042
1043 int ll_local_size(struct inode *inode)
1044 {
1045         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1046         struct ll_inode_info *lli = ll_i2info(inode);
1047         struct ll_sb_info *sbi = ll_i2sbi(inode);
1048         struct lustre_handle lockh = { 0 };
1049         int flags = 0;
1050         int rc;
1051         ENTRY;
1052
1053         if (lli->lli_smd->lsm_stripe_count == 0)
1054                 RETURN(0);
1055
1056         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1057                        &policy, LCK_PR, &flags, inode, &lockh);
1058         if (rc < 0)
1059                 RETURN(rc);
1060         else if (rc == 0)
1061                 RETURN(-ENODATA);
1062
1063         rc = ll_merge_lvb(inode);
1064         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1065         RETURN(rc);
1066 }
1067
1068 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1069                      lstat_t *st)
1070 {
1071         struct lustre_handle lockh = { 0 };
1072         struct ldlm_enqueue_info einfo = { 0 };
1073         struct obd_info oinfo = { { { 0 } } };
1074         struct ost_lvb lvb;
1075         int rc;
1076
1077         ENTRY;
1078
1079         einfo.ei_type = LDLM_EXTENT;
1080         einfo.ei_mode = LCK_PR;
1081         einfo.ei_cb_bl = osc_extent_blocking_cb;
1082         einfo.ei_cb_cp = ldlm_completion_ast;
1083         einfo.ei_cb_gl = ll_glimpse_callback;
1084         einfo.ei_cbdata = NULL;
1085
1086         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1087         oinfo.oi_lockh = &lockh;
1088         oinfo.oi_md = lsm;
1089         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1090
1091         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1092         if (rc == -ENOENT)
1093                 RETURN(rc);
1094         if (rc != 0) {
1095                 CERROR("obd_enqueue returned rc %d, "
1096                        "returning -EIO\n", rc);
1097                 RETURN(rc > 0 ? -EIO : rc);
1098         }
1099
1100         lov_stripe_lock(lsm);
1101         memset(&lvb, 0, sizeof(lvb));
1102         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1103         st->st_size = lvb.lvb_size;
1104         st->st_blocks = lvb.lvb_blocks;
1105         st->st_mtime = lvb.lvb_mtime;
1106         st->st_atime = lvb.lvb_atime;
1107         st->st_ctime = lvb.lvb_ctime;
1108         lov_stripe_unlock(lsm);
1109
1110         RETURN(rc);
1111 }
1112
1113 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1114  * file (because it prefers KMS over RSS when larger) */
1115 int ll_glimpse_size(struct inode *inode, int ast_flags)
1116 {
1117         struct ll_inode_info *lli = ll_i2info(inode);
1118         struct ll_sb_info *sbi = ll_i2sbi(inode);
1119         struct lustre_handle lockh = { 0 };
1120         struct ldlm_enqueue_info einfo = { 0 };
1121         struct obd_info oinfo = { { { 0 } } };
1122         int rc;
1123         ENTRY;
1124
1125         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1126                 RETURN(0);
1127
1128         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1129
1130         if (!lli->lli_smd) {
1131                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1132                 RETURN(0);
1133         }
1134
1135         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1136          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1137          *       won't revoke any conflicting DLM locks held. Instead,
1138          *       ll_glimpse_callback() will be called on each client
1139          *       holding a DLM lock against this file, and resulting size
1140          *       will be returned for each stripe. DLM lock on [0, EOF] is
1141          *       acquired only if there were no conflicting locks. */
1142         einfo.ei_type = LDLM_EXTENT;
1143         einfo.ei_mode = LCK_PR;
1144         einfo.ei_cb_bl = osc_extent_blocking_cb;
1145         einfo.ei_cb_cp = ldlm_completion_ast;
1146         einfo.ei_cb_gl = ll_glimpse_callback;
1147         einfo.ei_cbdata = inode;
1148
1149         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1150         oinfo.oi_lockh = &lockh;
1151         oinfo.oi_md = lli->lli_smd;
1152         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1153
1154         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1155         if (rc == -ENOENT)
1156                 RETURN(rc);
1157         if (rc != 0) {
1158                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1159                 RETURN(rc > 0 ? -EIO : rc);
1160         }
1161
1162         rc = ll_merge_lvb(inode);
1163
1164         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1165                i_size_read(inode), (unsigned long long)inode->i_blocks);
1166
1167         RETURN(rc);
1168 }
1169
1170 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1171                    struct lov_stripe_md *lsm, int mode,
1172                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1173                    int ast_flags)
1174 {
1175         struct ll_sb_info *sbi = ll_i2sbi(inode);
1176         struct ost_lvb lvb;
1177         struct ldlm_enqueue_info einfo = { 0 };
1178         struct obd_info oinfo = { { { 0 } } };
1179         int rc;
1180         ENTRY;
1181
1182         LASSERT(!lustre_handle_is_used(lockh));
1183         LASSERT(lsm != NULL);
1184
1185         /* don't drop the mmapped file to LRU */
1186         if (mapping_mapped(inode->i_mapping))
1187                 ast_flags |= LDLM_FL_NO_LRU;
1188
1189         /* XXX phil: can we do this?  won't it screw the file size up? */
1190         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1191             (sbi->ll_flags & LL_SBI_NOLCK))
1192                 RETURN(0);
1193
1194         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1195                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1196
1197         einfo.ei_type = LDLM_EXTENT;
1198         einfo.ei_mode = mode;
1199         einfo.ei_cb_bl = osc_extent_blocking_cb;
1200         einfo.ei_cb_cp = ldlm_completion_ast;
1201         einfo.ei_cb_gl = ll_glimpse_callback;
1202         einfo.ei_cbdata = inode;
1203
1204         oinfo.oi_policy = *policy;
1205         oinfo.oi_lockh = lockh;
1206         oinfo.oi_md = lsm;
1207         oinfo.oi_flags = ast_flags;
1208
1209         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1210         *policy = oinfo.oi_policy;
1211         if (rc > 0)
1212                 rc = -EIO;
1213
1214         ll_inode_size_lock(inode, 1);
1215         inode_init_lvb(inode, &lvb);
1216         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1217
1218         if (policy->l_extent.start == 0 &&
1219             policy->l_extent.end == OBD_OBJECT_EOF) {
1220                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1221                  * the kms under both a DLM lock and the
1222                  * ll_inode_size_lock().  If we don't get the
1223                  * ll_inode_size_lock() here we can match the DLM lock and
1224                  * reset i_size from the kms before the truncating path has
1225                  * updated the kms.  generic_file_write can then trust the
1226                  * stale i_size when doing appending writes and effectively
1227                  * cancel the result of the truncate.  Getting the
1228                  * ll_inode_size_lock() after the enqueue maintains the DLM
1229                  * -> ll_inode_size_lock() acquiring order. */
1230                 i_size_write(inode, lvb.lvb_size);
1231                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1232                        inode->i_ino, i_size_read(inode));
1233         }
1234
1235         if (rc == 0) {
1236                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1237                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1238                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1239         }
1240         ll_inode_size_unlock(inode, 1);
1241
1242         RETURN(rc);
1243 }
1244
1245 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1246                      struct lov_stripe_md *lsm, int mode,
1247                      struct lustre_handle *lockh)
1248 {
1249         struct ll_sb_info *sbi = ll_i2sbi(inode);
1250         int rc;
1251         ENTRY;
1252
1253         /* XXX phil: can we do this?  won't it screw the file size up? */
1254         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1255             (sbi->ll_flags & LL_SBI_NOLCK))
1256                 RETURN(0);
1257
1258         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1259
1260         RETURN(rc);
1261 }
1262
1263 static void ll_set_file_contended(struct inode *inode)
1264 {
1265         struct ll_inode_info *lli = ll_i2info(inode);
1266         cfs_time_t now = cfs_time_current();
1267
1268         spin_lock(&lli->lli_lock);
1269         lli->lli_contention_time = now;
1270         lli->lli_flags |= LLIF_CONTENDED;
1271         spin_unlock(&lli->lli_lock);
1272 }
1273
1274 void ll_clear_file_contended(struct inode *inode)
1275 {
1276         struct ll_inode_info *lli = ll_i2info(inode);
1277
1278         spin_lock(&lli->lli_lock);
1279         lli->lli_flags &= ~LLIF_CONTENDED;
1280         spin_unlock(&lli->lli_lock);
1281 }
1282
1283 static int ll_is_file_contended(struct file *file)
1284 {
1285         struct inode *inode = file->f_dentry->d_inode;
1286         struct ll_inode_info *lli = ll_i2info(inode);
1287         struct ll_sb_info *sbi = ll_i2sbi(inode);
1288         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1289         ENTRY;
1290
1291         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1292                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1293                        " osc connect flags = 0x"LPX64"\n",
1294                        sbi->ll_lco.lco_flags);
1295                 RETURN(0);
1296         }
1297         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1298                 RETURN(1);
1299         if (lli->lli_flags & LLIF_CONTENDED) {
1300                 cfs_time_t cur_time = cfs_time_current();
1301                 cfs_time_t retry_time;
1302
1303                 retry_time = cfs_time_add(
1304                         lli->lli_contention_time,
1305                         cfs_time_seconds(sbi->ll_contention_time));
1306                 if (cfs_time_after(cur_time, retry_time)) {
1307                         ll_clear_file_contended(inode);
1308                         RETURN(0);
1309                 }
1310                 RETURN(1);
1311         }
1312         RETURN(0);
1313 }
1314
1315 static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
1316                                  const char *buf, size_t count,
1317                                  loff_t start, loff_t end, int rw)
1318 {
1319         int append;
1320         int tree_locked = 0;
1321         int rc;
1322         struct inode * inode = file->f_dentry->d_inode;
1323         ENTRY;
1324
1325         append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
1326
1327         if (append || !ll_is_file_contended(file)) {
1328                 struct ll_lock_tree_node *node;
1329                 int ast_flags;
1330
1331                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1332                 if (file->f_flags & O_NONBLOCK)
1333                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1334                 node = ll_node_from_inode(inode, start, end,
1335                                           (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
1336                 if (IS_ERR(node)) {
1337                         rc = PTR_ERR(node);
1338                         GOTO(out, rc);
1339                 }
1340                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1341                 rc = ll_tree_lock(tree, node, buf, count, ast_flags);
1342                 if (rc == 0)
1343                         tree_locked = 1;
1344                 else if (rc == -EUSERS)
1345                         ll_set_file_contended(inode);
1346                 else
1347                         GOTO(out, rc);
1348         }
1349         RETURN(tree_locked);
1350 out:
1351         return rc;
1352 }
1353
1354 /**
1355  * Checks if requested extent lock is compatible with a lock under a page.
1356  *
1357  * Checks if the lock under \a page is compatible with a read or write lock
1358  * (specified by \a rw) for an extent [\a start , \a end].
1359  *
1360  * \param page the page under which lock is considered
1361  * \param rw OBD_BRW_READ if requested for reading,
1362  *           OBD_BRW_WRITE if requested for writing
1363  * \param start start of the requested extent
1364  * \param end end of the requested extent
1365  * \param cookie transparent parameter for passing locking context
1366  *
1367  * \post result == 1, *cookie == context, appropriate lock is referenced or
1368  * \post result == 0
1369  *
1370  * \retval 1 owned lock is reused for the request
1371  * \retval 0 no lock reused for the request
1372  *
1373  * \see ll_release_short_lock
1374  */
1375 static int ll_reget_short_lock(struct page *page, int rw,
1376                                obd_off start, obd_off end,
1377                                void **cookie)
1378 {
1379         struct ll_async_page *llap;
1380         struct obd_export *exp;
1381         struct inode *inode = page->mapping->host;
1382
1383         ENTRY;
1384
1385         exp = ll_i2dtexp(inode);
1386         if (exp == NULL)
1387                 RETURN(0);
1388
1389         llap = llap_cast_private(page);
1390         if (llap == NULL)
1391                 RETURN(0);
1392
1393         RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
1394                                     &llap->llap_cookie, rw, start, end,
1395                                     cookie));
1396 }
1397
1398 /**
1399  * Releases a reference to a lock taken in a "fast" way.
1400  *
1401  * Releases a read or a write (specified by \a rw) lock
1402  * referenced by \a cookie.
1403  *
1404  * \param inode inode to which data belong
1405  * \param end end of the locked extent
1406  * \param rw OBD_BRW_READ if requested for reading,
1407  *           OBD_BRW_WRITE if requested for writing
1408  * \param cookie transparent parameter for passing locking context
1409  *
1410  * \post appropriate lock is dereferenced
1411  *
1412  * \see ll_reget_short_lock
1413  */
1414 static void ll_release_short_lock(struct inode *inode, obd_off end,
1415                                   void *cookie, int rw)
1416 {
1417         struct obd_export *exp;
1418         int rc;
1419
1420         exp = ll_i2dtexp(inode);
1421         if (exp == NULL)
1422                 return;
1423
1424         rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
1425                                     cookie, rw);
1426         if (rc < 0)
1427                 CERROR("unlock failed (%d)\n", rc);
1428 }
1429
1430 /**
1431  * Checks if requested extent lock is compatible
1432  * with a lock under a page in page cache.
1433  *
1434  * Checks if a lock under some \a page is compatible with a read or write lock
1435  * (specified by \a rw) for an extent [\a start , \a end].
1436  *
1437  * \param file the file under which lock is considered
1438  * \param rw OBD_BRW_READ if requested for reading,
1439  *           OBD_BRW_WRITE if requested for writing
1440  * \param ppos start of the requested extent
1441  * \param end end of the requested extent
1442  * \param cookie transparent parameter for passing locking context
1443  * \param buf userspace buffer for the data
1444  *
1445  * \post result == 1, *cookie == context, appropriate lock is referenced
1446  * \post retuls == 0
1447  *
1448  * \retval 1 owned lock is reused for the request
1449  * \retval 0 no lock reused for the request
1450  *
1451  * \see ll_file_put_fast_lock
1452  */
1453 static inline int ll_file_get_fast_lock(struct file *file,
1454                                         obd_off ppos, obd_off end,
1455                                         char *buf, void **cookie, int rw)
1456 {
1457         int rc = 0;
1458         struct page *page;
1459
1460         ENTRY;
1461
1462         if (!ll_region_mapped((unsigned long)buf, end - ppos)) {
1463                 page = find_lock_page(file->f_dentry->d_inode->i_mapping,
1464                                       ppos >> CFS_PAGE_SHIFT);
1465                 if (page) {
1466                         if (ll_reget_short_lock(page, rw, ppos, end, cookie))
1467                                 rc = 1;
1468
1469                         unlock_page(page);
1470                         page_cache_release(page);
1471                 }
1472         }
1473
1474         RETURN(rc);
1475 }
1476
1477 /**
1478  * Releases a reference to a lock taken in a "fast" way.
1479  *
1480  * Releases a read or a write (specified by \a rw) lock
1481  * referenced by \a cookie.
1482  *
1483  * \param inode inode to which data belong
1484  * \param end end of the locked extent
1485  * \param rw OBD_BRW_READ if requested for reading,
1486  *           OBD_BRW_WRITE if requested for writing
1487  * \param cookie transparent parameter for passing locking context
1488  *
1489  * \post appropriate lock is dereferenced
1490  *
1491  * \see ll_file_get_fast_lock
1492  */
1493 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
1494                                          void *cookie, int rw)
1495 {
1496         ll_release_short_lock(inode, end, cookie, rw);
1497 }
1498
1499 enum ll_lock_style {
1500         LL_LOCK_STYLE_NOLOCK   = 0,
1501         LL_LOCK_STYLE_FASTLOCK = 1,
1502         LL_LOCK_STYLE_TREELOCK = 2
1503 };
1504
1505 /**
1506  * Checks if requested extent lock is compatible with a lock
1507  * under a page cache page.
1508  *
1509  * Checks if the lock under \a page is compatible with a read or write lock
1510  * (specified by \a rw) for an extent [\a start , \a end].
1511  *
1512  * \param file file under which I/O is processed
1513  * \param rw OBD_BRW_READ if requested for reading,
1514  *           OBD_BRW_WRITE if requested for writing
1515  * \param ppos start of the requested extent
1516  * \param end end of the requested extent
1517  * \param cookie transparent parameter for passing locking context
1518  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1519  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1520  * \param buf userspace buffer for the data
1521  *
1522  * \retval LL_LOCK_STYLE_FASTLOCK owned lock is reused through fast lock
1523  * \retval LL_LOCK_STYLE_TREELOCK got a lock through tree lock
1524  * \retval LL_LOCK_STYLE_NOLOCK got no lock
1525  *
1526  * \see ll_file_put_lock
1527  */
1528 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
1529                                    obd_off end, char *buf, void **cookie,
1530                                    struct ll_lock_tree *tree, int rw)
1531 {
1532         int rc;
1533
1534         ENTRY;
1535
1536         if (ll_file_get_fast_lock(file, ppos, end, buf, cookie, rw))
1537                 RETURN(LL_LOCK_STYLE_FASTLOCK);
1538
1539         rc = ll_file_get_tree_lock(tree, file, buf, ppos - end, ppos, end, rw);
1540         /* rc: 1 for tree lock, 0 for no lock, <0 for error */
1541         switch (rc) {
1542         case 1:
1543                 RETURN(LL_LOCK_STYLE_TREELOCK);
1544         case 0:
1545                 RETURN(LL_LOCK_STYLE_NOLOCK);
1546         }
1547
1548         /* an error happened if we reached this point, rc = -errno here */
1549         RETURN(rc);
1550 }
1551
1552 /**
1553  * Drops the lock taken by ll_file_get_lock.
1554  *
1555  * Releases a read or a write (specified by \a rw) lock
1556  * referenced by \a tree or \a cookie.
1557  *
1558  * \param inode inode to which data belong
1559  * \param end end of the locked extent
1560  * \param lockstyle facility through which the lock was taken
1561  * \param rw OBD_BRW_READ if requested for reading,
1562  *           OBD_BRW_WRITE if requested for writing
1563  * \param cookie transparent parameter for passing locking context
1564  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1565  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1566  *
1567  * \post appropriate lock is dereferenced
1568  *
1569  * \see ll_file_get_lock
1570  */
1571 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
1572                                     enum ll_lock_style lock_style,
1573                                     void *cookie, struct ll_lock_tree *tree,
1574                                     int rw)
1575
1576 {
1577         switch (lock_style) {
1578         case LL_LOCK_STYLE_TREELOCK:
1579                 ll_tree_unlock(tree);
1580                 break;
1581         case LL_LOCK_STYLE_FASTLOCK:
1582                 ll_file_put_fast_lock(inode, end, cookie, rw);
1583                 break;
1584         default:
1585                 CERROR("invalid locking style (%d)\n", lock_style);
1586         }
1587 }
1588
1589 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1590                             loff_t *ppos)
1591 {
1592         struct inode *inode = file->f_dentry->d_inode;
1593         struct ll_inode_info *lli = ll_i2info(inode);
1594         struct lov_stripe_md *lsm = lli->lli_smd;
1595         struct ll_sb_info *sbi = ll_i2sbi(inode);
1596         struct ll_lock_tree tree;
1597         struct ost_lvb lvb;
1598         struct ll_ra_read bead;
1599         int ra = 0;
1600         obd_off end;
1601         ssize_t retval, chunk, sum = 0;
1602         int lock_style;
1603         void *cookie;
1604
1605         __u64 kms;
1606         ENTRY;
1607         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1608                inode->i_ino, inode->i_generation, inode, count, *ppos);
1609         /* "If nbyte is 0, read() will return 0 and have no other results."
1610          *                      -- Single Unix Spec */
1611         if (count == 0)
1612                 RETURN(0);
1613
1614         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1615
1616         if (!lsm) {
1617                 /* Read on file with no objects should return zero-filled
1618                  * buffers up to file size (we can get non-zero sizes with
1619                  * mknod + truncate, then opening file for read. This is a
1620                  * common pattern in NFS case, it seems). Bug 6243 */
1621                 int notzeroed;
1622                 /* Since there are no objects on OSTs, we have nothing to get
1623                  * lock on and so we are forced to access inode->i_size
1624                  * unguarded */
1625
1626                 /* Read beyond end of file */
1627                 if (*ppos >= i_size_read(inode))
1628                         RETURN(0);
1629
1630                 if (count > i_size_read(inode) - *ppos)
1631                         count = i_size_read(inode) - *ppos;
1632                 /* Make sure to correctly adjust the file pos pointer for
1633                  * EFAULT case */
1634                 notzeroed = clear_user(buf, count);
1635                 count -= notzeroed;
1636                 *ppos += count;
1637                 if (!count)
1638                         RETURN(-EFAULT);
1639                 RETURN(count);
1640         }
1641 repeat:
1642         if (sbi->ll_max_rw_chunk != 0) {
1643                 /* first, let's know the end of the current stripe */
1644                 end = *ppos;
1645                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, &end);
1646
1647                 /* correct, the end is beyond the request */
1648                 if (end > *ppos + count - 1)
1649                         end = *ppos + count - 1;
1650
1651                 /* and chunk shouldn't be too large even if striping is wide */
1652                 if (end - *ppos > sbi->ll_max_rw_chunk)
1653                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1654         } else {
1655                 end = *ppos + count - 1;
1656         }
1657
1658         lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
1659                                       buf, &cookie, &tree, OBD_BRW_READ);
1660         if (lock_style < 0)
1661                 GOTO(out, retval = lock_style);
1662
1663         ll_inode_size_lock(inode, 1);
1664         /*
1665          * Consistency guarantees: following possibilities exist for the
1666          * relation between region being read and real file size at this
1667          * moment:
1668          *
1669          *  (A): the region is completely inside of the file;
1670          *
1671          *  (B-x): x bytes of region are inside of the file, the rest is
1672          *  outside;
1673          *
1674          *  (C): the region is completely outside of the file.
1675          *
1676          * This classification is stable under DLM lock acquired by
1677          * ll_tree_lock() above, because to change class, other client has to
1678          * take DLM lock conflicting with our lock. Also, any updates to
1679          * ->i_size by other threads on this client are serialized by
1680          * ll_inode_size_lock(). This guarantees that short reads are handled
1681          * correctly in the face of concurrent writes and truncates.
1682          */
1683         inode_init_lvb(inode, &lvb);
1684         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1685         kms = lvb.lvb_size;
1686         if (*ppos + count - 1 > kms) {
1687                 /* A glimpse is necessary to determine whether we return a
1688                  * short read (B) or some zeroes at the end of the buffer (C) */
1689                 ll_inode_size_unlock(inode, 1);
1690                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1691                 if (retval) {
1692                         if (lock_style != LL_LOCK_STYLE_NOLOCK)
1693                                 ll_file_put_lock(inode, end, lock_style,
1694                                                  cookie, &tree, OBD_BRW_READ);
1695                         goto out;
1696                 }
1697         } else {
1698                 /* region is within kms and, hence, within real file size (A).
1699                  * We need to increase i_size to cover the read region so that
1700                  * generic_file_read() will do its job, but that doesn't mean
1701                  * the kms size is _correct_, it is only the _minimum_ size.
1702                  * If someone does a stat they will get the correct size which
1703                  * will always be >= the kms value here.  b=11081 */
1704                 if (i_size_read(inode) < kms)
1705                         i_size_write(inode, kms);
1706                 ll_inode_size_unlock(inode, 1);
1707         }
1708
1709         chunk = end - *ppos + 1;
1710         CDEBUG(D_INODE,"Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1711                inode->i_ino, chunk, *ppos, i_size_read(inode));
1712
1713         if (lock_style != LL_LOCK_STYLE_NOLOCK) {
1714                 /* turn off the kernel's read-ahead */
1715                 file->f_ra.ra_pages = 0;
1716
1717                 /* initialize read-ahead window once per syscall */
1718                 if (ra == 0) {
1719                         ra = 1;
1720                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1721                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1722                         ll_ra_read_in(file, &bead);
1723                 }
1724
1725                 /* BUG: 5972 */
1726                 file_accessed(file);
1727                 retval = generic_file_read(file, buf, chunk, ppos);
1728                 ll_file_put_lock(inode, end, lock_style, cookie, &tree,
1729                                  OBD_BRW_READ);
1730         } else {
1731                 retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
1732         }
1733
1734         ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
1735
1736         if (retval > 0) {
1737                 buf += retval;
1738                 count -= retval;
1739                 sum += retval;
1740                 if (retval == chunk && count > 0)
1741                         goto repeat;
1742         }
1743
1744  out:
1745         if (ra != 0)
1746                 ll_ra_read_ex(file, &bead);
1747         retval = (sum > 0) ? sum : retval;
1748         RETURN(retval);
1749 }
1750
1751 /*
1752  * Write to a file (through the page cache).
1753  */
1754 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1755                              loff_t *ppos)
1756 {
1757         struct inode *inode = file->f_dentry->d_inode;
1758         struct ll_sb_info *sbi = ll_i2sbi(inode);
1759         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1760         struct ll_lock_tree tree;
1761         loff_t maxbytes = ll_file_maxbytes(inode);
1762         loff_t lock_start, lock_end, end;
1763         ssize_t retval, chunk, sum = 0;
1764         int tree_locked;
1765         ENTRY;
1766
1767         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1768                inode->i_ino, inode->i_generation, inode, count, *ppos);
1769
1770         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1771
1772         /* POSIX, but surprised the VFS doesn't check this already */
1773         if (count == 0)
1774                 RETURN(0);
1775
1776         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1777          * called on the file, don't fail the below assertion (bug 2388). */
1778         if (file->f_flags & O_LOV_DELAY_CREATE &&
1779             ll_i2info(inode)->lli_smd == NULL)
1780                 RETURN(-EBADF);
1781
1782         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1783
1784         down(&ll_i2info(inode)->lli_write_sem);
1785
1786 repeat:
1787         chunk = 0; /* just to fix gcc's warning */
1788         end = *ppos + count - 1;
1789
1790         if (file->f_flags & O_APPEND) {
1791                 lock_start = 0;
1792                 lock_end = OBD_OBJECT_EOF;
1793         } else if (sbi->ll_max_rw_chunk != 0) {
1794                 /* first, let's know the end of the current stripe */
1795                 end = *ppos;
1796                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1797                                 (obd_off *)&end);
1798
1799                 /* correct, the end is beyond the request */
1800                 if (end > *ppos + count - 1)
1801                         end = *ppos + count - 1;
1802
1803                 /* and chunk shouldn't be too large even if striping is wide */
1804                 if (end - *ppos > sbi->ll_max_rw_chunk)
1805                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1806                 lock_start = *ppos;
1807                 lock_end = end;
1808         } else {
1809                 lock_start = *ppos;
1810                 lock_end = *ppos + count - 1;
1811         }
1812
1813         tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
1814                                             lock_start, lock_end, OBD_BRW_WRITE);
1815         if (tree_locked < 0)
1816                 GOTO(out, retval = tree_locked);
1817
1818         /* This is ok, g_f_w will overwrite this under i_sem if it races
1819          * with a local truncate, it just makes our maxbyte checking easier.
1820          * The i_size value gets updated in ll_extent_lock() as a consequence
1821          * of the [0,EOF] extent lock we requested above. */
1822         if (file->f_flags & O_APPEND) {
1823                 *ppos = i_size_read(inode);
1824                 end = *ppos + count - 1;
1825         }
1826
1827         if (*ppos >= maxbytes) {
1828                 send_sig(SIGXFSZ, current, 0);
1829                 GOTO(out_unlock, retval = -EFBIG);
1830         }
1831         if (end > maxbytes - 1)
1832                 end = maxbytes - 1;
1833
1834         /* generic_file_write handles O_APPEND after getting i_mutex */
1835         chunk = end - *ppos + 1;
1836         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1837                inode->i_ino, chunk, *ppos);
1838         if (tree_locked)
1839                 retval = generic_file_write(file, buf, chunk, ppos);
1840         else
1841                 retval = ll_file_lockless_io(file, (char*)buf, chunk,
1842                                              ppos, WRITE);
1843         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1844
1845 out_unlock:
1846         if (tree_locked)
1847                 ll_tree_unlock(&tree);
1848
1849 out:
1850         if (retval > 0) {
1851                 buf += retval;
1852                 count -= retval;
1853                 sum += retval;
1854                 if (retval == chunk && count > 0)
1855                         goto repeat;
1856         }
1857
1858         up(&ll_i2info(inode)->lli_write_sem);
1859
1860         retval = (sum > 0) ? sum : retval;
1861         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1862                            retval > 0 ? retval : 0);
1863         RETURN(retval);
1864 }
1865
1866 /*
1867  * Send file content (through pagecache) somewhere with helper
1868  */
1869 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1870                                 read_actor_t actor, void *target)
1871 {
1872         struct inode *inode = in_file->f_dentry->d_inode;
1873         struct ll_inode_info *lli = ll_i2info(inode);
1874         struct lov_stripe_md *lsm = lli->lli_smd;
1875         struct ll_lock_tree tree;
1876         struct ll_lock_tree_node *node;
1877         struct ost_lvb lvb;
1878         struct ll_ra_read bead;
1879         int rc;
1880         ssize_t retval;
1881         __u64 kms;
1882         ENTRY;
1883         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1884                inode->i_ino, inode->i_generation, inode, count, *ppos);
1885
1886         /* "If nbyte is 0, read() will return 0 and have no other results."
1887          *                      -- Single Unix Spec */
1888         if (count == 0)
1889                 RETURN(0);
1890
1891         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1892         /* turn off the kernel's read-ahead */
1893         in_file->f_ra.ra_pages = 0;
1894
1895         /* File with no objects, nothing to lock */
1896         if (!lsm)
1897                 RETURN(generic_file_sendfile(in_file, ppos,count,actor,target));
1898
1899         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1900         if (IS_ERR(node))
1901                 RETURN(PTR_ERR(node));
1902
1903         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1904         rc = ll_tree_lock(&tree, node, NULL, count,
1905                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1906         if (rc != 0)
1907                 RETURN(rc);
1908
1909         ll_clear_file_contended(inode);
1910         ll_inode_size_lock(inode, 1);
1911         /*
1912          * Consistency guarantees: following possibilities exist for the
1913          * relation between region being read and real file size at this
1914          * moment:
1915          *
1916          *  (A): the region is completely inside of the file;
1917          *
1918          *  (B-x): x bytes of region are inside of the file, the rest is
1919          *  outside;
1920          *
1921          *  (C): the region is completely outside of the file.
1922          *
1923          * This classification is stable under DLM lock acquired by
1924          * ll_tree_lock() above, because to change class, other client has to
1925          * take DLM lock conflicting with our lock. Also, any updates to
1926          * ->i_size by other threads on this client are serialized by
1927          * ll_inode_size_lock(). This guarantees that short reads are handled
1928          * correctly in the face of concurrent writes and truncates.
1929          */
1930         inode_init_lvb(inode, &lvb);
1931         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1932         kms = lvb.lvb_size;
1933         if (*ppos + count - 1 > kms) {
1934                 /* A glimpse is necessary to determine whether we return a
1935                  * short read (B) or some zeroes at the end of the buffer (C) */
1936                 ll_inode_size_unlock(inode, 1);
1937                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1938                 if (retval)
1939                         goto out;
1940         } else {
1941                 /* region is within kms and, hence, within real file size (A) */
1942                 i_size_write(inode, kms);
1943                 ll_inode_size_unlock(inode, 1);
1944         }
1945
1946         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1947                inode->i_ino, count, *ppos, i_size_read(inode));
1948
1949         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1950         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1951         ll_ra_read_in(in_file, &bead);
1952         /* BUG: 5972 */
1953         file_accessed(in_file);
1954         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1955         ll_ra_read_ex(in_file, &bead);
1956
1957  out:
1958         ll_tree_unlock(&tree);
1959         RETURN(retval);
1960 }
1961
1962 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1963                                unsigned long arg)
1964 {
1965         struct ll_inode_info *lli = ll_i2info(inode);
1966         struct obd_export *exp = ll_i2dtexp(inode);
1967         struct ll_recreate_obj ucreatp;
1968         struct obd_trans_info oti = { 0 };
1969         struct obdo *oa = NULL;
1970         int lsm_size;
1971         int rc = 0;
1972         struct lov_stripe_md *lsm, *lsm2;
1973         ENTRY;
1974
1975         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1976                 RETURN(-EPERM);
1977
1978         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1979                             sizeof(struct ll_recreate_obj));
1980         if (rc) {
1981                 RETURN(-EFAULT);
1982         }
1983         OBDO_ALLOC(oa);
1984         if (oa == NULL)
1985                 RETURN(-ENOMEM);
1986
1987         down(&lli->lli_size_sem);
1988         lsm = lli->lli_smd;
1989         if (lsm == NULL)
1990                 GOTO(out, rc = -ENOENT);
1991         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1992                    (lsm->lsm_stripe_count));
1993
1994         OBD_ALLOC(lsm2, lsm_size);
1995         if (lsm2 == NULL)
1996                 GOTO(out, rc = -ENOMEM);
1997
1998         oa->o_id = ucreatp.lrc_id;
1999         oa->o_gr = ucreatp.lrc_group;
2000         oa->o_nlink = ucreatp.lrc_ost_idx;
2001         oa->o_flags |= OBD_FL_RECREATE_OBJS;
2002         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
2003         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2004                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2005
2006         memcpy(lsm2, lsm, lsm_size);
2007         rc = obd_create(exp, oa, &lsm2, &oti);
2008
2009         OBD_FREE(lsm2, lsm_size);
2010         GOTO(out, rc);
2011 out:
2012         up(&lli->lli_size_sem);
2013         OBDO_FREE(oa);
2014         return rc;
2015 }
2016
2017 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
2018                              int flags, struct lov_user_md *lum, int lum_size)
2019 {
2020         struct ll_inode_info *lli = ll_i2info(inode);
2021         struct lov_stripe_md *lsm;
2022         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
2023         int rc = 0;
2024         ENTRY;
2025
2026         down(&lli->lli_size_sem);
2027         lsm = lli->lli_smd;
2028         if (lsm) {
2029                 up(&lli->lli_size_sem);
2030                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
2031                        inode->i_ino);
2032                 RETURN(-EEXIST);
2033         }
2034
2035         rc = ll_intent_file_open(file, lum, lum_size, &oit);
2036         if (rc)
2037                 GOTO(out, rc);
2038         if (it_disposition(&oit, DISP_LOOKUP_NEG))
2039                 GOTO(out_req_free, rc = -ENOENT);
2040         rc = oit.d.lustre.it_status;
2041         if (rc < 0)
2042                 GOTO(out_req_free, rc);
2043
2044         ll_release_openhandle(file->f_dentry, &oit);
2045
2046  out:
2047         up(&lli->lli_size_sem);
2048         ll_intent_release(&oit);
2049         RETURN(rc);
2050 out_req_free:
2051         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2052         goto out;
2053 }
2054
2055 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
2056                              struct lov_mds_md **lmmp, int *lmm_size,
2057                              struct ptlrpc_request **request)
2058 {
2059         struct ll_sb_info *sbi = ll_i2sbi(inode);
2060         struct mdt_body  *body;
2061         struct lov_mds_md *lmm = NULL;
2062         struct ptlrpc_request *req = NULL;
2063         struct obd_capa *oc;
2064         int rc, lmmsize;
2065
2066         rc = ll_get_max_mdsize(sbi, &lmmsize);
2067         if (rc)
2068                 RETURN(rc);
2069
2070         oc = ll_mdscapa_get(inode);
2071         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
2072                              oc, filename, strlen(filename) + 1,
2073                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
2074                              ll_i2suppgid(inode), &req);
2075         capa_put(oc);
2076         if (rc < 0) {
2077                 CDEBUG(D_INFO, "md_getattr_name failed "
2078                        "on %s: rc %d\n", filename, rc);
2079                 GOTO(out, rc);
2080         }
2081
2082         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2083         LASSERT(body != NULL); /* checked by mdc_getattr_name */
2084
2085         lmmsize = body->eadatasize;
2086
2087         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
2088                         lmmsize == 0) {
2089                 GOTO(out, rc = -ENODATA);
2090         }
2091
2092         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
2093         LASSERT(lmm != NULL);
2094
2095         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
2096             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3)) &&
2097             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_JOIN))) {
2098                 GOTO(out, rc = -EPROTO);
2099         }
2100
2101         /*
2102          * This is coming from the MDS, so is probably in
2103          * little endian.  We convert it to host endian before
2104          * passing it to userspace.
2105          */
2106         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
2107                 /* if function called for directory - we should
2108                  * avoid swab not existent lsm objects */
2109                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
2110                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
2111                         if (S_ISREG(body->mode))
2112                                 lustre_swab_lov_user_md_objects(
2113                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
2114                                  ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
2115                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
2116                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
2117                         if (S_ISREG(body->mode))
2118                                 lustre_swab_lov_user_md_objects(
2119                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
2120                                  ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
2121                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_JOIN)) {
2122                         lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
2123                 }
2124         }
2125
2126         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
2127                 struct lov_stripe_md *lsm;
2128                 struct lov_user_md_join *lmj;
2129                 int lmj_size, i, aindex = 0;
2130
2131                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
2132                 if (rc < 0)
2133                         GOTO(out, rc = -ENOMEM);
2134                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
2135                 if (rc)
2136                         GOTO(out_free_memmd, rc);
2137
2138                 lmj_size = sizeof(struct lov_user_md_join) +
2139                            lsm->lsm_stripe_count *
2140                            sizeof(struct lov_user_ost_data_join);
2141                 OBD_ALLOC(lmj, lmj_size);
2142                 if (!lmj)
2143                         GOTO(out_free_memmd, rc = -ENOMEM);
2144
2145                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
2146                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2147                         struct lov_extent *lex =
2148                                 &lsm->lsm_array->lai_ext_array[aindex];
2149
2150                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
2151                                 aindex ++;
2152                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
2153                                         LPU64" len %d\n", aindex, i,
2154                                         lex->le_start, (int)lex->le_len);
2155                         lmj->lmm_objects[i].l_extent_start =
2156                                 lex->le_start;
2157
2158                         if ((int)lex->le_len == -1)
2159                                 lmj->lmm_objects[i].l_extent_end = -1;
2160                         else
2161                                 lmj->lmm_objects[i].l_extent_end =
2162                                         lex->le_start + lex->le_len;
2163                         lmj->lmm_objects[i].l_object_id =
2164                                 lsm->lsm_oinfo[i]->loi_id;
2165                         lmj->lmm_objects[i].l_object_gr =
2166                                 lsm->lsm_oinfo[i]->loi_gr;
2167                         lmj->lmm_objects[i].l_ost_gen =
2168                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2169                         lmj->lmm_objects[i].l_ost_idx =
2170                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2171                 }
2172                 lmm = (struct lov_mds_md *)lmj;
2173                 lmmsize = lmj_size;
2174 out_free_memmd:
2175                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
2176         }
2177 out:
2178         *lmmp = lmm;
2179         *lmm_size = lmmsize;
2180         *request = req;
2181         return rc;
2182 }
2183
2184 static int ll_lov_setea(struct inode *inode, struct file *file,
2185                             unsigned long arg)
2186 {
2187         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2188         struct lov_user_md  *lump;
2189         int lum_size = sizeof(struct lov_user_md) +
2190                        sizeof(struct lov_user_ost_data);
2191         int rc;
2192         ENTRY;
2193
2194         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2195                 RETURN(-EPERM);
2196
2197         OBD_ALLOC(lump, lum_size);
2198         if (lump == NULL) {
2199                 RETURN(-ENOMEM);
2200         }
2201         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2202         if (rc) {
2203                 OBD_FREE(lump, lum_size);
2204                 RETURN(-EFAULT);
2205         }
2206
2207         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2208
2209         OBD_FREE(lump, lum_size);
2210         RETURN(rc);
2211 }
2212
2213 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2214                             unsigned long arg)
2215 {
2216         struct lov_user_md_v3 lumv3;
2217         struct lov_user_md_v1 *lumv1 = (struct lov_user_md_v1 *)&lumv3;
2218         struct lov_user_md_v1 *lumv1p = (struct lov_user_md_v1 *)arg;
2219         struct lov_user_md_v3 *lumv3p = (struct lov_user_md_v3 *)arg;
2220         int lum_size;
2221         int rc;
2222         int flags = FMODE_WRITE;
2223         ENTRY;
2224
2225         /* first try with v1 which is smaller than v3 */
2226         lum_size = sizeof(struct lov_user_md_v1);
2227         rc = copy_from_user(lumv1, lumv1p, lum_size);
2228         if (rc)
2229                 RETURN(-EFAULT);
2230
2231         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
2232                 lum_size = sizeof(struct lov_user_md_v3);
2233                 rc = copy_from_user(&lumv3, lumv3p, lum_size);
2234                 if (rc)
2235                         RETURN(-EFAULT);
2236         }
2237
2238         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
2239         if (rc == 0) {
2240                  put_user(0, &lumv1p->lmm_stripe_count);
2241                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
2242                                     0, ll_i2info(inode)->lli_smd,
2243                                     (void *)arg);
2244         }
2245         RETURN(rc);
2246 }
2247
2248 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2249 {
2250         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2251
2252         if (!lsm)
2253                 RETURN(-ENODATA);
2254
2255         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
2256                             (void *)arg);
2257 }
2258
2259 static int ll_get_grouplock(struct inode *inode, struct file *file,
2260                             unsigned long arg)
2261 {
2262         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2263         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2264                                                     .end = OBD_OBJECT_EOF}};
2265         struct lustre_handle lockh = { 0 };
2266         struct ll_inode_info *lli = ll_i2info(inode);
2267         struct lov_stripe_md *lsm = lli->lli_smd;
2268         int flags = 0, rc;
2269         ENTRY;
2270
2271         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2272                 RETURN(-EINVAL);
2273         }
2274
2275         policy.l_extent.gid = arg;
2276         if (file->f_flags & O_NONBLOCK)
2277                 flags = LDLM_FL_BLOCK_NOWAIT;
2278
2279         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2280         if (rc)
2281                 RETURN(rc);
2282
2283         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2284         fd->fd_gid = arg;
2285         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2286
2287         RETURN(0);
2288 }
2289
2290 static int ll_put_grouplock(struct inode *inode, struct file *file,
2291                             unsigned long arg)
2292 {
2293         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2294         struct ll_inode_info *lli = ll_i2info(inode);
2295         struct lov_stripe_md *lsm = lli->lli_smd;
2296         int rc;
2297         ENTRY;
2298
2299         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2300                 /* Ugh, it's already unlocked. */
2301                 RETURN(-EINVAL);
2302         }
2303
2304         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2305                 RETURN(-EINVAL);
2306
2307         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2308
2309         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2310         if (rc)
2311                 RETURN(rc);
2312
2313         fd->fd_gid = 0;
2314         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2315
2316         RETURN(0);
2317 }
2318
2319 #if LUSTRE_FIX >= 50
2320 static int join_sanity_check(struct inode *head, struct inode *tail)
2321 {
2322         ENTRY;
2323         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2324                 CERROR("server do not support join \n");
2325                 RETURN(-EINVAL);
2326         }
2327         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2328                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2329                        head->i_ino, tail->i_ino);
2330                 RETURN(-EINVAL);
2331         }
2332         if (head->i_ino == tail->i_ino) {
2333                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2334                 RETURN(-EINVAL);
2335         }
2336         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2337                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2338                 RETURN(-EINVAL);
2339         }
2340         RETURN(0);
2341 }
2342
2343 static int join_file(struct inode *head_inode, struct file *head_filp,
2344                      struct file *tail_filp)
2345 {
2346         struct dentry *tail_dentry = tail_filp->f_dentry;
2347         struct lookup_intent oit = {.it_op = IT_OPEN,
2348                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2349         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2350                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2351
2352         struct lustre_handle lockh;
2353         struct md_op_data *op_data;
2354         int    rc;
2355         loff_t data;
2356         ENTRY;
2357
2358         tail_dentry = tail_filp->f_dentry;
2359
2360         data = i_size_read(head_inode);
2361         op_data = ll_prep_md_op_data(NULL, head_inode,
2362                                      tail_dentry->d_parent->d_inode,
2363                                      tail_dentry->d_name.name,
2364                                      tail_dentry->d_name.len, 0,
2365                                      LUSTRE_OPC_ANY, &data);
2366         if (IS_ERR(op_data))
2367                 RETURN(PTR_ERR(op_data));
2368
2369         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit,
2370                          op_data, &lockh, NULL, 0, NULL, 0);
2371
2372         ll_finish_md_op_data(op_data);
2373         if (rc < 0)
2374                 GOTO(out, rc);
2375
2376         rc = oit.d.lustre.it_status;
2377
2378         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2379                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2380                 ptlrpc_req_finished((struct ptlrpc_request *)
2381                                     oit.d.lustre.it_data);
2382                 GOTO(out, rc);
2383         }
2384
2385         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2386                                            * away */
2387                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2388                 oit.d.lustre.it_lock_mode = 0;
2389         }
2390         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2391         it_clear_disposition(&oit, DISP_ENQ_COMPLETE);
2392         ll_release_openhandle(head_filp->f_dentry, &oit);
2393 out:
2394         ll_intent_release(&oit);
2395         RETURN(rc);
2396 }
2397
2398 static int ll_file_join(struct inode *head, struct file *filp,
2399                         char *filename_tail)
2400 {
2401         struct inode *tail = NULL, *first = NULL, *second = NULL;
2402         struct dentry *tail_dentry;
2403         struct file *tail_filp, *first_filp, *second_filp;
2404         struct ll_lock_tree first_tree, second_tree;
2405         struct ll_lock_tree_node *first_node, *second_node;
2406         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2407         int rc = 0, cleanup_phase = 0;
2408         ENTRY;
2409
2410         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2411                head->i_ino, head->i_generation, head, filename_tail);
2412
2413         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2414         if (IS_ERR(tail_filp)) {
2415                 CERROR("Can not open tail file %s", filename_tail);
2416                 rc = PTR_ERR(tail_filp);
2417                 GOTO(cleanup, rc);
2418         }
2419         tail = igrab(tail_filp->f_dentry->d_inode);
2420
2421         tlli = ll_i2info(tail);
2422         tail_dentry = tail_filp->f_dentry;
2423         LASSERT(tail_dentry);
2424         cleanup_phase = 1;
2425
2426         /*reorder the inode for lock sequence*/
2427         first = head->i_ino > tail->i_ino ? head : tail;
2428         second = head->i_ino > tail->i_ino ? tail : head;
2429         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2430         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2431
2432         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2433                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2434         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2435         if (IS_ERR(first_node)){
2436                 rc = PTR_ERR(first_node);
2437                 GOTO(cleanup, rc);
2438         }
2439         first_tree.lt_fd = first_filp->private_data;
2440         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2441         if (rc != 0)
2442                 GOTO(cleanup, rc);
2443         cleanup_phase = 2;
2444
2445         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2446         if (IS_ERR(second_node)){
2447                 rc = PTR_ERR(second_node);
2448                 GOTO(cleanup, rc);
2449         }
2450         second_tree.lt_fd = second_filp->private_data;
2451         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2452         if (rc != 0)
2453                 GOTO(cleanup, rc);
2454         cleanup_phase = 3;
2455
2456         rc = join_sanity_check(head, tail);
2457         if (rc)
2458                 GOTO(cleanup, rc);
2459
2460         rc = join_file(head, filp, tail_filp);
2461         if (rc)
2462                 GOTO(cleanup, rc);
2463 cleanup:
2464         switch (cleanup_phase) {
2465         case 3:
2466                 ll_tree_unlock(&second_tree);
2467                 obd_cancel_unused(ll_i2dtexp(second),
2468                                   ll_i2info(second)->lli_smd, 0, NULL);
2469         case 2:
2470                 ll_tree_unlock(&first_tree);
2471                 obd_cancel_unused(ll_i2dtexp(first),
2472                                   ll_i2info(first)->lli_smd, 0, NULL);
2473         case 1:
2474                 filp_close(tail_filp, 0);
2475                 if (tail)
2476                         iput(tail);
2477                 if (head && rc == 0) {
2478                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2479                                        &hlli->lli_smd);
2480                         hlli->lli_smd = NULL;
2481                 }
2482         case 0:
2483                 break;
2484         default:
2485                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2486                 LBUG();
2487         }
2488         RETURN(rc);
2489 }
2490 #endif /* LUSTRE_FIX >= 50 */
2491
2492 /**
2493  * Close inode open handle
2494  *
2495  * \param dentry [in]     dentry which contains the inode
2496  * \param it     [in,out] intent which contains open info and result
2497  *
2498  * \retval 0     success
2499  * \retval <0    failure
2500  */
2501 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2502 {
2503         struct inode *inode = dentry->d_inode;
2504         struct obd_client_handle *och;
2505         int rc;
2506         ENTRY;
2507
2508         LASSERT(inode);
2509
2510         /* Root ? Do nothing. */
2511         if (dentry->d_inode->i_sb->s_root == dentry)
2512                 RETURN(0);
2513
2514         /* No open handle to close? Move away */
2515         if (!it_disposition(it, DISP_OPEN_OPEN))
2516                 RETURN(0);
2517
2518         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2519
2520         OBD_ALLOC(och, sizeof(*och));
2521         if (!och)
2522                 GOTO(out, rc = -ENOMEM);
2523
2524         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2525                     ll_i2info(inode), it, och);
2526
2527         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2528                                        inode, och);
2529  out:
2530         /* this one is in place of ll_file_open */
2531         if (it_disposition(it, DISP_ENQ_OPEN_REF))
2532                 ptlrpc_req_finished(it->d.lustre.it_data);
2533         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2534         RETURN(rc);
2535 }
2536
2537 /**
2538  * Get size for inode for which FIEMAP mapping is requested.
2539  * Make the FIEMAP get_info call and returns the result.
2540  */
2541 int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
2542               int num_bytes)
2543 {
2544         struct obd_export *exp = ll_i2dtexp(inode);
2545         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2546         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
2547         int vallen = num_bytes;
2548         int rc;
2549         ENTRY;
2550
2551         /* If the stripe_count > 1 and the application does not understand
2552          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
2553          */
2554         if (lsm->lsm_stripe_count > 1 &&
2555             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
2556                 return -EOPNOTSUPP;
2557
2558         fm_key.oa.o_id = lsm->lsm_object_id;
2559         fm_key.oa.o_gr = lsm->lsm_object_gr;
2560         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2561
2562         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLFID | OBD_MD_FLGROUP |
2563                         OBD_MD_FLSIZE);
2564
2565         /* If filesize is 0, then there would be no objects for mapping */
2566         if (fm_key.oa.o_size == 0) {
2567                 fiemap->fm_mapped_extents = 0;
2568                 RETURN(0);
2569         }
2570
2571         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
2572
2573         rc = obd_get_info(exp, sizeof(fm_key), &fm_key, &vallen, fiemap, lsm);
2574         if (rc)
2575                 CERROR("obd_get_info failed: rc = %d\n", rc);
2576
2577         RETURN(rc);
2578 }
2579
2580 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2581                   unsigned long arg)
2582 {
2583         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2584         int flags;
2585         ENTRY;
2586
2587         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2588                inode->i_generation, inode, cmd);
2589         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2590
2591         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2592         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2593                 RETURN(-ENOTTY);
2594
2595         switch(cmd) {
2596         case LL_IOC_GETFLAGS:
2597                 /* Get the current value of the file flags */
2598                 return put_user(fd->fd_flags, (int *)arg);
2599         case LL_IOC_SETFLAGS:
2600         case LL_IOC_CLRFLAGS:
2601                 /* Set or clear specific file flags */
2602                 /* XXX This probably needs checks to ensure the flags are
2603                  *     not abused, and to handle any flag side effects.
2604                  */
2605                 if (get_user(flags, (int *) arg))
2606                         RETURN(-EFAULT);
2607
2608                 if (cmd == LL_IOC_SETFLAGS) {
2609                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2610                             !(file->f_flags & O_DIRECT)) {
2611                                 CERROR("%s: unable to disable locking on "
2612                                        "non-O_DIRECT file\n", current->comm);
2613                                 RETURN(-EINVAL);
2614                         }
2615
2616                         fd->fd_flags |= flags;
2617                 } else {
2618                         fd->fd_flags &= ~flags;
2619                 }
2620                 RETURN(0);
2621         case LL_IOC_LOV_SETSTRIPE:
2622                 RETURN(ll_lov_setstripe(inode, file, arg));
2623         case LL_IOC_LOV_SETEA:
2624                 RETURN(ll_lov_setea(inode, file, arg));
2625         case LL_IOC_LOV_GETSTRIPE:
2626                 RETURN(ll_lov_getstripe(inode, arg));
2627         case LL_IOC_RECREATE_OBJ:
2628                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2629         case EXT3_IOC_FIEMAP: {
2630                 struct ll_user_fiemap *fiemap_s;
2631                 size_t num_bytes, ret_bytes;
2632                 unsigned int extent_count;
2633                 int rc = 0;
2634
2635                 /* Get the extent count so we can calculate the size of
2636                  * required fiemap buffer */
2637                 if (get_user(extent_count,
2638                     &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
2639                         RETURN(-EFAULT);
2640                 num_bytes = sizeof(*fiemap_s) + (extent_count *
2641                                                  sizeof(struct ll_fiemap_extent));
2642                 OBD_VMALLOC(fiemap_s, num_bytes);
2643                 if (fiemap_s == NULL)
2644                         RETURN(-ENOMEM);
2645
2646                 if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
2647                                    sizeof(*fiemap_s)))
2648                         GOTO(error, rc = -EFAULT);
2649
2650                 if (fiemap_s->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
2651                         fiemap_s->fm_flags = fiemap_s->fm_flags &
2652                                                     ~LUSTRE_FIEMAP_FLAGS_COMPAT;
2653                         if (copy_to_user((char *)arg, fiemap_s,
2654                                          sizeof(*fiemap_s)))
2655                                 GOTO(error, rc = -EFAULT);
2656
2657                         GOTO(error, rc = -EBADR);
2658                 }
2659
2660                 /* If fm_extent_count is non-zero, read the first extent since
2661                  * it is used to calculate end_offset and device from previous
2662                  * fiemap call. */
2663                 if (extent_count) {
2664                         if (copy_from_user(&fiemap_s->fm_extents[0],
2665                             (char __user *)arg + sizeof(*fiemap_s),
2666                             sizeof(struct ll_fiemap_extent)))
2667                                 GOTO(error, rc = -EFAULT);
2668                 }
2669
2670                 if (fiemap_s->fm_flags & FIEMAP_FLAG_SYNC) {
2671                         int rc;
2672
2673                         rc = filemap_fdatawrite(inode->i_mapping);
2674                         if (rc)
2675                                 GOTO(error, rc);
2676                 }
2677
2678                 rc = ll_fiemap(inode, fiemap_s, num_bytes);
2679                 if (rc)
2680                         GOTO(error, rc);
2681
2682                 ret_bytes = sizeof(struct ll_user_fiemap);
2683
2684                 if (extent_count != 0)
2685                         ret_bytes += (fiemap_s->fm_mapped_extents *
2686                                          sizeof(struct ll_fiemap_extent));
2687
2688                 if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
2689                         rc = -EFAULT;
2690
2691 error:
2692                 OBD_VFREE(fiemap_s, num_bytes);
2693                 RETURN(rc);
2694         }
2695         case EXT3_IOC_GETFLAGS:
2696         case EXT3_IOC_SETFLAGS:
2697                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2698         case EXT3_IOC_GETVERSION_OLD:
2699         case EXT3_IOC_GETVERSION:
2700                 RETURN(put_user(inode->i_generation, (int *)arg));
2701         case LL_IOC_JOIN: {
2702 #if LUSTRE_FIX >= 50
2703                 /* Allow file join in beta builds to allow debuggging */
2704                 char *ftail;
2705                 int rc;
2706
2707                 ftail = getname((const char *)arg);
2708                 if (IS_ERR(ftail))
2709                         RETURN(PTR_ERR(ftail));
2710                 rc = ll_file_join(inode, file, ftail);
2711                 putname(ftail);
2712                 RETURN(rc);
2713 #else
2714                 CWARN("file join is not supported in this version of Lustre\n");
2715                 RETURN(-ENOTTY);
2716 #endif
2717         }
2718         case LL_IOC_GROUP_LOCK:
2719                 RETURN(ll_get_grouplock(inode, file, arg));
2720         case LL_IOC_GROUP_UNLOCK:
2721                 RETURN(ll_put_grouplock(inode, file, arg));
2722         case IOC_OBD_STATFS:
2723                 RETURN(ll_obd_statfs(inode, (void *)arg));
2724
2725         /* We need to special case any other ioctls we want to handle,
2726          * to send them to the MDS/OST as appropriate and to properly
2727          * network encode the arg field.
2728         case EXT3_IOC_SETVERSION_OLD:
2729         case EXT3_IOC_SETVERSION:
2730         */
2731         case LL_IOC_FLUSHCTX:
2732                 RETURN(ll_flush_ctx(inode));
2733         default: {
2734                 int err;
2735
2736                 if (LLIOC_STOP ==
2737                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2738                         RETURN(err);
2739
2740                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2741                                      (void *)arg));
2742         }
2743         }
2744 }
2745
2746 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2747 {
2748         struct inode *inode = file->f_dentry->d_inode;
2749         struct ll_inode_info *lli = ll_i2info(inode);
2750         struct lov_stripe_md *lsm = lli->lli_smd;
2751         loff_t retval;
2752         ENTRY;
2753         retval = offset + ((origin == 2) ? i_size_read(inode) :
2754                            (origin == 1) ? file->f_pos : 0);
2755         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2756                inode->i_ino, inode->i_generation, inode, retval, retval,
2757                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2758         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2759
2760         if (origin == 2) { /* SEEK_END */
2761                 int nonblock = 0, rc;
2762
2763                 if (file->f_flags & O_NONBLOCK)
2764                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2765
2766                 if (lsm != NULL) {
2767                         rc = ll_glimpse_size(inode, nonblock);
2768                         if (rc != 0)
2769                                 RETURN(rc);
2770                 }
2771
2772                 ll_inode_size_lock(inode, 0);
2773                 offset += i_size_read(inode);
2774                 ll_inode_size_unlock(inode, 0);
2775         } else if (origin == 1) { /* SEEK_CUR */
2776                 offset += file->f_pos;
2777         }
2778
2779         retval = -EINVAL;
2780         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2781                 if (offset != file->f_pos) {
2782                         file->f_pos = offset;
2783                 }
2784                 retval = offset;
2785         }
2786
2787         RETURN(retval);
2788 }
2789
2790 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2791 {
2792         struct inode *inode = dentry->d_inode;
2793         struct ll_inode_info *lli = ll_i2info(inode);
2794         struct lov_stripe_md *lsm = lli->lli_smd;
2795         struct ptlrpc_request *req;
2796         struct obd_capa *oc;
2797         int rc, err;
2798         ENTRY;
2799         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2800                inode->i_generation, inode);
2801         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2802
2803         /* fsync's caller has already called _fdata{sync,write}, we want
2804          * that IO to finish before calling the osc and mdc sync methods */
2805         rc = filemap_fdatawait(inode->i_mapping);
2806
2807         /* catch async errors that were recorded back when async writeback
2808          * failed for pages in this mapping. */
2809         err = lli->lli_async_rc;
2810         lli->lli_async_rc = 0;
2811         if (rc == 0)
2812                 rc = err;
2813         if (lsm) {
2814                 err = lov_test_and_clear_async_rc(lsm);
2815                 if (rc == 0)
2816                         rc = err;
2817         }
2818
2819         oc = ll_mdscapa_get(inode);
2820         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2821                       &req);
2822         capa_put(oc);
2823         if (!rc)
2824                 rc = err;
2825         if (!err)
2826                 ptlrpc_req_finished(req);
2827
2828         if (data && lsm) {
2829                 struct obdo *oa;
2830
2831                 OBDO_ALLOC(oa);
2832                 if (!oa)
2833                         RETURN(rc ? rc : -ENOMEM);
2834
2835                 oa->o_id = lsm->lsm_object_id;
2836                 oa->o_gr = lsm->lsm_object_gr;
2837                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2838                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2839                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2840                                            OBD_MD_FLGROUP);
2841
2842                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2843                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2844                                0, OBD_OBJECT_EOF, oc);
2845                 capa_put(oc);
2846                 if (!rc)
2847                         rc = err;
2848                 OBDO_FREE(oa);
2849         }
2850
2851         RETURN(rc);
2852 }
2853
2854 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2855 {
2856         struct inode *inode = file->f_dentry->d_inode;
2857         struct ll_sb_info *sbi = ll_i2sbi(inode);
2858         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
2859                                            .ei_cb_cp =ldlm_flock_completion_ast,
2860                                            .ei_cbdata = file_lock };
2861         struct md_op_data *op_data;
2862         struct lustre_handle lockh = {0};
2863         ldlm_policy_data_t flock;
2864         int flags = 0;
2865         int rc;
2866         ENTRY;
2867
2868         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2869                inode->i_ino, file_lock);
2870
2871         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2872
2873         if (file_lock->fl_flags & FL_FLOCK) {
2874                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2875                 /* set missing params for flock() calls */
2876                 file_lock->fl_end = OFFSET_MAX;
2877                 file_lock->fl_pid = current->tgid;
2878         }
2879         flock.l_flock.pid = file_lock->fl_pid;
2880         flock.l_flock.start = file_lock->fl_start;
2881         flock.l_flock.end = file_lock->fl_end;
2882
2883         switch (file_lock->fl_type) {
2884         case F_RDLCK:
2885                 einfo.ei_mode = LCK_PR;
2886                 break;
2887         case F_UNLCK:
2888                 /* An unlock request may or may not have any relation to
2889                  * existing locks so we may not be able to pass a lock handle
2890                  * via a normal ldlm_lock_cancel() request. The request may even
2891                  * unlock a byte range in the middle of an existing lock. In
2892                  * order to process an unlock request we need all of the same
2893                  * information that is given with a normal read or write record
2894                  * lock request. To avoid creating another ldlm unlock (cancel)
2895                  * message we'll treat a LCK_NL flock request as an unlock. */
2896                 einfo.ei_mode = LCK_NL;
2897                 break;
2898         case F_WRLCK:
2899                 einfo.ei_mode = LCK_PW;
2900                 break;
2901         default:
2902                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2903                 LBUG();
2904         }
2905
2906         switch (cmd) {
2907         case F_SETLKW:
2908 #ifdef F_SETLKW64
2909         case F_SETLKW64:
2910 #endif
2911                 flags = 0;
2912                 break;
2913         case F_SETLK:
2914 #ifdef F_SETLK64
2915         case F_SETLK64:
2916 #endif
2917                 flags = LDLM_FL_BLOCK_NOWAIT;
2918                 break;
2919         case F_GETLK:
2920 #ifdef F_GETLK64
2921         case F_GETLK64:
2922 #endif
2923                 flags = LDLM_FL_TEST_LOCK;
2924                 /* Save the old mode so that if the mode in the lock changes we
2925                  * can decrement the appropriate reader or writer refcount. */
2926                 file_lock->fl_type = einfo.ei_mode;
2927                 break;
2928         default:
2929                 CERROR("unknown fcntl lock command: %d\n", cmd);
2930                 LBUG();
2931         }
2932
2933         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2934                                      LUSTRE_OPC_ANY, NULL);
2935         if (IS_ERR(op_data))
2936                 RETURN(PTR_ERR(op_data));
2937
2938         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2939                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2940                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2941
2942         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2943                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2944
2945         ll_finish_md_op_data(op_data);
2946
2947         if ((file_lock->fl_flags & FL_FLOCK) &&
2948             (rc == 0 || file_lock->fl_type == F_UNLCK))
2949                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2950 #ifdef HAVE_F_OP_FLOCK
2951         if ((file_lock->fl_flags & FL_POSIX) &&
2952             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2953             !(flags & LDLM_FL_TEST_LOCK))
2954                 posix_lock_file_wait(file, file_lock);
2955 #endif
2956
2957         RETURN(rc);
2958 }
2959
2960 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2961 {
2962         ENTRY;
2963
2964         RETURN(-ENOSYS);
2965 }
2966
2967 int ll_have_md_lock(struct inode *inode, __u64 bits)
2968 {
2969         struct lustre_handle lockh;
2970         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2971         struct lu_fid *fid;
2972         int flags;
2973         ENTRY;
2974
2975         if (!inode)
2976                RETURN(0);
2977
2978         fid = &ll_i2info(inode)->lli_fid;
2979         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2980
2981         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2982         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2983                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2984                 RETURN(1);
2985         }
2986         RETURN(0);
2987 }
2988
2989 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2990                             struct lustre_handle *lockh)
2991 {
2992         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2993         struct lu_fid *fid;
2994         ldlm_mode_t rc;
2995         int flags;
2996         ENTRY;
2997
2998         fid = &ll_i2info(inode)->lli_fid;
2999         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
3000
3001         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
3002         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
3003                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
3004         RETURN(rc);
3005 }
3006
3007 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
3008         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
3009                               * and return success */
3010                 inode->i_nlink = 0;
3011                 /* This path cannot be hit for regular files unless in
3012                  * case of obscure races, so no need to to validate
3013                  * size. */
3014                 if (!S_ISREG(inode->i_mode) &&
3015                     !S_ISDIR(inode->i_mode))
3016                         return 0;
3017         }
3018
3019         if (rc) {
3020                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
3021                 return -abs(rc);
3022
3023         }
3024
3025         return 0;
3026 }
3027
3028 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
3029 {
3030         struct inode *inode = dentry->d_inode;
3031         struct ptlrpc_request *req = NULL;
3032         struct ll_sb_info *sbi;
3033         struct obd_export *exp;
3034         int rc;
3035         ENTRY;
3036
3037         if (!inode) {
3038                 CERROR("REPORT THIS LINE TO PETER\n");
3039                 RETURN(0);
3040         }
3041         sbi = ll_i2sbi(inode);
3042
3043         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
3044                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
3045
3046         exp = ll_i2mdexp(inode);
3047
3048         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
3049                 struct lookup_intent oit = { .it_op = IT_GETATTR };
3050                 struct md_op_data *op_data;
3051
3052                 /* Call getattr by fid, so do not provide name at all. */
3053                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
3054                                              dentry->d_inode, NULL, 0, 0,
3055                                              LUSTRE_OPC_ANY, NULL);
3056                 if (IS_ERR(op_data))
3057                         RETURN(PTR_ERR(op_data));
3058
3059                 oit.it_flags |= O_CHECK_STALE;
3060                 rc = md_intent_lock(exp, op_data, NULL, 0,
3061                                     /* we are not interested in name
3062                                        based lookup */
3063                                     &oit, 0, &req,
3064                                     ll_md_blocking_ast, 0);
3065                 ll_finish_md_op_data(op_data);
3066                 oit.it_flags &= ~O_CHECK_STALE;
3067                 if (rc < 0) {
3068                         rc = ll_inode_revalidate_fini(inode, rc);
3069                         GOTO (out, rc);
3070                 }
3071
3072                 rc = ll_revalidate_it_finish(req, &oit, dentry);
3073                 if (rc != 0) {
3074                         ll_intent_release(&oit);
3075                         GOTO(out, rc);
3076                 }
3077
3078                 /* Unlinked? Unhash dentry, so it is not picked up later by
3079                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
3080                    here to preserve get_cwd functionality on 2.6.
3081                    Bug 10503 */
3082                 if (!dentry->d_inode->i_nlink) {
3083                         spin_lock(&ll_lookup_lock);
3084                         spin_lock(&dcache_lock);
3085                         ll_drop_dentry(dentry);
3086                         spin_unlock(&dcache_lock);
3087                         spin_unlock(&ll_lookup_lock);
3088                 }
3089
3090                 ll_lookup_finish_locks(&oit, dentry);
3091         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
3092                                                      MDS_INODELOCK_LOOKUP)) {
3093                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
3094                 obd_valid valid = OBD_MD_FLGETATTR;
3095                 struct obd_capa *oc;
3096                 int ealen = 0;
3097
3098                 if (S_ISREG(inode->i_mode)) {
3099                         rc = ll_get_max_mdsize(sbi, &ealen);
3100                         if (rc)
3101                                 RETURN(rc);
3102                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
3103                 }
3104                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
3105                  * capa for this inode. Because we only keep capas of dirs
3106                  * fresh. */
3107                 oc = ll_mdscapa_get(inode);
3108                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
3109                                 ealen, &req);
3110                 capa_put(oc);
3111                 if (rc) {
3112                         rc = ll_inode_revalidate_fini(inode, rc);
3113                         RETURN(rc);
3114                 }
3115
3116                 rc = ll_prep_inode(&inode, req, NULL);
3117                 if (rc)
3118                         GOTO(out, rc);
3119         }
3120
3121         /* if object not yet allocated, don't validate size */
3122         if (ll_i2info(inode)->lli_smd == NULL)
3123                 GOTO(out, rc = 0);
3124
3125         /* ll_glimpse_size will prefer locally cached writes if they extend
3126          * the file */
3127         rc = ll_glimpse_size(inode, 0);
3128         EXIT;
3129 out:
3130         ptlrpc_req_finished(req);
3131         return rc;
3132 }
3133
3134 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
3135                   struct lookup_intent *it, struct kstat *stat)
3136 {
3137         struct inode *inode = de->d_inode;
3138         int res = 0;
3139
3140         res = ll_inode_revalidate_it(de, it);
3141         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
3142
3143         if (res)
3144                 return res;
3145
3146         stat->dev = inode->i_sb->s_dev;
3147         stat->ino = inode->i_ino;
3148         stat->mode = inode->i_mode;
3149         stat->nlink = inode->i_nlink;
3150         stat->uid = inode->i_uid;
3151         stat->gid = inode->i_gid;
3152         stat->rdev = kdev_t_to_nr(inode->i_rdev);
3153         stat->atime = inode->i_atime;
3154         stat->mtime = inode->i_mtime;
3155         stat->ctime = inode->i_ctime;
3156 #ifdef HAVE_INODE_BLKSIZE
3157         stat->blksize = inode->i_blksize;
3158 #else
3159         stat->blksize = 1 << inode->i_blkbits;
3160 #endif
3161
3162         ll_inode_size_lock(inode, 0);
3163         stat->size = i_size_read(inode);
3164         stat->blocks = inode->i_blocks;
3165         ll_inode_size_unlock(inode, 0);
3166
3167         return 0;
3168 }
3169 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3170 {
3171         struct lookup_intent it = { .it_op = IT_GETATTR };
3172
3173         return ll_getattr_it(mnt, de, &it, stat);
3174 }
3175
3176 static
3177 int lustre_check_acl(struct inode *inode, int mask)
3178 {
3179 #ifdef CONFIG_FS_POSIX_ACL
3180         struct ll_inode_info *lli = ll_i2info(inode);
3181         struct posix_acl *acl;
3182         int rc;
3183         ENTRY;
3184
3185         spin_lock(&lli->lli_lock);
3186         acl = posix_acl_dup(lli->lli_posix_acl);
3187         spin_unlock(&lli->lli_lock);
3188
3189         if (!acl)
3190                 RETURN(-EAGAIN);
3191
3192         rc = posix_acl_permission(inode, acl, mask);
3193         posix_acl_release(acl);
3194
3195         RETURN(rc);
3196 #else
3197         return -EAGAIN;
3198 #endif
3199 }
3200
3201 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
3202 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3203 {
3204         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3205                inode->i_ino, inode->i_generation, inode, mask);
3206         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3207                 return lustre_check_remote_perm(inode, mask);
3208
3209         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3210         return generic_permission(inode, mask, lustre_check_acl);
3211 }
3212 #else
3213 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3214 {
3215         int mode = inode->i_mode;
3216         int rc;
3217
3218         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3219                inode->i_ino, inode->i_generation, inode, mask);
3220
3221         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3222                 return lustre_check_remote_perm(inode, mask);
3223
3224         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3225
3226         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
3227             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
3228                 return -EROFS;
3229         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
3230                 return -EACCES;
3231         if (current->fsuid == inode->i_uid) {
3232                 mode >>= 6;
3233         } else if (1) {
3234                 if (((mode >> 3) & mask & S_IRWXO) != mask)
3235                         goto check_groups;
3236                 rc = lustre_check_acl(inode, mask);
3237                 if (rc == -EAGAIN)
3238                         goto check_groups;
3239                 if (rc == -EACCES)
3240                         goto check_capabilities;
3241                 return rc;
3242         } else {
3243 check_groups:
3244                 if (in_group_p(inode->i_gid))
3245                         mode >>= 3;
3246         }
3247         if ((mode & mask & S_IRWXO) == mask)
3248                 return 0;
3249
3250 check_capabilities:
3251         if (!(mask & MAY_EXEC) ||
3252             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
3253                 if (cfs_capable(CFS_CAP_DAC_OVERRIDE))
3254                         return 0;
3255
3256         if (cfs_capable(CFS_CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
3257             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
3258                 return 0;
3259
3260         return -EACCES;
3261 }
3262 #endif
3263
3264 /* -o localflock - only provides locally consistent flock locks */
3265 struct file_operations ll_file_operations = {
3266         .read           = ll_file_read,
3267         .write          = ll_file_write,
3268         .ioctl          = ll_file_ioctl,
3269         .open           = ll_file_open,
3270         .release        = ll_file_release,
3271         .mmap           = ll_file_mmap,
3272         .llseek         = ll_file_seek,
3273         .sendfile       = ll_file_sendfile,
3274         .fsync          = ll_fsync,
3275 };
3276
3277 struct file_operations ll_file_operations_flock = {
3278         .read           = ll_file_read,
3279         .write          = ll_file_write,
3280         .ioctl          = ll_file_ioctl,
3281         .open           = ll_file_open,
3282         .release        = ll_file_release,
3283         .mmap           = ll_file_mmap,
3284         .llseek         = ll_file_seek,
3285         .sendfile       = ll_file_sendfile,
3286         .fsync          = ll_fsync,
3287 #ifdef HAVE_F_OP_FLOCK
3288         .flock          = ll_file_flock,
3289 #endif
3290         .lock           = ll_file_flock
3291 };
3292
3293 /* These are for -o noflock - to return ENOSYS on flock calls */
3294 struct file_operations ll_file_operations_noflock = {
3295         .read           = ll_file_read,
3296         .write          = ll_file_write,
3297         .ioctl          = ll_file_ioctl,
3298         .open           = ll_file_open,
3299         .release        = ll_file_release,
3300         .mmap           = ll_file_mmap,
3301         .llseek         = ll_file_seek,
3302         .sendfile       = ll_file_sendfile,
3303         .fsync          = ll_fsync,
3304 #ifdef HAVE_F_OP_FLOCK
3305         .flock          = ll_file_noflock,
3306 #endif
3307         .lock           = ll_file_noflock
3308 };
3309
3310 struct inode_operations ll_file_inode_operations = {
3311 #ifdef HAVE_VFS_INTENT_PATCHES
3312         .setattr_raw    = ll_setattr_raw,
3313 #endif
3314         .setattr        = ll_setattr,
3315         .truncate       = ll_truncate,
3316         .getattr        = ll_getattr,
3317         .permission     = ll_inode_permission,
3318         .setxattr       = ll_setxattr,
3319         .getxattr       = ll_getxattr,
3320         .listxattr      = ll_listxattr,
3321         .removexattr    = ll_removexattr,
3322 };
3323
3324 /* dynamic ioctl number support routins */
3325 static struct llioc_ctl_data {
3326         struct rw_semaphore ioc_sem;
3327         struct list_head    ioc_head;
3328 } llioc = {
3329         __RWSEM_INITIALIZER(llioc.ioc_sem),
3330         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3331 };
3332
3333
3334 struct llioc_data {
3335         struct list_head        iocd_list;
3336         unsigned int            iocd_size;
3337         llioc_callback_t        iocd_cb;
3338         unsigned int            iocd_count;
3339         unsigned int            iocd_cmd[0];
3340 };
3341
3342 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3343 {
3344         unsigned int size;
3345         struct llioc_data *in_data = NULL;
3346         ENTRY;
3347
3348         if (cb == NULL || cmd == NULL ||
3349             count > LLIOC_MAX_CMD || count < 0)
3350                 RETURN(NULL);
3351
3352         size = sizeof(*in_data) + count * sizeof(unsigned int);
3353         OBD_ALLOC(in_data, size);
3354         if (in_data == NULL)
3355                 RETURN(NULL);
3356
3357         memset(in_data, 0, sizeof(*in_data));
3358         in_data->iocd_size = size;
3359         in_data->iocd_cb = cb;
3360         in_data->iocd_count = count;
3361         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3362
3363         down_write(&llioc.ioc_sem);
3364         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3365         up_write(&llioc.ioc_sem);
3366
3367         RETURN(in_data);
3368 }
3369
3370 void ll_iocontrol_unregister(void *magic)
3371 {
3372         struct llioc_data *tmp;
3373
3374         if (magic == NULL)
3375                 return;
3376
3377         down_write(&llioc.ioc_sem);
3378         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3379                 if (tmp == magic) {
3380                         unsigned int size = tmp->iocd_size;
3381
3382                         list_del(&tmp->iocd_list);
3383                         up_write(&llioc.ioc_sem);
3384
3385                         OBD_FREE(tmp, size);
3386                         return;
3387                 }
3388         }
3389         up_write(&llioc.ioc_sem);
3390
3391         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3392 }
3393
3394 EXPORT_SYMBOL(ll_iocontrol_register);
3395 EXPORT_SYMBOL(ll_iocontrol_unregister);
3396
3397 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
3398                         unsigned int cmd, unsigned long arg, int *rcp)
3399 {
3400         enum llioc_iter ret = LLIOC_CONT;
3401         struct llioc_data *data;
3402         int rc = -EINVAL, i;
3403
3404         down_read(&llioc.ioc_sem);
3405         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3406                 for (i = 0; i < data->iocd_count; i++) {
3407                         if (cmd != data->iocd_cmd[i])
3408                                 continue;
3409
3410                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3411                         break;
3412                 }
3413
3414                 if (ret == LLIOC_STOP)
3415                         break;
3416         }
3417         up_read(&llioc.ioc_sem);
3418
3419         if (rcp)
3420                 *rcp = rc;
3421         return ret;
3422 }