Whamcloud - gitweb
add workaround for leak space in ESTALE case.
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <linux/pagemap.h>
47 #include <linux/file.h>
48 #include "llite_internal.h"
49 #include <lustre/ll_fiemap.h>
50
51 /* also used by llite/special.c:ll_special_open() */
52 struct ll_file_data *ll_file_data_get(void)
53 {
54         struct ll_file_data *fd;
55
56         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
57         return fd;
58 }
59
60 static void ll_file_data_put(struct ll_file_data *fd)
61 {
62         if (fd != NULL)
63                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
64 }
65
66 static int ll_close_inode_openhandle(struct inode *inode,
67                                      struct obd_client_handle *och)
68 {
69         struct ptlrpc_request *req = NULL;
70         struct obd_device *obd;
71         struct obdo *oa;
72         struct mdc_op_data data = { { 0 } };
73         int rc;
74         ENTRY;
75
76         obd = class_exp2obd(ll_i2mdcexp(inode));
77         if (obd == NULL) {
78                 CERROR("Invalid MDC connection handle "LPX64"\n",
79                        ll_i2mdcexp(inode)->exp_handle.h_cookie);
80                 GOTO(out, rc = 0);
81         }
82
83         /*
84          * here we check if this is forced umount. If so this is called on
85          * canceling "open lock" and we do not call mdc_close() in this case, as
86          * it will not be successful, as import is already deactivated.
87          */
88         if (obd->obd_force)
89                 GOTO(out, rc = 0);
90
91         OBDO_ALLOC(oa);
92         if (!oa)
93                 RETURN(-ENOMEM); // XXX We leak openhandle and request here.
94
95         oa->o_id = inode->i_ino;
96         oa->o_valid = OBD_MD_FLID;
97         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
98                                    OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
99                                    OBD_MD_FLATIME | OBD_MD_FLMTIME |
100                                    OBD_MD_FLCTIME);
101         if (ll_is_inode_dirty(inode)) {
102                 oa->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
103                 oa->o_valid |= OBD_MD_FLFLAGS;
104         }
105         ll_inode2fid(&data.fid1, inode);
106         rc = mdc_close(ll_i2mdcexp(inode), &data, oa, och, &req);
107         if (rc == EAGAIN) {
108                 /* We are the last writer, so the MDS has instructed us to get
109                  * the file size and any write cookies, then close again. */
110                 ll_queue_done_writing(inode);
111                 rc = 0;
112         } else if (rc) {
113                 CERROR("inode %lu mdc close failed: rc = %d\n",
114                        inode->i_ino, rc);
115         }
116
117         OBDO_FREE(oa);
118
119         if (rc == 0) {
120                 rc = ll_objects_destroy(req, inode);
121                 if (rc)
122                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
123                                inode->i_ino, rc);
124         }
125
126         ptlrpc_req_finished(req); /* This is close request */
127         EXIT;
128 out:
129         mdc_clear_open_replay_data(och);
130
131         return rc;
132 }
133
134 int ll_mdc_real_close(struct inode *inode, int flags)
135 {
136         struct ll_inode_info *lli = ll_i2info(inode);
137         int rc = 0;
138         struct obd_client_handle **och_p;
139         struct obd_client_handle *och;
140         __u64 *och_usecount;
141
142         ENTRY;
143
144         if (flags & FMODE_WRITE) {
145                 och_p = &lli->lli_mds_write_och;
146                 och_usecount = &lli->lli_open_fd_write_count;
147         } else if (flags & FMODE_EXEC) {
148                 och_p = &lli->lli_mds_exec_och;
149                 och_usecount = &lli->lli_open_fd_exec_count;
150          } else {
151                 LASSERT(flags & FMODE_READ);
152                 och_p = &lli->lli_mds_read_och;
153                 och_usecount = &lli->lli_open_fd_read_count;
154         }
155
156         down(&lli->lli_och_sem);
157         if (*och_usecount) { /* There are still users of this handle, so
158                                 skip freeing it. */
159                 up(&lli->lli_och_sem);
160                 RETURN(0);
161         }
162         och=*och_p;
163         *och_p = NULL;
164         up(&lli->lli_och_sem);
165
166         if (och) { /* There might be a race and somebody have freed this och
167                       already */
168                 rc = ll_close_inode_openhandle(inode, och);
169                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
170                 OBD_FREE(och, sizeof *och);
171         }
172
173         RETURN(rc);
174 }
175
176 int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
177                         struct file *file)
178 {
179         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
180         struct ll_inode_info *lli = ll_i2info(inode);
181         int rc = 0;
182         ENTRY;
183
184         /* clear group lock, if present */
185         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
186                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
187                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
188                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
189                                       &fd->fd_cwlockh);
190         }
191
192         /* Let's see if we have good enough OPEN lock on the file and if
193            we can skip talking to MDS */
194         if (file->f_dentry->d_inode) { /* Can this ever be false? */
195                 int lockmode;
196                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
197                 struct lustre_handle lockh;
198                 struct inode *inode = file->f_dentry->d_inode;
199                 struct ldlm_res_id file_res_id;
200
201                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
202                 fid_build_reg_res_name(ll_inode_lu_fid(inode), &file_res_id);
203
204                 down(&lli->lli_och_sem);
205                 if (fd->fd_omode & FMODE_WRITE) {
206                         lockmode = LCK_CW;
207                         LASSERT(lli->lli_open_fd_write_count);
208                         lli->lli_open_fd_write_count--;
209                 } else if (fd->fd_omode & FMODE_EXEC) {
210                         lockmode = LCK_PR;
211                         LASSERT(lli->lli_open_fd_exec_count);
212                         lli->lli_open_fd_exec_count--;
213                 } else {
214                         lockmode = LCK_CR;
215                         LASSERT(lli->lli_open_fd_read_count);
216                         lli->lli_open_fd_read_count--;
217                 }
218                 up(&lli->lli_och_sem);
219
220                 if (!ldlm_lock_match(mdc_exp->exp_obd->obd_namespace, flags,
221                                      &file_res_id, LDLM_IBITS, &policy,lockmode,
222                                      &lockh)) {
223                         rc = ll_mdc_real_close(file->f_dentry->d_inode,
224                                                 fd->fd_omode);
225                 }
226         } else {
227                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
228                        file, file->f_dentry, file->f_dentry->d_name.name);
229         }
230
231         LUSTRE_FPRIVATE(file) = NULL;
232         ll_file_data_put(fd);
233
234         RETURN(rc);
235 }
236
237 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
238
239 /* While this returns an error code, fput() the caller does not, so we need
240  * to make every effort to clean up all of our state here.  Also, applications
241  * rarely check close errors and even if an error is returned they will not
242  * re-try the close call.
243  */
244 int ll_file_release(struct inode *inode, struct file *file)
245 {
246         struct ll_file_data *fd;
247         struct ll_sb_info *sbi = ll_i2sbi(inode);
248         struct ll_inode_info *lli = ll_i2info(inode);
249         struct lov_stripe_md *lsm = lli->lli_smd;
250         int rc;
251         ENTRY;
252
253         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
254                inode->i_generation, inode);
255
256
257         if (inode->i_sb->s_root != file->f_dentry)
258                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
259         fd = LUSTRE_FPRIVATE(file);
260         LASSERT(fd != NULL);
261
262         /* The last ref on @file, maybe not the the owner pid of statahead.
263          * Different processes can open the same dir, "ll_opendir_key" means:
264          * it is me that should stop the statahead thread. */
265         if (lli->lli_opendir_key == fd && lli->lli_opendir_pid != 0)
266                 ll_stop_statahead(inode, lli->lli_opendir_key);
267
268         if (inode->i_sb->s_root == file->f_dentry) {
269                 LUSTRE_FPRIVATE(file) = NULL;
270                 ll_file_data_put(fd);
271                 RETURN(0);
272         }
273
274         if (lsm)
275                 lov_test_and_clear_async_rc(lsm);
276         lli->lli_async_rc = 0;
277
278         rc = ll_mdc_close(sbi->ll_mdc_exp, inode, file);
279         RETURN(rc);
280 }
281
282 static int ll_intent_file_open(struct file *file, void *lmm,
283                                int lmmsize, struct lookup_intent *itp)
284 {
285         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
286         struct mdc_op_data data = { { 0 } };
287         struct dentry *parent = file->f_dentry->d_parent;
288         const char *name = file->f_dentry->d_name.name;
289         const int len = file->f_dentry->d_name.len;
290         struct inode *inode = file->f_dentry->d_inode;
291         struct ptlrpc_request *req;
292         int rc;
293         ENTRY;
294
295         if (!parent)
296                 RETURN(-ENOENT);
297
298         ll_prepare_mdc_op_data(&data, parent->d_inode, inode,
299                                name, len, O_RDWR, NULL);
300
301         /* Usually we come here only for NFSD, and we want open lock.
302            But we can also get here with pre 2.6.15 patchless kernels, and in
303            that case that lock is also ok */
304         /* We can also get here if there was cached open handle in revalidate_it
305          * but it disappeared while we were getting from there to ll_file_open.
306          * But this means this file was closed and immediatelly opened which
307          * makes a good candidate for using OPEN lock */
308         /* If lmmsize & lmm are not 0, we are just setting stripe info
309          * parameters. No need for the open lock */
310         if (!lmm && !lmmsize)
311                 itp->it_flags |= MDS_OPEN_LOCK;
312
313         rc = mdc_intent_lock(sbi->ll_mdc_exp, &data, lmm, lmmsize, itp,
314                               0 /*unused */, &req, ll_mdc_blocking_ast, 0);
315         if (rc == -ESTALE) {
316                 /* reason for keep own exit path - don`t flood log
317                 * with messages with -ESTALE errors.
318                 */
319                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
320                      it_open_error(DISP_OPEN_OPEN, itp))
321                         GOTO(out, rc);
322                 ll_release_openhandle(file->f_dentry, itp);
323                 GOTO(out, rc);
324         }
325
326         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
327                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
328                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
329                 GOTO(out, rc);
330         }
331
332         if (itp->d.lustre.it_lock_mode)
333                 mdc_set_lock_data(&itp->d.lustre.it_lock_handle,
334                                   inode);
335
336         rc = ll_prep_inode(sbi->ll_osc_exp, &file->f_dentry->d_inode,
337                            req, DLM_REPLY_REC_OFF, NULL);
338 out:
339         ptlrpc_req_finished(itp->d.lustre.it_data);
340         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
341         ll_intent_drop_lock(itp);
342
343         RETURN(rc);
344 }
345
346
347 static void ll_och_fill(struct ll_inode_info *lli, struct lookup_intent *it,
348                         struct obd_client_handle *och)
349 {
350         struct ptlrpc_request *req = it->d.lustre.it_data;
351         struct mds_body *body;
352
353         LASSERT(och);
354
355         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
356         LASSERT(body != NULL);                  /* reply already checked out */
357         /* and swabbed in mdc_enqueue */
358         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
359
360         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
361         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
362         lli->lli_io_epoch = body->io_epoch;
363
364         mdc_set_open_replay_data(och, it->d.lustre.it_data);
365 }
366
367 int ll_local_open(struct file *file, struct lookup_intent *it,
368                   struct ll_file_data *fd, struct obd_client_handle *och)
369 {
370         ENTRY;
371
372         LASSERT(!LUSTRE_FPRIVATE(file));
373
374         LASSERT(fd != NULL);
375
376         if (och)
377                 ll_och_fill(ll_i2info(file->f_dentry->d_inode), it, och);
378         LUSTRE_FPRIVATE(file) = fd;
379         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
380         fd->fd_omode = it->it_flags;
381
382         RETURN(0);
383 }
384
385 /* Open a file, and (for the very first open) create objects on the OSTs at
386  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
387  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
388  * lli_open_sem to ensure no other process will create objects, send the
389  * stripe MD to the MDS, or try to destroy the objects if that fails.
390  *
391  * If we already have the stripe MD locally then we don't request it in
392  * mdc_open(), by passing a lmm_size = 0.
393  *
394  * It is up to the application to ensure no other processes open this file
395  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
396  * used.  We might be able to avoid races of that sort by getting lli_open_sem
397  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
398  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
399  */
400 int ll_file_open(struct inode *inode, struct file *file)
401 {
402         struct ll_inode_info *lli = ll_i2info(inode);
403         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
404                                           .it_flags = file->f_flags };
405         struct lov_stripe_md *lsm;
406         struct ptlrpc_request *req = NULL;
407         struct obd_client_handle **och_p;
408         __u64 *och_usecount;
409         struct ll_file_data *fd;
410         int rc = 0, opendir_set = 0;
411         ENTRY;
412
413         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
414                inode->i_generation, inode, file->f_flags);
415
416 #ifdef HAVE_VFS_INTENT_PATCHES
417         it = file->f_it;
418 #else
419         it = file->private_data; /* XXX: compat macro */
420         file->private_data = NULL; /* prevent ll_local_open assertion */
421 #endif
422
423         fd = ll_file_data_get();
424         if (fd == NULL)
425                 RETURN(-ENOMEM);
426
427         if (S_ISDIR(inode->i_mode)) {
428 again:
429                 spin_lock(&lli->lli_lock);
430                 if (lli->lli_opendir_key == NULL && lli->lli_opendir_pid == 0) {
431                         LASSERT(lli->lli_sai == NULL);
432                         lli->lli_opendir_key = fd;
433                         lli->lli_opendir_pid = cfs_curproc_pid();
434                         opendir_set = 1;
435                 } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid() &&
436                                     lli->lli_opendir_key != NULL)) {
437                         /* Two cases for this:
438                          * (1) The same process open such directory many times.
439                          * (2) The old process opened the directory, and exited
440                          *     before its children processes. Then new process
441                          *     with the same pid opens such directory before the
442                          *     old process's children processes exit.
443                          * reset stat ahead for such cases. */
444                         spin_unlock(&lli->lli_lock);
445                         CDEBUG(D_INFO, "Conflict statahead for %.*s %lu/%u"
446                                " reset it.\n", file->f_dentry->d_name.len,
447                                file->f_dentry->d_name.name,
448                                inode->i_ino, inode->i_generation);
449                         ll_stop_statahead(inode, lli->lli_opendir_key);
450                         goto again;
451                 }
452                 spin_unlock(&lli->lli_lock);
453         }
454
455         if (inode->i_sb->s_root == file->f_dentry) {
456                 LUSTRE_FPRIVATE(file) = fd;
457                 RETURN(0);
458         }
459
460         if (!it || !it->d.lustre.it_disposition) {
461                 /* Convert f_flags into access mode. We cannot use file->f_mode,
462                  * because everything but O_ACCMODE mask was stripped from it */
463                 if ((oit.it_flags + 1) & O_ACCMODE)
464                         oit.it_flags++;
465                 if (file->f_flags & O_TRUNC)
466                         oit.it_flags |= FMODE_WRITE;
467
468                 /* kernel only call f_op->open in dentry_open.  filp_open calls
469                  * dentry_open after call to open_namei that checks permissions.
470                  * Only nfsd_open call dentry_open directly without checking
471                  * permissions and because of that this code below is safe. */
472                 if (oit.it_flags & FMODE_WRITE)
473                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
474
475                 /* We do not want O_EXCL here, presumably we opened the file
476                  * already? XXX - NFS implications? */
477                 oit.it_flags &= ~O_EXCL;
478
479                 it = &oit;
480         }
481
482 restart:
483         /* Let's see if we have file open on MDS already. */
484         if (it->it_flags & FMODE_WRITE) {
485                 och_p = &lli->lli_mds_write_och;
486                 och_usecount = &lli->lli_open_fd_write_count;
487         } else if (it->it_flags & FMODE_EXEC) {
488                 och_p = &lli->lli_mds_exec_och;
489                 och_usecount = &lli->lli_open_fd_exec_count;
490          } else {
491                 och_p = &lli->lli_mds_read_och;
492                 och_usecount = &lli->lli_open_fd_read_count;
493         }
494
495         LASSERTF(it->it_flags != 0, "it %p dist %d \n", it,
496                  it->d.lustre.it_disposition);
497
498         down(&lli->lli_och_sem);
499         if (*och_p) { /* Open handle is present */
500                 if (it_disposition(it, DISP_OPEN_OPEN)) {
501                         /* Well, there's extra open request that we do not need,
502                            let's close it somehow. This will decref request. */
503                         rc = it_open_error(DISP_OPEN_OPEN, it);
504                         if (rc) {
505                                 up(&lli->lli_och_sem);
506                                 ll_file_data_put(fd);
507                                 GOTO(out_openerr, rc);
508                         }
509                         ll_release_openhandle(file->f_dentry, it);
510                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
511                                              LPROC_LL_OPEN);
512                 }
513                 (*och_usecount)++;
514
515                 rc = ll_local_open(file, it, fd, NULL);
516
517                 LASSERTF(rc == 0, "rc = %d\n", rc);
518         } else {
519                 LASSERT(*och_usecount == 0);
520                 if (!it->d.lustre.it_disposition) {
521                         /* We cannot just request lock handle now, new ELC code
522                            means that one of other OPEN locks for this file
523                            could be cancelled, and since blocking ast handler
524                            would attempt to grab och_sem as well, that would
525                            result in a deadlock */
526                         up(&lli->lli_och_sem);
527                         it->it_flags |= O_CHECK_STALE;
528                         rc = ll_intent_file_open(file, NULL, 0, it);
529                         it->it_flags &= ~O_CHECK_STALE;
530                         if (rc) {
531                                 ll_file_data_put(fd);
532                                 GOTO(out_openerr, rc);
533                         }
534
535                         mdc_set_lock_data(&it->d.lustre.it_lock_handle,
536                                           file->f_dentry->d_inode);
537                         goto restart;
538                 }
539
540                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
541                 if (!*och_p) {
542                         ll_file_data_put(fd);
543                         GOTO(out_och_free, rc = -ENOMEM);
544                 }
545                 (*och_usecount)++;
546                req = it->d.lustre.it_data;
547
548                 /* mdc_intent_lock() didn't get a request ref if there was an
549                  * open error, so don't do cleanup on the request here
550                  * (bug 3430) */
551                 /* XXX (green): Should not we bail out on any error here, not
552                  * just open error? */
553                 rc = it_open_error(DISP_OPEN_OPEN, it);
554                 if (rc) {
555                         ll_file_data_put(fd);
556                         GOTO(out_och_free, rc);
557                 }
558
559                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
560                 rc = ll_local_open(file, it, fd, *och_p);
561                 LASSERTF(rc == 0, "rc = %d\n", rc);
562         }
563         up(&lli->lli_och_sem);
564
565         /* Must do this outside lli_och_sem lock to prevent deadlock where
566            different kind of OPEN lock for this same inode gets cancelled
567            by ldlm_cancel_lru */
568         if (!S_ISREG(inode->i_mode))
569                 GOTO(out, rc);
570
571         lsm = lli->lli_smd;
572         if (lsm == NULL) {
573                 if (file->f_flags & O_LOV_DELAY_CREATE ||
574                     !(file->f_mode & FMODE_WRITE)) {
575                         CDEBUG(D_INODE, "object creation was delayed\n");
576                         GOTO(out, rc);
577                 }
578         }
579         file->f_flags &= ~O_LOV_DELAY_CREATE;
580         GOTO(out, rc);
581  out:
582         ptlrpc_req_finished(req);
583         if (req)
584                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
585         if (rc == 0) {
586                 ll_open_complete(inode);
587         } else {
588 out_och_free:
589                 if (*och_p) {
590                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
591                         *och_p = NULL; /* OBD_FREE writes some magic there */
592                         (*och_usecount)--;
593                 }
594                 up(&lli->lli_och_sem);
595 out_openerr:
596                 if (opendir_set != 0)
597                         ll_stop_statahead(inode, lli->lli_opendir_key);
598         }
599
600         return rc;
601 }
602
603 /* Fills the obdo with the attributes for the inode defined by lsm */
604 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
605                    struct obdo *oa)
606 {
607         struct ptlrpc_request_set *set;
608         struct obd_info oinfo = { { { 0 } } };
609         int rc;
610         ENTRY;
611
612         LASSERT(lsm != NULL);
613
614         memset(oa, 0, sizeof *oa);
615         oinfo.oi_md = lsm;
616         oinfo.oi_oa = oa;
617         oa->o_id = lsm->lsm_object_id;
618         oa->o_gr = lsm->lsm_object_gr;
619         oa->o_mode = S_IFREG;
620         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
621                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
622                 OBD_MD_FLCTIME | OBD_MD_FLGROUP;
623
624         set = ptlrpc_prep_set();
625         if (set == NULL) {
626                 rc = -ENOMEM;
627         } else {
628                 rc = obd_getattr_async(exp, &oinfo, set);
629                 if (rc == 0)
630                         rc = ptlrpc_set_wait(set);
631                 ptlrpc_set_destroy(set);
632         }
633         if (rc)
634                 RETURN(rc);
635
636         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
637                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
638         RETURN(0);
639 }
640
641 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
642 {
643         struct ll_inode_info *lli = ll_i2info(inode);
644         struct lov_stripe_md *lsm = lli->lli_smd;
645         struct obd_export *exp = ll_i2obdexp(inode);
646         struct {
647                 char name[16];
648                 struct ldlm_lock *lock;
649         } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock };
650         __u32 stripe, vallen = sizeof(stripe);
651         struct lov_oinfo *loinfo;
652         int rc;
653         ENTRY;
654
655         if (lsm->lsm_stripe_count == 1)
656                 GOTO(check, stripe = 0);
657
658         /* get our offset in the lov */
659         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe, lsm);
660         if (rc != 0) {
661                 CERROR("obd_get_info: rc = %d\n", rc);
662                 RETURN(rc);
663         }
664         LASSERT(stripe < lsm->lsm_stripe_count);
665
666 check:
667         loinfo = lsm->lsm_oinfo[stripe];
668         if (!osc_res_name_eq(loinfo->loi_id, loinfo->loi_gr,
669                             &lock->l_resource->lr_name)) {
670                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
671                            loinfo->loi_id, loinfo->loi_gr);
672                 RETURN(-ELDLM_NO_LOCK_DATA);
673         }
674
675         RETURN(stripe);
676 }
677
678 /* Get extra page reference to ensure it is not going away */
679 void ll_pin_extent_cb(void *data)
680 {
681         struct page *page = data;
682
683         page_cache_get(page);
684
685         return;
686 }
687 /* Flush the page from page cache for an extent as its canceled.
688  * Page to remove is delivered as @data.
689  *
690  * No one can dirty the extent until we've finished our work and they cannot
691  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
692  * but other kernel actors could have pages locked.
693  *
694  * If @discard is set, there is no need to write the page if it is dirty.
695  *
696  * Called with the DLM lock held. */
697 int ll_page_removal_cb(void *data, int discard)
698 {
699         int rc;
700         struct page *page = data;
701         struct address_space *mapping;
702
703         ENTRY;
704
705         /* We have page reference already from ll_pin_page */
706         lock_page(page);
707
708         /* Already truncated by somebody */
709         if (!page->mapping)
710                 GOTO(out, rc = 0);
711
712         mapping = page->mapping;
713
714         ll_teardown_mmaps(mapping,
715                           (__u64)page->index << PAGE_CACHE_SHIFT,
716                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
717                                                               ~PAGE_CACHE_MASK);
718         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
719         if (!discard && PageWriteback(page))
720                 wait_on_page_writeback(page);
721
722         if (!discard && clear_page_dirty_for_io(page)) {
723                 rc = ll_call_writepage(page->mapping->host, page);
724                 /* either waiting for io to complete or reacquiring
725                  * the lock that the failed writepage released */
726                 lock_page(page);
727                 wait_on_page_writeback(page);
728                 if (rc < 0) {
729                         CERROR("writepage inode %lu(%p) of page %p "
730                                "failed: %d\n", mapping->host->i_ino,
731                                mapping->host, page, rc);
732                         if (rc == -ENOSPC)
733                                 set_bit(AS_ENOSPC, &mapping->flags);
734                         else
735                                 set_bit(AS_EIO, &mapping->flags);
736                 }
737         }
738         if (page->mapping != NULL) {
739                 struct ll_async_page *llap = llap_cast_private(page);
740                 // checking again to account for writeback's lock_page()
741                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
742                 if (llap)
743                         ll_ra_accounting(llap, page->mapping);
744                 ll_truncate_complete_page(page);
745         }
746         EXIT;
747 out:
748         LASSERT(!PageWriteback(page));
749         unlock_page(page);
750         page_cache_release(page);
751
752         return 0;
753 }
754
755 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
756                              void *data, int flag)
757 {
758         struct inode *inode;
759         struct ll_inode_info *lli;
760         struct lov_stripe_md *lsm;
761         int stripe;
762         __u64 kms;
763
764         ENTRY;
765
766         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
767                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
768                 LBUG();
769         }
770
771         inode = ll_inode_from_lock(lock);
772         if (inode == NULL)
773                 RETURN(0);
774         lli = ll_i2info(inode);
775         if (lli == NULL)
776                 GOTO(iput, 0);
777         if (lli->lli_smd == NULL)
778                 GOTO(iput, 0);
779         lsm = lli->lli_smd;
780
781         stripe = ll_lock_to_stripe_offset(inode, lock);
782         if (stripe < 0)
783                 GOTO(iput, 0);
784
785         lov_stripe_lock(lsm);
786         lock_res_and_lock(lock);
787         kms = ldlm_extent_shift_kms(lock,
788                                     lsm->lsm_oinfo[stripe]->loi_kms);
789
790         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
791                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
792                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
793         lsm->lsm_oinfo[stripe]->loi_kms = kms;
794         unlock_res_and_lock(lock);
795         lov_stripe_unlock(lsm);
796         ll_try_done_writing(inode);
797         EXIT;
798 iput:
799         iput(inode);
800
801         return 0;
802 }
803
804 #if 0
805 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
806 {
807         /* XXX ALLOCATE - 160 bytes */
808         struct inode *inode = ll_inode_from_lock(lock);
809         struct ll_inode_info *lli = ll_i2info(inode);
810         struct lustre_handle lockh = { 0 };
811         struct ost_lvb *lvb;
812         int stripe;
813         ENTRY;
814
815         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
816                      LDLM_FL_BLOCK_CONV)) {
817                 LBUG(); /* not expecting any blocked async locks yet */
818                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
819                            "lock, returning");
820                 ldlm_lock_dump(D_OTHER, lock, 0);
821                 ldlm_reprocess_all(lock->l_resource);
822                 RETURN(0);
823         }
824
825         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
826
827         stripe = ll_lock_to_stripe_offset(inode, lock);
828         if (stripe < 0)
829                 goto iput;
830
831         if (lock->l_lvb_len) {
832                 struct lov_stripe_md *lsm = lli->lli_smd;
833                 __u64 kms;
834                 lvb = lock->l_lvb_data;
835                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
836
837                 lock_res_and_lock(lock);
838                 ll_inode_size_lock(inode, 1);
839                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
840                 kms = ldlm_extent_shift_kms(NULL, kms);
841                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
842                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
843                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
844                 lsm->lsm_oinfo[stripe].loi_kms = kms;
845                 ll_inode_size_unlock(inode, 1);
846                 unlock_res_and_lock(lock);
847         }
848
849 iput:
850         iput(inode);
851         wake_up(&lock->l_waitq);
852
853         ldlm_lock2handle(lock, &lockh);
854         ldlm_lock_decref(&lockh, LCK_PR);
855         RETURN(0);
856 }
857 #endif
858
859 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
860 {
861         struct ptlrpc_request *req = reqp;
862         struct inode *inode = ll_inode_from_lock(lock);
863         struct ll_inode_info *lli;
864         struct lov_stripe_md *lsm;
865         struct ost_lvb *lvb;
866         int rc, stripe;
867         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
868         ENTRY;
869
870         if (inode == NULL)
871                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
872         lli = ll_i2info(inode);
873         if (lli == NULL)
874                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
875         lsm = lli->lli_smd;
876         if (lsm == NULL)
877                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
878
879         /* First, find out which stripe index this lock corresponds to. */
880         stripe = ll_lock_to_stripe_offset(inode, lock);
881         if (stripe < 0)
882                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
883
884         rc = lustre_pack_reply(req, 2, size, NULL);
885         if (rc)
886                 GOTO(iput, rc);
887
888         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
889         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
890         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
891         lvb->lvb_atime = LTIME_S(inode->i_atime);
892         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
893
894         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
895                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
896                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_atime,
897                    lvb->lvb_mtime, lvb->lvb_ctime);
898  iput:
899         iput(inode);
900
901  out:
902         /* These errors are normal races, so we don't want to fill the console
903          * with messages by calling ptlrpc_error() */
904         if (rc == -ELDLM_NO_LOCK_DATA)
905                 lustre_pack_reply(req, 1, NULL, NULL);
906
907         req->rq_status = rc;
908         return rc;
909 }
910
911 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
912                      lstat_t *st)
913 {
914         struct lustre_handle lockh = { 0 };
915         struct ldlm_enqueue_info einfo = { 0 };
916         struct obd_info oinfo = { { { 0 } } };
917         struct ost_lvb lvb;
918         int rc;
919
920         ENTRY;
921
922         einfo.ei_type = LDLM_EXTENT;
923         einfo.ei_mode = LCK_PR;
924         einfo.ei_cb_bl = osc_extent_blocking_cb;
925         einfo.ei_cb_cp = ldlm_completion_ast;
926         einfo.ei_cb_gl = ll_glimpse_callback;
927         einfo.ei_cbdata = NULL;
928
929         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
930         oinfo.oi_lockh = &lockh;
931         oinfo.oi_md = lsm;
932         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
933
934         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
935         if (rc == -ENOENT)
936                 RETURN(rc);
937         if (rc != 0) {
938                 CERROR("obd_enqueue returned rc %d, "
939                        "returning -EIO\n", rc);
940                 RETURN(rc > 0 ? -EIO : rc);
941         }
942
943         lov_stripe_lock(lsm);
944         memset(&lvb, 0, sizeof(lvb));
945         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 0);
946         st->st_size = lvb.lvb_size;
947         st->st_blocks = lvb.lvb_blocks;
948         st->st_mtime = lvb.lvb_mtime;
949         st->st_atime = lvb.lvb_atime;
950         st->st_ctime = lvb.lvb_ctime;
951         lov_stripe_unlock(lsm);
952
953         RETURN(rc);
954 }
955
956 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
957  * file (because it prefers KMS over RSS when larger) */
958 int ll_glimpse_size(struct inode *inode, int ast_flags)
959 {
960         struct ll_inode_info *lli = ll_i2info(inode);
961         struct ll_sb_info *sbi = ll_i2sbi(inode);
962         struct lustre_handle lockh = { 0 };
963         struct ldlm_enqueue_info einfo = { 0 };
964         struct obd_info oinfo = { { { 0 } } };
965         struct ost_lvb lvb;
966         int rc;
967         ENTRY;
968
969         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
970
971         if (!lli->lli_smd) {
972                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
973                 RETURN(0);
974         }
975
976         /* NOTE: this looks like DLM lock request, but it may not be one. Due
977          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
978          *       won't revoke any conflicting DLM locks held. Instead,
979          *       ll_glimpse_callback() will be called on each client
980          *       holding a DLM lock against this file, and resulting size
981          *       will be returned for each stripe. DLM lock on [0, EOF] is
982          *       acquired only if there were no conflicting locks. */
983         einfo.ei_type = LDLM_EXTENT;
984         einfo.ei_mode = LCK_PR;
985         einfo.ei_cb_bl = osc_extent_blocking_cb;
986         einfo.ei_cb_cp = ldlm_completion_ast;
987         einfo.ei_cb_gl = ll_glimpse_callback;
988         einfo.ei_cbdata = inode;
989
990         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
991         oinfo.oi_lockh = &lockh;
992         oinfo.oi_md = lli->lli_smd;
993         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
994
995         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
996         if (rc == -ENOENT)
997                 RETURN(rc);
998         if (rc != 0) {
999                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1000                 RETURN(rc > 0 ? -EIO : rc);
1001         }
1002
1003         ll_inode_size_lock(inode, 1);
1004         inode_init_lvb(inode, &lvb);
1005         /* merge timestamps the most resently obtained from mds with
1006            timestamps obtained from osts */
1007         lvb.lvb_atime = lli->lli_lvb.lvb_atime;
1008         lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
1009         lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
1010         rc = obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0);
1011         i_size_write(inode, lvb.lvb_size);
1012         inode->i_blocks = lvb.lvb_blocks;
1013         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1014         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1015         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1016         ll_inode_size_unlock(inode, 1);
1017
1018         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1019                i_size_read(inode), (long long)inode->i_blocks);
1020
1021         RETURN(rc);
1022 }
1023
1024 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1025                    struct lov_stripe_md *lsm, int mode,
1026                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1027                    int ast_flags)
1028 {
1029         struct ll_sb_info *sbi = ll_i2sbi(inode);
1030         struct ost_lvb lvb;
1031         struct ldlm_enqueue_info einfo = { 0 };
1032         struct obd_info oinfo = { { { 0 } } };
1033         int rc;
1034         ENTRY;
1035
1036         LASSERT(!lustre_handle_is_used(lockh));
1037         LASSERT(lsm != NULL);
1038
1039         /* don't drop the mmapped file to LRU */
1040         if (mapping_mapped(inode->i_mapping))
1041                 ast_flags |= LDLM_FL_NO_LRU;
1042
1043         /* XXX phil: can we do this?  won't it screw the file size up? */
1044         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1045             (sbi->ll_flags & LL_SBI_NOLCK))
1046                 RETURN(0);
1047
1048         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1049                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1050
1051         einfo.ei_type = LDLM_EXTENT;
1052         einfo.ei_mode = mode;
1053         einfo.ei_cb_bl = osc_extent_blocking_cb;
1054         einfo.ei_cb_cp = ldlm_completion_ast;
1055         einfo.ei_cb_gl = ll_glimpse_callback;
1056         einfo.ei_cbdata = inode;
1057
1058         oinfo.oi_policy = *policy;
1059         oinfo.oi_lockh = lockh;
1060         oinfo.oi_md = lsm;
1061         oinfo.oi_flags = ast_flags;
1062
1063         rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo, NULL);
1064         *policy = oinfo.oi_policy;
1065         if (rc > 0)
1066                 rc = -EIO;
1067
1068         ll_inode_size_lock(inode, 1);
1069         inode_init_lvb(inode, &lvb);
1070         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 1);
1071
1072         if (policy->l_extent.start == 0 &&
1073             policy->l_extent.end == OBD_OBJECT_EOF) {
1074                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1075                  * the kms under both a DLM lock and the
1076                  * ll_inode_size_lock().  If we don't get the
1077                  * ll_inode_size_lock() here we can match the DLM lock and
1078                  * reset i_size from the kms before the truncating path has
1079                  * updated the kms.  generic_file_write can then trust the
1080                  * stale i_size when doing appending writes and effectively
1081                  * cancel the result of the truncate.  Getting the
1082                  * ll_inode_size_lock() after the enqueue maintains the DLM
1083                  * -> ll_inode_size_lock() acquiring order. */
1084                 i_size_write(inode, lvb.lvb_size);
1085                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1086                        inode->i_ino, i_size_read(inode));
1087         }
1088
1089         if (rc == 0) {
1090                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1091                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1092                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1093         }
1094         ll_inode_size_unlock(inode, 1);
1095
1096         RETURN(rc);
1097 }
1098
1099 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1100                      struct lov_stripe_md *lsm, int mode,
1101                      struct lustre_handle *lockh)
1102 {
1103         struct ll_sb_info *sbi = ll_i2sbi(inode);
1104         int rc;
1105         ENTRY;
1106
1107         /* XXX phil: can we do this?  won't it screw the file size up? */
1108         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1109             (sbi->ll_flags & LL_SBI_NOLCK))
1110                 RETURN(0);
1111
1112         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
1113
1114         RETURN(rc);
1115 }
1116
1117 static void ll_set_file_contended(struct inode *inode)
1118 {
1119         struct ll_inode_info *lli = ll_i2info(inode);
1120
1121         lli->lli_contention_time = cfs_time_current();
1122         set_bit(LLI_F_CONTENDED, &lli->lli_flags);
1123 }
1124
1125 void ll_clear_file_contended(struct inode *inode)
1126 {
1127         struct ll_inode_info *lli = ll_i2info(inode);
1128
1129         clear_bit(LLI_F_CONTENDED, &lli->lli_flags);
1130 }
1131
1132 static int ll_is_file_contended(struct file *file)
1133 {
1134         struct inode *inode = file->f_dentry->d_inode;
1135         struct ll_inode_info *lli = ll_i2info(inode);
1136         struct ll_sb_info *sbi = ll_i2sbi(inode);
1137         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1138         ENTRY;
1139
1140         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1141                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1142                        " osc connect flags = 0x"LPX64"\n",
1143                        sbi->ll_lco.lco_flags);
1144                 RETURN(0);
1145         }
1146         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1147                 RETURN(1);
1148         if (test_bit(LLI_F_CONTENDED, &lli->lli_flags)) {
1149                 cfs_time_t cur_time = cfs_time_current();
1150                 cfs_time_t retry_time;
1151
1152                 retry_time = cfs_time_add(
1153                         lli->lli_contention_time,
1154                         cfs_time_seconds(sbi->ll_contention_time));
1155                 if (cfs_time_after(cur_time, retry_time)) {
1156                         ll_clear_file_contended(inode);
1157                         RETURN(0);
1158                 }
1159                 RETURN(1);
1160         }
1161         RETURN(0);
1162 }
1163
1164 static int ll_file_get_tree_lock_iov(struct ll_lock_tree *tree,
1165                                      struct file *file, const struct iovec *iov,
1166                                      unsigned long nr_segs,
1167                                      obd_off start, obd_off end, int rw)
1168 {
1169         int append;
1170         int tree_locked = 0;
1171         int rc;
1172         struct inode * inode = file->f_dentry->d_inode;
1173         ENTRY;
1174
1175         append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
1176
1177         if (append || !ll_is_file_contended(file)) {
1178                 struct ll_lock_tree_node *node;
1179                 int ast_flags;
1180
1181                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1182                 if (file->f_flags & O_NONBLOCK)
1183                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1184                 node = ll_node_from_inode(inode, start, end,
1185                                           (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
1186                 if (IS_ERR(node)) {
1187                         rc = PTR_ERR(node);
1188                         GOTO(out, rc);
1189                 }
1190                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1191                 rc = ll_tree_lock_iov(tree, node, iov, nr_segs, ast_flags);
1192                 if (rc == 0)
1193                         tree_locked = 1;
1194                 else if (rc == -EUSERS)
1195                         ll_set_file_contended(inode);
1196                 else
1197                         GOTO(out, rc);
1198         }
1199         RETURN(tree_locked);
1200 out:
1201         return rc;
1202 }
1203
1204 /* XXX: exact copy from kernel code (__generic_file_aio_write_nolock from rhel4)
1205  */
1206 static size_t ll_file_get_iov_count(const struct iovec *iov,
1207                                      unsigned long *nr_segs)
1208 {
1209         size_t count = 0;
1210         unsigned long seg;
1211
1212         for (seg = 0; seg < *nr_segs; seg++) {
1213                 const struct iovec *iv = &iov[seg];
1214
1215                 /*
1216                  * If any segment has a negative length, or the cumulative
1217                  * length ever wraps negative then return -EINVAL.
1218                  */
1219                 count += iv->iov_len;
1220                 if (unlikely((ssize_t)(count|iv->iov_len) < 0))
1221                         return -EINVAL;
1222                 if (access_ok(VERIFY_WRITE, iv->iov_base, iv->iov_len))
1223                         continue;
1224                 if (seg == 0)
1225                         return -EFAULT;
1226                 *nr_segs = seg;
1227                 count -= iv->iov_len;   /* This segment is no good */
1228                 break;
1229         }
1230         return count;
1231 }
1232
1233 static int iov_copy_update(unsigned long *nr_segs, const struct iovec **iov_out,
1234                            unsigned long *nrsegs_copy,
1235                            struct iovec *iov_copy, size_t *offset,
1236                            size_t size)
1237 {
1238         int i;
1239         const struct iovec *iov = *iov_out;
1240         for (i = 0; i < *nr_segs;
1241              i++) {
1242                 const struct iovec *iv = &iov[i];
1243                 struct iovec *ivc = &iov_copy[i];
1244                 *ivc = *iv;
1245                 if (i == 0) {
1246                         ivc->iov_len -= *offset;
1247                         ivc->iov_base += *offset;
1248                 }
1249                 if (ivc->iov_len >= size) {
1250                         ivc->iov_len = size;
1251                         if (i == 0)
1252                                 *offset += size;
1253                         else
1254                                 *offset = size;
1255                         break;
1256                 }
1257                 size -= ivc->iov_len;
1258         }
1259         *iov_out += i;
1260         *nr_segs -= i;
1261         *nrsegs_copy = i + 1;
1262
1263         return 0;
1264 }
1265
1266 static int ll_reget_short_lock(struct page *page, int rw,
1267                                obd_off start, obd_off end,
1268                                void **cookie)
1269 {
1270         struct ll_async_page *llap;
1271         struct obd_export *exp;
1272         struct inode *inode = page->mapping->host;
1273
1274         ENTRY;
1275
1276         exp = ll_i2obdexp(inode);
1277         if (exp == NULL)
1278                 RETURN(0);
1279
1280         llap = llap_cast_private(page);
1281         if (llap == NULL)
1282                 RETURN(0);
1283
1284         RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
1285                                     &llap->llap_cookie, rw, start, end,
1286                                     cookie));
1287 }
1288
1289 static void ll_release_short_lock(struct inode *inode, obd_off end,
1290                                   void *cookie, int rw)
1291 {
1292         struct obd_export *exp;
1293         int rc;
1294
1295         exp = ll_i2obdexp(inode);
1296         if (exp == NULL)
1297                 return;
1298
1299         rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
1300                                     cookie, rw);
1301         if (rc < 0)
1302                 CERROR("unlock failed (%d)\n", rc);
1303 }
1304
1305 static inline int ll_file_get_fast_lock(struct file *file,
1306                                         obd_off ppos, obd_off end,
1307                                         const struct iovec *iov,
1308                                         unsigned long nr_segs,
1309                                         void **cookie, int rw)
1310 {
1311         int rc = 0, seg;
1312         struct page *page;
1313
1314         ENTRY;
1315
1316         /* we would like this read request to be lockfree */
1317         for (seg = 0; seg < nr_segs; seg++) {
1318                 const struct iovec *iv = &iov[seg];
1319                 if (ll_region_mapped((unsigned long)iv->iov_base, iv->iov_len))
1320                         GOTO(out, rc);
1321         }
1322
1323         page = find_lock_page(file->f_dentry->d_inode->i_mapping,
1324                               ppos >> CFS_PAGE_SHIFT);
1325         if (page) {
1326                 if (ll_reget_short_lock(page, rw, ppos, end, cookie))
1327                         rc = 1;
1328
1329                 unlock_page(page);
1330                 page_cache_release(page);
1331         }
1332
1333 out:
1334         RETURN(rc);
1335 }
1336
1337 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
1338                                          void *cookie, int rw)
1339 {
1340         ll_release_short_lock(inode, end, cookie, rw);
1341 }
1342
1343 enum ll_lock_style {
1344         LL_LOCK_STYLE_NOLOCK   = 0,
1345         LL_LOCK_STYLE_FASTLOCK = 1,
1346         LL_LOCK_STYLE_TREELOCK = 2
1347 };
1348
1349 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
1350                                    obd_off end, const struct iovec *iov,
1351                                    unsigned long nr_segs, void **cookie,
1352                                    struct ll_lock_tree *tree, int rw)
1353 {
1354         int rc;
1355
1356         ENTRY;
1357
1358         if (ll_file_get_fast_lock(file, ppos, end, iov, nr_segs, cookie, rw))
1359                 RETURN(LL_LOCK_STYLE_FASTLOCK);
1360
1361         rc = ll_file_get_tree_lock_iov(tree, file, iov, nr_segs,
1362                                        ppos, end, rw);
1363         /* rc: 1 for tree lock, 0 for no lock, <0 for error */
1364         switch (rc) {
1365         case 1:
1366                 RETURN(LL_LOCK_STYLE_TREELOCK);
1367         case 0:
1368                 RETURN(LL_LOCK_STYLE_NOLOCK);
1369         }
1370
1371         /* an error happened if we reached this point, rc = -errno here */
1372         RETURN(rc);
1373 }
1374
1375 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
1376                                     enum ll_lock_style lock_style,
1377                                     void *cookie, struct ll_lock_tree *tree,
1378                                     int rw)
1379
1380 {
1381         switch (lock_style) {
1382         case LL_LOCK_STYLE_TREELOCK:
1383                 ll_tree_unlock(tree);
1384                 break;
1385         case LL_LOCK_STYLE_FASTLOCK:
1386                 ll_file_put_fast_lock(inode, end, cookie, rw);
1387                 break;
1388         default:
1389                 CERROR("invalid locking style (%d)\n", lock_style);
1390         }
1391 }
1392
1393 #ifdef HAVE_FILE_READV
1394 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
1395                               unsigned long nr_segs, loff_t *ppos)
1396 {
1397 #else
1398 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
1399                                 unsigned long nr_segs, loff_t pos)
1400 {
1401         struct file *file = iocb->ki_filp;
1402         loff_t *ppos = &iocb->ki_pos;
1403 #endif
1404         struct inode *inode = file->f_dentry->d_inode;
1405         struct ll_inode_info *lli = ll_i2info(inode);
1406         struct lov_stripe_md *lsm = lli->lli_smd;
1407         struct ll_sb_info *sbi = ll_i2sbi(inode);
1408         struct ll_lock_tree tree;
1409         struct ost_lvb lvb;
1410         struct ll_ra_read bead;
1411         int ra = 0;
1412         obd_off end;
1413         ssize_t retval, chunk, sum = 0;
1414         int lock_style;
1415         struct iovec *iov_copy = NULL;
1416         unsigned long nrsegs_copy, nrsegs_orig = 0;
1417         size_t count, iov_offset = 0;
1418         __u64 kms;
1419         void *cookie;
1420         ENTRY;
1421
1422         count = ll_file_get_iov_count(iov, &nr_segs);
1423         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1424                inode->i_ino, inode->i_generation, inode, count, *ppos);
1425         /* "If nbyte is 0, read() will return 0 and have no other results."
1426          *                      -- Single Unix Spec */
1427         if (count == 0)
1428                 RETURN(0);
1429
1430         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1431
1432         if (!lsm) {
1433                 /* Read on file with no objects should return zero-filled
1434                  * buffers up to file size (we can get non-zero sizes with
1435                  * mknod + truncate, then opening file for read. This is a
1436                  * common pattern in NFS case, it seems). Bug 6243 */
1437                 int notzeroed;
1438                 /* Since there are no objects on OSTs, we have nothing to get
1439                  * lock on and so we are forced to access inode->i_size
1440                  * unguarded */
1441
1442                 /* Read beyond end of file */
1443                 if (*ppos >= i_size_read(inode))
1444                         RETURN(0);
1445
1446                 if (count > i_size_read(inode) - *ppos)
1447                         count = i_size_read(inode) - *ppos;
1448                 /* Make sure to correctly adjust the file pos pointer for
1449                  * EFAULT case */
1450                 for (nrsegs_copy = 0; nrsegs_copy < nr_segs; nrsegs_copy++) {
1451                         const struct iovec *iv = &iov[nrsegs_copy];
1452
1453                         if (count < iv->iov_len)
1454                                 chunk = count;
1455                         else
1456                                 chunk = iv->iov_len;
1457                         notzeroed = clear_user(iv->iov_base, chunk);
1458                         sum += (chunk - notzeroed);
1459                         count -= (chunk - notzeroed);
1460                         if (notzeroed || !count)
1461                                 break;
1462                 }
1463                 *ppos += sum;
1464                 if (!sum)
1465                         RETURN(-EFAULT);
1466                 RETURN(sum);
1467         }
1468
1469 repeat:
1470         if (sbi->ll_max_rw_chunk != 0) {
1471                 /* first, let's know the end of the current stripe */
1472                 end = *ppos;
1473                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END,&end);
1474
1475                 /* correct, the end is beyond the request */
1476                 if (end > *ppos + count - 1)
1477                         end = *ppos + count - 1;
1478
1479                 /* and chunk shouldn't be too large even if striping is wide */
1480                 if (end - *ppos > sbi->ll_max_rw_chunk)
1481                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1482
1483                 chunk = end - *ppos + 1;
1484                 if ((count == chunk) && (iov_offset == 0)) {
1485                         if (iov_copy)
1486                                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1487
1488                         iov_copy = (struct iovec *)iov;
1489                         nrsegs_copy = nr_segs;
1490                 } else {
1491                         if (!iov_copy) {
1492                                 nrsegs_orig = nr_segs;
1493                                 OBD_ALLOC(iov_copy, sizeof(*iov) * nr_segs);
1494                                 if (!iov_copy)
1495                                         GOTO(out, retval = -ENOMEM);
1496                         }
1497
1498                         iov_copy_update(&nr_segs, &iov, &nrsegs_copy, iov_copy,
1499                                         &iov_offset, chunk);
1500                 }
1501         } else {
1502                 end = *ppos + count - 1;
1503                 iov_copy = (struct iovec *)iov;
1504                 nrsegs_copy = nr_segs;
1505         }
1506
1507         lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
1508                                       iov_copy, nrsegs_copy, &cookie, &tree,
1509                                       OBD_BRW_READ);
1510         if (lock_style < 0)
1511                 GOTO(out, retval = lock_style);
1512
1513         ll_inode_size_lock(inode, 1);
1514         /*
1515          * Consistency guarantees: following possibilities exist for the
1516          * relation between region being read and real file size at this
1517          * moment:
1518          *
1519          *  (A): the region is completely inside of the file;
1520          *
1521          *  (B-x): x bytes of region are inside of the file, the rest is
1522          *  outside;
1523          *
1524          *  (C): the region is completely outside of the file.
1525          *
1526          * This classification is stable under DLM lock acquired by
1527          * ll_tree_lock() above, because to change class, other client has to
1528          * take DLM lock conflicting with our lock. Also, any updates to
1529          * ->i_size by other threads on this client are serialized by
1530          * ll_inode_size_lock(). This guarantees that short reads are handled
1531          * correctly in the face of concurrent writes and truncates.
1532          */
1533         inode_init_lvb(inode, &lvb);
1534         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1535         kms = lvb.lvb_size;
1536         if (*ppos + count - 1 > kms) {
1537                 /* A glimpse is necessary to determine whether we return a
1538                  * short read (B) or some zeroes at the end of the buffer (C) */
1539                 ll_inode_size_unlock(inode, 1);
1540                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1541                 if (retval) {
1542                         if (lock_style != LL_LOCK_STYLE_NOLOCK)
1543                                 ll_file_put_lock(inode, end, lock_style,
1544                                                  cookie, &tree, OBD_BRW_READ);
1545                         goto out;
1546                 }
1547         } else {
1548                 /* region is within kms and, hence, within real file size (A).
1549                  * We need to increase i_size to cover the read region so that
1550                  * generic_file_read() will do its job, but that doesn't mean
1551                  * the kms size is _correct_, it is only the _minimum_ size.
1552                  * If someone does a stat they will get the correct size which
1553                  * will always be >= the kms value here.  b=11081 */
1554                 if (i_size_read(inode) < kms)
1555                         i_size_write(inode, kms);
1556                 ll_inode_size_unlock(inode, 1);
1557         }
1558
1559         chunk = end - *ppos + 1;
1560         CDEBUG(D_INODE,"Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1561                inode->i_ino, chunk, *ppos, i_size_read(inode));
1562
1563         /* turn off the kernel's read-ahead */
1564         if (lock_style != LL_LOCK_STYLE_NOLOCK) {
1565                 /* read under locks
1566                  *
1567                  * 1. update inode's atime as long as concurrent stat
1568                  * (via ll_glimpse_size) might bring out-of-date ones
1569                  *
1570                  * 2. update lsm so that next stat (via
1571                  * ll_glimpse_size) could get correct values in lsm */
1572                 struct ost_lvb xtimes;
1573
1574                 lov_stripe_lock(lsm);
1575                 LTIME_S(inode->i_atime) = LTIME_S(CURRENT_TIME);
1576                 xtimes.lvb_atime = LTIME_S(inode->i_atime);
1577                 obd_update_lvb(sbi->ll_osc_exp, lsm, &xtimes,
1578                                OBD_MD_FLATIME);
1579                 lov_stripe_unlock(lsm);
1580
1581                 file->f_ra.ra_pages = 0;
1582                 /* initialize read-ahead window once per syscall */
1583                 if (ra == 0) {
1584                         ra = 1;
1585                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1586                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1587                         ll_ra_read_in(file, &bead);
1588                 }
1589
1590                 /* BUG: 5972 */
1591                 file_accessed(file);
1592 #ifdef HAVE_FILE_READV
1593                 retval = generic_file_readv(file, iov_copy, nrsegs_copy, ppos);
1594 #else
1595                 retval = generic_file_aio_read(iocb, iov_copy, nrsegs_copy,
1596                                                *ppos);
1597 #endif
1598                 ll_file_put_lock(inode, end, lock_style, cookie,
1599                                  &tree, OBD_BRW_READ);
1600         } else {
1601                 /* lockless read
1602                  *
1603                  * current time will get into request as atime
1604                  * (lustre/osc/osc_request.c:osc_build_request())
1605                  */
1606                 retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy, ppos,
1607                                              READ, chunk);
1608         }
1609         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1610         if (retval > 0) {
1611                 count -= retval;
1612                 sum += retval;
1613                 if (retval == chunk && count > 0)
1614                         goto repeat;
1615         }
1616
1617  out:
1618         if (ra != 0)
1619                 ll_ra_read_ex(file, &bead);
1620         retval = (sum > 0) ? sum : retval;
1621
1622         if (iov_copy && iov_copy != iov)
1623                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1624
1625         RETURN(retval);
1626 }
1627
1628 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1629                             loff_t *ppos)
1630 {
1631         struct iovec local_iov = { .iov_base = (void __user *)buf,
1632                                    .iov_len = count };
1633 #ifdef HAVE_FILE_READV
1634         return ll_file_readv(file, &local_iov, 1, ppos);
1635 #else
1636         struct kiocb kiocb;
1637         ssize_t ret;
1638
1639         init_sync_kiocb(&kiocb, file);
1640         kiocb.ki_pos = *ppos;
1641         kiocb.ki_left = count;
1642
1643         ret = ll_file_aio_read(&kiocb, &local_iov, 1, kiocb.ki_pos);
1644         *ppos = kiocb.ki_pos;
1645         return ret;
1646 #endif
1647 }
1648
1649 /*
1650  * Write to a file (through the page cache).
1651  */
1652 #ifdef HAVE_FILE_WRITEV
1653 static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
1654                               unsigned long nr_segs, loff_t *ppos)
1655 {
1656 #else /* AIO stuff */
1657 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1658                                  unsigned long nr_segs, loff_t pos)
1659 {
1660         struct file *file = iocb->ki_filp;
1661         loff_t *ppos = &iocb->ki_pos;
1662 #endif
1663         struct inode *inode = file->f_dentry->d_inode;
1664         struct ll_sb_info *sbi = ll_i2sbi(inode);
1665         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1666         struct ll_lock_tree tree;
1667         loff_t maxbytes = ll_file_maxbytes(inode);
1668         loff_t lock_start, lock_end, end;
1669         ssize_t retval, chunk, sum = 0;
1670         int tree_locked;
1671         struct iovec *iov_copy = NULL;
1672         unsigned long nrsegs_copy, nrsegs_orig = 0;
1673         size_t count, iov_offset = 0;
1674         ENTRY;
1675
1676         count = ll_file_get_iov_count(iov, &nr_segs);
1677
1678         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1679                inode->i_ino, inode->i_generation, inode, count, *ppos);
1680
1681         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1682
1683         /* POSIX, but surprised the VFS doesn't check this already */
1684         if (count == 0)
1685                 RETURN(0);
1686
1687         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1688          * called on the file, don't fail the below assertion (bug 2388). */
1689         if (file->f_flags & O_LOV_DELAY_CREATE &&
1690             ll_i2info(inode)->lli_smd == NULL)
1691                 RETURN(-EBADF);
1692
1693         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1694
1695         down(&ll_i2info(inode)->lli_write_sem);
1696
1697 repeat:
1698         chunk = 0; /* just to fix gcc's warning */
1699         end = *ppos + count - 1;
1700
1701         if (file->f_flags & O_APPEND) {
1702                 lock_start = 0;
1703                 lock_end = OBD_OBJECT_EOF;
1704                 iov_copy = (struct iovec *)iov;
1705                 nrsegs_copy = nr_segs;
1706         } else if (sbi->ll_max_rw_chunk != 0) {
1707                 /* first, let's know the end of the current stripe */
1708                 end = *ppos;
1709                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END,
1710                                 (obd_off *)&end);
1711
1712                 /* correct, the end is beyond the request */
1713                 if (end > *ppos + count - 1)
1714                         end = *ppos + count - 1;
1715
1716                 /* and chunk shouldn't be too large even if striping is wide */
1717                 if (end - *ppos > sbi->ll_max_rw_chunk)
1718                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1719                 lock_start = *ppos;
1720                 lock_end = end;
1721                 chunk = end - *ppos + 1;
1722                 if ((count == chunk) && (iov_offset == 0)) {
1723                         if (iov_copy)
1724                                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1725
1726                         iov_copy = (struct iovec *)iov;
1727                         nrsegs_copy = nr_segs;
1728                 } else {
1729                         if (!iov_copy) {
1730                                 nrsegs_orig = nr_segs;
1731                                 OBD_ALLOC(iov_copy, sizeof(*iov) * nr_segs);
1732                                 if (!iov_copy)
1733                                         GOTO(out, retval = -ENOMEM);
1734                         }
1735                         iov_copy_update(&nr_segs, &iov, &nrsegs_copy, iov_copy,
1736                                         &iov_offset, chunk);
1737                 }
1738         } else {
1739                 lock_start = *ppos;
1740                 lock_end = end;
1741                 iov_copy = (struct iovec *)iov;
1742                 nrsegs_copy = nr_segs;
1743         }
1744
1745         tree_locked = ll_file_get_tree_lock_iov(&tree, file, iov_copy,
1746                                                 nrsegs_copy,
1747                                                 (obd_off)lock_start,
1748                                                 (obd_off)lock_end,
1749                                                 OBD_BRW_WRITE);
1750         if (tree_locked < 0)
1751                 GOTO(out, retval = tree_locked);
1752
1753         /* This is ok, g_f_w will overwrite this under i_sem if it races
1754          * with a local truncate, it just makes our maxbyte checking easier.
1755          * The i_size value gets updated in ll_extent_lock() as a consequence
1756          * of the [0,EOF] extent lock we requested above. */
1757         if (file->f_flags & O_APPEND) {
1758                 *ppos = i_size_read(inode);
1759                 end = *ppos + count - 1;
1760         }
1761
1762         if (*ppos >= maxbytes) {
1763                 send_sig(SIGXFSZ, current, 0);
1764                 GOTO(out_unlock, retval = -EFBIG);
1765         }
1766         if (end > maxbytes - 1)
1767                 end = maxbytes - 1;
1768
1769         /* generic_file_write handles O_APPEND after getting i_mutex */
1770         chunk = end - *ppos + 1;
1771         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1772                inode->i_ino, chunk, *ppos);
1773         if (tree_locked) {
1774                 /* write under locks
1775                  *
1776                  * 1. update inode's mtime and ctime as long as
1777                  * concurrent stat (via ll_glimpse_size) might bring
1778                  * out-of-date ones
1779                  *
1780                  * 2. update lsm so that next stat (via
1781                  * ll_glimpse_size) could get correct values in lsm */
1782                 struct ost_lvb xtimes;
1783
1784                 lov_stripe_lock(lsm);
1785                 LTIME_S(inode->i_mtime) = LTIME_S(CURRENT_TIME);
1786                 LTIME_S(inode->i_ctime) = LTIME_S(CURRENT_TIME);
1787                 xtimes.lvb_mtime = LTIME_S(inode->i_mtime);
1788                 xtimes.lvb_ctime = LTIME_S(inode->i_ctime);
1789                 obd_update_lvb(sbi->ll_osc_exp, lsm, &xtimes,
1790                                OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1791                 lov_stripe_unlock(lsm);
1792
1793 #ifdef HAVE_FILE_WRITEV
1794                 retval = generic_file_writev(file, iov_copy, nrsegs_copy, ppos);
1795 #else
1796                 retval = generic_file_aio_write(iocb, iov_copy, nrsegs_copy,
1797                                                 *ppos);
1798 #endif
1799         } else {
1800                 /* lockless write
1801                  *
1802                  * current time will get into request as mtime and
1803                  * ctime (lustre/osc/osc_request.c:osc_build_request())
1804                  */
1805                 retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy,
1806                                              ppos, WRITE, chunk);
1807         }
1808         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1809
1810 out_unlock:
1811         if (tree_locked)
1812                 ll_tree_unlock(&tree);
1813
1814 out:
1815         if (retval > 0) {
1816                 count -= retval;
1817                 sum += retval;
1818                 if (retval == chunk && count > 0)
1819                         goto repeat;
1820         }
1821
1822         up(&ll_i2info(inode)->lli_write_sem);
1823
1824         if (iov_copy && iov_copy != iov)
1825                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1826
1827         retval = (sum > 0) ? sum : retval;
1828         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1829                            retval > 0 ? retval : 0);
1830         RETURN(retval);
1831 }
1832
1833 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1834                              loff_t *ppos)
1835 {
1836         struct iovec local_iov = { .iov_base = (void __user *)buf,
1837                                    .iov_len = count };
1838
1839 #ifdef HAVE_FILE_WRITEV
1840         return ll_file_writev(file, &local_iov, 1, ppos);
1841 #else
1842         struct kiocb kiocb;
1843         ssize_t ret;
1844
1845         init_sync_kiocb(&kiocb, file);
1846         kiocb.ki_pos = *ppos;
1847         kiocb.ki_left = count;
1848
1849         ret = ll_file_aio_write(&kiocb, &local_iov, 1, kiocb.ki_pos);
1850         *ppos = kiocb.ki_pos;
1851
1852         return ret;
1853 #endif
1854 }
1855
1856 /*
1857  * Send file content (through pagecache) somewhere with helper
1858  */
1859 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1860                                 read_actor_t actor, void *target)
1861 {
1862         struct inode *inode = in_file->f_dentry->d_inode;
1863         struct ll_inode_info *lli = ll_i2info(inode);
1864         struct lov_stripe_md *lsm = lli->lli_smd;
1865         struct ll_lock_tree tree;
1866         struct ll_lock_tree_node *node;
1867         struct ost_lvb lvb;
1868         struct ll_ra_read bead;
1869         int rc;
1870         ssize_t retval;
1871         __u64 kms;
1872         ENTRY;
1873         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1874                inode->i_ino, inode->i_generation, inode, count, *ppos);
1875
1876         /* "If nbyte is 0, read() will return 0 and have no other results."
1877          *                      -- Single Unix Spec */
1878         if (count == 0)
1879                 RETURN(0);
1880
1881         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1882         /* turn off the kernel's read-ahead */
1883         in_file->f_ra.ra_pages = 0;
1884
1885         /* File with no objects, nothing to lock */
1886         if (!lsm)
1887                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1888
1889         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1890         if (IS_ERR(node))
1891                 RETURN(PTR_ERR(node));
1892
1893         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1894         rc = ll_tree_lock(&tree, node, NULL, count,
1895                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1896         if (rc != 0)
1897                 RETURN(rc);
1898
1899         ll_clear_file_contended(inode);
1900         ll_inode_size_lock(inode, 1);
1901         /*
1902          * Consistency guarantees: following possibilities exist for the
1903          * relation between region being read and real file size at this
1904          * moment:
1905          *
1906          *  (A): the region is completely inside of the file;
1907          *
1908          *  (B-x): x bytes of region are inside of the file, the rest is
1909          *  outside;
1910          *
1911          *  (C): the region is completely outside of the file.
1912          *
1913          * This classification is stable under DLM lock acquired by
1914          * ll_tree_lock() above, because to change class, other client has to
1915          * take DLM lock conflicting with our lock. Also, any updates to
1916          * ->i_size by other threads on this client are serialized by
1917          * ll_inode_size_lock(). This guarantees that short reads are handled
1918          * correctly in the face of concurrent writes and truncates.
1919          */
1920         inode_init_lvb(inode, &lvb);
1921         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1922         kms = lvb.lvb_size;
1923         if (*ppos + count - 1 > kms) {
1924                 /* A glimpse is necessary to determine whether we return a
1925                  * short read (B) or some zeroes at the end of the buffer (C) */
1926                 ll_inode_size_unlock(inode, 1);
1927                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1928                 if (retval)
1929                         goto out;
1930         } else {
1931                 /* region is within kms and, hence, within real file size (A) */
1932                 i_size_write(inode, kms);
1933                 ll_inode_size_unlock(inode, 1);
1934         }
1935
1936         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1937                inode->i_ino, count, *ppos, i_size_read(inode));
1938
1939         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1940         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1941         ll_ra_read_in(in_file, &bead);
1942         /* BUG: 5972 */
1943         file_accessed(in_file);
1944         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1945         ll_ra_read_ex(in_file, &bead);
1946
1947  out:
1948         ll_tree_unlock(&tree);
1949         RETURN(retval);
1950 }
1951
1952 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1953                                unsigned long arg)
1954 {
1955         struct ll_inode_info *lli = ll_i2info(inode);
1956         struct obd_export *exp = ll_i2obdexp(inode);
1957         struct ll_recreate_obj ucreatp;
1958         struct obd_trans_info oti = { 0 };
1959         struct obdo *oa = NULL;
1960         int lsm_size;
1961         int rc = 0;
1962         struct lov_stripe_md *lsm, *lsm2;
1963         ENTRY;
1964
1965         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1966                 RETURN(-EPERM);
1967
1968         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1969                             sizeof(struct ll_recreate_obj));
1970         if (rc) {
1971                 RETURN(-EFAULT);
1972         }
1973         OBDO_ALLOC(oa);
1974         if (oa == NULL)
1975                 RETURN(-ENOMEM);
1976
1977         down(&lli->lli_size_sem);
1978         lsm = lli->lli_smd;
1979         if (lsm == NULL)
1980                 GOTO(out, rc = -ENOENT);
1981         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1982                    (lsm->lsm_stripe_count));
1983
1984         OBD_ALLOC(lsm2, lsm_size);
1985         if (lsm2 == NULL)
1986                 GOTO(out, rc = -ENOMEM);
1987
1988         oa->o_id = ucreatp.lrc_id;
1989         oa->o_nlink = ucreatp.lrc_ost_idx;
1990         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1991         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
1992         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1993                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1994
1995         memcpy(lsm2, lsm, lsm_size);
1996         rc = obd_create(exp, oa, &lsm2, &oti);
1997
1998         OBD_FREE(lsm2, lsm_size);
1999         GOTO(out, rc);
2000 out:
2001         up(&lli->lli_size_sem);
2002         OBDO_FREE(oa);
2003         return rc;
2004 }
2005
2006 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
2007                                     int flags, struct lov_user_md *lum,
2008                                     int lum_size)
2009 {
2010         struct ll_inode_info *lli = ll_i2info(inode);
2011         struct lov_stripe_md *lsm;
2012         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
2013         int rc = 0;
2014         ENTRY;
2015
2016         down(&lli->lli_size_sem);
2017         lsm = lli->lli_smd;
2018         if (lsm) {
2019                 up(&lli->lli_size_sem);
2020                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
2021                        inode->i_ino);
2022                 RETURN(-EEXIST);
2023         }
2024
2025         rc = ll_intent_file_open(file, lum, lum_size, &oit);
2026         if (rc)
2027                 GOTO(out, rc);
2028         if (it_disposition(&oit, DISP_LOOKUP_NEG))
2029                 GOTO(out_req_free, rc = -ENOENT);
2030         rc = oit.d.lustre.it_status;
2031         if (rc < 0)
2032                 GOTO(out_req_free, rc);
2033
2034         ll_release_openhandle(file->f_dentry, &oit);
2035
2036  out:
2037         up(&lli->lli_size_sem);
2038         ll_intent_release(&oit);
2039         RETURN(rc);
2040 out_req_free:
2041         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2042         goto out;
2043 }
2044
2045 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
2046                              struct lov_mds_md **lmmp, int *lmm_size,
2047                              struct ptlrpc_request **request)
2048 {
2049         struct ll_sb_info *sbi = ll_i2sbi(inode);
2050         struct ll_fid  fid;
2051         struct mds_body  *body;
2052         struct lov_mds_md *lmm = NULL;
2053         struct ptlrpc_request *req = NULL;
2054         int rc, lmmsize;
2055
2056         ll_inode2fid(&fid, inode);
2057
2058         rc = ll_get_max_mdsize(sbi, &lmmsize);
2059         if (rc)
2060                 RETURN(rc);
2061
2062         rc = mdc_getattr_name(sbi->ll_mdc_exp, &fid,
2063                         filename, strlen(filename) + 1,
2064                         OBD_MD_FLEASIZE | OBD_MD_FLDIREA,
2065                         lmmsize, &req);
2066         if (rc < 0) {
2067                 CDEBUG(D_INFO, "mdc_getattr_name failed "
2068                                 "on %s: rc %d\n", filename, rc);
2069                 GOTO(out, rc);
2070         }
2071
2072         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
2073                         sizeof(*body));
2074         LASSERT(body != NULL); /* checked by mdc_getattr_name */
2075         /* swabbed by mdc_getattr_name */
2076         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF));
2077
2078         lmmsize = body->eadatasize;
2079
2080         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
2081                         lmmsize == 0) {
2082                 GOTO(out, rc = -ENODATA);
2083         }
2084
2085         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
2086                         lmmsize);
2087         LASSERT(lmm != NULL);
2088         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF + 1));
2089
2090         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
2091             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3)) &&
2092             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_JOIN))) {
2093                 GOTO(out, rc = -EPROTO);
2094         }
2095         /*
2096          * This is coming from the MDS, so is probably in
2097          * little endian.  We convert it to host endian before
2098          * passing it to userspace.
2099          */
2100         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
2101                 /* if function called for directory - we should
2102                  * avoid swab not existent lsm objects */
2103                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
2104                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
2105                         if (S_ISREG(body->mode))
2106                                 lustre_swab_lov_user_md_objects(
2107                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
2108                                  ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
2109                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
2110                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
2111                         if (S_ISREG(body->mode))
2112                                 lustre_swab_lov_user_md_objects(
2113                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
2114                                  ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
2115                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_JOIN)) {
2116                         lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
2117                 }
2118         }
2119
2120         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
2121                 struct lov_stripe_md *lsm;
2122                 struct lov_user_md_join *lmj;
2123                 int lmj_size, i, aindex = 0;
2124
2125                 rc = obd_unpackmd(sbi->ll_osc_exp, &lsm, lmm, lmmsize);
2126                 if (rc < 0)
2127                         GOTO(out, rc = -ENOMEM);
2128                 rc = obd_checkmd(sbi->ll_osc_exp, sbi->ll_mdc_exp, lsm);
2129                 if (rc)
2130                         GOTO(out_free_memmd, rc);
2131
2132                 lmj_size = sizeof(struct lov_user_md_join) +
2133                         lsm->lsm_stripe_count *
2134                         sizeof(struct lov_user_ost_data_join);
2135                 OBD_ALLOC(lmj, lmj_size);
2136                 if (!lmj)
2137                         GOTO(out_free_memmd, rc = -ENOMEM);
2138
2139                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
2140                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2141                         struct lov_extent *lex =
2142                                 &lsm->lsm_array->lai_ext_array[aindex];
2143
2144                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
2145                                 aindex ++;
2146                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
2147                                         LPU64" len %d\n", aindex, i,
2148                                         lex->le_start, (int)lex->le_len);
2149                         lmj->lmm_objects[i].l_extent_start =
2150                                 lex->le_start;
2151
2152                         if ((int)lex->le_len == -1)
2153                                 lmj->lmm_objects[i].l_extent_end = -1;
2154                         else
2155                                 lmj->lmm_objects[i].l_extent_end =
2156                                         lex->le_start + lex->le_len;
2157                         lmj->lmm_objects[i].l_object_id =
2158                                 lsm->lsm_oinfo[i]->loi_id;
2159                         lmj->lmm_objects[i].l_object_gr =
2160                                 lsm->lsm_oinfo[i]->loi_gr;
2161                         lmj->lmm_objects[i].l_ost_gen =
2162                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2163                         lmj->lmm_objects[i].l_ost_idx =
2164                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2165                 }
2166                 lmm = (struct lov_mds_md *)lmj;
2167                 lmmsize = lmj_size;
2168 out_free_memmd:
2169                 obd_free_memmd(sbi->ll_osc_exp, &lsm);
2170         }
2171 out:
2172         *lmmp = lmm;
2173         *lmm_size = lmmsize;
2174         *request = req;
2175         return rc;
2176 }
2177 static int ll_lov_setea(struct inode *inode, struct file *file,
2178                             unsigned long arg)
2179 {
2180         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2181         struct lov_user_md  *lump;
2182         int lum_size = sizeof(struct lov_user_md) +
2183                        sizeof(struct lov_user_ost_data);
2184         int rc;
2185         ENTRY;
2186
2187         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2188                 RETURN(-EPERM);
2189
2190         OBD_ALLOC(lump, lum_size);
2191         if (lump == NULL) {
2192                 RETURN(-ENOMEM);
2193         }
2194         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2195         if (rc) {
2196                 OBD_FREE(lump, lum_size);
2197                 RETURN(-EFAULT);
2198         }
2199
2200         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2201
2202         OBD_FREE(lump, lum_size);
2203         RETURN(rc);
2204 }
2205
2206 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2207                             unsigned long arg)
2208 {
2209         struct lov_user_md_v3 lumv3;
2210         struct lov_user_md_v1 *lumv1 = (struct lov_user_md_v1 *)&lumv3;
2211         struct lov_user_md_v1 *lumv1p = (struct lov_user_md_v1 *)arg;
2212         struct lov_user_md_v3 *lumv3p = (struct lov_user_md_v3 *)arg;
2213         int lum_size;
2214         int rc;
2215         int flags = FMODE_WRITE;
2216         ENTRY;
2217
2218         /* first try with v1 which is smaller than v3 */
2219         lum_size = sizeof(struct lov_user_md_v1);
2220         rc = copy_from_user(lumv1, lumv1p, lum_size);
2221         if (rc)
2222                 RETURN(-EFAULT);
2223
2224         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
2225                 lum_size = sizeof(struct lov_user_md_v3);
2226                 rc = copy_from_user(&lumv3, lumv3p, lum_size);
2227                 if (rc)
2228                         RETURN(-EFAULT);
2229         }
2230
2231         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
2232         if (rc == 0) {
2233                  put_user(0, &lumv1p->lmm_stripe_count);
2234                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode),
2235                                     0, ll_i2info(inode)->lli_smd,
2236                                     (void *)arg);
2237         }
2238         RETURN(rc);
2239 }
2240
2241 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2242 {
2243         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2244
2245         if (!lsm)
2246                 RETURN(-ENODATA);
2247
2248         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode), 0, lsm,
2249                             (void *)arg);
2250 }
2251
2252 static int ll_get_grouplock(struct inode *inode, struct file *file,
2253                             unsigned long arg)
2254 {
2255         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2256         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2257                                                     .end = OBD_OBJECT_EOF}};
2258         struct lustre_handle lockh = { 0 };
2259         struct ll_inode_info *lli = ll_i2info(inode);
2260         struct lov_stripe_md *lsm = lli->lli_smd;
2261         int flags = 0, rc;
2262         ENTRY;
2263
2264         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2265                 RETURN(-EINVAL);
2266         }
2267
2268         policy.l_extent.gid = arg;
2269         if (file->f_flags & O_NONBLOCK)
2270                 flags = LDLM_FL_BLOCK_NOWAIT;
2271
2272         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2273         if (rc)
2274                 RETURN(rc);
2275
2276         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2277         fd->fd_gid = arg;
2278         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2279
2280         RETURN(0);
2281 }
2282
2283 static int ll_put_grouplock(struct inode *inode, struct file *file,
2284                             unsigned long arg)
2285 {
2286         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2287         struct ll_inode_info *lli = ll_i2info(inode);
2288         struct lov_stripe_md *lsm = lli->lli_smd;
2289         int rc;
2290         ENTRY;
2291
2292         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2293                 /* Ugh, it's already unlocked. */
2294                 RETURN(-EINVAL);
2295         }
2296
2297         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2298                 RETURN(-EINVAL);
2299
2300         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2301
2302         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2303         if (rc)
2304                 RETURN(rc);
2305
2306         fd->fd_gid = 0;
2307         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2308
2309         RETURN(0);
2310 }
2311
2312 #if LUSTRE_FIX >= 50
2313 static int join_sanity_check(struct inode *head, struct inode *tail)
2314 {
2315         ENTRY;
2316         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2317                 CERROR("server do not support join \n");
2318                 RETURN(-EINVAL);
2319         }
2320         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2321                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2322                        head->i_ino, tail->i_ino);
2323                 RETURN(-EINVAL);
2324         }
2325         if (head->i_ino == tail->i_ino) {
2326                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2327                 RETURN(-EINVAL);
2328         }
2329         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2330                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2331                 RETURN(-EINVAL);
2332         }
2333         RETURN(0);
2334 }
2335
2336 static int join_file(struct inode *head_inode, struct file *head_filp,
2337                      struct file *tail_filp)
2338 {
2339         struct dentry *tail_dentry = tail_filp->f_dentry;
2340         struct lookup_intent oit = {.it_op = IT_OPEN,
2341                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2342         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_PW,
2343                 ll_mdc_blocking_ast, ldlm_completion_ast, NULL, NULL };
2344
2345         struct lustre_handle lockh;
2346         struct mdc_op_data *op_data;
2347         int    rc;
2348         loff_t data;
2349         ENTRY;
2350
2351         tail_dentry = tail_filp->f_dentry;
2352
2353         OBD_ALLOC_PTR(op_data);
2354         if (op_data == NULL) {
2355                 RETURN(-ENOMEM);
2356         }
2357
2358         data = i_size_read(head_inode);
2359         ll_prepare_mdc_op_data(op_data, head_inode,
2360                                tail_dentry->d_parent->d_inode,
2361                                tail_dentry->d_name.name,
2362                                tail_dentry->d_name.len, 0, &data);
2363         rc = mdc_enqueue(ll_i2mdcexp(head_inode), &einfo, &oit,
2364                          op_data, &lockh, NULL, 0, 0);
2365
2366         if (rc < 0)
2367                 GOTO(out, rc);
2368
2369         rc = oit.d.lustre.it_status;
2370
2371         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2372                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2373                 ptlrpc_req_finished((struct ptlrpc_request *)
2374                                     oit.d.lustre.it_data);
2375                 GOTO(out, rc);
2376         }
2377
2378         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2379                                            * away */
2380                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2381                 oit.d.lustre.it_lock_mode = 0;
2382         }
2383         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2384         it_clear_disposition(&oit, DISP_ENQ_COMPLETE);
2385         ll_release_openhandle(head_filp->f_dentry, &oit);
2386 out:
2387         if (op_data)
2388                 OBD_FREE_PTR(op_data);
2389         ll_intent_release(&oit);
2390         RETURN(rc);
2391 }
2392
2393 static int ll_file_join(struct inode *head, struct file *filp,
2394                         char *filename_tail)
2395 {
2396         struct inode *tail = NULL, *first = NULL, *second = NULL;
2397         struct dentry *tail_dentry;
2398         struct file *tail_filp, *first_filp, *second_filp;
2399         struct ll_lock_tree first_tree, second_tree;
2400         struct ll_lock_tree_node *first_node, *second_node;
2401         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2402         int rc = 0, cleanup_phase = 0;
2403         ENTRY;
2404
2405         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2406                head->i_ino, head->i_generation, head, filename_tail);
2407
2408         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2409         if (IS_ERR(tail_filp)) {
2410                 CERROR("Can not open tail file %s", filename_tail);
2411                 rc = PTR_ERR(tail_filp);
2412                 GOTO(cleanup, rc);
2413         }
2414         tail = igrab(tail_filp->f_dentry->d_inode);
2415
2416         tlli = ll_i2info(tail);
2417         tail_dentry = tail_filp->f_dentry;
2418         LASSERT(tail_dentry);
2419         cleanup_phase = 1;
2420
2421         /*reorder the inode for lock sequence*/
2422         first = head->i_ino > tail->i_ino ? head : tail;
2423         second = head->i_ino > tail->i_ino ? tail : head;
2424         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2425         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2426
2427         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2428                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2429         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2430         if (IS_ERR(first_node)){
2431                 rc = PTR_ERR(first_node);
2432                 GOTO(cleanup, rc);
2433         }
2434         first_tree.lt_fd = first_filp->private_data;
2435         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2436         if (rc != 0)
2437                 GOTO(cleanup, rc);
2438         cleanup_phase = 2;
2439
2440         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2441         if (IS_ERR(second_node)){
2442                 rc = PTR_ERR(second_node);
2443                 GOTO(cleanup, rc);
2444         }
2445         second_tree.lt_fd = second_filp->private_data;
2446         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2447         if (rc != 0)
2448                 GOTO(cleanup, rc);
2449         cleanup_phase = 3;
2450
2451         rc = join_sanity_check(head, tail);
2452         if (rc)
2453                 GOTO(cleanup, rc);
2454
2455         rc = join_file(head, filp, tail_filp);
2456         if (rc)
2457                 GOTO(cleanup, rc);
2458 cleanup:
2459         switch (cleanup_phase) {
2460         case 3:
2461                 ll_tree_unlock(&second_tree);
2462                 obd_cancel_unused(ll_i2obdexp(second),
2463                                   ll_i2info(second)->lli_smd, 0, NULL);
2464         case 2:
2465                 ll_tree_unlock(&first_tree);
2466                 obd_cancel_unused(ll_i2obdexp(first),
2467                                   ll_i2info(first)->lli_smd, 0, NULL);
2468         case 1:
2469                 filp_close(tail_filp, 0);
2470                 if (tail)
2471                         iput(tail);
2472                 if (head && rc == 0) {
2473                         obd_free_memmd(ll_i2sbi(head)->ll_osc_exp,
2474                                        &hlli->lli_smd);
2475                         hlli->lli_smd = NULL;
2476                 }
2477         case 0:
2478                 break;
2479         default:
2480                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2481                 LBUG();
2482         }
2483         RETURN(rc);
2484 }
2485 #endif  /* LUSTRE_FIX >= 50 */
2486
2487 /**
2488  * Close inode open handle
2489  *
2490  * \param dentry [in]     dentry which contains the inode
2491  * \param it     [in,out] intent which contains open info and result
2492  *
2493  * \retval 0     success
2494  * \retval <0    failure
2495  */
2496 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2497 {
2498         struct inode *inode = dentry->d_inode;
2499         struct obd_client_handle *och;
2500         int rc;
2501         ENTRY;
2502
2503         LASSERT(inode);
2504
2505         /* Root ? Do nothing. */
2506         if (dentry->d_inode->i_sb->s_root == dentry)
2507                 RETURN(0);
2508
2509         /* No open handle to close? Move away */
2510         if (!it_disposition(it, DISP_OPEN_OPEN))
2511                 RETURN(0);
2512
2513         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2514
2515         OBD_ALLOC(och, sizeof(*och));
2516         if (!och)
2517                 GOTO(out, rc = -ENOMEM);
2518
2519         ll_och_fill(ll_i2info(inode), it, och);
2520
2521         rc = ll_close_inode_openhandle(inode, och);
2522
2523         OBD_FREE(och, sizeof(*och));
2524  out:
2525         /* this one is in place of ll_file_open */
2526         if (it_disposition(it, DISP_ENQ_OPEN_REF))
2527                 ptlrpc_req_finished(it->d.lustre.it_data);
2528         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2529         RETURN(rc);
2530 }
2531
2532 int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
2533               int num_bytes)
2534 {
2535         struct obd_export *exp = ll_i2obdexp(inode);
2536         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2537         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
2538         int vallen = num_bytes;
2539         int rc;
2540         ENTRY;
2541
2542         /* If the stripe_count > 1 and the application does not understand
2543          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
2544          */
2545         if (lsm->lsm_stripe_count > 1 &&
2546             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
2547                 return -EOPNOTSUPP;
2548
2549         fm_key.oa.o_id = lsm->lsm_object_id;
2550         fm_key.oa.o_valid = OBD_MD_FLID;
2551
2552         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLFID | OBD_MD_FLSIZE);
2553
2554         /* If filesize is 0, then there would be no objects for mapping */
2555         if (fm_key.oa.o_size == 0) {
2556                 fiemap->fm_mapped_extents = 0;
2557                 RETURN(0);
2558         }
2559
2560         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
2561
2562         rc = obd_get_info(exp, sizeof(fm_key), &fm_key, &vallen, fiemap, lsm);
2563         if (rc)
2564                 CERROR("obd_get_info failed: rc = %d\n", rc);
2565
2566         RETURN(rc);
2567 }
2568
2569 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2570                   unsigned long arg)
2571 {
2572         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2573         int flags;
2574         ENTRY;
2575
2576         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2577                inode->i_generation, inode, cmd);
2578         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2579
2580         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2581         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2582                 RETURN(-ENOTTY);
2583
2584         switch(cmd) {
2585         case LL_IOC_GETFLAGS:
2586                 /* Get the current value of the file flags */
2587                 return put_user(fd->fd_flags, (int *)arg);
2588         case LL_IOC_SETFLAGS:
2589         case LL_IOC_CLRFLAGS:
2590                 /* Set or clear specific file flags */
2591                 /* XXX This probably needs checks to ensure the flags are
2592                  *     not abused, and to handle any flag side effects.
2593                  */
2594                 if (get_user(flags, (int *) arg))
2595                         RETURN(-EFAULT);
2596
2597                 if (cmd == LL_IOC_SETFLAGS) {
2598                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2599                             !(file->f_flags & O_DIRECT)) {
2600                                 CERROR("%s: unable to disable locking on "
2601                                        "non-O_DIRECT file\n", current->comm);
2602                                 RETURN(-EINVAL);
2603                         }
2604
2605                         fd->fd_flags |= flags;
2606                 } else {
2607                         fd->fd_flags &= ~flags;
2608                 }
2609                 RETURN(0);
2610         case LL_IOC_LOV_SETSTRIPE:
2611                 RETURN(ll_lov_setstripe(inode, file, arg));
2612         case LL_IOC_LOV_SETEA:
2613                 RETURN(ll_lov_setea(inode, file, arg));
2614         case LL_IOC_LOV_GETSTRIPE:
2615                 RETURN(ll_lov_getstripe(inode, arg));
2616         case LL_IOC_RECREATE_OBJ:
2617                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2618         case EXT3_IOC_FIEMAP: {
2619                 struct ll_user_fiemap *fiemap_s;
2620                 size_t num_bytes, ret_bytes;
2621                 unsigned int extent_count;
2622                 int rc = 0;
2623
2624                 /* Get the extent count so we can calculate the size of
2625                  * required fiemap buffer */
2626                 if (get_user(extent_count,
2627                     &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
2628                         RETURN(-EFAULT);
2629                 num_bytes = sizeof(*fiemap_s) + (extent_count *
2630                                                  sizeof(struct ll_fiemap_extent));
2631                 OBD_VMALLOC(fiemap_s, num_bytes);
2632                 if (fiemap_s == NULL)
2633                         RETURN(-ENOMEM);
2634
2635                 if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
2636                                    sizeof(*fiemap_s)))
2637                         GOTO(error, rc = -EFAULT);
2638
2639                 if (fiemap_s->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
2640                         fiemap_s->fm_flags = fiemap_s->fm_flags &
2641                                                     ~LUSTRE_FIEMAP_FLAGS_COMPAT;
2642                         if (copy_to_user((char *)arg, fiemap_s,
2643                                          sizeof(*fiemap_s)))
2644                                 GOTO(error, rc = -EFAULT);
2645
2646                         GOTO(error, rc = -EBADR);
2647                 }
2648
2649                 /* If fm_extent_count is non-zero, read the first extent since
2650                  * it is used to calculate end_offset and device from previous
2651                  * fiemap call. */
2652                 if (extent_count) {
2653                         if (copy_from_user(&fiemap_s->fm_extents[0],
2654                             (char __user *)arg + sizeof(*fiemap_s),
2655                             sizeof(struct ll_fiemap_extent)))
2656                                 GOTO(error, rc = -EFAULT);
2657                 }
2658
2659                 if (fiemap_s->fm_flags & FIEMAP_FLAG_SYNC) {
2660                         int rc;
2661
2662                         rc = filemap_fdatawrite(inode->i_mapping);
2663                         if (rc)
2664                                 GOTO(error, rc);
2665                 }
2666
2667                 rc = ll_fiemap(inode, fiemap_s, num_bytes);
2668                 if (rc)
2669                         GOTO(error, rc);
2670
2671                 ret_bytes = sizeof(struct ll_user_fiemap);
2672
2673                 if (extent_count != 0)
2674                         ret_bytes += (fiemap_s->fm_mapped_extents *
2675                                          sizeof(struct ll_fiemap_extent));
2676
2677                 if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
2678                         rc = -EFAULT;
2679
2680 error:
2681                 OBD_VFREE(fiemap_s, num_bytes);
2682                 RETURN(rc);
2683         }
2684         case EXT3_IOC_GETFLAGS:
2685         case EXT3_IOC_SETFLAGS:
2686                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2687         case EXT3_IOC_GETVERSION_OLD:
2688         case EXT3_IOC_GETVERSION:
2689                 RETURN(put_user(inode->i_generation, (int *)arg));
2690         case LL_IOC_JOIN: {
2691 #if LUSTRE_FIX >= 50
2692                 /* Allow file join in beta builds to allow debuggging */
2693                 char *ftail;
2694                 int rc;
2695
2696                 ftail = getname((const char *)arg);
2697                 if (IS_ERR(ftail))
2698                         RETURN(PTR_ERR(ftail));
2699                 rc = ll_file_join(inode, file, ftail);
2700                 putname(ftail);
2701                 RETURN(rc);
2702 #else
2703                 CWARN("file join is not supported in this version of Lustre\n");
2704                 RETURN(-ENOTTY);
2705 #endif
2706         }
2707         case LL_IOC_GROUP_LOCK:
2708                 RETURN(ll_get_grouplock(inode, file, arg));
2709         case LL_IOC_GROUP_UNLOCK:
2710                 RETURN(ll_put_grouplock(inode, file, arg));
2711         case IOC_OBD_STATFS:
2712                 RETURN(ll_obd_statfs(inode, (void *)arg));
2713         case OBD_IOC_GETNAME_OLD:
2714         case OBD_IOC_GETNAME: {
2715                 struct obd_device *obd =
2716                         class_exp2obd(ll_i2sbi(inode)->ll_osc_exp);
2717                 if (!obd)
2718                         RETURN(-EFAULT);
2719                 if (copy_to_user((void *)arg, obd->obd_name,
2720                                 strlen(obd->obd_name) + 1))
2721                         RETURN (-EFAULT);
2722                 RETURN(0);
2723         }
2724
2725         /* We need to special case any other ioctls we want to handle,
2726          * to send them to the MDS/OST as appropriate and to properly
2727          * network encode the arg field.
2728         case EXT3_IOC_SETVERSION_OLD:
2729         case EXT3_IOC_SETVERSION:
2730         */
2731         default: {
2732                 int err;
2733
2734                 if (LLIOC_STOP ==
2735                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2736                         RETURN(err);
2737
2738                 RETURN(obd_iocontrol(cmd, ll_i2obdexp(inode), 0, NULL,
2739                                      (void *)arg));
2740         }
2741         }
2742 }
2743
2744 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2745 {
2746         struct inode *inode = file->f_dentry->d_inode;
2747         struct ll_inode_info *lli = ll_i2info(inode);
2748         struct lov_stripe_md *lsm = lli->lli_smd;
2749         loff_t retval;
2750         ENTRY;
2751         retval = offset + ((origin == 2) ? i_size_read(inode) :
2752                            (origin == 1) ? file->f_pos : 0);
2753         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2754                inode->i_ino, inode->i_generation, inode, retval, retval,
2755                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2756         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2757
2758         if (origin == 2) { /* SEEK_END */
2759                 int nonblock = 0, rc;
2760
2761                 if (file->f_flags & O_NONBLOCK)
2762                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2763
2764                 if (lsm != NULL) {
2765                         rc = ll_glimpse_size(inode, nonblock);
2766                         if (rc != 0)
2767                                 RETURN(rc);
2768                 }
2769
2770                 ll_inode_size_lock(inode, 0);
2771                 offset += i_size_read(inode);
2772                 ll_inode_size_unlock(inode, 0);
2773         } else if (origin == 1) { /* SEEK_CUR */
2774                 offset += file->f_pos;
2775         }
2776
2777         retval = -EINVAL;
2778         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2779                 if (offset != file->f_pos) {
2780                         file->f_pos = offset;
2781                         file->f_version = 0;
2782                 }
2783                 retval = offset;
2784         }
2785
2786         RETURN(retval);
2787 }
2788
2789 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2790 {
2791         struct inode *inode = dentry->d_inode;
2792         struct ll_inode_info *lli = ll_i2info(inode);
2793         struct lov_stripe_md *lsm = lli->lli_smd;
2794         struct ll_fid fid;
2795         struct ptlrpc_request *req;
2796         int rc, err;
2797         ENTRY;
2798         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2799                inode->i_generation, inode);
2800         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2801
2802         /* fsync's caller has already called _fdata{sync,write}, we want
2803          * that IO to finish before calling the osc and mdc sync methods */
2804         rc = filemap_fdatawait(inode->i_mapping);
2805
2806         /* catch async errors that were recorded back when async writeback
2807          * failed for pages in this mapping. */
2808         err = lli->lli_async_rc;
2809         lli->lli_async_rc = 0;
2810         if (rc == 0)
2811                 rc = err;
2812         if (lsm) {
2813                 err = lov_test_and_clear_async_rc(lsm);
2814                 if (rc == 0)
2815                         rc = err;
2816         }
2817
2818         ll_inode2fid(&fid, inode);
2819         err = mdc_sync(ll_i2sbi(inode)->ll_mdc_exp, &fid, &req);
2820         if (!rc)
2821                 rc = err;
2822         if (!err)
2823                 ptlrpc_req_finished(req);
2824
2825         if (data && lsm) {
2826                 struct obd_info *oinfo;
2827
2828                 OBD_ALLOC_PTR(oinfo);
2829                 if (!oinfo)
2830                         RETURN(rc ? rc : -ENOMEM);
2831                 OBDO_ALLOC(oinfo->oi_oa);
2832                 if (!oinfo->oi_oa) {
2833                         OBD_FREE_PTR(oinfo);
2834                         RETURN(rc ? rc : -ENOMEM);
2835                 }
2836                 oinfo->oi_oa->o_id = lsm->lsm_object_id;
2837                 oinfo->oi_oa->o_gr = lsm->lsm_object_gr;
2838                 oinfo->oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2839                 obdo_from_inode(oinfo->oi_oa, inode,
2840                                 OBD_MD_FLTYPE | OBD_MD_FLATIME |
2841                                 OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2842                 oinfo->oi_md = lsm;
2843                 err = obd_sync_rqset(ll_i2sbi(inode)->ll_osc_exp, oinfo,
2844                                      0, OBD_OBJECT_EOF);
2845                 if (!rc)
2846                         rc = err;
2847                 OBDO_FREE(oinfo->oi_oa);
2848                 OBD_FREE_PTR(oinfo);
2849         }
2850
2851         RETURN(rc);
2852 }
2853
2854 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2855 {
2856         struct inode *inode = file->f_dentry->d_inode;
2857         struct ll_sb_info *sbi = ll_i2sbi(inode);
2858         struct lu_fid *fid = ll_inode_lu_fid(inode);
2859         struct ldlm_res_id res_id =
2860                     { .name = { fid_seq(fid),
2861                                 fid_oid(fid),
2862                                 fid_ver(fid),
2863                                 LDLM_FLOCK} };
2864         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2865                 ldlm_flock_completion_ast, NULL, file_lock };
2866         struct lustre_handle lockh = {0};
2867         ldlm_policy_data_t flock;
2868         int flags = 0;
2869         int rc;
2870         ENTRY;
2871
2872         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2873                inode->i_ino, file_lock);
2874         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2875
2876         if (fid_is_igif(fid)) {
2877                 /* If this is an IGIF inode, we need to keep the 1.6-style
2878                  * flock mapping for compatibility.  If it is a proper FID
2879                  * then we know any other client accessing it must also be
2880                  * accessing it as a FID and can use the CMD-style flock. */
2881                 res_id.name[2] = LDLM_FLOCK;
2882                 res_id.name[3] = 0;
2883         }
2884
2885         if (file_lock->fl_flags & FL_FLOCK) {
2886                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2887                 /* set missing params for flock() calls */
2888                 file_lock->fl_end = OFFSET_MAX;
2889                 file_lock->fl_pid = current->tgid;
2890         }
2891         flock.l_flock.pid = file_lock->fl_pid;
2892         flock.l_flock.start = file_lock->fl_start;
2893         flock.l_flock.end = file_lock->fl_end;
2894
2895         switch (file_lock->fl_type) {
2896         case F_RDLCK:
2897                 einfo.ei_mode = LCK_PR;
2898                 break;
2899         case F_UNLCK:
2900                 /* An unlock request may or may not have any relation to
2901                  * existing locks so we may not be able to pass a lock handle
2902                  * via a normal ldlm_lock_cancel() request. The request may even
2903                  * unlock a byte range in the middle of an existing lock. In
2904                  * order to process an unlock request we need all of the same
2905                  * information that is given with a normal read or write record
2906                  * lock request. To avoid creating another ldlm unlock (cancel)
2907                  * message we'll treat a LCK_NL flock request as an unlock. */
2908                 einfo.ei_mode = LCK_NL;
2909                 break;
2910         case F_WRLCK:
2911                 einfo.ei_mode = LCK_PW;
2912                 break;
2913         default:
2914                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2915                 RETURN (-EINVAL);
2916         }
2917
2918         switch (cmd) {
2919         case F_SETLKW:
2920 #ifdef F_SETLKW64
2921         case F_SETLKW64:
2922 #endif
2923                 flags = 0;
2924                 break;
2925         case F_SETLK:
2926 #ifdef F_SETLK64
2927         case F_SETLK64:
2928 #endif
2929                 flags = LDLM_FL_BLOCK_NOWAIT;
2930                 break;
2931         case F_GETLK:
2932 #ifdef F_GETLK64
2933         case F_GETLK64:
2934 #endif
2935                 flags = LDLM_FL_TEST_LOCK;
2936                 /* Save the old mode so that if the mode in the lock changes we
2937                  * can decrement the appropriate reader or writer refcount. */
2938                 file_lock->fl_type = einfo.ei_mode;
2939                 break;
2940         default:
2941                 CERROR("unknown fcntl lock command: %d\n", cmd);
2942                 RETURN (-EINVAL);
2943         }
2944
2945         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2946                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2947                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2948
2949         rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, &einfo, res_id,
2950                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2951         if ((file_lock->fl_flags & FL_FLOCK) &&
2952             (rc == 0 || file_lock->fl_type == F_UNLCK))
2953                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2954 #ifdef HAVE_F_OP_FLOCK
2955         if ((file_lock->fl_flags & FL_POSIX) &&
2956             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2957             !(flags & LDLM_FL_TEST_LOCK))
2958                 posix_lock_file_wait(file, file_lock);
2959 #endif
2960
2961         RETURN(rc);
2962 }
2963
2964 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2965 {
2966         ENTRY;
2967
2968         RETURN(-ENOSYS);
2969 }
2970
2971 int ll_have_md_lock(struct inode *inode, __u64 bits)
2972 {
2973         struct lustre_handle lockh;
2974         struct ldlm_res_id res_id;
2975         struct obd_device *obddev;
2976         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2977         int flags;
2978         ENTRY;
2979
2980         if (!inode)
2981                RETURN(0);
2982
2983         obddev = ll_i2mdcexp(inode)->exp_obd;
2984         fid_build_reg_res_name(ll_inode_lu_fid(inode), &res_id);
2985
2986         CDEBUG(D_INFO, "trying to match res "LPU64":"LPU64":"LPU64"\n",
2987                 res_id.name[0],
2988                 res_id.name[1],
2989                 res_id.name[2]);
2990
2991         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2992         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
2993                             &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2994                 RETURN(1);
2995         }
2996
2997         RETURN(0);
2998 }
2999
3000 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
3001         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
3002                               * and return success */
3003                 inode->i_nlink = 0;
3004                 /* This path cannot be hit for regular files unless in
3005                  * case of obscure races, so no need to to validate
3006                  * size. */
3007                 if (!S_ISREG(inode->i_mode) &&
3008                     !S_ISDIR(inode->i_mode))
3009                         return 0;
3010         }
3011
3012         if (rc) {
3013                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
3014                 return -abs(rc);
3015
3016         }
3017
3018         return 0;
3019 }
3020
3021 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
3022 {
3023         struct inode *inode = dentry->d_inode;
3024         struct ptlrpc_request *req = NULL;
3025         struct obd_export *exp;
3026         int rc;
3027         ENTRY;
3028
3029         if (!inode) {
3030                 CERROR("REPORT THIS LINE TO PETER\n");
3031                 RETURN(0);
3032         }
3033         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
3034                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
3035
3036         exp = ll_i2mdcexp(inode);
3037
3038         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
3039                 struct lookup_intent oit = { .it_op = IT_GETATTR };
3040                 struct mdc_op_data op_data = { { 0 } };
3041
3042                 /* Call getattr by fid, so do not provide name at all. */
3043                 ll_prepare_mdc_op_data(&op_data, dentry->d_parent->d_inode,
3044                                        dentry->d_inode, NULL, 0, 0, NULL);
3045                 oit.it_flags |= O_CHECK_STALE;
3046                 rc = mdc_intent_lock(exp, &op_data, NULL, 0,
3047                                      /* we are not interested in name
3048                                         based lookup */
3049                                      &oit, 0, &req,
3050                                      ll_mdc_blocking_ast, 0);
3051                 oit.it_flags &= ~O_CHECK_STALE;
3052                 if (rc < 0) {
3053                         rc = ll_inode_revalidate_fini(inode, rc);
3054                         GOTO (out, rc);
3055                 }
3056
3057                 rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
3058                 if (rc != 0) {
3059                         ll_intent_release(&oit);
3060                         GOTO(out, rc);
3061                 }
3062
3063                 /* Unlinked? Unhash dentry, so it is not picked up later by
3064                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
3065                    here to preserve get_cwd functionality on 2.6.
3066                    Bug 10503 */
3067                 if (!dentry->d_inode->i_nlink) {
3068                         spin_lock(&ll_lookup_lock);
3069                         spin_lock(&dcache_lock);
3070                         ll_drop_dentry(dentry);
3071                         spin_unlock(&dcache_lock);
3072                         spin_unlock(&ll_lookup_lock);
3073                 }
3074
3075                 ll_lookup_finish_locks(&oit, dentry);
3076         } else if (!ll_have_md_lock(dentry->d_inode,
3077                                   MDS_INODELOCK_UPDATE|MDS_INODELOCK_LOOKUP)) {
3078                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
3079                 struct ll_fid fid;
3080                 obd_valid valid = OBD_MD_FLGETATTR;
3081                 int ealen = 0;
3082
3083                 if (S_ISREG(inode->i_mode)) {
3084                         rc = ll_get_max_mdsize(sbi, &ealen);
3085                         if (rc)
3086                                 RETURN(rc);
3087                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
3088                 }
3089                 ll_inode2fid(&fid, inode);
3090                 rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req);
3091                 if (rc) {
3092                         rc = ll_inode_revalidate_fini(inode, rc);
3093                         RETURN(rc);
3094                 }
3095
3096                 rc = ll_prep_inode(sbi->ll_osc_exp, &inode, req, REPLY_REC_OFF,
3097                                    NULL);
3098                 if (rc)
3099                         GOTO(out, rc);
3100         }
3101
3102         /* if object not yet allocated, don't validate size */
3103         if (ll_i2info(inode)->lli_smd == NULL) {
3104                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
3105                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
3106                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
3107                 GOTO(out, rc = 0);
3108         }
3109
3110         /* ll_glimpse_size will prefer locally cached writes if they extend
3111          * the file */
3112         rc = ll_glimpse_size(inode, 0);
3113
3114 out:
3115         ptlrpc_req_finished(req);
3116         RETURN(rc);
3117 }
3118
3119 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
3120                   struct lookup_intent *it, struct kstat *stat)
3121 {
3122         struct inode *inode = de->d_inode;
3123         int res = 0;
3124
3125         res = ll_inode_revalidate_it(de, it);
3126         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
3127
3128         if (res)
3129                 return res;
3130
3131         stat->dev = inode->i_sb->s_dev;
3132         stat->ino = inode->i_ino;
3133         stat->mode = inode->i_mode;
3134         stat->nlink = inode->i_nlink;
3135         stat->uid = inode->i_uid;
3136         stat->gid = inode->i_gid;
3137         stat->rdev = kdev_t_to_nr(inode->i_rdev);
3138         stat->atime = inode->i_atime;
3139         stat->mtime = inode->i_mtime;
3140         stat->ctime = inode->i_ctime;
3141 #ifdef HAVE_INODE_BLKSIZE
3142         stat->blksize = inode->i_blksize;
3143 #else
3144         stat->blksize = 1<<inode->i_blkbits;
3145 #endif
3146
3147         ll_inode_size_lock(inode, 0);
3148         stat->size = i_size_read(inode);
3149         stat->blocks = inode->i_blocks;
3150         ll_inode_size_unlock(inode, 0);
3151
3152         return 0;
3153 }
3154 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3155 {
3156         struct lookup_intent it = { .it_op = IT_GETATTR };
3157
3158         return ll_getattr_it(mnt, de, &it, stat);
3159 }
3160
3161 static
3162 int lustre_check_acl(struct inode *inode, int mask)
3163 {
3164 #ifdef CONFIG_FS_POSIX_ACL
3165         struct ll_inode_info *lli = ll_i2info(inode);
3166         struct posix_acl *acl;
3167         int rc;
3168         ENTRY;
3169
3170         spin_lock(&lli->lli_lock);
3171         acl = posix_acl_dup(lli->lli_posix_acl);
3172         spin_unlock(&lli->lli_lock);
3173
3174         if (!acl)
3175                 RETURN(-EAGAIN);
3176
3177         rc = posix_acl_permission(inode, acl, mask);
3178         posix_acl_release(acl);
3179
3180         RETURN(rc);
3181 #else
3182         return -EAGAIN;
3183 #endif
3184 }
3185
3186 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
3187 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3188 {
3189         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3190                inode->i_ino, inode->i_generation, inode, mask);
3191
3192         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3193         return generic_permission(inode, mask, lustre_check_acl);
3194 }
3195 #else
3196 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
3197 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3198 #else
3199 int ll_inode_permission(struct inode *inode, int mask)
3200 #endif
3201 {
3202         int mode = inode->i_mode;
3203         int rc;
3204
3205         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3206                inode->i_ino, inode->i_generation, inode, mask);
3207         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3208
3209         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
3210             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
3211                 return -EROFS;
3212         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
3213                 return -EACCES;
3214         if (current->fsuid == inode->i_uid) {
3215                 mode >>= 6;
3216         } else if (1) {
3217                 if (((mode >> 3) & mask & S_IRWXO) != mask)
3218                         goto check_groups;
3219                 rc = lustre_check_acl(inode, mask);
3220                 if (rc == -EAGAIN)
3221                         goto check_groups;
3222                 if (rc == -EACCES)
3223                         goto check_capabilities;
3224                 return rc;
3225         } else {
3226 check_groups:
3227                 if (in_group_p(inode->i_gid))
3228                         mode >>= 3;
3229         }
3230         if ((mode & mask & S_IRWXO) == mask)
3231                 return 0;
3232
3233 check_capabilities:
3234         if (!(mask & MAY_EXEC) ||
3235             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
3236                 if (cfs_capable(CFS_CAP_DAC_OVERRIDE))
3237                         return 0;
3238
3239         if (cfs_capable(CFS_CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
3240             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
3241                 return 0;
3242
3243         return -EACCES;
3244 }
3245 #endif
3246
3247 /* -o localflock - only provides locally consistent flock locks */
3248 struct file_operations ll_file_operations = {
3249         .read           = ll_file_read,
3250 #ifdef HAVE_FILE_READV
3251         .readv          = ll_file_readv,
3252 #else
3253         .aio_read       = ll_file_aio_read,
3254 #endif
3255         .write          = ll_file_write,
3256 #ifdef HAVE_FILE_WRITEV
3257         .writev         = ll_file_writev,
3258 #else
3259         .aio_write      = ll_file_aio_write,
3260 #endif
3261         .ioctl          = ll_file_ioctl,
3262         .open           = ll_file_open,
3263         .release        = ll_file_release,
3264         .mmap           = ll_file_mmap,
3265         .llseek         = ll_file_seek,
3266         .sendfile       = ll_file_sendfile,
3267         .fsync          = ll_fsync,
3268 };
3269
3270 struct file_operations ll_file_operations_flock = {
3271         .read           = ll_file_read,
3272 #ifdef HAVE_FILE_READV
3273         .readv          = ll_file_readv,
3274 #else
3275         .aio_read       = ll_file_aio_read,
3276 #endif
3277         .write          = ll_file_write,
3278 #ifdef HAVE_FILE_WRITEV
3279         .writev         = ll_file_writev,
3280 #else
3281         .aio_write      = ll_file_aio_write,
3282 #endif
3283         .ioctl          = ll_file_ioctl,
3284         .open           = ll_file_open,
3285         .release        = ll_file_release,
3286         .mmap           = ll_file_mmap,
3287         .llseek         = ll_file_seek,
3288         .sendfile       = ll_file_sendfile,
3289         .fsync          = ll_fsync,
3290 #ifdef HAVE_F_OP_FLOCK
3291         .flock          = ll_file_flock,
3292 #endif
3293         .lock           = ll_file_flock
3294 };
3295
3296 /* These are for -o noflock - to return ENOSYS on flock calls */
3297 struct file_operations ll_file_operations_noflock = {
3298         .read           = ll_file_read,
3299 #ifdef HAVE_FILE_READV
3300         .readv          = ll_file_readv,
3301 #else
3302         .aio_read       = ll_file_aio_read,
3303 #endif
3304         .write          = ll_file_write,
3305 #ifdef HAVE_FILE_WRITEV
3306         .writev         = ll_file_writev,
3307 #else
3308         .aio_write      = ll_file_aio_write,
3309 #endif
3310         .ioctl          = ll_file_ioctl,
3311         .open           = ll_file_open,
3312         .release        = ll_file_release,
3313         .mmap           = ll_file_mmap,
3314         .llseek         = ll_file_seek,
3315         .sendfile       = ll_file_sendfile,
3316         .fsync          = ll_fsync,
3317 #ifdef HAVE_F_OP_FLOCK
3318         .flock          = ll_file_noflock,
3319 #endif
3320         .lock           = ll_file_noflock
3321 };
3322
3323 struct inode_operations ll_file_inode_operations = {
3324 #ifdef HAVE_VFS_INTENT_PATCHES
3325         .setattr_raw    = ll_setattr_raw,
3326 #endif
3327         .setattr        = ll_setattr,
3328         .truncate       = ll_truncate,
3329         .getattr        = ll_getattr,
3330         .permission     = ll_inode_permission,
3331         .setxattr       = ll_setxattr,
3332         .getxattr       = ll_getxattr,
3333         .listxattr      = ll_listxattr,
3334         .removexattr    = ll_removexattr,
3335 };
3336
3337 /* dynamic ioctl number support routins */
3338 static struct llioc_ctl_data {
3339         struct rw_semaphore ioc_sem;
3340         struct list_head    ioc_head;
3341 } llioc = {
3342         __RWSEM_INITIALIZER(llioc.ioc_sem),
3343         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3344 };
3345
3346
3347 struct llioc_data {
3348         struct list_head        iocd_list;
3349         unsigned int            iocd_size;
3350         llioc_callback_t        iocd_cb;
3351         unsigned int            iocd_count;
3352         unsigned int            iocd_cmd[0];
3353 };
3354
3355 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3356 {
3357         unsigned int size;
3358         struct llioc_data *in_data = NULL;
3359         ENTRY;
3360
3361         if (cb == NULL || cmd == NULL ||
3362             count > LLIOC_MAX_CMD || count < 0)
3363                 RETURN(NULL);
3364
3365         size = sizeof(*in_data) + count * sizeof(unsigned int);
3366         OBD_ALLOC(in_data, size);
3367         if (in_data == NULL)
3368                 RETURN(NULL);
3369
3370         memset(in_data, 0, sizeof(*in_data));
3371         in_data->iocd_size = size;
3372         in_data->iocd_cb = cb;
3373         in_data->iocd_count = count;
3374         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3375
3376         down_write(&llioc.ioc_sem);
3377         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3378         up_write(&llioc.ioc_sem);
3379
3380         RETURN(in_data);
3381 }
3382
3383 void ll_iocontrol_unregister(void *magic)
3384 {
3385         struct llioc_data *tmp;
3386
3387         if (magic == NULL)
3388                 return;
3389
3390         down_write(&llioc.ioc_sem);
3391         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3392                 if (tmp == magic) {
3393                         unsigned int size = tmp->iocd_size;
3394
3395                         list_del(&tmp->iocd_list);
3396                         up_write(&llioc.ioc_sem);
3397
3398                         OBD_FREE(tmp, size);
3399                         return;
3400                 }
3401         }
3402         up_write(&llioc.ioc_sem);
3403
3404         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3405 }
3406
3407 EXPORT_SYMBOL(ll_iocontrol_register);
3408 EXPORT_SYMBOL(ll_iocontrol_unregister);
3409
3410 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
3411                         unsigned int cmd, unsigned long arg, int *rcp)
3412 {
3413         enum llioc_iter ret = LLIOC_CONT;
3414         struct llioc_data *data;
3415         int rc = -EINVAL, i;
3416
3417         down_read(&llioc.ioc_sem);
3418         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3419                 for (i = 0; i < data->iocd_count; i++) {
3420                         if (cmd != data->iocd_cmd[i])
3421                                 continue;
3422
3423                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3424                         break;
3425                 }
3426
3427                 if (ret == LLIOC_STOP)
3428                         break;
3429         }
3430         up_read(&llioc.ioc_sem);
3431
3432         if (rcp)
3433                 *rcp = rc;
3434         return ret;
3435 }