Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <lustre_mdc.h>
47 #include <linux/pagemap.h>
48 #include <linux/file.h>
49 #include "llite_internal.h"
50 #include <lustre/ll_fiemap.h>
51
52 /* also used by llite/special.c:ll_special_open() */
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
58         return fd;
59 }
60
61 static void ll_file_data_put(struct ll_file_data *fd)
62 {
63         if (fd != NULL)
64                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
65 }
66
67 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
68                           struct lustre_handle *fh)
69 {
70         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
71         op_data->op_attr.ia_mode = inode->i_mode;
72         op_data->op_attr.ia_atime = inode->i_atime;
73         op_data->op_attr.ia_mtime = inode->i_mtime;
74         op_data->op_attr.ia_ctime = inode->i_ctime;
75         op_data->op_attr.ia_size = i_size_read(inode);
76         op_data->op_attr_blocks = inode->i_blocks;
77         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
78         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
79         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
80         op_data->op_capa1 = ll_mdscapa_get(inode);
81 }
82
83 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
84                              struct obd_client_handle *och)
85 {
86         ENTRY;
87
88         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
89                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
90
91         if (!(och->och_flags & FMODE_WRITE))
92                 goto out;
93
94         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
95             !S_ISREG(inode->i_mode))
96                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
97         else
98                 ll_epoch_close(inode, op_data, &och, 0);
99
100 out:
101         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
102         EXIT;
103 }
104
105 static int ll_close_inode_openhandle(struct obd_export *md_exp,
106                                      struct inode *inode,
107                                      struct obd_client_handle *och)
108 {
109         struct obd_export *exp = ll_i2mdexp(inode);
110         struct md_op_data *op_data;
111         struct ptlrpc_request *req = NULL;
112         struct obd_device *obd = class_exp2obd(exp);
113         int epoch_close = 1;
114         int seq_end = 0, rc;
115         ENTRY;
116
117         if (obd == NULL) {
118                 /*
119                  * XXX: in case of LMV, is this correct to access
120                  * ->exp_handle?
121                  */
122                 CERROR("Invalid MDC connection handle "LPX64"\n",
123                        ll_i2mdexp(inode)->exp_handle.h_cookie);
124                 GOTO(out, rc = 0);
125         }
126
127         /*
128          * here we check if this is forced umount. If so this is called on
129          * canceling "open lock" and we do not call md_close() in this case, as
130          * it will not be successful, as import is already deactivated.
131          */
132         if (obd->obd_force)
133                 GOTO(out, rc = 0);
134
135         OBD_ALLOC_PTR(op_data);
136         if (op_data == NULL)
137                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
138
139         ll_prepare_close(inode, op_data, och);
140         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
141         rc = md_close(md_exp, op_data, och->och_mod, &req);
142         if (rc != -EAGAIN)
143                 seq_end = 1;
144
145         if (rc == -EAGAIN) {
146                 /* This close must have the epoch closed. */
147                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
148                 LASSERT(epoch_close);
149                 /* MDS has instructed us to obtain Size-on-MDS attribute from
150                  * OSTs and send setattr to back to MDS. */
151                 rc = ll_sizeonmds_update(inode, och->och_mod,
152                                          &och->och_fh, op_data->op_ioepoch);
153                 if (rc) {
154                         CERROR("inode %lu mdc Size-on-MDS update failed: "
155                                "rc = %d\n", inode->i_ino, rc);
156                         rc = 0;
157                 }
158         } else if (rc) {
159                 CERROR("inode %lu mdc close failed: rc = %d\n",
160                        inode->i_ino, rc);
161         }
162         ll_finish_md_op_data(op_data);
163
164         if (rc == 0) {
165                 rc = ll_objects_destroy(req, inode);
166                 if (rc)
167                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
168                                inode->i_ino, rc);
169         }
170
171         EXIT;
172 out:
173
174         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
175             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
176                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
177         } else {
178                 if (seq_end)
179                         ptlrpc_close_replay_seq(req);
180                 md_clear_open_replay_data(md_exp, och);
181                 /* Free @och if it is not waiting for DONE_WRITING. */
182                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
183                 OBD_FREE_PTR(och);
184         }
185         if (req) /* This is close request */
186                 ptlrpc_req_finished(req);
187         return rc;
188 }
189
190 int ll_md_real_close(struct inode *inode, int flags)
191 {
192         struct ll_inode_info *lli = ll_i2info(inode);
193         struct obd_client_handle **och_p;
194         struct obd_client_handle *och;
195         __u64 *och_usecount;
196         int rc = 0;
197         ENTRY;
198
199         if (flags & FMODE_WRITE) {
200                 och_p = &lli->lli_mds_write_och;
201                 och_usecount = &lli->lli_open_fd_write_count;
202         } else if (flags & FMODE_EXEC) {
203                 och_p = &lli->lli_mds_exec_och;
204                 och_usecount = &lli->lli_open_fd_exec_count;
205         } else {
206                 LASSERT(flags & FMODE_READ);
207                 och_p = &lli->lli_mds_read_och;
208                 och_usecount = &lli->lli_open_fd_read_count;
209         }
210
211         down(&lli->lli_och_sem);
212         if (*och_usecount) { /* There are still users of this handle, so
213                                 skip freeing it. */
214                 up(&lli->lli_och_sem);
215                 RETURN(0);
216         }
217         och=*och_p;
218         *och_p = NULL;
219         up(&lli->lli_och_sem);
220
221         if (och) { /* There might be a race and somebody have freed this och
222                       already */
223                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
224                                                inode, och);
225         }
226
227         RETURN(rc);
228 }
229
230 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
231                 struct file *file)
232 {
233         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
234         struct ll_inode_info *lli = ll_i2info(inode);
235         int rc = 0;
236         ENTRY;
237
238         /* clear group lock, if present */
239         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
240                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
241                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
242                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
243                                       &fd->fd_cwlockh);
244         }
245
246         /* Let's see if we have good enough OPEN lock on the file and if
247            we can skip talking to MDS */
248         if (file->f_dentry->d_inode) { /* Can this ever be false? */
249                 int lockmode;
250                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
251                 struct lustre_handle lockh;
252                 struct inode *inode = file->f_dentry->d_inode;
253                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
254
255                 down(&lli->lli_och_sem);
256                 if (fd->fd_omode & FMODE_WRITE) {
257                         lockmode = LCK_CW;
258                         LASSERT(lli->lli_open_fd_write_count);
259                         lli->lli_open_fd_write_count--;
260                 } else if (fd->fd_omode & FMODE_EXEC) {
261                         lockmode = LCK_PR;
262                         LASSERT(lli->lli_open_fd_exec_count);
263                         lli->lli_open_fd_exec_count--;
264                 } else {
265                         lockmode = LCK_CR;
266                         LASSERT(lli->lli_open_fd_read_count);
267                         lli->lli_open_fd_read_count--;
268                 }
269                 up(&lli->lli_och_sem);
270
271                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
272                                    LDLM_IBITS, &policy, lockmode,
273                                    &lockh)) {
274                         rc = ll_md_real_close(file->f_dentry->d_inode,
275                                               fd->fd_omode);
276                 }
277         } else {
278                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
279                        file, file->f_dentry, file->f_dentry->d_name.name);
280         }
281
282         LUSTRE_FPRIVATE(file) = NULL;
283         ll_file_data_put(fd);
284         ll_capa_close(inode);
285
286         RETURN(rc);
287 }
288
289 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
290
291 /* While this returns an error code, fput() the caller does not, so we need
292  * to make every effort to clean up all of our state here.  Also, applications
293  * rarely check close errors and even if an error is returned they will not
294  * re-try the close call.
295  */
296 int ll_file_release(struct inode *inode, struct file *file)
297 {
298         struct ll_file_data *fd;
299         struct ll_sb_info *sbi = ll_i2sbi(inode);
300         struct ll_inode_info *lli = ll_i2info(inode);
301         struct lov_stripe_md *lsm = lli->lli_smd;
302         int rc;
303
304         ENTRY;
305         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
306                inode->i_generation, inode);
307
308 #ifdef CONFIG_FS_POSIX_ACL
309         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
310             inode == inode->i_sb->s_root->d_inode) {
311                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
312
313                 LASSERT(fd != NULL);
314                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
315                         fd->fd_flags &= ~LL_FILE_RMTACL;
316                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
317                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
318                 }
319         }
320 #endif
321
322         if (inode->i_sb->s_root != file->f_dentry)
323                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
324         fd = LUSTRE_FPRIVATE(file);
325         LASSERT(fd != NULL);
326
327         /* The last ref on @file, maybe not the the owner pid of statahead.
328          * Different processes can open the same dir, "ll_opendir_key" means:
329          * it is me that should stop the statahead thread. */
330         if (lli->lli_opendir_key == fd)
331                 ll_stop_statahead(inode, fd);
332
333         if (inode->i_sb->s_root == file->f_dentry) {
334                 LUSTRE_FPRIVATE(file) = NULL;
335                 ll_file_data_put(fd);
336                 RETURN(0);
337         }
338
339         if (lsm)
340                 lov_test_and_clear_async_rc(lsm);
341         lli->lli_async_rc = 0;
342
343         rc = ll_md_close(sbi->ll_md_exp, inode, file);
344         RETURN(rc);
345 }
346
347 static int ll_intent_file_open(struct file *file, void *lmm,
348                                int lmmsize, struct lookup_intent *itp)
349 {
350         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
351         struct dentry *parent = file->f_dentry->d_parent;
352         const char *name = file->f_dentry->d_name.name;
353         const int len = file->f_dentry->d_name.len;
354         struct md_op_data *op_data;
355         struct ptlrpc_request *req;
356         int rc;
357         ENTRY;
358
359         if (!parent)
360                 RETURN(-ENOENT);
361
362         /* Usually we come here only for NFSD, and we want open lock.
363            But we can also get here with pre 2.6.15 patchless kernels, and in
364            that case that lock is also ok */
365         /* We can also get here if there was cached open handle in revalidate_it
366          * but it disappeared while we were getting from there to ll_file_open.
367          * But this means this file was closed and immediatelly opened which
368          * makes a good candidate for using OPEN lock */
369         /* If lmmsize & lmm are not 0, we are just setting stripe info
370          * parameters. No need for the open lock */
371         if (!lmm && !lmmsize)
372                 itp->it_flags |= MDS_OPEN_LOCK;
373
374         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
375                                       file->f_dentry->d_inode, name, len,
376                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
377         if (IS_ERR(op_data))
378                 RETURN(PTR_ERR(op_data));
379
380         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
381                             0 /*unused */, &req, ll_md_blocking_ast, 0);
382         ll_finish_md_op_data(op_data);
383         if (rc == -ESTALE) {
384                 /* reason for keep own exit path - don`t flood log
385                 * with messages with -ESTALE errors.
386                 */
387                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
388                      it_open_error(DISP_OPEN_OPEN, itp))
389                         GOTO(out, rc);
390                 ll_release_openhandle(file->f_dentry, itp);
391                 GOTO(out_stale, rc);
392         }
393
394         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
395                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
396                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
397                 GOTO(out, rc);
398         }
399
400         if (itp->d.lustre.it_lock_mode)
401                 md_set_lock_data(sbi->ll_md_exp,
402                                  &itp->d.lustre.it_lock_handle,
403                                  file->f_dentry->d_inode);
404
405         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
406 out:
407         ptlrpc_req_finished(itp->d.lustre.it_data);
408
409 out_stale:
410         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
411         ll_intent_drop_lock(itp);
412
413         RETURN(rc);
414 }
415
416 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
417                        struct lookup_intent *it, struct obd_client_handle *och)
418 {
419         struct ptlrpc_request *req = it->d.lustre.it_data;
420         struct mdt_body *body;
421
422         LASSERT(och);
423
424         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
425         LASSERT(body != NULL);                      /* reply already checked out */
426
427         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
428         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
429         och->och_fid = lli->lli_fid;
430         och->och_flags = it->it_flags;
431         lli->lli_ioepoch = body->ioepoch;
432
433         return md_set_open_replay_data(md_exp, och, req);
434 }
435
436 int ll_local_open(struct file *file, struct lookup_intent *it,
437                   struct ll_file_data *fd, struct obd_client_handle *och)
438 {
439         struct inode *inode = file->f_dentry->d_inode;
440         struct ll_inode_info *lli = ll_i2info(inode);
441         ENTRY;
442
443         LASSERT(!LUSTRE_FPRIVATE(file));
444
445         LASSERT(fd != NULL);
446
447         if (och) {
448                 struct ptlrpc_request *req = it->d.lustre.it_data;
449                 struct mdt_body *body;
450                 int rc;
451
452                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
453                 if (rc)
454                         RETURN(rc);
455
456                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
457                 if ((it->it_flags & FMODE_WRITE) &&
458                     (body->valid & OBD_MD_FLSIZE))
459                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
460                                lli->lli_ioepoch, PFID(&lli->lli_fid));
461         }
462
463         LUSTRE_FPRIVATE(file) = fd;
464         ll_readahead_init(inode, &fd->fd_ras);
465         fd->fd_omode = it->it_flags;
466         RETURN(0);
467 }
468
469 /* Open a file, and (for the very first open) create objects on the OSTs at
470  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
471  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
472  * lli_open_sem to ensure no other process will create objects, send the
473  * stripe MD to the MDS, or try to destroy the objects if that fails.
474  *
475  * If we already have the stripe MD locally then we don't request it in
476  * md_open(), by passing a lmm_size = 0.
477  *
478  * It is up to the application to ensure no other processes open this file
479  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
480  * used.  We might be able to avoid races of that sort by getting lli_open_sem
481  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
482  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
483  */
484 int ll_file_open(struct inode *inode, struct file *file)
485 {
486         struct ll_inode_info *lli = ll_i2info(inode);
487         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
488                                           .it_flags = file->f_flags };
489         struct lov_stripe_md *lsm;
490         struct ptlrpc_request *req = NULL;
491         struct obd_client_handle **och_p;
492         __u64 *och_usecount;
493         struct ll_file_data *fd;
494         int rc = 0, opendir_set = 0;
495         ENTRY;
496
497         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
498                inode->i_generation, inode, file->f_flags);
499
500 #ifdef HAVE_VFS_INTENT_PATCHES
501         it = file->f_it;
502 #else
503         it = file->private_data; /* XXX: compat macro */
504         file->private_data = NULL; /* prevent ll_local_open assertion */
505 #endif
506
507         fd = ll_file_data_get();
508         if (fd == NULL)
509                 RETURN(-ENOMEM);
510
511         if (S_ISDIR(inode->i_mode)) {
512                 spin_lock(&lli->lli_lock);
513                 /* "lli->lli_opendir_pid != 0" means someone has set it.
514                  * "lli->lli_sai != NULL" means the previous statahead has not
515                  *                        been cleanup. */
516                 if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) {
517                         opendir_set = 1;
518                         lli->lli_opendir_pid = cfs_curproc_pid();
519                         lli->lli_opendir_key = fd;
520                 } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) {
521                         /* Two cases for this:
522                          * (1) The same process open such directory many times.
523                          * (2) The old process opened the directory, and exited
524                          *     before its children processes. Then new process
525                          *     with the same pid opens such directory before the
526                          *     old process's children processes exit.
527                          * Change the owner to the latest one. */
528                         opendir_set = 2;
529                         lli->lli_opendir_key = fd;
530                 }
531                 spin_unlock(&lli->lli_lock);
532         }
533
534         if (inode->i_sb->s_root == file->f_dentry) {
535                 LUSTRE_FPRIVATE(file) = fd;
536                 RETURN(0);
537         }
538
539         if (!it || !it->d.lustre.it_disposition) {
540                 /* Convert f_flags into access mode. We cannot use file->f_mode,
541                  * because everything but O_ACCMODE mask was stripped from
542                  * there */
543                 if ((oit.it_flags + 1) & O_ACCMODE)
544                         oit.it_flags++;
545                 if (file->f_flags & O_TRUNC)
546                         oit.it_flags |= FMODE_WRITE;
547
548                 /* kernel only call f_op->open in dentry_open.  filp_open calls
549                  * dentry_open after call to open_namei that checks permissions.
550                  * Only nfsd_open call dentry_open directly without checking
551                  * permissions and because of that this code below is safe. */
552                 if (oit.it_flags & FMODE_WRITE)
553                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
554
555                 /* We do not want O_EXCL here, presumably we opened the file
556                  * already? XXX - NFS implications? */
557                 oit.it_flags &= ~O_EXCL;
558
559                 it = &oit;
560         }
561
562 restart:
563         /* Let's see if we have file open on MDS already. */
564         if (it->it_flags & FMODE_WRITE) {
565                 och_p = &lli->lli_mds_write_och;
566                 och_usecount = &lli->lli_open_fd_write_count;
567         } else if (it->it_flags & FMODE_EXEC) {
568                 och_p = &lli->lli_mds_exec_och;
569                 och_usecount = &lli->lli_open_fd_exec_count;
570          } else {
571                 och_p = &lli->lli_mds_read_och;
572                 och_usecount = &lli->lli_open_fd_read_count;
573         }
574
575         down(&lli->lli_och_sem);
576         if (*och_p) { /* Open handle is present */
577                 if (it_disposition(it, DISP_OPEN_OPEN)) {
578                         /* Well, there's extra open request that we do not need,
579                            let's close it somehow. This will decref request. */
580                         rc = it_open_error(DISP_OPEN_OPEN, it);
581                         if (rc) {
582                                 up(&lli->lli_och_sem);
583                                 ll_file_data_put(fd);
584                                 GOTO(out_openerr, rc);
585                         }
586                         ll_release_openhandle(file->f_dentry, it);
587                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
588                                              LPROC_LL_OPEN);
589                 }
590                 (*och_usecount)++;
591
592                 rc = ll_local_open(file, it, fd, NULL);
593                 if (rc) {
594                         (*och_usecount)--;
595                         up(&lli->lli_och_sem);
596                         ll_file_data_put(fd);
597                         GOTO(out_openerr, rc);
598                 }
599         } else {
600                 LASSERT(*och_usecount == 0);
601                 if (!it->d.lustre.it_disposition) {
602                         /* We cannot just request lock handle now, new ELC code
603                            means that one of other OPEN locks for this file
604                            could be cancelled, and since blocking ast handler
605                            would attempt to grab och_sem as well, that would
606                            result in a deadlock */
607                         up(&lli->lli_och_sem);
608                         it->it_flags |= O_CHECK_STALE;
609                         rc = ll_intent_file_open(file, NULL, 0, it);
610                         it->it_flags &= ~O_CHECK_STALE;
611                         if (rc) {
612                                 ll_file_data_put(fd);
613                                 GOTO(out_openerr, rc);
614                         }
615
616                         /* Got some error? Release the request */
617                         if (it->d.lustre.it_status < 0) {
618                                 req = it->d.lustre.it_data;
619                                 ptlrpc_req_finished(req);
620                         }
621                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
622                                          &it->d.lustre.it_lock_handle,
623                                          file->f_dentry->d_inode);
624                         goto restart;
625                 }
626                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
627                 if (!*och_p) {
628                         ll_file_data_put(fd);
629                         GOTO(out_och_free, rc = -ENOMEM);
630                 }
631                 (*och_usecount)++;
632                 req = it->d.lustre.it_data;
633
634                 /* md_intent_lock() didn't get a request ref if there was an
635                  * open error, so don't do cleanup on the request here
636                  * (bug 3430) */
637                 /* XXX (green): Should not we bail out on any error here, not
638                  * just open error? */
639                 rc = it_open_error(DISP_OPEN_OPEN, it);
640                 if (rc) {
641                         ll_file_data_put(fd);
642                         GOTO(out_och_free, rc);
643                 }
644
645                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
646                 rc = ll_local_open(file, it, fd, *och_p);
647                 if (rc) {
648                         ll_file_data_put(fd);
649                         GOTO(out_och_free, rc);
650                 }
651         }
652         up(&lli->lli_och_sem);
653
654         /* Must do this outside lli_och_sem lock to prevent deadlock where
655            different kind of OPEN lock for this same inode gets cancelled
656            by ldlm_cancel_lru */
657         if (!S_ISREG(inode->i_mode))
658                 GOTO(out, rc);
659
660         ll_capa_open(inode);
661
662         lsm = lli->lli_smd;
663         if (lsm == NULL) {
664                 if (file->f_flags & O_LOV_DELAY_CREATE ||
665                     !(file->f_mode & FMODE_WRITE)) {
666                         CDEBUG(D_INODE, "object creation was delayed\n");
667                         GOTO(out, rc);
668                 }
669         }
670         file->f_flags &= ~O_LOV_DELAY_CREATE;
671         GOTO(out, rc);
672 out:
673         ptlrpc_req_finished(req);
674         if (req)
675                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
676 out_och_free:
677         if (rc) {
678                 if (*och_p) {
679                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
680                         *och_p = NULL; /* OBD_FREE writes some magic there */
681                         (*och_usecount)--;
682                 }
683                 up(&lli->lli_och_sem);
684 out_openerr:
685                 if (opendir_set == 1) {
686                         lli->lli_opendir_key = NULL;
687                         lli->lli_opendir_pid = 0;
688                 } else if (unlikely(opendir_set == 2)) {
689                         ll_stop_statahead(inode, fd);
690                 }
691         }
692
693         return rc;
694 }
695
696 /* Fills the obdo with the attributes for the inode defined by lsm */
697 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
698 {
699         struct ptlrpc_request_set *set;
700         struct ll_inode_info *lli = ll_i2info(inode);
701         struct lov_stripe_md *lsm = lli->lli_smd;
702
703         struct obd_info oinfo = { { { 0 } } };
704         int rc;
705         ENTRY;
706
707         LASSERT(lsm != NULL);
708
709         oinfo.oi_md = lsm;
710         oinfo.oi_oa = obdo;
711         oinfo.oi_oa->o_id = lsm->lsm_object_id;
712         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
713         oinfo.oi_oa->o_mode = S_IFREG;
714         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
715                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
716                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
717                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
718                                OBD_MD_FLGROUP;
719         oinfo.oi_capa = ll_mdscapa_get(inode);
720
721         set = ptlrpc_prep_set();
722         if (set == NULL) {
723                 CERROR("can't allocate ptlrpc set\n");
724                 rc = -ENOMEM;
725         } else {
726                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
727                 if (rc == 0)
728                         rc = ptlrpc_set_wait(set);
729                 ptlrpc_set_destroy(set);
730         }
731         capa_put(oinfo.oi_capa);
732         if (rc)
733                 RETURN(rc);
734
735         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
736                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
737                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
738
739         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
740         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
741                lli->lli_smd->lsm_object_id, i_size_read(inode),
742                (unsigned long long)inode->i_blocks,
743                (unsigned long)ll_inode_blksize(inode));
744         RETURN(0);
745 }
746
747 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
748 {
749         struct ll_inode_info *lli = ll_i2info(inode);
750         struct lov_stripe_md *lsm = lli->lli_smd;
751         struct obd_export *exp = ll_i2dtexp(inode);
752         struct {
753                 char name[16];
754                 struct ldlm_lock *lock;
755         } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock };
756         __u32 stripe, vallen = sizeof(stripe);
757         struct lov_oinfo *loinfo;
758         int rc;
759         ENTRY;
760
761         if (lsm->lsm_stripe_count == 1)
762                 GOTO(check, stripe = 0);
763
764         /* get our offset in the lov */
765         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe, lsm);
766         if (rc != 0) {
767                 CERROR("obd_get_info: rc = %d\n", rc);
768                 RETURN(rc);
769         }
770         LASSERT(stripe < lsm->lsm_stripe_count);
771
772 check:
773         loinfo = lsm->lsm_oinfo[stripe];
774         if (!osc_res_name_eq(loinfo->loi_id, loinfo->loi_gr,
775                             &lock->l_resource->lr_name)){
776                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
777                            loinfo->loi_id, loinfo->loi_gr);
778                 RETURN(-ELDLM_NO_LOCK_DATA);
779         }
780
781         RETURN(stripe);
782 }
783
784 /* Get extra page reference to ensure it is not going away */
785 void ll_pin_extent_cb(void *data)
786 {
787         struct page *page = data;
788
789         page_cache_get(page);
790
791         return;
792 }
793
794 /* Flush the page from page cache for an extent as its canceled.
795  * Page to remove is delivered as @data.
796  *
797  * No one can dirty the extent until we've finished our work and they cannot
798  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
799  * but other kernel actors could have pages locked.
800  *
801  * If @discard is set, there is no need to write the page if it is dirty.
802  *
803  * Called with the DLM lock held. */
804 int ll_page_removal_cb(void *data, int discard)
805 {
806         int rc;
807         struct page *page = data;
808         struct address_space *mapping;
809
810         ENTRY;
811
812         /* We have page reference already from ll_pin_page */
813         lock_page(page);
814
815         /* Already truncated by somebody */
816         if (!page->mapping)
817                 GOTO(out, rc = 0);
818         mapping = page->mapping;
819
820         ll_teardown_mmaps(mapping,
821                           (__u64)page->index << PAGE_CACHE_SHIFT,
822                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
823                                                               ~PAGE_CACHE_MASK);
824         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
825
826         if (!discard && clear_page_dirty_for_io(page)) {
827                 LASSERT(page->mapping);
828                 rc = ll_call_writepage(page->mapping->host, page);
829                 /* either waiting for io to complete or reacquiring
830                  * the lock that the failed writepage released */
831                 lock_page(page);
832                 wait_on_page_writeback(page);
833                 if (rc != 0) {
834                         CERROR("writepage inode %lu(%p) of page %p "
835                                "failed: %d\n", mapping->host->i_ino,
836                                mapping->host, page, rc);
837                         if (rc == -ENOSPC)
838                                 set_bit(AS_ENOSPC, &mapping->flags);
839                         else
840                                 set_bit(AS_EIO, &mapping->flags);
841                 }
842                 set_bit(AS_EIO, &mapping->flags);
843         }
844         if (page->mapping != NULL) {
845                 struct ll_async_page *llap = llap_cast_private(page);
846                 /* checking again to account for writeback's lock_page() */
847                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
848                 if (llap)
849                         ll_ra_accounting(llap, page->mapping);
850                 ll_truncate_complete_page(page);
851         }
852         EXIT;
853 out:
854         LASSERT(!PageWriteback(page));
855         unlock_page(page);
856         page_cache_release(page);
857
858         return 0;
859 }
860
861 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
862                              void *data, int flag)
863 {
864         struct inode *inode;
865         struct ll_inode_info *lli;
866         struct lov_stripe_md *lsm;
867         int stripe;
868         __u64 kms;
869
870         ENTRY;
871
872         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
873                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
874                 LBUG();
875         }
876
877         inode = ll_inode_from_lock(lock);
878         if (inode == NULL)
879                 RETURN(0);
880         lli = ll_i2info(inode);
881         if (lli == NULL)
882                 GOTO(iput, 0);
883         if (lli->lli_smd == NULL)
884                 GOTO(iput, 0);
885         lsm = lli->lli_smd;
886
887         stripe = ll_lock_to_stripe_offset(inode, lock);
888         if (stripe < 0)
889                 GOTO(iput, 0);
890
891         lov_stripe_lock(lsm);
892         lock_res_and_lock(lock);
893         kms = ldlm_extent_shift_kms(lock,
894                                     lsm->lsm_oinfo[stripe]->loi_kms);
895
896         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
897                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
898                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
899         lsm->lsm_oinfo[stripe]->loi_kms = kms;
900         unlock_res_and_lock(lock);
901         lov_stripe_unlock(lsm);
902         ll_queue_done_writing(inode, 0);
903         EXIT;
904 iput:
905         iput(inode);
906
907         return 0;
908 }
909
910 #if 0
911 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
912 {
913         /* XXX ALLOCATE - 160 bytes */
914         struct inode *inode = ll_inode_from_lock(lock);
915         struct ll_inode_info *lli = ll_i2info(inode);
916         struct lustre_handle lockh = { 0 };
917         struct ost_lvb *lvb;
918         int stripe;
919         ENTRY;
920
921         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
922                      LDLM_FL_BLOCK_CONV)) {
923                 LBUG(); /* not expecting any blocked async locks yet */
924                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
925                            "lock, returning");
926                 ldlm_lock_dump(D_OTHER, lock, 0);
927                 ldlm_reprocess_all(lock->l_resource);
928                 RETURN(0);
929         }
930
931         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
932
933         stripe = ll_lock_to_stripe_offset(inode, lock);
934         if (stripe < 0)
935                 goto iput;
936
937         if (lock->l_lvb_len) {
938                 struct lov_stripe_md *lsm = lli->lli_smd;
939                 __u64 kms;
940                 lvb = lock->l_lvb_data;
941                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
942
943                 lock_res_and_lock(lock);
944                 ll_inode_size_lock(inode, 1);
945                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
946                 kms = ldlm_extent_shift_kms(NULL, kms);
947                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
948                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
949                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
950                 lsm->lsm_oinfo[stripe].loi_kms = kms;
951                 ll_inode_size_unlock(inode, 1);
952                 unlock_res_and_lock(lock);
953         }
954
955 iput:
956         iput(inode);
957         wake_up(&lock->l_waitq);
958
959         ldlm_lock2handle(lock, &lockh);
960         ldlm_lock_decref(&lockh, LCK_PR);
961         RETURN(0);
962 }
963 #endif
964
965 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
966 {
967         struct ptlrpc_request *req = reqp;
968         struct inode *inode = ll_inode_from_lock(lock);
969         struct ll_inode_info *lli;
970         struct lov_stripe_md *lsm;
971         struct ost_lvb *lvb;
972         int rc, stripe;
973         ENTRY;
974
975         if (inode == NULL)
976                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
977         lli = ll_i2info(inode);
978         if (lli == NULL)
979                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
980         lsm = lli->lli_smd;
981         if (lsm == NULL)
982                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
983
984         /* First, find out which stripe index this lock corresponds to. */
985         stripe = ll_lock_to_stripe_offset(inode, lock);
986         if (stripe < 0)
987                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
988
989         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
990         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
991                              sizeof(*lvb));
992         rc = req_capsule_server_pack(&req->rq_pill);
993         if (rc) {
994                 CERROR("lustre_pack_reply: %d\n", rc);
995                 GOTO(iput, rc);
996         }
997
998         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
999         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1000         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1001         lvb->lvb_atime = LTIME_S(inode->i_atime);
1002         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1003
1004         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1005                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1006                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1007                    lvb->lvb_atime, lvb->lvb_ctime);
1008  iput:
1009         iput(inode);
1010
1011  out:
1012         /* These errors are normal races, so we don't want to fill the console
1013          * with messages by calling ptlrpc_error() */
1014         if (rc == -ELDLM_NO_LOCK_DATA)
1015                 lustre_pack_reply(req, 1, NULL, NULL);
1016
1017         req->rq_status = rc;
1018         return rc;
1019 }
1020
1021 static int ll_merge_lvb(struct inode *inode)
1022 {
1023         struct ll_inode_info *lli = ll_i2info(inode);
1024         struct ll_sb_info *sbi = ll_i2sbi(inode);
1025         struct ost_lvb lvb;
1026         int rc;
1027
1028         ENTRY;
1029
1030         ll_inode_size_lock(inode, 1);
1031         inode_init_lvb(inode, &lvb);
1032         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1033         i_size_write(inode, lvb.lvb_size);
1034         inode->i_blocks = lvb.lvb_blocks;
1035
1036         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1037         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1038         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1039         ll_inode_size_unlock(inode, 1);
1040
1041         RETURN(rc);
1042 }
1043
1044 int ll_local_size(struct inode *inode)
1045 {
1046         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1047         struct ll_inode_info *lli = ll_i2info(inode);
1048         struct ll_sb_info *sbi = ll_i2sbi(inode);
1049         struct lustre_handle lockh = { 0 };
1050         int flags = 0;
1051         int rc;
1052         ENTRY;
1053
1054         if (lli->lli_smd->lsm_stripe_count == 0)
1055                 RETURN(0);
1056
1057         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1058                        &policy, LCK_PR, &flags, inode, &lockh);
1059         if (rc < 0)
1060                 RETURN(rc);
1061         else if (rc == 0)
1062                 RETURN(-ENODATA);
1063
1064         rc = ll_merge_lvb(inode);
1065         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1066         RETURN(rc);
1067 }
1068
1069 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1070                      lstat_t *st)
1071 {
1072         struct lustre_handle lockh = { 0 };
1073         struct ldlm_enqueue_info einfo = { 0 };
1074         struct obd_info oinfo = { { { 0 } } };
1075         struct ost_lvb lvb;
1076         int rc;
1077
1078         ENTRY;
1079
1080         einfo.ei_type = LDLM_EXTENT;
1081         einfo.ei_mode = LCK_PR;
1082         einfo.ei_cb_bl = osc_extent_blocking_cb;
1083         einfo.ei_cb_cp = ldlm_completion_ast;
1084         einfo.ei_cb_gl = ll_glimpse_callback;
1085         einfo.ei_cbdata = NULL;
1086
1087         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1088         oinfo.oi_lockh = &lockh;
1089         oinfo.oi_md = lsm;
1090         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1091
1092         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1093         if (rc == -ENOENT)
1094                 RETURN(rc);
1095         if (rc != 0) {
1096                 CERROR("obd_enqueue returned rc %d, "
1097                        "returning -EIO\n", rc);
1098                 RETURN(rc > 0 ? -EIO : rc);
1099         }
1100
1101         lov_stripe_lock(lsm);
1102         memset(&lvb, 0, sizeof(lvb));
1103         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1104         st->st_size = lvb.lvb_size;
1105         st->st_blocks = lvb.lvb_blocks;
1106         st->st_mtime = lvb.lvb_mtime;
1107         st->st_atime = lvb.lvb_atime;
1108         st->st_ctime = lvb.lvb_ctime;
1109         lov_stripe_unlock(lsm);
1110
1111         RETURN(rc);
1112 }
1113
1114 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1115  * file (because it prefers KMS over RSS when larger) */
1116 int ll_glimpse_size(struct inode *inode, int ast_flags)
1117 {
1118         struct ll_inode_info *lli = ll_i2info(inode);
1119         struct ll_sb_info *sbi = ll_i2sbi(inode);
1120         struct lustre_handle lockh = { 0 };
1121         struct ldlm_enqueue_info einfo = { 0 };
1122         struct obd_info oinfo = { { { 0 } } };
1123         int rc;
1124         ENTRY;
1125
1126         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1127                 RETURN(0);
1128
1129         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1130
1131         if (!lli->lli_smd) {
1132                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1133                 RETURN(0);
1134         }
1135
1136         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1137          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1138          *       won't revoke any conflicting DLM locks held. Instead,
1139          *       ll_glimpse_callback() will be called on each client
1140          *       holding a DLM lock against this file, and resulting size
1141          *       will be returned for each stripe. DLM lock on [0, EOF] is
1142          *       acquired only if there were no conflicting locks. */
1143         einfo.ei_type = LDLM_EXTENT;
1144         einfo.ei_mode = LCK_PR;
1145         einfo.ei_cb_bl = osc_extent_blocking_cb;
1146         einfo.ei_cb_cp = ldlm_completion_ast;
1147         einfo.ei_cb_gl = ll_glimpse_callback;
1148         einfo.ei_cbdata = inode;
1149
1150         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1151         oinfo.oi_lockh = &lockh;
1152         oinfo.oi_md = lli->lli_smd;
1153         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1154
1155         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1156         if (rc == -ENOENT)
1157                 RETURN(rc);
1158         if (rc != 0) {
1159                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1160                 RETURN(rc > 0 ? -EIO : rc);
1161         }
1162
1163         rc = ll_merge_lvb(inode);
1164
1165         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1166                i_size_read(inode), (unsigned long long)inode->i_blocks);
1167
1168         RETURN(rc);
1169 }
1170
1171 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1172                    struct lov_stripe_md *lsm, int mode,
1173                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1174                    int ast_flags)
1175 {
1176         struct ll_sb_info *sbi = ll_i2sbi(inode);
1177         struct ost_lvb lvb;
1178         struct ldlm_enqueue_info einfo = { 0 };
1179         struct obd_info oinfo = { { { 0 } } };
1180         int rc;
1181         ENTRY;
1182
1183         LASSERT(!lustre_handle_is_used(lockh));
1184         LASSERT(lsm != NULL);
1185
1186         /* don't drop the mmapped file to LRU */
1187         if (mapping_mapped(inode->i_mapping))
1188                 ast_flags |= LDLM_FL_NO_LRU;
1189
1190         /* XXX phil: can we do this?  won't it screw the file size up? */
1191         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1192             (sbi->ll_flags & LL_SBI_NOLCK))
1193                 RETURN(0);
1194
1195         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1196                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1197
1198         einfo.ei_type = LDLM_EXTENT;
1199         einfo.ei_mode = mode;
1200         einfo.ei_cb_bl = osc_extent_blocking_cb;
1201         einfo.ei_cb_cp = ldlm_completion_ast;
1202         einfo.ei_cb_gl = ll_glimpse_callback;
1203         einfo.ei_cbdata = inode;
1204
1205         oinfo.oi_policy = *policy;
1206         oinfo.oi_lockh = lockh;
1207         oinfo.oi_md = lsm;
1208         oinfo.oi_flags = ast_flags;
1209
1210         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1211         *policy = oinfo.oi_policy;
1212         if (rc > 0)
1213                 rc = -EIO;
1214
1215         ll_inode_size_lock(inode, 1);
1216         inode_init_lvb(inode, &lvb);
1217         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1218
1219         if (policy->l_extent.start == 0 &&
1220             policy->l_extent.end == OBD_OBJECT_EOF) {
1221                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1222                  * the kms under both a DLM lock and the
1223                  * ll_inode_size_lock().  If we don't get the
1224                  * ll_inode_size_lock() here we can match the DLM lock and
1225                  * reset i_size from the kms before the truncating path has
1226                  * updated the kms.  generic_file_write can then trust the
1227                  * stale i_size when doing appending writes and effectively
1228                  * cancel the result of the truncate.  Getting the
1229                  * ll_inode_size_lock() after the enqueue maintains the DLM
1230                  * -> ll_inode_size_lock() acquiring order. */
1231                 i_size_write(inode, lvb.lvb_size);
1232                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1233                        inode->i_ino, i_size_read(inode));
1234         }
1235
1236         if (rc == 0) {
1237                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1238                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1239                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1240         }
1241         ll_inode_size_unlock(inode, 1);
1242
1243         RETURN(rc);
1244 }
1245
1246 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1247                      struct lov_stripe_md *lsm, int mode,
1248                      struct lustre_handle *lockh)
1249 {
1250         struct ll_sb_info *sbi = ll_i2sbi(inode);
1251         int rc;
1252         ENTRY;
1253
1254         /* XXX phil: can we do this?  won't it screw the file size up? */
1255         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1256             (sbi->ll_flags & LL_SBI_NOLCK))
1257                 RETURN(0);
1258
1259         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1260
1261         RETURN(rc);
1262 }
1263
1264 static void ll_set_file_contended(struct inode *inode)
1265 {
1266         struct ll_inode_info *lli = ll_i2info(inode);
1267         cfs_time_t now = cfs_time_current();
1268
1269         spin_lock(&lli->lli_lock);
1270         lli->lli_contention_time = now;
1271         lli->lli_flags |= LLIF_CONTENDED;
1272         spin_unlock(&lli->lli_lock);
1273 }
1274
1275 void ll_clear_file_contended(struct inode *inode)
1276 {
1277         struct ll_inode_info *lli = ll_i2info(inode);
1278
1279         spin_lock(&lli->lli_lock);
1280         lli->lli_flags &= ~LLIF_CONTENDED;
1281         spin_unlock(&lli->lli_lock);
1282 }
1283
1284 static int ll_is_file_contended(struct file *file)
1285 {
1286         struct inode *inode = file->f_dentry->d_inode;
1287         struct ll_inode_info *lli = ll_i2info(inode);
1288         struct ll_sb_info *sbi = ll_i2sbi(inode);
1289         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1290         ENTRY;
1291
1292         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1293                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1294                        " osc connect flags = 0x"LPX64"\n",
1295                        sbi->ll_lco.lco_flags);
1296                 RETURN(0);
1297         }
1298         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1299                 RETURN(1);
1300         if (lli->lli_flags & LLIF_CONTENDED) {
1301                 cfs_time_t cur_time = cfs_time_current();
1302                 cfs_time_t retry_time;
1303
1304                 retry_time = cfs_time_add(
1305                         lli->lli_contention_time,
1306                         cfs_time_seconds(sbi->ll_contention_time));
1307                 if (cfs_time_after(cur_time, retry_time)) {
1308                         ll_clear_file_contended(inode);
1309                         RETURN(0);
1310                 }
1311                 RETURN(1);
1312         }
1313         RETURN(0);
1314 }
1315
1316 static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
1317                                  const char *buf, size_t count,
1318                                  loff_t start, loff_t end, int rw)
1319 {
1320         int append;
1321         int tree_locked = 0;
1322         int rc;
1323         struct inode * inode = file->f_dentry->d_inode;
1324         ENTRY;
1325
1326         append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
1327
1328         if (append || !ll_is_file_contended(file)) {
1329                 struct ll_lock_tree_node *node;
1330                 int ast_flags;
1331
1332                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1333                 if (file->f_flags & O_NONBLOCK)
1334                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1335                 node = ll_node_from_inode(inode, start, end,
1336                                           (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
1337                 if (IS_ERR(node)) {
1338                         rc = PTR_ERR(node);
1339                         GOTO(out, rc);
1340                 }
1341                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1342                 rc = ll_tree_lock(tree, node, buf, count, ast_flags);
1343                 if (rc == 0)
1344                         tree_locked = 1;
1345                 else if (rc == -EUSERS)
1346                         ll_set_file_contended(inode);
1347                 else
1348                         GOTO(out, rc);
1349         }
1350         RETURN(tree_locked);
1351 out:
1352         return rc;
1353 }
1354
1355 /**
1356  * Checks if requested extent lock is compatible with a lock under a page.
1357  *
1358  * Checks if the lock under \a page is compatible with a read or write lock
1359  * (specified by \a rw) for an extent [\a start , \a end].
1360  *
1361  * \param page the page under which lock is considered
1362  * \param rw OBD_BRW_READ if requested for reading,
1363  *           OBD_BRW_WRITE if requested for writing
1364  * \param start start of the requested extent
1365  * \param end end of the requested extent
1366  * \param cookie transparent parameter for passing locking context
1367  *
1368  * \post result == 1, *cookie == context, appropriate lock is referenced or
1369  * \post result == 0
1370  *
1371  * \retval 1 owned lock is reused for the request
1372  * \retval 0 no lock reused for the request
1373  *
1374  * \see ll_release_short_lock
1375  */
1376 static int ll_reget_short_lock(struct page *page, int rw,
1377                                obd_off start, obd_off end,
1378                                void **cookie)
1379 {
1380         struct ll_async_page *llap;
1381         struct obd_export *exp;
1382         struct inode *inode = page->mapping->host;
1383
1384         ENTRY;
1385
1386         exp = ll_i2dtexp(inode);
1387         if (exp == NULL)
1388                 RETURN(0);
1389
1390         llap = llap_cast_private(page);
1391         if (llap == NULL)
1392                 RETURN(0);
1393
1394         RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
1395                                     &llap->llap_cookie, rw, start, end,
1396                                     cookie));
1397 }
1398
1399 /**
1400  * Releases a reference to a lock taken in a "fast" way.
1401  *
1402  * Releases a read or a write (specified by \a rw) lock
1403  * referenced by \a cookie.
1404  *
1405  * \param inode inode to which data belong
1406  * \param end end of the locked extent
1407  * \param rw OBD_BRW_READ if requested for reading,
1408  *           OBD_BRW_WRITE if requested for writing
1409  * \param cookie transparent parameter for passing locking context
1410  *
1411  * \post appropriate lock is dereferenced
1412  *
1413  * \see ll_reget_short_lock
1414  */
1415 static void ll_release_short_lock(struct inode *inode, obd_off end,
1416                                   void *cookie, int rw)
1417 {
1418         struct obd_export *exp;
1419         int rc;
1420
1421         exp = ll_i2dtexp(inode);
1422         if (exp == NULL)
1423                 return;
1424
1425         rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
1426                                     cookie, rw);
1427         if (rc < 0)
1428                 CERROR("unlock failed (%d)\n", rc);
1429 }
1430
1431 /**
1432  * Checks if requested extent lock is compatible
1433  * with a lock under a page in page cache.
1434  *
1435  * Checks if a lock under some \a page is compatible with a read or write lock
1436  * (specified by \a rw) for an extent [\a start , \a end].
1437  *
1438  * \param file the file under which lock is considered
1439  * \param rw OBD_BRW_READ if requested for reading,
1440  *           OBD_BRW_WRITE if requested for writing
1441  * \param ppos start of the requested extent
1442  * \param end end of the requested extent
1443  * \param cookie transparent parameter for passing locking context
1444  * \param buf userspace buffer for the data
1445  *
1446  * \post result == 1, *cookie == context, appropriate lock is referenced
1447  * \post retuls == 0
1448  *
1449  * \retval 1 owned lock is reused for the request
1450  * \retval 0 no lock reused for the request
1451  *
1452  * \see ll_file_put_fast_lock
1453  */
1454 static inline int ll_file_get_fast_lock(struct file *file,
1455                                         obd_off ppos, obd_off end,
1456                                         char *buf, void **cookie, int rw)
1457 {
1458         int rc = 0;
1459         struct page *page;
1460
1461         ENTRY;
1462
1463         if (!ll_region_mapped((unsigned long)buf, end - ppos)) {
1464                 page = find_lock_page(file->f_dentry->d_inode->i_mapping,
1465                                       ppos >> CFS_PAGE_SHIFT);
1466                 if (page) {
1467                         if (ll_reget_short_lock(page, rw, ppos, end, cookie))
1468                                 rc = 1;
1469
1470                         unlock_page(page);
1471                         page_cache_release(page);
1472                 }
1473         }
1474
1475         RETURN(rc);
1476 }
1477
1478 /**
1479  * Releases a reference to a lock taken in a "fast" way.
1480  *
1481  * Releases a read or a write (specified by \a rw) lock
1482  * referenced by \a cookie.
1483  *
1484  * \param inode inode to which data belong
1485  * \param end end of the locked extent
1486  * \param rw OBD_BRW_READ if requested for reading,
1487  *           OBD_BRW_WRITE if requested for writing
1488  * \param cookie transparent parameter for passing locking context
1489  *
1490  * \post appropriate lock is dereferenced
1491  *
1492  * \see ll_file_get_fast_lock
1493  */
1494 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
1495                                          void *cookie, int rw)
1496 {
1497         ll_release_short_lock(inode, end, cookie, rw);
1498 }
1499
1500 enum ll_lock_style {
1501         LL_LOCK_STYLE_NOLOCK   = 0,
1502         LL_LOCK_STYLE_FASTLOCK = 1,
1503         LL_LOCK_STYLE_TREELOCK = 2
1504 };
1505
1506 /**
1507  * Checks if requested extent lock is compatible with a lock
1508  * under a page cache page.
1509  *
1510  * Checks if the lock under \a page is compatible with a read or write lock
1511  * (specified by \a rw) for an extent [\a start , \a end].
1512  *
1513  * \param file file under which I/O is processed
1514  * \param rw OBD_BRW_READ if requested for reading,
1515  *           OBD_BRW_WRITE if requested for writing
1516  * \param ppos start of the requested extent
1517  * \param end end of the requested extent
1518  * \param cookie transparent parameter for passing locking context
1519  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1520  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1521  * \param buf userspace buffer for the data
1522  *
1523  * \retval LL_LOCK_STYLE_FASTLOCK owned lock is reused through fast lock
1524  * \retval LL_LOCK_STYLE_TREELOCK got a lock through tree lock
1525  * \retval LL_LOCK_STYLE_NOLOCK got no lock
1526  *
1527  * \see ll_file_put_lock
1528  */
1529 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
1530                                    obd_off end, char *buf, void **cookie,
1531                                    struct ll_lock_tree *tree, int rw)
1532 {
1533         int rc;
1534
1535         ENTRY;
1536
1537         if (ll_file_get_fast_lock(file, ppos, end, buf, cookie, rw))
1538                 RETURN(LL_LOCK_STYLE_FASTLOCK);
1539
1540         rc = ll_file_get_tree_lock(tree, file, buf, ppos - end, ppos, end, rw);
1541         /* rc: 1 for tree lock, 0 for no lock, <0 for error */
1542         switch (rc) {
1543         case 1:
1544                 RETURN(LL_LOCK_STYLE_TREELOCK);
1545         case 0:
1546                 RETURN(LL_LOCK_STYLE_NOLOCK);
1547         }
1548
1549         /* an error happened if we reached this point, rc = -errno here */
1550         RETURN(rc);
1551 }
1552
1553 /**
1554  * Drops the lock taken by ll_file_get_lock.
1555  *
1556  * Releases a read or a write (specified by \a rw) lock
1557  * referenced by \a tree or \a cookie.
1558  *
1559  * \param inode inode to which data belong
1560  * \param end end of the locked extent
1561  * \param lockstyle facility through which the lock was taken
1562  * \param rw OBD_BRW_READ if requested for reading,
1563  *           OBD_BRW_WRITE if requested for writing
1564  * \param cookie transparent parameter for passing locking context
1565  *           (only used with LL_LOCK_STYLE_FASTLOCK)
1566  * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
1567  *
1568  * \post appropriate lock is dereferenced
1569  *
1570  * \see ll_file_get_lock
1571  */
1572 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
1573                                     enum ll_lock_style lock_style,
1574                                     void *cookie, struct ll_lock_tree *tree,
1575                                     int rw)
1576
1577 {
1578         switch (lock_style) {
1579         case LL_LOCK_STYLE_TREELOCK:
1580                 ll_tree_unlock(tree);
1581                 break;
1582         case LL_LOCK_STYLE_FASTLOCK:
1583                 ll_file_put_fast_lock(inode, end, cookie, rw);
1584                 break;
1585         default:
1586                 CERROR("invalid locking style (%d)\n", lock_style);
1587         }
1588 }
1589
1590 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1591                             loff_t *ppos)
1592 {
1593         struct inode *inode = file->f_dentry->d_inode;
1594         struct ll_inode_info *lli = ll_i2info(inode);
1595         struct lov_stripe_md *lsm = lli->lli_smd;
1596         struct ll_sb_info *sbi = ll_i2sbi(inode);
1597         struct ll_lock_tree tree;
1598         struct ost_lvb lvb;
1599         struct ll_ra_read bead;
1600         int ra = 0;
1601         obd_off end;
1602         ssize_t retval, chunk, sum = 0;
1603         int lock_style;
1604         void *cookie;
1605
1606         __u64 kms;
1607         ENTRY;
1608         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1609                inode->i_ino, inode->i_generation, inode, count, *ppos);
1610         /* "If nbyte is 0, read() will return 0 and have no other results."
1611          *                      -- Single Unix Spec */
1612         if (count == 0)
1613                 RETURN(0);
1614
1615         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1616
1617         if (!lsm) {
1618                 /* Read on file with no objects should return zero-filled
1619                  * buffers up to file size (we can get non-zero sizes with
1620                  * mknod + truncate, then opening file for read. This is a
1621                  * common pattern in NFS case, it seems). Bug 6243 */
1622                 int notzeroed;
1623                 /* Since there are no objects on OSTs, we have nothing to get
1624                  * lock on and so we are forced to access inode->i_size
1625                  * unguarded */
1626
1627                 /* Read beyond end of file */
1628                 if (*ppos >= i_size_read(inode))
1629                         RETURN(0);
1630
1631                 if (count > i_size_read(inode) - *ppos)
1632                         count = i_size_read(inode) - *ppos;
1633                 /* Make sure to correctly adjust the file pos pointer for
1634                  * EFAULT case */
1635                 notzeroed = clear_user(buf, count);
1636                 count -= notzeroed;
1637                 *ppos += count;
1638                 if (!count)
1639                         RETURN(-EFAULT);
1640                 RETURN(count);
1641         }
1642 repeat:
1643         if (sbi->ll_max_rw_chunk != 0) {
1644                 /* first, let's know the end of the current stripe */
1645                 end = *ppos;
1646                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, &end);
1647
1648                 /* correct, the end is beyond the request */
1649                 if (end > *ppos + count - 1)
1650                         end = *ppos + count - 1;
1651
1652                 /* and chunk shouldn't be too large even if striping is wide */
1653                 if (end - *ppos > sbi->ll_max_rw_chunk)
1654                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1655         } else {
1656                 end = *ppos + count - 1;
1657         }
1658
1659         lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
1660                                       buf, &cookie, &tree, OBD_BRW_READ);
1661         if (lock_style < 0)
1662                 GOTO(out, retval = lock_style);
1663
1664         ll_inode_size_lock(inode, 1);
1665         /*
1666          * Consistency guarantees: following possibilities exist for the
1667          * relation between region being read and real file size at this
1668          * moment:
1669          *
1670          *  (A): the region is completely inside of the file;
1671          *
1672          *  (B-x): x bytes of region are inside of the file, the rest is
1673          *  outside;
1674          *
1675          *  (C): the region is completely outside of the file.
1676          *
1677          * This classification is stable under DLM lock acquired by
1678          * ll_tree_lock() above, because to change class, other client has to
1679          * take DLM lock conflicting with our lock. Also, any updates to
1680          * ->i_size by other threads on this client are serialized by
1681          * ll_inode_size_lock(). This guarantees that short reads are handled
1682          * correctly in the face of concurrent writes and truncates.
1683          */
1684         inode_init_lvb(inode, &lvb);
1685         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1686         kms = lvb.lvb_size;
1687         if (*ppos + count - 1 > kms) {
1688                 /* A glimpse is necessary to determine whether we return a
1689                  * short read (B) or some zeroes at the end of the buffer (C) */
1690                 ll_inode_size_unlock(inode, 1);
1691                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1692                 if (retval) {
1693                         if (lock_style != LL_LOCK_STYLE_NOLOCK)
1694                                 ll_file_put_lock(inode, end, lock_style,
1695                                                  cookie, &tree, OBD_BRW_READ);
1696                         goto out;
1697                 }
1698         } else {
1699                 /* region is within kms and, hence, within real file size (A).
1700                  * We need to increase i_size to cover the read region so that
1701                  * generic_file_read() will do its job, but that doesn't mean
1702                  * the kms size is _correct_, it is only the _minimum_ size.
1703                  * If someone does a stat they will get the correct size which
1704                  * will always be >= the kms value here.  b=11081 */
1705                 if (i_size_read(inode) < kms)
1706                         i_size_write(inode, kms);
1707                 ll_inode_size_unlock(inode, 1);
1708         }
1709
1710         chunk = end - *ppos + 1;
1711         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1712                inode->i_ino, chunk, *ppos, i_size_read(inode));
1713
1714         if (lock_style != LL_LOCK_STYLE_NOLOCK) {
1715                 /* turn off the kernel's read-ahead */
1716                 file->f_ra.ra_pages = 0;
1717
1718                 /* initialize read-ahead window once per syscall */
1719                 if (ra == 0) {
1720                         ra = 1;
1721                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1722                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1723                         ll_ra_read_in(file, &bead);
1724                 }
1725
1726                 /* BUG: 5972 */
1727                 file_accessed(file);
1728                 retval = generic_file_read(file, buf, chunk, ppos);
1729                 ll_file_put_lock(inode, end, lock_style, cookie, &tree,
1730                                  OBD_BRW_READ);
1731         } else {
1732                 retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
1733         }
1734
1735         ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
1736
1737         if (retval > 0) {
1738                 buf += retval;
1739                 count -= retval;
1740                 sum += retval;
1741                 if (retval == chunk && count > 0)
1742                         goto repeat;
1743         }
1744
1745  out:
1746         if (ra != 0)
1747                 ll_ra_read_ex(file, &bead);
1748         retval = (sum > 0) ? sum : retval;
1749         RETURN(retval);
1750 }
1751
1752 /*
1753  * Write to a file (through the page cache).
1754  */
1755 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1756                              loff_t *ppos)
1757 {
1758         struct inode *inode = file->f_dentry->d_inode;
1759         struct ll_sb_info *sbi = ll_i2sbi(inode);
1760         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1761         struct ll_lock_tree tree;
1762         loff_t maxbytes = ll_file_maxbytes(inode);
1763         loff_t lock_start, lock_end, end;
1764         ssize_t retval, chunk, sum = 0;
1765         int tree_locked;
1766         ENTRY;
1767
1768         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1769                inode->i_ino, inode->i_generation, inode, count, *ppos);
1770
1771         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1772
1773         /* POSIX, but surprised the VFS doesn't check this already */
1774         if (count == 0)
1775                 RETURN(0);
1776
1777         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1778          * called on the file, don't fail the below assertion (bug 2388). */
1779         if (file->f_flags & O_LOV_DELAY_CREATE &&
1780             ll_i2info(inode)->lli_smd == NULL)
1781                 RETURN(-EBADF);
1782
1783         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1784
1785         down(&ll_i2info(inode)->lli_write_sem);
1786
1787 repeat:
1788         chunk = 0; /* just to fix gcc's warning */
1789         end = *ppos + count - 1;
1790
1791         if (file->f_flags & O_APPEND) {
1792                 lock_start = 0;
1793                 lock_end = OBD_OBJECT_EOF;
1794         } else if (sbi->ll_max_rw_chunk != 0) {
1795                 /* first, let's know the end of the current stripe */
1796                 end = *ppos;
1797                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1798                                 (obd_off *)&end);
1799
1800                 /* correct, the end is beyond the request */
1801                 if (end > *ppos + count - 1)
1802                         end = *ppos + count - 1;
1803
1804                 /* and chunk shouldn't be too large even if striping is wide */
1805                 if (end - *ppos > sbi->ll_max_rw_chunk)
1806                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1807                 lock_start = *ppos;
1808                 lock_end = end;
1809         } else {
1810                 lock_start = *ppos;
1811                 lock_end = *ppos + count - 1;
1812         }
1813
1814         tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
1815                                             lock_start, lock_end, OBD_BRW_WRITE);
1816         if (tree_locked < 0)
1817                 GOTO(out, retval = tree_locked);
1818
1819         /* This is ok, g_f_w will overwrite this under i_sem if it races
1820          * with a local truncate, it just makes our maxbyte checking easier.
1821          * The i_size value gets updated in ll_extent_lock() as a consequence
1822          * of the [0,EOF] extent lock we requested above. */
1823         if (file->f_flags & O_APPEND) {
1824                 *ppos = i_size_read(inode);
1825                 end = *ppos + count - 1;
1826         }
1827
1828         if (*ppos >= maxbytes) {
1829                 send_sig(SIGXFSZ, current, 0);
1830                 GOTO(out_unlock, retval = -EFBIG);
1831         }
1832         if (end > maxbytes - 1)
1833                 end = maxbytes - 1;
1834
1835         /* generic_file_write handles O_APPEND after getting i_mutex */
1836         chunk = end - *ppos + 1;
1837         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1838                inode->i_ino, chunk, *ppos);
1839         if (tree_locked)
1840                 retval = generic_file_write(file, buf, chunk, ppos);
1841         else
1842                 retval = ll_file_lockless_io(file, (char*)buf, chunk,
1843                                              ppos, WRITE);
1844         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1845
1846 out_unlock:
1847         if (tree_locked)
1848                 ll_tree_unlock(&tree);
1849
1850 out:
1851         if (retval > 0) {
1852                 buf += retval;
1853                 count -= retval;
1854                 sum += retval;
1855                 if (retval == chunk && count > 0)
1856                         goto repeat;
1857         }
1858
1859         up(&ll_i2info(inode)->lli_write_sem);
1860
1861         retval = (sum > 0) ? sum : retval;
1862         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1863                            retval > 0 ? retval : 0);
1864         RETURN(retval);
1865 }
1866
1867 /*
1868  * Send file content (through pagecache) somewhere with helper
1869  */
1870 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1871                                 read_actor_t actor, void *target)
1872 {
1873         struct inode *inode = in_file->f_dentry->d_inode;
1874         struct ll_inode_info *lli = ll_i2info(inode);
1875         struct lov_stripe_md *lsm = lli->lli_smd;
1876         struct ll_lock_tree tree;
1877         struct ll_lock_tree_node *node;
1878         struct ost_lvb lvb;
1879         struct ll_ra_read bead;
1880         int rc;
1881         ssize_t retval;
1882         __u64 kms;
1883         ENTRY;
1884         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1885                inode->i_ino, inode->i_generation, inode, count, *ppos);
1886
1887         /* "If nbyte is 0, read() will return 0 and have no other results."
1888          *                      -- Single Unix Spec */
1889         if (count == 0)
1890                 RETURN(0);
1891
1892         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1893         /* turn off the kernel's read-ahead */
1894         in_file->f_ra.ra_pages = 0;
1895
1896         /* File with no objects, nothing to lock */
1897         if (!lsm)
1898                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1899
1900         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1901         if (IS_ERR(node))
1902                 RETURN(PTR_ERR(node));
1903
1904         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1905         rc = ll_tree_lock(&tree, node, NULL, count,
1906                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1907         if (rc != 0)
1908                 RETURN(rc);
1909
1910         ll_clear_file_contended(inode);
1911         ll_inode_size_lock(inode, 1);
1912         /*
1913          * Consistency guarantees: following possibilities exist for the
1914          * relation between region being read and real file size at this
1915          * moment:
1916          *
1917          *  (A): the region is completely inside of the file;
1918          *
1919          *  (B-x): x bytes of region are inside of the file, the rest is
1920          *  outside;
1921          *
1922          *  (C): the region is completely outside of the file.
1923          *
1924          * This classification is stable under DLM lock acquired by
1925          * ll_tree_lock() above, because to change class, other client has to
1926          * take DLM lock conflicting with our lock. Also, any updates to
1927          * ->i_size by other threads on this client are serialized by
1928          * ll_inode_size_lock(). This guarantees that short reads are handled
1929          * correctly in the face of concurrent writes and truncates.
1930          */
1931         inode_init_lvb(inode, &lvb);
1932         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1933         kms = lvb.lvb_size;
1934         if (*ppos + count - 1 > kms) {
1935                 /* A glimpse is necessary to determine whether we return a
1936                  * short read (B) or some zeroes at the end of the buffer (C) */
1937                 ll_inode_size_unlock(inode, 1);
1938                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1939                 if (retval)
1940                         goto out;
1941         } else {
1942                 /* region is within kms and, hence, within real file size (A) */
1943                 i_size_write(inode, kms);
1944                 ll_inode_size_unlock(inode, 1);
1945         }
1946
1947         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1948                inode->i_ino, count, *ppos, i_size_read(inode));
1949
1950         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1951         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1952         ll_ra_read_in(in_file, &bead);
1953         /* BUG: 5972 */
1954         file_accessed(in_file);
1955         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1956         ll_ra_read_ex(in_file, &bead);
1957
1958  out:
1959         ll_tree_unlock(&tree);
1960         RETURN(retval);
1961 }
1962
1963 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1964                                unsigned long arg)
1965 {
1966         struct ll_inode_info *lli = ll_i2info(inode);
1967         struct obd_export *exp = ll_i2dtexp(inode);
1968         struct ll_recreate_obj ucreatp;
1969         struct obd_trans_info oti = { 0 };
1970         struct obdo *oa = NULL;
1971         int lsm_size;
1972         int rc = 0;
1973         struct lov_stripe_md *lsm, *lsm2;
1974         ENTRY;
1975
1976         if (!capable (CAP_SYS_ADMIN))
1977                 RETURN(-EPERM);
1978
1979         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1980                             sizeof(struct ll_recreate_obj));
1981         if (rc) {
1982                 RETURN(-EFAULT);
1983         }
1984         OBDO_ALLOC(oa);
1985         if (oa == NULL)
1986                 RETURN(-ENOMEM);
1987
1988         down(&lli->lli_size_sem);
1989         lsm = lli->lli_smd;
1990         if (lsm == NULL)
1991                 GOTO(out, rc = -ENOENT);
1992         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1993                    (lsm->lsm_stripe_count));
1994
1995         OBD_ALLOC(lsm2, lsm_size);
1996         if (lsm2 == NULL)
1997                 GOTO(out, rc = -ENOMEM);
1998
1999         oa->o_id = ucreatp.lrc_id;
2000         oa->o_gr = ucreatp.lrc_group;
2001         oa->o_nlink = ucreatp.lrc_ost_idx;
2002         oa->o_flags |= OBD_FL_RECREATE_OBJS;
2003         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
2004         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2005                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2006
2007         memcpy(lsm2, lsm, lsm_size);
2008         rc = obd_create(exp, oa, &lsm2, &oti);
2009
2010         OBD_FREE(lsm2, lsm_size);
2011         GOTO(out, rc);
2012 out:
2013         up(&lli->lli_size_sem);
2014         OBDO_FREE(oa);
2015         return rc;
2016 }
2017
2018 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
2019                              int flags, struct lov_user_md *lum, int lum_size)
2020 {
2021         struct ll_inode_info *lli = ll_i2info(inode);
2022         struct lov_stripe_md *lsm;
2023         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
2024         int rc = 0;
2025         ENTRY;
2026
2027         down(&lli->lli_size_sem);
2028         lsm = lli->lli_smd;
2029         if (lsm) {
2030                 up(&lli->lli_size_sem);
2031                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
2032                        inode->i_ino);
2033                 RETURN(-EEXIST);
2034         }
2035
2036         rc = ll_intent_file_open(file, lum, lum_size, &oit);
2037         if (rc)
2038                 GOTO(out, rc);
2039         if (it_disposition(&oit, DISP_LOOKUP_NEG))
2040                 GOTO(out_req_free, rc = -ENOENT);
2041         rc = oit.d.lustre.it_status;
2042         if (rc < 0)
2043                 GOTO(out_req_free, rc);
2044
2045         ll_release_openhandle(file->f_dentry, &oit);
2046
2047  out:
2048         up(&lli->lli_size_sem);
2049         ll_intent_release(&oit);
2050         RETURN(rc);
2051 out_req_free:
2052         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2053         goto out;
2054 }
2055
2056 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
2057                              struct lov_mds_md **lmmp, int *lmm_size,
2058                              struct ptlrpc_request **request)
2059 {
2060         struct ll_sb_info *sbi = ll_i2sbi(inode);
2061         struct mdt_body  *body;
2062         struct lov_mds_md *lmm = NULL;
2063         struct ptlrpc_request *req = NULL;
2064         struct obd_capa *oc;
2065         int rc, lmmsize;
2066
2067         rc = ll_get_max_mdsize(sbi, &lmmsize);
2068         if (rc)
2069                 RETURN(rc);
2070
2071         oc = ll_mdscapa_get(inode);
2072         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
2073                              oc, filename, strlen(filename) + 1,
2074                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
2075                              ll_i2suppgid(inode), &req);
2076         capa_put(oc);
2077         if (rc < 0) {
2078                 CDEBUG(D_INFO, "md_getattr_name failed "
2079                        "on %s: rc %d\n", filename, rc);
2080                 GOTO(out, rc);
2081         }
2082
2083         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2084         LASSERT(body != NULL); /* checked by mdc_getattr_name */
2085
2086         lmmsize = body->eadatasize;
2087
2088         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
2089                         lmmsize == 0) {
2090                 GOTO(out, rc = -ENODATA);
2091         }
2092
2093         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
2094         LASSERT(lmm != NULL);
2095
2096         /*
2097          * This is coming from the MDS, so is probably in
2098          * little endian.  We convert it to host endian before
2099          * passing it to userspace.
2100          */
2101         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
2102                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
2103                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
2104         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
2105                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
2106         }
2107
2108         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
2109                 struct lov_stripe_md *lsm;
2110                 struct lov_user_md_join *lmj;
2111                 int lmj_size, i, aindex = 0;
2112
2113                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
2114                 if (rc < 0)
2115                         GOTO(out, rc = -ENOMEM);
2116                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
2117                 if (rc)
2118                         GOTO(out_free_memmd, rc);
2119
2120                 lmj_size = sizeof(struct lov_user_md_join) +
2121                            lsm->lsm_stripe_count *
2122                            sizeof(struct lov_user_ost_data_join);
2123                 OBD_ALLOC(lmj, lmj_size);
2124                 if (!lmj)
2125                         GOTO(out_free_memmd, rc = -ENOMEM);
2126
2127                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
2128                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2129                         struct lov_extent *lex =
2130                                 &lsm->lsm_array->lai_ext_array[aindex];
2131
2132                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
2133                                 aindex ++;
2134                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
2135                                         LPU64" len %d\n", aindex, i,
2136                                         lex->le_start, (int)lex->le_len);
2137                         lmj->lmm_objects[i].l_extent_start =
2138                                 lex->le_start;
2139
2140                         if ((int)lex->le_len == -1)
2141                                 lmj->lmm_objects[i].l_extent_end = -1;
2142                         else
2143                                 lmj->lmm_objects[i].l_extent_end =
2144                                         lex->le_start + lex->le_len;
2145                         lmj->lmm_objects[i].l_object_id =
2146                                 lsm->lsm_oinfo[i]->loi_id;
2147                         lmj->lmm_objects[i].l_object_gr =
2148                                 lsm->lsm_oinfo[i]->loi_gr;
2149                         lmj->lmm_objects[i].l_ost_gen =
2150                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2151                         lmj->lmm_objects[i].l_ost_idx =
2152                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2153                 }
2154                 lmm = (struct lov_mds_md *)lmj;
2155                 lmmsize = lmj_size;
2156 out_free_memmd:
2157                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
2158         }
2159 out:
2160         *lmmp = lmm;
2161         *lmm_size = lmmsize;
2162         *request = req;
2163         return rc;
2164 }
2165
2166 static int ll_lov_setea(struct inode *inode, struct file *file,
2167                             unsigned long arg)
2168 {
2169         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2170         struct lov_user_md  *lump;
2171         int lum_size = sizeof(struct lov_user_md) +
2172                        sizeof(struct lov_user_ost_data);
2173         int rc;
2174         ENTRY;
2175
2176         if (!capable (CAP_SYS_ADMIN))
2177                 RETURN(-EPERM);
2178
2179         OBD_ALLOC(lump, lum_size);
2180         if (lump == NULL) {
2181                 RETURN(-ENOMEM);
2182         }
2183         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2184         if (rc) {
2185                 OBD_FREE(lump, lum_size);
2186                 RETURN(-EFAULT);
2187         }
2188
2189         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2190
2191         OBD_FREE(lump, lum_size);
2192         RETURN(rc);
2193 }
2194
2195 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2196                             unsigned long arg)
2197 {
2198         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
2199         int rc;
2200         int flags = FMODE_WRITE;
2201         ENTRY;
2202
2203         /* Bug 1152: copy properly when this is no longer true */
2204         LASSERT(sizeof(lum) == sizeof(*lump));
2205         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
2206         rc = copy_from_user(&lum, lump, sizeof(lum));
2207         if (rc)
2208                 RETURN(-EFAULT);
2209
2210         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
2211         if (rc == 0) {
2212                  put_user(0, &lump->lmm_stripe_count);
2213                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
2214                                     0, ll_i2info(inode)->lli_smd, lump);
2215         }
2216         RETURN(rc);
2217 }
2218
2219 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2220 {
2221         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2222
2223         if (!lsm)
2224                 RETURN(-ENODATA);
2225
2226         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
2227                             (void *)arg);
2228 }
2229
2230 static int ll_get_grouplock(struct inode *inode, struct file *file,
2231                             unsigned long arg)
2232 {
2233         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2234         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2235                                                     .end = OBD_OBJECT_EOF}};
2236         struct lustre_handle lockh = { 0 };
2237         struct ll_inode_info *lli = ll_i2info(inode);
2238         struct lov_stripe_md *lsm = lli->lli_smd;
2239         int flags = 0, rc;
2240         ENTRY;
2241
2242         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2243                 RETURN(-EINVAL);
2244         }
2245
2246         policy.l_extent.gid = arg;
2247         if (file->f_flags & O_NONBLOCK)
2248                 flags = LDLM_FL_BLOCK_NOWAIT;
2249
2250         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2251         if (rc)
2252                 RETURN(rc);
2253
2254         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2255         fd->fd_gid = arg;
2256         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2257
2258         RETURN(0);
2259 }
2260
2261 static int ll_put_grouplock(struct inode *inode, struct file *file,
2262                             unsigned long arg)
2263 {
2264         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2265         struct ll_inode_info *lli = ll_i2info(inode);
2266         struct lov_stripe_md *lsm = lli->lli_smd;
2267         int rc;
2268         ENTRY;
2269
2270         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2271                 /* Ugh, it's already unlocked. */
2272                 RETURN(-EINVAL);
2273         }
2274
2275         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2276                 RETURN(-EINVAL);
2277
2278         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2279
2280         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2281         if (rc)
2282                 RETURN(rc);
2283
2284         fd->fd_gid = 0;
2285         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2286
2287         RETURN(0);
2288 }
2289
2290 static int join_sanity_check(struct inode *head, struct inode *tail)
2291 {
2292         ENTRY;
2293         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2294                 CERROR("server do not support join \n");
2295                 RETURN(-EINVAL);
2296         }
2297         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2298                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2299                        head->i_ino, tail->i_ino);
2300                 RETURN(-EINVAL);
2301         }
2302         if (head->i_ino == tail->i_ino) {
2303                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2304                 RETURN(-EINVAL);
2305         }
2306         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2307                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2308                 RETURN(-EINVAL);
2309         }
2310         RETURN(0);
2311 }
2312
2313 static int join_file(struct inode *head_inode, struct file *head_filp,
2314                      struct file *tail_filp)
2315 {
2316         struct dentry *tail_dentry = tail_filp->f_dentry;
2317         struct lookup_intent oit = {.it_op = IT_OPEN,
2318                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2319         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2320                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2321
2322         struct lustre_handle lockh;
2323         struct md_op_data *op_data;
2324         int    rc;
2325         loff_t data;
2326         ENTRY;
2327
2328         tail_dentry = tail_filp->f_dentry;
2329
2330         data = i_size_read(head_inode);
2331         op_data = ll_prep_md_op_data(NULL, head_inode,
2332                                      tail_dentry->d_parent->d_inode,
2333                                      tail_dentry->d_name.name,
2334                                      tail_dentry->d_name.len, 0,
2335                                      LUSTRE_OPC_ANY, &data);
2336         if (IS_ERR(op_data))
2337                 RETURN(PTR_ERR(op_data));
2338
2339         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit,
2340                          op_data, &lockh, NULL, 0, NULL, 0);
2341
2342         ll_finish_md_op_data(op_data);
2343         if (rc < 0)
2344                 GOTO(out, rc);
2345
2346         rc = oit.d.lustre.it_status;
2347
2348         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2349                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2350                 ptlrpc_req_finished((struct ptlrpc_request *)
2351                                     oit.d.lustre.it_data);
2352                 GOTO(out, rc);
2353         }
2354
2355         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2356                                            * away */
2357                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2358                 oit.d.lustre.it_lock_mode = 0;
2359         }
2360         ll_release_openhandle(head_filp->f_dentry, &oit);
2361 out:
2362         ll_intent_release(&oit);
2363         RETURN(rc);
2364 }
2365
2366 static int ll_file_join(struct inode *head, struct file *filp,
2367                         char *filename_tail)
2368 {
2369         struct inode *tail = NULL, *first = NULL, *second = NULL;
2370         struct dentry *tail_dentry;
2371         struct file *tail_filp, *first_filp, *second_filp;
2372         struct ll_lock_tree first_tree, second_tree;
2373         struct ll_lock_tree_node *first_node, *second_node;
2374         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2375         int rc = 0, cleanup_phase = 0;
2376         ENTRY;
2377
2378         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2379                head->i_ino, head->i_generation, head, filename_tail);
2380
2381         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2382         if (IS_ERR(tail_filp)) {
2383                 CERROR("Can not open tail file %s", filename_tail);
2384                 rc = PTR_ERR(tail_filp);
2385                 GOTO(cleanup, rc);
2386         }
2387         tail = igrab(tail_filp->f_dentry->d_inode);
2388
2389         tlli = ll_i2info(tail);
2390         tail_dentry = tail_filp->f_dentry;
2391         LASSERT(tail_dentry);
2392         cleanup_phase = 1;
2393
2394         /*reorder the inode for lock sequence*/
2395         first = head->i_ino > tail->i_ino ? head : tail;
2396         second = head->i_ino > tail->i_ino ? tail : head;
2397         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2398         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2399
2400         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2401                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2402         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2403         if (IS_ERR(first_node)){
2404                 rc = PTR_ERR(first_node);
2405                 GOTO(cleanup, rc);
2406         }
2407         first_tree.lt_fd = first_filp->private_data;
2408         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2409         if (rc != 0)
2410                 GOTO(cleanup, rc);
2411         cleanup_phase = 2;
2412
2413         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2414         if (IS_ERR(second_node)){
2415                 rc = PTR_ERR(second_node);
2416                 GOTO(cleanup, rc);
2417         }
2418         second_tree.lt_fd = second_filp->private_data;
2419         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2420         if (rc != 0)
2421                 GOTO(cleanup, rc);
2422         cleanup_phase = 3;
2423
2424         rc = join_sanity_check(head, tail);
2425         if (rc)
2426                 GOTO(cleanup, rc);
2427
2428         rc = join_file(head, filp, tail_filp);
2429         if (rc)
2430                 GOTO(cleanup, rc);
2431 cleanup:
2432         switch (cleanup_phase) {
2433         case 3:
2434                 ll_tree_unlock(&second_tree);
2435                 obd_cancel_unused(ll_i2dtexp(second),
2436                                   ll_i2info(second)->lli_smd, 0, NULL);
2437         case 2:
2438                 ll_tree_unlock(&first_tree);
2439                 obd_cancel_unused(ll_i2dtexp(first),
2440                                   ll_i2info(first)->lli_smd, 0, NULL);
2441         case 1:
2442                 filp_close(tail_filp, 0);
2443                 if (tail)
2444                         iput(tail);
2445                 if (head && rc == 0) {
2446                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2447                                        &hlli->lli_smd);
2448                         hlli->lli_smd = NULL;
2449                 }
2450         case 0:
2451                 break;
2452         default:
2453                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2454                 LBUG();
2455         }
2456         RETURN(rc);
2457 }
2458
2459 /**
2460  * Close inode open handle
2461  *
2462  * \param dentry [in]     dentry which contains the inode
2463  * \param it     [in,out] intent which contains open info and result
2464  *
2465  * \retval 0     success
2466  * \retval <0    failure
2467  */
2468 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2469 {
2470         struct inode *inode = dentry->d_inode;
2471         struct obd_client_handle *och;
2472         int rc;
2473         ENTRY;
2474
2475         LASSERT(inode);
2476
2477         /* Root ? Do nothing. */
2478         if (dentry->d_inode->i_sb->s_root == dentry)
2479                 RETURN(0);
2480
2481         /* No open handle to close? Move away */
2482         if (!it_disposition(it, DISP_OPEN_OPEN))
2483                 RETURN(0);
2484
2485         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2486
2487         OBD_ALLOC(och, sizeof(*och));
2488         if (!och)
2489                 GOTO(out, rc = -ENOMEM);
2490
2491         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2492                     ll_i2info(inode), it, och);
2493
2494         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2495                                        inode, och);
2496  out:
2497         /* this one is in place of ll_file_open */
2498         ptlrpc_req_finished(it->d.lustre.it_data);
2499         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2500         RETURN(rc);
2501 }
2502
2503 /**
2504  * Get size for inode for which FIEMAP mapping is requested.
2505  * Make the FIEMAP get_info call and returns the result.
2506  */
2507 int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
2508               int num_bytes)
2509 {
2510         struct obd_export *exp = ll_i2dtexp(inode);
2511         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2512         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
2513         int vallen = num_bytes;
2514         int rc;
2515         ENTRY;
2516
2517         /* If the stripe_count > 1 and the application does not understand
2518          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
2519          */
2520         if (lsm->lsm_stripe_count > 1 &&
2521             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
2522                 return -EOPNOTSUPP;
2523
2524         fm_key.oa.o_id = lsm->lsm_object_id;
2525         fm_key.oa.o_gr = lsm->lsm_object_gr;
2526         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2527
2528         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLFID | OBD_MD_FLGROUP |
2529                         OBD_MD_FLSIZE);
2530
2531         /* If filesize is 0, then there would be no objects for mapping */
2532         if (fm_key.oa.o_size == 0) {
2533                 fiemap->fm_mapped_extents = 0;
2534                 RETURN(0);
2535         }
2536
2537         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
2538
2539         rc = obd_get_info(exp, sizeof(fm_key), &fm_key, &vallen, fiemap, lsm);
2540         if (rc)
2541                 CERROR("obd_get_info failed: rc = %d\n", rc);
2542
2543         RETURN(rc);
2544 }
2545
2546 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2547                   unsigned long arg)
2548 {
2549         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2550         int flags;
2551         ENTRY;
2552
2553         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2554                inode->i_generation, inode, cmd);
2555         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2556
2557         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2558         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2559                 RETURN(-ENOTTY);
2560
2561         switch(cmd) {
2562         case LL_IOC_GETFLAGS:
2563                 /* Get the current value of the file flags */
2564                 return put_user(fd->fd_flags, (int *)arg);
2565         case LL_IOC_SETFLAGS:
2566         case LL_IOC_CLRFLAGS:
2567                 /* Set or clear specific file flags */
2568                 /* XXX This probably needs checks to ensure the flags are
2569                  *     not abused, and to handle any flag side effects.
2570                  */
2571                 if (get_user(flags, (int *) arg))
2572                         RETURN(-EFAULT);
2573
2574                 if (cmd == LL_IOC_SETFLAGS) {
2575                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2576                             !(file->f_flags & O_DIRECT)) {
2577                                 CERROR("%s: unable to disable locking on "
2578                                        "non-O_DIRECT file\n", current->comm);
2579                                 RETURN(-EINVAL);
2580                         }
2581
2582                         fd->fd_flags |= flags;
2583                 } else {
2584                         fd->fd_flags &= ~flags;
2585                 }
2586                 RETURN(0);
2587         case LL_IOC_LOV_SETSTRIPE:
2588                 RETURN(ll_lov_setstripe(inode, file, arg));
2589         case LL_IOC_LOV_SETEA:
2590                 RETURN(ll_lov_setea(inode, file, arg));
2591         case LL_IOC_LOV_GETSTRIPE:
2592                 RETURN(ll_lov_getstripe(inode, arg));
2593         case LL_IOC_RECREATE_OBJ:
2594                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2595         case EXT3_IOC_FIEMAP: {
2596                 struct ll_user_fiemap *fiemap_s;
2597                 size_t num_bytes, ret_bytes;
2598                 unsigned int extent_count;
2599                 int rc = 0;
2600
2601                 /* Get the extent count so we can calculate the size of
2602                  * required fiemap buffer */
2603                 if (get_user(extent_count,
2604                     &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
2605                         RETURN(-EFAULT);
2606                 num_bytes = sizeof(*fiemap_s) + (extent_count *
2607                                                  sizeof(struct ll_fiemap_extent));
2608                 OBD_VMALLOC(fiemap_s, num_bytes);
2609                 if (fiemap_s == NULL)
2610                         RETURN(-ENOMEM);
2611
2612                 if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
2613                                    sizeof(*fiemap_s)))
2614                         GOTO(error, rc = -EFAULT);
2615
2616                 if (fiemap_s->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
2617                         fiemap_s->fm_flags = fiemap_s->fm_flags &
2618                                                     ~LUSTRE_FIEMAP_FLAGS_COMPAT;
2619                         if (copy_to_user((char *)arg, fiemap_s,
2620                                          sizeof(*fiemap_s)))
2621                                 GOTO(error, rc = -EFAULT);
2622
2623                         GOTO(error, rc = -EBADR);
2624                 }
2625
2626                 /* If fm_extent_count is non-zero, read the first extent since
2627                  * it is used to calculate end_offset and device from previous
2628                  * fiemap call. */
2629                 if (extent_count) {
2630                         if (copy_from_user(&fiemap_s->fm_extents[0],
2631                             (char __user *)arg + sizeof(*fiemap_s),
2632                             sizeof(struct ll_fiemap_extent)))
2633                                 GOTO(error, rc = -EFAULT);
2634                 }
2635
2636                 if (fiemap_s->fm_flags & FIEMAP_FLAG_SYNC) {
2637                         int rc;
2638
2639                         rc = filemap_fdatawrite(inode->i_mapping);
2640                         if (rc)
2641                                 GOTO(error, rc);
2642                 }
2643
2644                 rc = ll_fiemap(inode, fiemap_s, num_bytes);
2645                 if (rc)
2646                         GOTO(error, rc);
2647
2648                 ret_bytes = sizeof(struct ll_user_fiemap);
2649
2650                 if (extent_count != 0)
2651                         ret_bytes += (fiemap_s->fm_mapped_extents *
2652                                          sizeof(struct ll_fiemap_extent));
2653
2654                 if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
2655                         rc = -EFAULT;
2656
2657 error:
2658                 OBD_VFREE(fiemap_s, num_bytes);
2659                 RETURN(rc);
2660         }
2661         case EXT3_IOC_GETFLAGS:
2662         case EXT3_IOC_SETFLAGS:
2663                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2664         case EXT3_IOC_GETVERSION_OLD:
2665         case EXT3_IOC_GETVERSION:
2666                 RETURN(put_user(inode->i_generation, (int *)arg));
2667         case LL_IOC_JOIN: {
2668                 char *ftail;
2669                 int rc;
2670
2671                 ftail = getname((const char *)arg);
2672                 if (IS_ERR(ftail))
2673                         RETURN(PTR_ERR(ftail));
2674                 rc = ll_file_join(inode, file, ftail);
2675                 putname(ftail);
2676                 RETURN(rc);
2677         }
2678         case LL_IOC_GROUP_LOCK:
2679                 RETURN(ll_get_grouplock(inode, file, arg));
2680         case LL_IOC_GROUP_UNLOCK:
2681                 RETURN(ll_put_grouplock(inode, file, arg));
2682         case IOC_OBD_STATFS:
2683                 RETURN(ll_obd_statfs(inode, (void *)arg));
2684
2685         /* We need to special case any other ioctls we want to handle,
2686          * to send them to the MDS/OST as appropriate and to properly
2687          * network encode the arg field.
2688         case EXT3_IOC_SETVERSION_OLD:
2689         case EXT3_IOC_SETVERSION:
2690         */
2691         case LL_IOC_FLUSHCTX:
2692                 RETURN(ll_flush_ctx(inode));
2693         default: {
2694                 int err;
2695
2696                 if (LLIOC_STOP ==
2697                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2698                         RETURN(err);
2699
2700                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2701                                      (void *)arg));
2702         }
2703         }
2704 }
2705
2706 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2707 {
2708         struct inode *inode = file->f_dentry->d_inode;
2709         struct ll_inode_info *lli = ll_i2info(inode);
2710         struct lov_stripe_md *lsm = lli->lli_smd;
2711         loff_t retval;
2712         ENTRY;
2713         retval = offset + ((origin == 2) ? i_size_read(inode) :
2714                            (origin == 1) ? file->f_pos : 0);
2715         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2716                inode->i_ino, inode->i_generation, inode, retval, retval,
2717                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2718         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2719
2720         if (origin == 2) { /* SEEK_END */
2721                 int nonblock = 0, rc;
2722
2723                 if (file->f_flags & O_NONBLOCK)
2724                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2725
2726                 if (lsm != NULL) {
2727                         rc = ll_glimpse_size(inode, nonblock);
2728                         if (rc != 0)
2729                                 RETURN(rc);
2730                 }
2731
2732                 ll_inode_size_lock(inode, 0);
2733                 offset += i_size_read(inode);
2734                 ll_inode_size_unlock(inode, 0);
2735         } else if (origin == 1) { /* SEEK_CUR */
2736                 offset += file->f_pos;
2737         }
2738
2739         retval = -EINVAL;
2740         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2741                 if (offset != file->f_pos) {
2742                         file->f_pos = offset;
2743 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2744                         file->f_reada = 0;
2745                         file->f_version = ++event;
2746 #endif
2747                 }
2748                 retval = offset;
2749         }
2750
2751         RETURN(retval);
2752 }
2753
2754 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2755 {
2756         struct inode *inode = dentry->d_inode;
2757         struct ll_inode_info *lli = ll_i2info(inode);
2758         struct lov_stripe_md *lsm = lli->lli_smd;
2759         struct ptlrpc_request *req;
2760         struct obd_capa *oc;
2761         int rc, err;
2762         ENTRY;
2763         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2764                inode->i_generation, inode);
2765         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2766
2767         /* fsync's caller has already called _fdata{sync,write}, we want
2768          * that IO to finish before calling the osc and mdc sync methods */
2769         rc = filemap_fdatawait(inode->i_mapping);
2770
2771         /* catch async errors that were recorded back when async writeback
2772          * failed for pages in this mapping. */
2773         err = lli->lli_async_rc;
2774         lli->lli_async_rc = 0;
2775         if (rc == 0)
2776                 rc = err;
2777         if (lsm) {
2778                 err = lov_test_and_clear_async_rc(lsm);
2779                 if (rc == 0)
2780                         rc = err;
2781         }
2782
2783         oc = ll_mdscapa_get(inode);
2784         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2785                       &req);
2786         capa_put(oc);
2787         if (!rc)
2788                 rc = err;
2789         if (!err)
2790                 ptlrpc_req_finished(req);
2791
2792         if (data && lsm) {
2793                 struct obdo *oa;
2794
2795                 OBDO_ALLOC(oa);
2796                 if (!oa)
2797                         RETURN(rc ? rc : -ENOMEM);
2798
2799                 oa->o_id = lsm->lsm_object_id;
2800                 oa->o_gr = lsm->lsm_object_gr;
2801                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2802                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2803                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2804                                            OBD_MD_FLGROUP);
2805
2806                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2807                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2808                                0, OBD_OBJECT_EOF, oc);
2809                 capa_put(oc);
2810                 if (!rc)
2811                         rc = err;
2812                 OBDO_FREE(oa);
2813         }
2814
2815         RETURN(rc);
2816 }
2817
2818 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2819 {
2820         struct inode *inode = file->f_dentry->d_inode;
2821         struct ll_sb_info *sbi = ll_i2sbi(inode);
2822         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
2823                                            .ei_cb_cp =ldlm_flock_completion_ast,
2824                                            .ei_cbdata = file_lock };
2825         struct md_op_data *op_data;
2826         struct lustre_handle lockh = {0};
2827         ldlm_policy_data_t flock;
2828         int flags = 0;
2829         int rc;
2830         ENTRY;
2831
2832         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2833                inode->i_ino, file_lock);
2834
2835         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2836
2837         if (file_lock->fl_flags & FL_FLOCK) {
2838                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2839                 /* set missing params for flock() calls */
2840                 file_lock->fl_end = OFFSET_MAX;
2841                 file_lock->fl_pid = current->tgid;
2842         }
2843         flock.l_flock.pid = file_lock->fl_pid;
2844         flock.l_flock.start = file_lock->fl_start;
2845         flock.l_flock.end = file_lock->fl_end;
2846
2847         switch (file_lock->fl_type) {
2848         case F_RDLCK:
2849                 einfo.ei_mode = LCK_PR;
2850                 break;
2851         case F_UNLCK:
2852                 /* An unlock request may or may not have any relation to
2853                  * existing locks so we may not be able to pass a lock handle
2854                  * via a normal ldlm_lock_cancel() request. The request may even
2855                  * unlock a byte range in the middle of an existing lock. In
2856                  * order to process an unlock request we need all of the same
2857                  * information that is given with a normal read or write record
2858                  * lock request. To avoid creating another ldlm unlock (cancel)
2859                  * message we'll treat a LCK_NL flock request as an unlock. */
2860                 einfo.ei_mode = LCK_NL;
2861                 break;
2862         case F_WRLCK:
2863                 einfo.ei_mode = LCK_PW;
2864                 break;
2865         default:
2866                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2867                 LBUG();
2868         }
2869
2870         switch (cmd) {
2871         case F_SETLKW:
2872 #ifdef F_SETLKW64
2873         case F_SETLKW64:
2874 #endif
2875                 flags = 0;
2876                 break;
2877         case F_SETLK:
2878 #ifdef F_SETLK64
2879         case F_SETLK64:
2880 #endif
2881                 flags = LDLM_FL_BLOCK_NOWAIT;
2882                 break;
2883         case F_GETLK:
2884 #ifdef F_GETLK64
2885         case F_GETLK64:
2886 #endif
2887                 flags = LDLM_FL_TEST_LOCK;
2888                 /* Save the old mode so that if the mode in the lock changes we
2889                  * can decrement the appropriate reader or writer refcount. */
2890                 file_lock->fl_type = einfo.ei_mode;
2891                 break;
2892         default:
2893                 CERROR("unknown fcntl lock command: %d\n", cmd);
2894                 LBUG();
2895         }
2896
2897         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2898                                      LUSTRE_OPC_ANY, NULL);
2899         if (IS_ERR(op_data))
2900                 RETURN(PTR_ERR(op_data));
2901
2902         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2903                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2904                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2905
2906         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2907                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2908
2909         ll_finish_md_op_data(op_data);
2910
2911         if ((file_lock->fl_flags & FL_FLOCK) &&
2912             (rc == 0 || file_lock->fl_type == F_UNLCK))
2913                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2914 #ifdef HAVE_F_OP_FLOCK
2915         if ((file_lock->fl_flags & FL_POSIX) &&
2916             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2917             !(flags & LDLM_FL_TEST_LOCK))
2918                 posix_lock_file_wait(file, file_lock);
2919 #endif
2920
2921         RETURN(rc);
2922 }
2923
2924 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2925 {
2926         ENTRY;
2927
2928         RETURN(-ENOSYS);
2929 }
2930
2931 int ll_have_md_lock(struct inode *inode, __u64 bits)
2932 {
2933         struct lustre_handle lockh;
2934         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2935         struct lu_fid *fid;
2936         int flags;
2937         ENTRY;
2938
2939         if (!inode)
2940                RETURN(0);
2941
2942         fid = &ll_i2info(inode)->lli_fid;
2943         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2944
2945         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2946         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2947                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2948                 RETURN(1);
2949         }
2950         RETURN(0);
2951 }
2952
2953 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2954                             struct lustre_handle *lockh)
2955 {
2956         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2957         struct lu_fid *fid;
2958         ldlm_mode_t rc;
2959         int flags;
2960         ENTRY;
2961
2962         fid = &ll_i2info(inode)->lli_fid;
2963         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2964
2965         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2966         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2967                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2968         RETURN(rc);
2969 }
2970
2971 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2972         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2973                               * and return success */
2974                 inode->i_nlink = 0;
2975                 /* This path cannot be hit for regular files unless in
2976                  * case of obscure races, so no need to to validate
2977                  * size. */
2978                 if (!S_ISREG(inode->i_mode) &&
2979                     !S_ISDIR(inode->i_mode))
2980                         return 0;
2981         }
2982
2983         if (rc) {
2984                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2985                 return -abs(rc);
2986
2987         }
2988
2989         return 0;
2990 }
2991
2992 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2993 {
2994         struct inode *inode = dentry->d_inode;
2995         struct ptlrpc_request *req = NULL;
2996         struct ll_sb_info *sbi;
2997         struct obd_export *exp;
2998         int rc;
2999         ENTRY;
3000
3001         if (!inode) {
3002                 CERROR("REPORT THIS LINE TO PETER\n");
3003                 RETURN(0);
3004         }
3005         sbi = ll_i2sbi(inode);
3006
3007         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
3008                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
3009
3010         exp = ll_i2mdexp(inode);
3011
3012         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
3013                 struct lookup_intent oit = { .it_op = IT_GETATTR };
3014                 struct md_op_data *op_data;
3015
3016                 /* Call getattr by fid, so do not provide name at all. */
3017                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
3018                                              dentry->d_inode, NULL, 0, 0,
3019                                              LUSTRE_OPC_ANY, NULL);
3020                 if (IS_ERR(op_data))
3021                         RETURN(PTR_ERR(op_data));
3022
3023                 oit.it_flags |= O_CHECK_STALE;
3024                 rc = md_intent_lock(exp, op_data, NULL, 0,
3025                                     /* we are not interested in name
3026                                        based lookup */
3027                                     &oit, 0, &req,
3028                                     ll_md_blocking_ast, 0);
3029                 ll_finish_md_op_data(op_data);
3030                 oit.it_flags &= ~O_CHECK_STALE;
3031                 if (rc < 0) {
3032                         rc = ll_inode_revalidate_fini(inode, rc);
3033                         GOTO (out, rc);
3034                 }
3035
3036                 rc = ll_revalidate_it_finish(req, &oit, dentry);
3037                 if (rc != 0) {
3038                         ll_intent_release(&oit);
3039                         GOTO(out, rc);
3040                 }
3041
3042                 /* Unlinked? Unhash dentry, so it is not picked up later by
3043                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
3044                    here to preserve get_cwd functionality on 2.6.
3045                    Bug 10503 */
3046                 if (!dentry->d_inode->i_nlink) {
3047                         spin_lock(&dcache_lock);
3048                         ll_drop_dentry(dentry);
3049                         spin_unlock(&dcache_lock);
3050                 }
3051
3052                 ll_lookup_finish_locks(&oit, dentry);
3053         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
3054                                                      MDS_INODELOCK_LOOKUP)) {
3055                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
3056                 obd_valid valid = OBD_MD_FLGETATTR;
3057                 struct obd_capa *oc;
3058                 int ealen = 0;
3059
3060                 if (S_ISREG(inode->i_mode)) {
3061                         rc = ll_get_max_mdsize(sbi, &ealen);
3062                         if (rc)
3063                                 RETURN(rc);
3064                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
3065                 }
3066                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
3067                  * capa for this inode. Because we only keep capas of dirs
3068                  * fresh. */
3069                 oc = ll_mdscapa_get(inode);
3070                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
3071                                 ealen, &req);
3072                 capa_put(oc);
3073                 if (rc) {
3074                         rc = ll_inode_revalidate_fini(inode, rc);
3075                         RETURN(rc);
3076                 }
3077
3078                 rc = ll_prep_inode(&inode, req, NULL);
3079                 if (rc)
3080                         GOTO(out, rc);
3081         }
3082
3083         /* if object not yet allocated, don't validate size */
3084         if (ll_i2info(inode)->lli_smd == NULL)
3085                 GOTO(out, rc = 0);
3086
3087         /* ll_glimpse_size will prefer locally cached writes if they extend
3088          * the file */
3089         rc = ll_glimpse_size(inode, 0);
3090         EXIT;
3091 out:
3092         ptlrpc_req_finished(req);
3093         return rc;
3094 }
3095
3096 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
3097                   struct lookup_intent *it, struct kstat *stat)
3098 {
3099         struct inode *inode = de->d_inode;
3100         int res = 0;
3101
3102         res = ll_inode_revalidate_it(de, it);
3103         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
3104
3105         if (res)
3106                 return res;
3107
3108         stat->dev = inode->i_sb->s_dev;
3109         stat->ino = inode->i_ino;
3110         stat->mode = inode->i_mode;
3111         stat->nlink = inode->i_nlink;
3112         stat->uid = inode->i_uid;
3113         stat->gid = inode->i_gid;
3114         stat->rdev = kdev_t_to_nr(inode->i_rdev);
3115         stat->atime = inode->i_atime;
3116         stat->mtime = inode->i_mtime;
3117         stat->ctime = inode->i_ctime;
3118 #ifdef HAVE_INODE_BLKSIZE
3119         stat->blksize = inode->i_blksize;
3120 #else
3121         stat->blksize = 1 << inode->i_blkbits;
3122 #endif
3123
3124         ll_inode_size_lock(inode, 0);
3125         stat->size = i_size_read(inode);
3126         stat->blocks = inode->i_blocks;
3127         ll_inode_size_unlock(inode, 0);
3128
3129         return 0;
3130 }
3131 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3132 {
3133         struct lookup_intent it = { .it_op = IT_GETATTR };
3134
3135         return ll_getattr_it(mnt, de, &it, stat);
3136 }
3137
3138 static
3139 int lustre_check_acl(struct inode *inode, int mask)
3140 {
3141 #ifdef CONFIG_FS_POSIX_ACL
3142         struct ll_inode_info *lli = ll_i2info(inode);
3143         struct posix_acl *acl;
3144         int rc;
3145         ENTRY;
3146
3147         spin_lock(&lli->lli_lock);
3148         acl = posix_acl_dup(lli->lli_posix_acl);
3149         spin_unlock(&lli->lli_lock);
3150
3151         if (!acl)
3152                 RETURN(-EAGAIN);
3153
3154         rc = posix_acl_permission(inode, acl, mask);
3155         posix_acl_release(acl);
3156
3157         RETURN(rc);
3158 #else
3159         return -EAGAIN;
3160 #endif
3161 }
3162
3163 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
3164 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3165 {
3166         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3167                inode->i_ino, inode->i_generation, inode, mask);
3168         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3169                 return lustre_check_remote_perm(inode, mask);
3170
3171         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3172         return generic_permission(inode, mask, lustre_check_acl);
3173 }
3174 #else
3175 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3176 {
3177         int mode = inode->i_mode;
3178         int rc;
3179
3180         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3181                inode->i_ino, inode->i_generation, inode, mask);
3182
3183         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3184                 return lustre_check_remote_perm(inode, mask);
3185
3186         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3187
3188         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
3189             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
3190                 return -EROFS;
3191         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
3192                 return -EACCES;
3193         if (current->fsuid == inode->i_uid) {
3194                 mode >>= 6;
3195         } else if (1) {
3196                 if (((mode >> 3) & mask & S_IRWXO) != mask)
3197                         goto check_groups;
3198                 rc = lustre_check_acl(inode, mask);
3199                 if (rc == -EAGAIN)
3200                         goto check_groups;
3201                 if (rc == -EACCES)
3202                         goto check_capabilities;
3203                 return rc;
3204         } else {
3205 check_groups:
3206                 if (in_group_p(inode->i_gid))
3207                         mode >>= 3;
3208         }
3209         if ((mode & mask & S_IRWXO) == mask)
3210                 return 0;
3211
3212 check_capabilities:
3213         if (!(mask & MAY_EXEC) ||
3214             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
3215                 if (capable(CAP_DAC_OVERRIDE))
3216                         return 0;
3217
3218         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
3219             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
3220                 return 0;
3221
3222         return -EACCES;
3223 }
3224 #endif
3225
3226 /* -o localflock - only provides locally consistent flock locks */
3227 struct file_operations ll_file_operations = {
3228         .read           = ll_file_read,
3229         .write          = ll_file_write,
3230         .ioctl          = ll_file_ioctl,
3231         .open           = ll_file_open,
3232         .release        = ll_file_release,
3233         .mmap           = ll_file_mmap,
3234         .llseek         = ll_file_seek,
3235         .sendfile       = ll_file_sendfile,
3236         .fsync          = ll_fsync,
3237 };
3238
3239 struct file_operations ll_file_operations_flock = {
3240         .read           = ll_file_read,
3241         .write          = ll_file_write,
3242         .ioctl          = ll_file_ioctl,
3243         .open           = ll_file_open,
3244         .release        = ll_file_release,
3245         .mmap           = ll_file_mmap,
3246         .llseek         = ll_file_seek,
3247         .sendfile       = ll_file_sendfile,
3248         .fsync          = ll_fsync,
3249 #ifdef HAVE_F_OP_FLOCK
3250         .flock          = ll_file_flock,
3251 #endif
3252         .lock           = ll_file_flock
3253 };
3254
3255 /* These are for -o noflock - to return ENOSYS on flock calls */
3256 struct file_operations ll_file_operations_noflock = {
3257         .read           = ll_file_read,
3258         .write          = ll_file_write,
3259         .ioctl          = ll_file_ioctl,
3260         .open           = ll_file_open,
3261         .release        = ll_file_release,
3262         .mmap           = ll_file_mmap,
3263         .llseek         = ll_file_seek,
3264         .sendfile       = ll_file_sendfile,
3265         .fsync          = ll_fsync,
3266 #ifdef HAVE_F_OP_FLOCK
3267         .flock          = ll_file_noflock,
3268 #endif
3269         .lock           = ll_file_noflock
3270 };
3271
3272 struct inode_operations ll_file_inode_operations = {
3273 #ifdef HAVE_VFS_INTENT_PATCHES
3274         .setattr_raw    = ll_setattr_raw,
3275 #endif
3276         .setattr        = ll_setattr,
3277         .truncate       = ll_truncate,
3278         .getattr        = ll_getattr,
3279         .permission     = ll_inode_permission,
3280         .setxattr       = ll_setxattr,
3281         .getxattr       = ll_getxattr,
3282         .listxattr      = ll_listxattr,
3283         .removexattr    = ll_removexattr,
3284 };
3285
3286 /* dynamic ioctl number support routins */
3287 static struct llioc_ctl_data {
3288         struct rw_semaphore ioc_sem;
3289         struct list_head    ioc_head;
3290 } llioc = {
3291         __RWSEM_INITIALIZER(llioc.ioc_sem),
3292         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3293 };
3294
3295
3296 struct llioc_data {
3297         struct list_head        iocd_list;
3298         unsigned int            iocd_size;
3299         llioc_callback_t        iocd_cb;
3300         unsigned int            iocd_count;
3301         unsigned int            iocd_cmd[0];
3302 };
3303
3304 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3305 {
3306         unsigned int size;
3307         struct llioc_data *in_data = NULL;
3308         ENTRY;
3309
3310         if (cb == NULL || cmd == NULL ||
3311             count > LLIOC_MAX_CMD || count < 0)
3312                 RETURN(NULL);
3313
3314         size = sizeof(*in_data) + count * sizeof(unsigned int);
3315         OBD_ALLOC(in_data, size);
3316         if (in_data == NULL)
3317                 RETURN(NULL);
3318
3319         memset(in_data, 0, sizeof(*in_data));
3320         in_data->iocd_size = size;
3321         in_data->iocd_cb = cb;
3322         in_data->iocd_count = count;
3323         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3324
3325         down_write(&llioc.ioc_sem);
3326         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3327         up_write(&llioc.ioc_sem);
3328
3329         RETURN(in_data);
3330 }
3331
3332 void ll_iocontrol_unregister(void *magic)
3333 {
3334         struct llioc_data *tmp;
3335
3336         if (magic == NULL)
3337                 return;
3338
3339         down_write(&llioc.ioc_sem);
3340         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3341                 if (tmp == magic) {
3342                         unsigned int size = tmp->iocd_size;
3343
3344                         list_del(&tmp->iocd_list);
3345                         up_write(&llioc.ioc_sem);
3346
3347                         OBD_FREE(tmp, size);
3348                         return;
3349                 }
3350         }
3351         up_write(&llioc.ioc_sem);
3352
3353         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3354 }
3355
3356 EXPORT_SYMBOL(ll_iocontrol_register);
3357 EXPORT_SYMBOL(ll_iocontrol_unregister);
3358
3359 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
3360                         unsigned int cmd, unsigned long arg, int *rcp)
3361 {
3362         enum llioc_iter ret = LLIOC_CONT;
3363         struct llioc_data *data;
3364         int rc = -EINVAL, i;
3365
3366         down_read(&llioc.ioc_sem);
3367         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3368                 for (i = 0; i < data->iocd_count; i++) {
3369                         if (cmd != data->iocd_cmd[i])
3370                                 continue;
3371
3372                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3373                         break;
3374                 }
3375
3376                 if (ret == LLIOC_STOP)
3377                         break;
3378         }
3379         up_read(&llioc.ioc_sem);
3380
3381         if (rcp)
3382                 *rcp = rc;
3383         return ret;
3384 }