Whamcloud - gitweb
b=18233
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <linux/pagemap.h>
47 #include <linux/file.h>
48 #include "llite_internal.h"
49 #include <lustre/ll_fiemap.h>
50
51 /* also used by llite/special.c:ll_special_open() */
52 struct ll_file_data *ll_file_data_get(void)
53 {
54         struct ll_file_data *fd;
55
56         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
57         return fd;
58 }
59
60 static void ll_file_data_put(struct ll_file_data *fd)
61 {
62         if (fd != NULL)
63                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
64 }
65
66 static int ll_close_inode_openhandle(struct inode *inode,
67                                      struct obd_client_handle *och)
68 {
69         struct ptlrpc_request *req = NULL;
70         struct obd_device *obd;
71         struct obdo *oa;
72         struct mdc_op_data data = { { 0 } };
73         int rc;
74         ENTRY;
75
76         obd = class_exp2obd(ll_i2mdcexp(inode));
77         if (obd == NULL) {
78                 CERROR("Invalid MDC connection handle "LPX64"\n",
79                        ll_i2mdcexp(inode)->exp_handle.h_cookie);
80                 GOTO(out, rc = 0);
81         }
82
83         /*
84          * here we check if this is forced umount. If so this is called on
85          * canceling "open lock" and we do not call mdc_close() in this case, as
86          * it will not be successful, as import is already deactivated.
87          */
88         if (obd->obd_force)
89                 GOTO(out, rc = 0);
90
91         OBDO_ALLOC(oa);
92         if (!oa)
93                 RETURN(-ENOMEM); // XXX We leak openhandle and request here.
94
95         oa->o_id = inode->i_ino;
96         oa->o_valid = OBD_MD_FLID;
97         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
98                                    OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
99                                    OBD_MD_FLATIME | OBD_MD_FLMTIME |
100                                    OBD_MD_FLCTIME);
101         if (ll_is_inode_dirty(inode)) {
102                 oa->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
103                 oa->o_valid |= OBD_MD_FLFLAGS;
104         }
105         ll_inode2fid(&data.fid1, inode);
106         rc = mdc_close(ll_i2mdcexp(inode), &data, oa, och, &req);
107         if (rc == EAGAIN) {
108                 /* We are the last writer, so the MDS has instructed us to get
109                  * the file size and any write cookies, then close again. */
110                 ll_queue_done_writing(inode);
111                 rc = 0;
112         } else if (rc) {
113                 CERROR("inode %lu mdc close failed: rc = %d\n",
114                        inode->i_ino, rc);
115         }
116
117         OBDO_FREE(oa);
118
119         if (rc == 0) {
120                 rc = ll_objects_destroy(req, inode);
121                 if (rc)
122                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
123                                inode->i_ino, rc);
124         }
125
126         ptlrpc_req_finished(req); /* This is close request */
127         EXIT;
128 out:
129         mdc_clear_open_replay_data(och);
130
131         return rc;
132 }
133
134 int ll_mdc_real_close(struct inode *inode, int flags)
135 {
136         struct ll_inode_info *lli = ll_i2info(inode);
137         int rc = 0;
138         struct obd_client_handle **och_p;
139         struct obd_client_handle *och;
140         __u64 *och_usecount;
141
142         ENTRY;
143
144         if (flags & FMODE_WRITE) {
145                 och_p = &lli->lli_mds_write_och;
146                 och_usecount = &lli->lli_open_fd_write_count;
147         } else if (flags & FMODE_EXEC) {
148                 och_p = &lli->lli_mds_exec_och;
149                 och_usecount = &lli->lli_open_fd_exec_count;
150          } else {
151                 LASSERT(flags & FMODE_READ);
152                 och_p = &lli->lli_mds_read_och;
153                 och_usecount = &lli->lli_open_fd_read_count;
154         }
155
156         down(&lli->lli_och_sem);
157         if (*och_usecount) { /* There are still users of this handle, so
158                                 skip freeing it. */
159                 up(&lli->lli_och_sem);
160                 RETURN(0);
161         }
162         och=*och_p;
163         *och_p = NULL;
164         up(&lli->lli_och_sem);
165
166         if (och) { /* There might be a race and somebody have freed this och
167                       already */
168                 rc = ll_close_inode_openhandle(inode, och);
169                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
170                 OBD_FREE(och, sizeof *och);
171         }
172
173         RETURN(rc);
174 }
175
176 int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
177                         struct file *file)
178 {
179         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
180         struct ll_inode_info *lli = ll_i2info(inode);
181         int rc = 0;
182         ENTRY;
183
184         /* clear group lock, if present */
185         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
186                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
187                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
188                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
189                                       &fd->fd_cwlockh);
190         }
191
192         /* Let's see if we have good enough OPEN lock on the file and if
193            we can skip talking to MDS */
194         if (file->f_dentry->d_inode) { /* Can this ever be false? */
195                 int lockmode;
196                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
197                 struct lustre_handle lockh;
198                 struct inode *inode = file->f_dentry->d_inode;
199                 struct ldlm_res_id file_res_id;
200
201                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
202                 fid_build_reg_res_name(ll_inode_lu_fid(inode), &file_res_id);
203
204                 down(&lli->lli_och_sem);
205                 if (fd->fd_omode & FMODE_WRITE) {
206                         lockmode = LCK_CW;
207                         LASSERT(lli->lli_open_fd_write_count);
208                         lli->lli_open_fd_write_count--;
209                 } else if (fd->fd_omode & FMODE_EXEC) {
210                         lockmode = LCK_PR;
211                         LASSERT(lli->lli_open_fd_exec_count);
212                         lli->lli_open_fd_exec_count--;
213                 } else {
214                         lockmode = LCK_CR;
215                         LASSERT(lli->lli_open_fd_read_count);
216                         lli->lli_open_fd_read_count--;
217                 }
218                 up(&lli->lli_och_sem);
219
220                 if (!ldlm_lock_match(mdc_exp->exp_obd->obd_namespace, flags,
221                                      &file_res_id, LDLM_IBITS, &policy,lockmode,
222                                      &lockh)) {
223                         rc = ll_mdc_real_close(file->f_dentry->d_inode,
224                                                 fd->fd_omode);
225                 }
226         } else {
227                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
228                        file, file->f_dentry, file->f_dentry->d_name.name);
229         }
230
231         LUSTRE_FPRIVATE(file) = NULL;
232         ll_file_data_put(fd);
233
234         RETURN(rc);
235 }
236
237 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
238
239 /* While this returns an error code, fput() the caller does not, so we need
240  * to make every effort to clean up all of our state here.  Also, applications
241  * rarely check close errors and even if an error is returned they will not
242  * re-try the close call.
243  */
244 int ll_file_release(struct inode *inode, struct file *file)
245 {
246         struct ll_file_data *fd;
247         struct ll_sb_info *sbi = ll_i2sbi(inode);
248         struct ll_inode_info *lli = ll_i2info(inode);
249         struct lov_stripe_md *lsm = lli->lli_smd;
250         int rc;
251         ENTRY;
252
253         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
254                inode->i_generation, inode);
255
256
257         if (inode->i_sb->s_root != file->f_dentry)
258                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
259         fd = LUSTRE_FPRIVATE(file);
260         LASSERT(fd != NULL);
261
262         /* The last ref on @file, maybe not the the owner pid of statahead.
263          * Different processes can open the same dir, "ll_opendir_key" means:
264          * it is me that should stop the statahead thread. */
265         if (lli->lli_opendir_key == fd && lli->lli_opendir_pid != 0)
266                 ll_stop_statahead(inode, lli->lli_opendir_key);
267
268         if (inode->i_sb->s_root == file->f_dentry) {
269                 LUSTRE_FPRIVATE(file) = NULL;
270                 ll_file_data_put(fd);
271                 RETURN(0);
272         }
273
274         if (lsm)
275                 lov_test_and_clear_async_rc(lsm);
276         lli->lli_async_rc = 0;
277
278         rc = ll_mdc_close(sbi->ll_mdc_exp, inode, file);
279         RETURN(rc);
280 }
281
282 static int ll_intent_file_open(struct file *file, void *lmm,
283                                int lmmsize, struct lookup_intent *itp)
284 {
285         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
286         struct mdc_op_data data = { { 0 } };
287         struct dentry *parent = file->f_dentry->d_parent;
288         const char *name = file->f_dentry->d_name.name;
289         const int len = file->f_dentry->d_name.len;
290         struct inode *inode = file->f_dentry->d_inode;
291         struct ptlrpc_request *req;
292         int rc;
293         ENTRY;
294
295         if (!parent)
296                 RETURN(-ENOENT);
297
298         ll_prepare_mdc_op_data(&data, parent->d_inode, inode,
299                                name, len, O_RDWR, NULL);
300
301         /* Usually we come here only for NFSD, and we want open lock.
302            But we can also get here with pre 2.6.15 patchless kernels, and in
303            that case that lock is also ok */
304         /* We can also get here if there was cached open handle in revalidate_it
305          * but it disappeared while we were getting from there to ll_file_open.
306          * But this means this file was closed and immediatelly opened which
307          * makes a good candidate for using OPEN lock */
308         /* If lmmsize & lmm are not 0, we are just setting stripe info
309          * parameters. No need for the open lock */
310         if (!lmm && !lmmsize)
311                 itp->it_flags |= MDS_OPEN_LOCK;
312
313         rc = mdc_intent_lock(sbi->ll_mdc_exp, &data, lmm, lmmsize, itp,
314                               0 /*unused */, &req, ll_mdc_blocking_ast, 0);
315         if (rc == -ESTALE) {
316                 /* reason for keep own exit path - don`t flood log
317                 * with messages with -ESTALE errors.
318                 */
319                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
320                      it_open_error(DISP_OPEN_OPEN, itp))
321                         GOTO(out, rc);
322                 ll_release_openhandle(file->f_dentry, itp);
323                 GOTO(out, rc);
324         }
325
326         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
327                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
328                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
329                 GOTO(out, rc);
330         }
331
332         if (itp->d.lustre.it_lock_mode)
333                 mdc_set_lock_data(&itp->d.lustre.it_lock_handle,
334                                   inode);
335
336         rc = ll_prep_inode(sbi->ll_osc_exp, &file->f_dentry->d_inode,
337                            req, DLM_REPLY_REC_OFF, NULL);
338 out:
339         ptlrpc_req_finished(itp->d.lustre.it_data);
340         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
341         ll_intent_drop_lock(itp);
342
343         RETURN(rc);
344 }
345
346
347 static void ll_och_fill(struct ll_inode_info *lli, struct lookup_intent *it,
348                         struct obd_client_handle *och)
349 {
350         struct ptlrpc_request *req = it->d.lustre.it_data;
351         struct mds_body *body;
352
353         LASSERT(och);
354
355         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
356         LASSERT(body != NULL);                  /* reply already checked out */
357         /* and swabbed in mdc_enqueue */
358         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
359
360         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
361         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
362         lli->lli_io_epoch = body->io_epoch;
363
364         mdc_set_open_replay_data(och, it->d.lustre.it_data);
365 }
366
367 int ll_local_open(struct file *file, struct lookup_intent *it,
368                   struct ll_file_data *fd, struct obd_client_handle *och)
369 {
370         ENTRY;
371
372         LASSERT(!LUSTRE_FPRIVATE(file));
373
374         LASSERT(fd != NULL);
375
376         if (och)
377                 ll_och_fill(ll_i2info(file->f_dentry->d_inode), it, och);
378         LUSTRE_FPRIVATE(file) = fd;
379         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
380         fd->fd_omode = it->it_flags;
381
382         RETURN(0);
383 }
384
385 /* Open a file, and (for the very first open) create objects on the OSTs at
386  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
387  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
388  * lli_open_sem to ensure no other process will create objects, send the
389  * stripe MD to the MDS, or try to destroy the objects if that fails.
390  *
391  * If we already have the stripe MD locally then we don't request it in
392  * mdc_open(), by passing a lmm_size = 0.
393  *
394  * It is up to the application to ensure no other processes open this file
395  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
396  * used.  We might be able to avoid races of that sort by getting lli_open_sem
397  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
398  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
399  */
400 int ll_file_open(struct inode *inode, struct file *file)
401 {
402         struct ll_inode_info *lli = ll_i2info(inode);
403         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
404                                           .it_flags = file->f_flags };
405         struct lov_stripe_md *lsm;
406         struct ptlrpc_request *req = NULL;
407         struct obd_client_handle **och_p;
408         __u64 *och_usecount;
409         struct ll_file_data *fd;
410         int rc = 0, opendir_set = 0;
411         ENTRY;
412
413         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
414                inode->i_generation, inode, file->f_flags);
415
416 #ifdef HAVE_VFS_INTENT_PATCHES
417         it = file->f_it;
418 #else
419         it = file->private_data; /* XXX: compat macro */
420         file->private_data = NULL; /* prevent ll_local_open assertion */
421 #endif
422
423         fd = ll_file_data_get();
424         if (fd == NULL)
425                 RETURN(-ENOMEM);
426
427         if (S_ISDIR(inode->i_mode)) {
428 again:
429                 spin_lock(&lli->lli_lock);
430                 if (lli->lli_opendir_key == NULL && lli->lli_opendir_pid == 0) {
431                         LASSERT(lli->lli_sai == NULL);
432                         lli->lli_opendir_key = fd;
433                         lli->lli_opendir_pid = cfs_curproc_pid();
434                         opendir_set = 1;
435                 } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid() &&
436                                     lli->lli_opendir_key != NULL)) {
437                         /* Two cases for this:
438                          * (1) The same process open such directory many times.
439                          * (2) The old process opened the directory, and exited
440                          *     before its children processes. Then new process
441                          *     with the same pid opens such directory before the
442                          *     old process's children processes exit.
443                          * reset stat ahead for such cases. */
444                         spin_unlock(&lli->lli_lock);
445                         CDEBUG(D_INFO, "Conflict statahead for %.*s %lu/%u"
446                                " reset it.\n", file->f_dentry->d_name.len,
447                                file->f_dentry->d_name.name,
448                                inode->i_ino, inode->i_generation);
449                         ll_stop_statahead(inode, lli->lli_opendir_key);
450                         goto again;
451                 }
452                 spin_unlock(&lli->lli_lock);
453         }
454
455         if (inode->i_sb->s_root == file->f_dentry) {
456                 LUSTRE_FPRIVATE(file) = fd;
457                 RETURN(0);
458         }
459
460         if (!it || !it->d.lustre.it_disposition) {
461                 /* Convert f_flags into access mode. We cannot use file->f_mode,
462                  * because everything but O_ACCMODE mask was stripped from it */
463                 if ((oit.it_flags + 1) & O_ACCMODE)
464                         oit.it_flags++;
465                 if (file->f_flags & O_TRUNC)
466                         oit.it_flags |= FMODE_WRITE;
467
468                 /* kernel only call f_op->open in dentry_open.  filp_open calls
469                  * dentry_open after call to open_namei that checks permissions.
470                  * Only nfsd_open call dentry_open directly without checking
471                  * permissions and because of that this code below is safe. */
472                 if (oit.it_flags & FMODE_WRITE)
473                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
474
475                 /* We do not want O_EXCL here, presumably we opened the file
476                  * already? XXX - NFS implications? */
477                 oit.it_flags &= ~O_EXCL;
478
479                 it = &oit;
480         }
481
482 restart:
483         /* Let's see if we have file open on MDS already. */
484         if (it->it_flags & FMODE_WRITE) {
485                 och_p = &lli->lli_mds_write_och;
486                 och_usecount = &lli->lli_open_fd_write_count;
487         } else if (it->it_flags & FMODE_EXEC) {
488                 och_p = &lli->lli_mds_exec_och;
489                 och_usecount = &lli->lli_open_fd_exec_count;
490          } else {
491                 och_p = &lli->lli_mds_read_och;
492                 och_usecount = &lli->lli_open_fd_read_count;
493         }
494
495         LASSERTF(it->it_flags != 0, "it %p dist %d \n", it,
496                  it->d.lustre.it_disposition);
497
498         down(&lli->lli_och_sem);
499         if (*och_p) { /* Open handle is present */
500                 if (it_disposition(it, DISP_OPEN_OPEN)) {
501                         /* Well, there's extra open request that we do not need,
502                            let's close it somehow. This will decref request. */
503                         rc = it_open_error(DISP_OPEN_OPEN, it);
504                         if (rc) {
505                                 up(&lli->lli_och_sem);
506                                 ll_file_data_put(fd);
507                                 GOTO(out_openerr, rc);
508                         }
509                         ll_release_openhandle(file->f_dentry, it);
510                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
511                                              LPROC_LL_OPEN);
512                 }
513                 (*och_usecount)++;
514
515                 rc = ll_local_open(file, it, fd, NULL);
516
517                 LASSERTF(rc == 0, "rc = %d\n", rc);
518         } else {
519                 LASSERT(*och_usecount == 0);
520                 if (!it->d.lustre.it_disposition) {
521                         /* We cannot just request lock handle now, new ELC code
522                            means that one of other OPEN locks for this file
523                            could be cancelled, and since blocking ast handler
524                            would attempt to grab och_sem as well, that would
525                            result in a deadlock */
526                         up(&lli->lli_och_sem);
527                         it->it_flags |= O_CHECK_STALE;
528                         rc = ll_intent_file_open(file, NULL, 0, it);
529                         it->it_flags &= ~O_CHECK_STALE;
530                         if (rc) {
531                                 ll_file_data_put(fd);
532                                 GOTO(out_openerr, rc);
533                         }
534
535                         mdc_set_lock_data(&it->d.lustre.it_lock_handle,
536                                           file->f_dentry->d_inode);
537                         goto restart;
538                 }
539
540                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
541                 if (!*och_p) {
542                         ll_file_data_put(fd);
543                         GOTO(out_och_free, rc = -ENOMEM);
544                 }
545                 (*och_usecount)++;
546                req = it->d.lustre.it_data;
547
548                 /* mdc_intent_lock() didn't get a request ref if there was an
549                  * open error, so don't do cleanup on the request here
550                  * (bug 3430) */
551                 /* XXX (green): Should not we bail out on any error here, not
552                  * just open error? */
553                 rc = it_open_error(DISP_OPEN_OPEN, it);
554                 if (rc) {
555                         ll_file_data_put(fd);
556                         GOTO(out_och_free, rc);
557                 }
558
559                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
560                 rc = ll_local_open(file, it, fd, *och_p);
561                 LASSERTF(rc == 0, "rc = %d\n", rc);
562         }
563         up(&lli->lli_och_sem);
564
565         /* Must do this outside lli_och_sem lock to prevent deadlock where
566            different kind of OPEN lock for this same inode gets cancelled
567            by ldlm_cancel_lru */
568         if (!S_ISREG(inode->i_mode))
569                 GOTO(out, rc);
570
571         lsm = lli->lli_smd;
572         if (lsm == NULL) {
573                 if (file->f_flags & O_LOV_DELAY_CREATE ||
574                     !(file->f_mode & FMODE_WRITE)) {
575                         CDEBUG(D_INODE, "object creation was delayed\n");
576                         GOTO(out, rc);
577                 }
578         }
579         file->f_flags &= ~O_LOV_DELAY_CREATE;
580         GOTO(out, rc);
581  out:
582         ptlrpc_req_finished(req);
583         if (req)
584                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
585         if (rc == 0) {
586                 ll_open_complete(inode);
587         } else {
588 out_och_free:
589                 if (*och_p) {
590                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
591                         *och_p = NULL; /* OBD_FREE writes some magic there */
592                         (*och_usecount)--;
593                 }
594                 up(&lli->lli_och_sem);
595 out_openerr:
596                 if (opendir_set != 0)
597                         ll_stop_statahead(inode, lli->lli_opendir_key);
598         }
599
600         return rc;
601 }
602
603 /* Fills the obdo with the attributes for the inode defined by lsm */
604 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
605                    struct obdo *oa)
606 {
607         struct ptlrpc_request_set *set;
608         struct obd_info oinfo = { { { 0 } } };
609         int rc;
610         ENTRY;
611
612         LASSERT(lsm != NULL);
613
614         memset(oa, 0, sizeof *oa);
615         oinfo.oi_md = lsm;
616         oinfo.oi_oa = oa;
617         oa->o_id = lsm->lsm_object_id;
618         oa->o_gr = lsm->lsm_object_gr;
619         oa->o_mode = S_IFREG;
620         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
621                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
622                 OBD_MD_FLCTIME | OBD_MD_FLGROUP;
623
624         set = ptlrpc_prep_set();
625         if (set == NULL) {
626                 rc = -ENOMEM;
627         } else {
628                 rc = obd_getattr_async(exp, &oinfo, set);
629                 if (rc == 0)
630                         rc = ptlrpc_set_wait(set);
631                 ptlrpc_set_destroy(set);
632         }
633         if (rc)
634                 RETURN(rc);
635
636         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
637                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
638         RETURN(0);
639 }
640
641 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
642 {
643         struct ll_inode_info *lli = ll_i2info(inode);
644         struct lov_stripe_md *lsm = lli->lli_smd;
645         struct obd_export *exp = ll_i2obdexp(inode);
646         struct {
647                 char name[16];
648                 struct ldlm_lock *lock;
649         } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock };
650         __u32 stripe, vallen = sizeof(stripe);
651         struct lov_oinfo *loinfo;
652         int rc;
653         ENTRY;
654
655         if (lsm->lsm_stripe_count == 1)
656                 GOTO(check, stripe = 0);
657
658         /* get our offset in the lov */
659         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe, lsm);
660         if (rc != 0) {
661                 CERROR("obd_get_info: rc = %d\n", rc);
662                 RETURN(rc);
663         }
664         LASSERT(stripe < lsm->lsm_stripe_count);
665
666 check:
667         loinfo = lsm->lsm_oinfo[stripe];
668         if (!osc_res_name_eq(loinfo->loi_id, loinfo->loi_gr,
669                             &lock->l_resource->lr_name)) {
670                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
671                            loinfo->loi_id, loinfo->loi_gr);
672                 RETURN(-ELDLM_NO_LOCK_DATA);
673         }
674
675         RETURN(stripe);
676 }
677
678 /* Get extra page reference to ensure it is not going away */
679 void ll_pin_extent_cb(void *data)
680 {
681         struct page *page = data;
682
683         page_cache_get(page);
684
685         return;
686 }
687 /* Flush the page from page cache for an extent as its canceled.
688  * Page to remove is delivered as @data.
689  *
690  * No one can dirty the extent until we've finished our work and they cannot
691  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
692  * but other kernel actors could have pages locked.
693  *
694  * If @discard is set, there is no need to write the page if it is dirty.
695  *
696  * Called with the DLM lock held. */
697 int ll_page_removal_cb(void *data, int discard)
698 {
699         int rc;
700         struct page *page = data;
701         struct address_space *mapping;
702
703         ENTRY;
704
705         /* We have page reference already from ll_pin_page */
706         lock_page(page);
707
708         /* Already truncated by somebody */
709         if (!page->mapping)
710                 GOTO(out, rc = 0);
711
712         mapping = page->mapping;
713
714         ll_teardown_mmaps(mapping,
715                           (__u64)page->index << PAGE_CACHE_SHIFT,
716                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
717                                                               ~PAGE_CACHE_MASK);
718         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
719         if (!discard && PageWriteback(page))
720                 wait_on_page_writeback(page);
721
722         if (!discard && clear_page_dirty_for_io(page)) {
723                 rc = ll_call_writepage(page->mapping->host, page);
724                 /* either waiting for io to complete or reacquiring
725                  * the lock that the failed writepage released */
726                 lock_page(page);
727                 wait_on_page_writeback(page);
728                 if (rc < 0) {
729                         CERROR("writepage inode %lu(%p) of page %p "
730                                "failed: %d\n", mapping->host->i_ino,
731                                mapping->host, page, rc);
732                         if (rc == -ENOSPC)
733                                 set_bit(AS_ENOSPC, &mapping->flags);
734                         else
735                                 set_bit(AS_EIO, &mapping->flags);
736                 }
737         }
738         if (page->mapping != NULL) {
739                 struct ll_async_page *llap = llap_cast_private(page);
740                 // checking again to account for writeback's lock_page()
741                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
742                 if (llap)
743                         ll_ra_accounting(llap, page->mapping);
744                 ll_truncate_complete_page(page);
745         }
746         EXIT;
747 out:
748         LASSERT(!PageWriteback(page));
749         unlock_page(page);
750         page_cache_release(page);
751
752         return 0;
753 }
754
755 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
756                              void *data, int flag)
757 {
758         struct inode *inode;
759         struct ll_inode_info *lli;
760         struct lov_stripe_md *lsm;
761         int stripe;
762         __u64 kms;
763
764         ENTRY;
765
766         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
767                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
768                 LBUG();
769         }
770
771         inode = ll_inode_from_lock(lock);
772         if (inode == NULL)
773                 RETURN(0);
774         lli = ll_i2info(inode);
775         if (lli == NULL)
776                 GOTO(iput, 0);
777         if (lli->lli_smd == NULL)
778                 GOTO(iput, 0);
779         lsm = lli->lli_smd;
780
781         stripe = ll_lock_to_stripe_offset(inode, lock);
782         if (stripe < 0)
783                 GOTO(iput, 0);
784
785         lov_stripe_lock(lsm);
786         lock_res_and_lock(lock);
787         kms = ldlm_extent_shift_kms(lock,
788                                     lsm->lsm_oinfo[stripe]->loi_kms);
789
790         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
791                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
792                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
793         lsm->lsm_oinfo[stripe]->loi_kms = kms;
794         unlock_res_and_lock(lock);
795         lov_stripe_unlock(lsm);
796         ll_try_done_writing(inode);
797         EXIT;
798 iput:
799         iput(inode);
800
801         return 0;
802 }
803
804 #if 0
805 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
806 {
807         /* XXX ALLOCATE - 160 bytes */
808         struct inode *inode = ll_inode_from_lock(lock);
809         struct ll_inode_info *lli = ll_i2info(inode);
810         struct lustre_handle lockh = { 0 };
811         struct ost_lvb *lvb;
812         int stripe;
813         ENTRY;
814
815         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
816                      LDLM_FL_BLOCK_CONV)) {
817                 LBUG(); /* not expecting any blocked async locks yet */
818                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
819                            "lock, returning");
820                 ldlm_lock_dump(D_OTHER, lock, 0);
821                 ldlm_reprocess_all(lock->l_resource);
822                 RETURN(0);
823         }
824
825         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
826
827         stripe = ll_lock_to_stripe_offset(inode, lock);
828         if (stripe < 0)
829                 goto iput;
830
831         if (lock->l_lvb_len) {
832                 struct lov_stripe_md *lsm = lli->lli_smd;
833                 __u64 kms;
834                 lvb = lock->l_lvb_data;
835                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
836
837                 lock_res_and_lock(lock);
838                 ll_inode_size_lock(inode, 1);
839                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
840                 kms = ldlm_extent_shift_kms(NULL, kms);
841                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
842                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
843                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
844                 lsm->lsm_oinfo[stripe].loi_kms = kms;
845                 ll_inode_size_unlock(inode, 1);
846                 unlock_res_and_lock(lock);
847         }
848
849 iput:
850         iput(inode);
851         wake_up(&lock->l_waitq);
852
853         ldlm_lock2handle(lock, &lockh);
854         ldlm_lock_decref(&lockh, LCK_PR);
855         RETURN(0);
856 }
857 #endif
858
859 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
860 {
861         struct ptlrpc_request *req = reqp;
862         struct inode *inode = ll_inode_from_lock(lock);
863         struct ll_inode_info *lli;
864         struct lov_stripe_md *lsm;
865         struct ost_lvb *lvb;
866         int rc, stripe;
867         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
868         ENTRY;
869
870         if (inode == NULL)
871                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
872         lli = ll_i2info(inode);
873         if (lli == NULL)
874                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
875         lsm = lli->lli_smd;
876         if (lsm == NULL)
877                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
878
879         /* First, find out which stripe index this lock corresponds to. */
880         stripe = ll_lock_to_stripe_offset(inode, lock);
881         if (stripe < 0)
882                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
883
884         rc = lustre_pack_reply(req, 2, size, NULL);
885         if (rc)
886                 GOTO(iput, rc);
887
888         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
889         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
890         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
891         lvb->lvb_atime = LTIME_S(inode->i_atime);
892         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
893
894         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
895                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
896                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_atime,
897                    lvb->lvb_mtime, lvb->lvb_ctime);
898  iput:
899         iput(inode);
900
901  out:
902         /* These errors are normal races, so we don't want to fill the console
903          * with messages by calling ptlrpc_error() */
904         if (rc == -ELDLM_NO_LOCK_DATA)
905                 lustre_pack_reply(req, 1, NULL, NULL);
906
907         req->rq_status = rc;
908         return rc;
909 }
910
911 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
912                      lstat_t *st)
913 {
914         struct lustre_handle lockh = { 0 };
915         struct ldlm_enqueue_info einfo = { 0 };
916         struct obd_info oinfo = { { { 0 } } };
917         struct ost_lvb lvb;
918         int rc;
919
920         ENTRY;
921
922         einfo.ei_type = LDLM_EXTENT;
923         einfo.ei_mode = LCK_PR;
924         einfo.ei_cb_bl = osc_extent_blocking_cb;
925         einfo.ei_cb_cp = ldlm_completion_ast;
926         einfo.ei_cb_gl = ll_glimpse_callback;
927         einfo.ei_cbdata = NULL;
928
929         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
930         oinfo.oi_lockh = &lockh;
931         oinfo.oi_md = lsm;
932         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
933
934         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
935         if (rc == -ENOENT)
936                 RETURN(rc);
937         if (rc != 0) {
938                 CERROR("obd_enqueue returned rc %d, "
939                        "returning -EIO\n", rc);
940                 RETURN(rc > 0 ? -EIO : rc);
941         }
942
943         lov_stripe_lock(lsm);
944         memset(&lvb, 0, sizeof(lvb));
945         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 0);
946         st->st_size = lvb.lvb_size;
947         st->st_blocks = lvb.lvb_blocks;
948         st->st_mtime = lvb.lvb_mtime;
949         st->st_atime = lvb.lvb_atime;
950         st->st_ctime = lvb.lvb_ctime;
951         lov_stripe_unlock(lsm);
952
953         RETURN(rc);
954 }
955
956 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
957  * file (because it prefers KMS over RSS when larger) */
958 int ll_glimpse_size(struct inode *inode, int ast_flags)
959 {
960         struct ll_inode_info *lli = ll_i2info(inode);
961         struct ll_sb_info *sbi = ll_i2sbi(inode);
962         struct lustre_handle lockh = { 0 };
963         struct ldlm_enqueue_info einfo = { 0 };
964         struct obd_info oinfo = { { { 0 } } };
965         struct ost_lvb lvb;
966         int rc;
967         ENTRY;
968
969         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
970
971         if (!lli->lli_smd) {
972                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
973                 RETURN(0);
974         }
975
976         /* NOTE: this looks like DLM lock request, but it may not be one. Due
977          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
978          *       won't revoke any conflicting DLM locks held. Instead,
979          *       ll_glimpse_callback() will be called on each client
980          *       holding a DLM lock against this file, and resulting size
981          *       will be returned for each stripe. DLM lock on [0, EOF] is
982          *       acquired only if there were no conflicting locks. */
983         einfo.ei_type = LDLM_EXTENT;
984         einfo.ei_mode = LCK_PR;
985         einfo.ei_cb_bl = osc_extent_blocking_cb;
986         einfo.ei_cb_cp = ldlm_completion_ast;
987         einfo.ei_cb_gl = ll_glimpse_callback;
988         einfo.ei_cbdata = inode;
989
990         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
991         oinfo.oi_lockh = &lockh;
992         oinfo.oi_md = lli->lli_smd;
993         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
994
995         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
996         if (rc == -ENOENT)
997                 RETURN(rc);
998         if (rc != 0) {
999                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1000                 RETURN(rc > 0 ? -EIO : rc);
1001         }
1002
1003         ll_inode_size_lock(inode, 1);
1004         inode_init_lvb(inode, &lvb);
1005         /* merge timestamps the most resently obtained from mds with
1006            timestamps obtained from osts */
1007         lvb.lvb_atime = lli->lli_lvb.lvb_atime;
1008         lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
1009         lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
1010         rc = obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0);
1011         i_size_write(inode, lvb.lvb_size);
1012         inode->i_blocks = lvb.lvb_blocks;
1013         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1014         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1015         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1016         ll_inode_size_unlock(inode, 1);
1017
1018         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1019                i_size_read(inode), (long long)inode->i_blocks);
1020
1021         RETURN(rc);
1022 }
1023
1024 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1025                    struct lov_stripe_md *lsm, int mode,
1026                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1027                    int ast_flags)
1028 {
1029         struct ll_sb_info *sbi = ll_i2sbi(inode);
1030         struct ost_lvb lvb;
1031         struct ldlm_enqueue_info einfo = { 0 };
1032         struct obd_info oinfo = { { { 0 } } };
1033         int rc;
1034         ENTRY;
1035
1036         LASSERT(!lustre_handle_is_used(lockh));
1037         LASSERT(lsm != NULL);
1038
1039         /* don't drop the mmapped file to LRU */
1040         if (mapping_mapped(inode->i_mapping))
1041                 ast_flags |= LDLM_FL_NO_LRU;
1042
1043         /* XXX phil: can we do this?  won't it screw the file size up? */
1044         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1045             (sbi->ll_flags & LL_SBI_NOLCK))
1046                 RETURN(0);
1047
1048         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1049                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1050
1051         einfo.ei_type = LDLM_EXTENT;
1052         einfo.ei_mode = mode;
1053         einfo.ei_cb_bl = osc_extent_blocking_cb;
1054         einfo.ei_cb_cp = ldlm_completion_ast;
1055         einfo.ei_cb_gl = ll_glimpse_callback;
1056         einfo.ei_cbdata = inode;
1057
1058         oinfo.oi_policy = *policy;
1059         oinfo.oi_lockh = lockh;
1060         oinfo.oi_md = lsm;
1061         oinfo.oi_flags = ast_flags;
1062
1063         rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo, NULL);
1064         *policy = oinfo.oi_policy;
1065         if (rc > 0)
1066                 rc = -EIO;
1067
1068         ll_inode_size_lock(inode, 1);
1069         inode_init_lvb(inode, &lvb);
1070         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 1);
1071
1072         if (policy->l_extent.start == 0 &&
1073             policy->l_extent.end == OBD_OBJECT_EOF) {
1074                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1075                  * the kms under both a DLM lock and the
1076                  * ll_inode_size_lock().  If we don't get the
1077                  * ll_inode_size_lock() here we can match the DLM lock and
1078                  * reset i_size from the kms before the truncating path has
1079                  * updated the kms.  generic_file_write can then trust the
1080                  * stale i_size when doing appending writes and effectively
1081                  * cancel the result of the truncate.  Getting the
1082                  * ll_inode_size_lock() after the enqueue maintains the DLM
1083                  * -> ll_inode_size_lock() acquiring order. */
1084                 i_size_write(inode, lvb.lvb_size);
1085                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1086                        inode->i_ino, i_size_read(inode));
1087         }
1088
1089         if (rc == 0) {
1090                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1091                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1092                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1093         }
1094         ll_inode_size_unlock(inode, 1);
1095
1096         RETURN(rc);
1097 }
1098
1099 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1100                      struct lov_stripe_md *lsm, int mode,
1101                      struct lustre_handle *lockh)
1102 {
1103         struct ll_sb_info *sbi = ll_i2sbi(inode);
1104         int rc;
1105         ENTRY;
1106
1107         /* XXX phil: can we do this?  won't it screw the file size up? */
1108         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1109             (sbi->ll_flags & LL_SBI_NOLCK))
1110                 RETURN(0);
1111
1112         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
1113
1114         RETURN(rc);
1115 }
1116
1117 static void ll_set_file_contended(struct inode *inode)
1118 {
1119         struct ll_inode_info *lli = ll_i2info(inode);
1120
1121         lli->lli_contention_time = cfs_time_current();
1122         set_bit(LLI_F_CONTENDED, &lli->lli_flags);
1123 }
1124
1125 void ll_clear_file_contended(struct inode *inode)
1126 {
1127         struct ll_inode_info *lli = ll_i2info(inode);
1128
1129         clear_bit(LLI_F_CONTENDED, &lli->lli_flags);
1130 }
1131
1132 static int ll_is_file_contended(struct file *file)
1133 {
1134         struct inode *inode = file->f_dentry->d_inode;
1135         struct ll_inode_info *lli = ll_i2info(inode);
1136         struct ll_sb_info *sbi = ll_i2sbi(inode);
1137         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1138         ENTRY;
1139
1140         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1141                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1142                        " osc connect flags = 0x"LPX64"\n",
1143                        sbi->ll_lco.lco_flags);
1144                 RETURN(0);
1145         }
1146         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1147                 RETURN(1);
1148         if (test_bit(LLI_F_CONTENDED, &lli->lli_flags)) {
1149                 cfs_time_t cur_time = cfs_time_current();
1150                 cfs_time_t retry_time;
1151
1152                 retry_time = cfs_time_add(
1153                         lli->lli_contention_time,
1154                         cfs_time_seconds(sbi->ll_contention_time));
1155                 if (cfs_time_after(cur_time, retry_time)) {
1156                         ll_clear_file_contended(inode);
1157                         RETURN(0);
1158                 }
1159                 RETURN(1);
1160         }
1161         RETURN(0);
1162 }
1163
1164 static int ll_file_get_tree_lock_iov(struct ll_lock_tree *tree,
1165                                      struct file *file, const struct iovec *iov,
1166                                      unsigned long nr_segs,
1167                                      obd_off start, obd_off end, int rw)
1168 {
1169         int append;
1170         int tree_locked = 0;
1171         int rc;
1172         struct inode * inode = file->f_dentry->d_inode;
1173         ENTRY;
1174
1175         append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
1176
1177         if (append || !ll_is_file_contended(file)) {
1178                 struct ll_lock_tree_node *node;
1179                 int ast_flags;
1180
1181                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1182                 if (file->f_flags & O_NONBLOCK)
1183                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1184                 node = ll_node_from_inode(inode, start, end,
1185                                           (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
1186                 if (IS_ERR(node)) {
1187                         rc = PTR_ERR(node);
1188                         GOTO(out, rc);
1189                 }
1190                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1191                 rc = ll_tree_lock_iov(tree, node, iov, nr_segs, ast_flags);
1192                 if (rc == 0)
1193                         tree_locked = 1;
1194                 else if (rc == -EUSERS)
1195                         ll_set_file_contended(inode);
1196                 else
1197                         GOTO(out, rc);
1198         }
1199         RETURN(tree_locked);
1200 out:
1201         return rc;
1202 }
1203
1204 /* XXX: exact copy from kernel code (__generic_file_aio_write_nolock from rhel4)
1205  */
1206 static size_t ll_file_get_iov_count(const struct iovec *iov,
1207                                      unsigned long *nr_segs)
1208 {
1209         size_t count = 0;
1210         unsigned long seg;
1211
1212         for (seg = 0; seg < *nr_segs; seg++) {
1213                 const struct iovec *iv = &iov[seg];
1214
1215                 /*
1216                  * If any segment has a negative length, or the cumulative
1217                  * length ever wraps negative then return -EINVAL.
1218                  */
1219                 count += iv->iov_len;
1220                 if (unlikely((ssize_t)(count|iv->iov_len) < 0))
1221                         return -EINVAL;
1222                 if (access_ok(VERIFY_WRITE, iv->iov_base, iv->iov_len))
1223                         continue;
1224                 if (seg == 0)
1225                         return -EFAULT;
1226                 *nr_segs = seg;
1227                 count -= iv->iov_len;   /* This segment is no good */
1228                 break;
1229         }
1230         return count;
1231 }
1232
1233 static int iov_copy_update(unsigned long *nr_segs, const struct iovec **iov_out,
1234                            unsigned long *nrsegs_copy,
1235                            struct iovec *iov_copy, size_t *offset,
1236                            size_t size)
1237 {
1238         int i;
1239         const struct iovec *iov = *iov_out;
1240         for (i = 0; i < *nr_segs;
1241              i++) {
1242                 const struct iovec *iv = &iov[i];
1243                 struct iovec *ivc = &iov_copy[i];
1244                 *ivc = *iv;
1245                 if (i == 0) {
1246                         ivc->iov_len -= *offset;
1247                         ivc->iov_base += *offset;
1248                 }
1249                 if (ivc->iov_len >= size) {
1250                         ivc->iov_len = size;
1251                         if (i == 0)
1252                                 *offset += size;
1253                         else
1254                                 *offset = size;
1255                         break;
1256                 }
1257                 size -= ivc->iov_len;
1258         }
1259         *iov_out += i;
1260         *nr_segs -= i;
1261         *nrsegs_copy = i + 1;
1262
1263         return 0;
1264 }
1265
1266 static int ll_reget_short_lock(struct page *page, int rw,
1267                                obd_off start, obd_off end,
1268                                void **cookie)
1269 {
1270         struct ll_async_page *llap;
1271         struct obd_export *exp;
1272         struct inode *inode = page->mapping->host;
1273
1274         ENTRY;
1275
1276         exp = ll_i2obdexp(inode);
1277         if (exp == NULL)
1278                 RETURN(0);
1279
1280         llap = llap_cast_private(page);
1281         if (llap == NULL)
1282                 RETURN(0);
1283
1284         RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
1285                                     &llap->llap_cookie, rw, start, end,
1286                                     cookie));
1287 }
1288
1289 static void ll_release_short_lock(struct inode *inode, obd_off end,
1290                                   void *cookie, int rw)
1291 {
1292         struct obd_export *exp;
1293         int rc;
1294
1295         exp = ll_i2obdexp(inode);
1296         if (exp == NULL)
1297                 return;
1298
1299         rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
1300                                     cookie, rw);
1301         if (rc < 0)
1302                 CERROR("unlock failed (%d)\n", rc);
1303 }
1304
1305 static inline int ll_file_get_fast_lock(struct file *file,
1306                                         obd_off ppos, obd_off end,
1307                                         const struct iovec *iov,
1308                                         unsigned long nr_segs,
1309                                         void **cookie, int rw)
1310 {
1311         int rc = 0, seg;
1312         struct page *page;
1313
1314         ENTRY;
1315
1316         /* we would like this read request to be lockfree */
1317         for (seg = 0; seg < nr_segs; seg++) {
1318                 const struct iovec *iv = &iov[seg];
1319                 if (ll_region_mapped((unsigned long)iv->iov_base, iv->iov_len))
1320                         GOTO(out, rc);
1321         }
1322
1323         page = find_lock_page(file->f_dentry->d_inode->i_mapping,
1324                               ppos >> CFS_PAGE_SHIFT);
1325         if (page) {
1326                 if (ll_reget_short_lock(page, rw, ppos, end, cookie))
1327                         rc = 1;
1328
1329                 unlock_page(page);
1330                 page_cache_release(page);
1331         }
1332
1333 out:
1334         RETURN(rc);
1335 }
1336
1337 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
1338                                          void *cookie, int rw)
1339 {
1340         ll_release_short_lock(inode, end, cookie, rw);
1341 }
1342
1343 enum ll_lock_style {
1344         LL_LOCK_STYLE_NOLOCK   = 0,
1345         LL_LOCK_STYLE_FASTLOCK = 1,
1346         LL_LOCK_STYLE_TREELOCK = 2
1347 };
1348
1349 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
1350                                    obd_off end, const struct iovec *iov,
1351                                    unsigned long nr_segs, void **cookie,
1352                                    struct ll_lock_tree *tree, int rw)
1353 {
1354         int rc;
1355
1356         ENTRY;
1357
1358         if (ll_file_get_fast_lock(file, ppos, end, iov, nr_segs, cookie, rw))
1359                 RETURN(LL_LOCK_STYLE_FASTLOCK);
1360
1361         rc = ll_file_get_tree_lock_iov(tree, file, iov, nr_segs,
1362                                        ppos, end, rw);
1363         /* rc: 1 for tree lock, 0 for no lock, <0 for error */
1364         switch (rc) {
1365         case 1:
1366                 RETURN(LL_LOCK_STYLE_TREELOCK);
1367         case 0:
1368                 RETURN(LL_LOCK_STYLE_NOLOCK);
1369         }
1370
1371         /* an error happened if we reached this point, rc = -errno here */
1372         RETURN(rc);
1373 }
1374
1375 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
1376                                     enum ll_lock_style lock_style,
1377                                     void *cookie, struct ll_lock_tree *tree,
1378                                     int rw)
1379
1380 {
1381         switch (lock_style) {
1382         case LL_LOCK_STYLE_TREELOCK:
1383                 ll_tree_unlock(tree);
1384                 break;
1385         case LL_LOCK_STYLE_FASTLOCK:
1386                 ll_file_put_fast_lock(inode, end, cookie, rw);
1387                 break;
1388         default:
1389                 CERROR("invalid locking style (%d)\n", lock_style);
1390         }
1391 }
1392
1393 #ifdef HAVE_FILE_READV
1394 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
1395                               unsigned long nr_segs, loff_t *ppos)
1396 {
1397 #else
1398 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
1399                                 unsigned long nr_segs, loff_t pos)
1400 {
1401         struct file *file = iocb->ki_filp;
1402         loff_t *ppos = &iocb->ki_pos;
1403 #endif
1404         struct inode *inode = file->f_dentry->d_inode;
1405         struct ll_inode_info *lli = ll_i2info(inode);
1406         struct lov_stripe_md *lsm = lli->lli_smd;
1407         struct ll_sb_info *sbi = ll_i2sbi(inode);
1408         struct ll_lock_tree tree;
1409         struct ost_lvb lvb;
1410         struct ll_ra_read bead;
1411         int ra = 0;
1412         obd_off end;
1413         ssize_t retval, chunk, sum = 0;
1414         int lock_style;
1415         struct iovec *iov_copy = NULL;
1416         unsigned long nrsegs_copy, nrsegs_orig = 0;
1417         size_t count, iov_offset = 0;
1418         __u64 kms;
1419         void *cookie;
1420         ENTRY;
1421
1422         count = ll_file_get_iov_count(iov, &nr_segs);
1423         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1424                inode->i_ino, inode->i_generation, inode, count, *ppos);
1425         /* "If nbyte is 0, read() will return 0 and have no other results."
1426          *                      -- Single Unix Spec */
1427         if (count == 0)
1428                 RETURN(0);
1429
1430         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1431
1432         if (!lsm) {
1433                 /* Read on file with no objects should return zero-filled
1434                  * buffers up to file size (we can get non-zero sizes with
1435                  * mknod + truncate, then opening file for read. This is a
1436                  * common pattern in NFS case, it seems). Bug 6243 */
1437                 int notzeroed;
1438                 /* Since there are no objects on OSTs, we have nothing to get
1439                  * lock on and so we are forced to access inode->i_size
1440                  * unguarded */
1441
1442                 /* Read beyond end of file */
1443                 if (*ppos >= i_size_read(inode))
1444                         RETURN(0);
1445
1446                 if (count > i_size_read(inode) - *ppos)
1447                         count = i_size_read(inode) - *ppos;
1448                 /* Make sure to correctly adjust the file pos pointer for
1449                  * EFAULT case */
1450                 for (nrsegs_copy = 0; nrsegs_copy < nr_segs; nrsegs_copy++) {
1451                         const struct iovec *iv = &iov[nrsegs_copy];
1452
1453                         if (count < iv->iov_len)
1454                                 chunk = count;
1455                         else
1456                                 chunk = iv->iov_len;
1457                         notzeroed = clear_user(iv->iov_base, chunk);
1458                         sum += (chunk - notzeroed);
1459                         count -= (chunk - notzeroed);
1460                         if (notzeroed || !count)
1461                                 break;
1462                 }
1463                 *ppos += sum;
1464                 if (!sum)
1465                         RETURN(-EFAULT);
1466                 RETURN(sum);
1467         }
1468
1469 repeat:
1470         if (sbi->ll_max_rw_chunk != 0) {
1471                 /* first, let's know the end of the current stripe */
1472                 end = *ppos;
1473                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END,&end);
1474
1475                 /* correct, the end is beyond the request */
1476                 if (end > *ppos + count - 1)
1477                         end = *ppos + count - 1;
1478
1479                 /* and chunk shouldn't be too large even if striping is wide */
1480                 if (end - *ppos > sbi->ll_max_rw_chunk)
1481                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1482
1483                 chunk = end - *ppos + 1;
1484                 if ((count == chunk) && (iov_offset == 0)) {
1485                         if (iov_copy)
1486                                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1487
1488                         iov_copy = (struct iovec *)iov;
1489                         nrsegs_copy = nr_segs;
1490                 } else {
1491                         if (!iov_copy) {
1492                                 nrsegs_orig = nr_segs;
1493                                 OBD_ALLOC(iov_copy, sizeof(*iov) * nr_segs);
1494                                 if (!iov_copy)
1495                                         GOTO(out, retval = -ENOMEM);
1496                         }
1497
1498                         iov_copy_update(&nr_segs, &iov, &nrsegs_copy, iov_copy,
1499                                         &iov_offset, chunk);
1500                 }
1501         } else {
1502                 end = *ppos + count - 1;
1503                 iov_copy = (struct iovec *)iov;
1504                 nrsegs_copy = nr_segs;
1505         }
1506
1507         down_read(&lli->lli_truncate_rwsem); /* Bug 18233 */
1508
1509         lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
1510                                       iov_copy, nrsegs_copy, &cookie, &tree,
1511                                       OBD_BRW_READ);
1512         if (lock_style < 0 || lock_style == LL_LOCK_STYLE_NOLOCK)
1513                 up_read(&lli->lli_truncate_rwsem);
1514         if (lock_style < 0)
1515                 GOTO(out, retval = lock_style);
1516
1517         ll_inode_size_lock(inode, 1);
1518         /*
1519          * Consistency guarantees: following possibilities exist for the
1520          * relation between region being read and real file size at this
1521          * moment:
1522          *
1523          *  (A): the region is completely inside of the file;
1524          *
1525          *  (B-x): x bytes of region are inside of the file, the rest is
1526          *  outside;
1527          *
1528          *  (C): the region is completely outside of the file.
1529          *
1530          * This classification is stable under DLM lock acquired by
1531          * ll_tree_lock() above, because to change class, other client has to
1532          * take DLM lock conflicting with our lock. Also, any updates to
1533          * ->i_size by other threads on this client are serialized by
1534          * ll_inode_size_lock(). This guarantees that short reads are handled
1535          * correctly in the face of concurrent writes and truncates.
1536          */
1537         inode_init_lvb(inode, &lvb);
1538         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1539         kms = lvb.lvb_size;
1540         if (*ppos + count - 1 > kms) {
1541                 /* A glimpse is necessary to determine whether we return a
1542                  * short read (B) or some zeroes at the end of the buffer (C) */
1543                 ll_inode_size_unlock(inode, 1);
1544                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1545                 if (retval) {
1546                         if (lock_style != LL_LOCK_STYLE_NOLOCK) {
1547                                 ll_file_put_lock(inode, end, lock_style,
1548                                                  cookie, &tree, OBD_BRW_READ);
1549                                 up_read(&lli->lli_truncate_rwsem);
1550                         }
1551                         goto out;
1552                 }
1553         } else {
1554                 /* region is within kms and, hence, within real file size (A).
1555                  * We need to increase i_size to cover the read region so that
1556                  * generic_file_read() will do its job, but that doesn't mean
1557                  * the kms size is _correct_, it is only the _minimum_ size.
1558                  * If someone does a stat they will get the correct size which
1559                  * will always be >= the kms value here.  b=11081 */
1560                 if (i_size_read(inode) < kms)
1561                         i_size_write(inode, kms);
1562                 ll_inode_size_unlock(inode, 1);
1563         }
1564
1565         chunk = end - *ppos + 1;
1566         CDEBUG(D_INODE,"Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1567                inode->i_ino, chunk, *ppos, i_size_read(inode));
1568
1569         /* turn off the kernel's read-ahead */
1570         if (lock_style != LL_LOCK_STYLE_NOLOCK) {
1571                 /* read under locks
1572                  *
1573                  * 1. update inode's atime as long as concurrent stat
1574                  * (via ll_glimpse_size) might bring out-of-date ones
1575                  *
1576                  * 2. update lsm so that next stat (via
1577                  * ll_glimpse_size) could get correct values in lsm */
1578                 struct ost_lvb xtimes;
1579
1580                 lov_stripe_lock(lsm);
1581                 LTIME_S(inode->i_atime) = LTIME_S(CURRENT_TIME);
1582                 xtimes.lvb_atime = LTIME_S(inode->i_atime);
1583                 obd_update_lvb(sbi->ll_osc_exp, lsm, &xtimes,
1584                                OBD_MD_FLATIME);
1585                 lov_stripe_unlock(lsm);
1586
1587                 file->f_ra.ra_pages = 0;
1588                 /* initialize read-ahead window once per syscall */
1589                 if (ra == 0) {
1590                         ra = 1;
1591                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1592                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1593                         ll_ra_read_in(file, &bead);
1594                 }
1595
1596                 /* BUG: 5972 */
1597                 file_accessed(file);
1598 #ifdef HAVE_FILE_READV
1599                 retval = generic_file_readv(file, iov_copy, nrsegs_copy, ppos);
1600 #else
1601                 retval = generic_file_aio_read(iocb, iov_copy, nrsegs_copy,
1602                                                *ppos);
1603 #endif
1604                 ll_file_put_lock(inode, end, lock_style, cookie,
1605                                  &tree, OBD_BRW_READ);
1606                 up_read(&lli->lli_truncate_rwsem);
1607         } else {
1608                 /* lockless read
1609                  *
1610                  * current time will get into request as atime
1611                  * (lustre/osc/osc_request.c:osc_build_request())
1612                  */
1613                 retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy, ppos,
1614                                              READ, chunk);
1615         }
1616         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1617         if (retval > 0) {
1618                 count -= retval;
1619                 sum += retval;
1620                 if (retval == chunk && count > 0)
1621                         goto repeat;
1622         }
1623
1624  out:
1625         if (ra != 0)
1626                 ll_ra_read_ex(file, &bead);
1627         retval = (sum > 0) ? sum : retval;
1628
1629         if (iov_copy && iov_copy != iov)
1630                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1631
1632         RETURN(retval);
1633 }
1634
1635 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1636                             loff_t *ppos)
1637 {
1638         struct iovec local_iov = { .iov_base = (void __user *)buf,
1639                                    .iov_len = count };
1640 #ifdef HAVE_FILE_READV
1641         return ll_file_readv(file, &local_iov, 1, ppos);
1642 #else
1643         struct kiocb kiocb;
1644         ssize_t ret;
1645
1646         init_sync_kiocb(&kiocb, file);
1647         kiocb.ki_pos = *ppos;
1648         kiocb.ki_left = count;
1649
1650         ret = ll_file_aio_read(&kiocb, &local_iov, 1, kiocb.ki_pos);
1651         *ppos = kiocb.ki_pos;
1652         return ret;
1653 #endif
1654 }
1655
1656 /*
1657  * Write to a file (through the page cache).
1658  */
1659 #ifdef HAVE_FILE_WRITEV
1660 static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
1661                               unsigned long nr_segs, loff_t *ppos)
1662 {
1663 #else /* AIO stuff */
1664 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1665                                  unsigned long nr_segs, loff_t pos)
1666 {
1667         struct file *file = iocb->ki_filp;
1668         loff_t *ppos = &iocb->ki_pos;
1669 #endif
1670         struct inode *inode = file->f_dentry->d_inode;
1671         struct ll_sb_info *sbi = ll_i2sbi(inode);
1672         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1673         struct ll_lock_tree tree;
1674         loff_t maxbytes = ll_file_maxbytes(inode);
1675         loff_t lock_start, lock_end, end;
1676         ssize_t retval, chunk, sum = 0;
1677         int tree_locked;
1678         struct iovec *iov_copy = NULL;
1679         unsigned long nrsegs_copy, nrsegs_orig = 0;
1680         size_t count, iov_offset = 0;
1681         ENTRY;
1682
1683         count = ll_file_get_iov_count(iov, &nr_segs);
1684
1685         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1686                inode->i_ino, inode->i_generation, inode, count, *ppos);
1687
1688         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1689
1690         /* POSIX, but surprised the VFS doesn't check this already */
1691         if (count == 0)
1692                 RETURN(0);
1693
1694         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1695          * called on the file, don't fail the below assertion (bug 2388). */
1696         if (file->f_flags & O_LOV_DELAY_CREATE &&
1697             ll_i2info(inode)->lli_smd == NULL)
1698                 RETURN(-EBADF);
1699
1700         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1701
1702         down(&ll_i2info(inode)->lli_write_sem);
1703
1704 repeat:
1705         chunk = 0; /* just to fix gcc's warning */
1706         end = *ppos + count - 1;
1707
1708         if (file->f_flags & O_APPEND) {
1709                 lock_start = 0;
1710                 lock_end = OBD_OBJECT_EOF;
1711                 iov_copy = (struct iovec *)iov;
1712                 nrsegs_copy = nr_segs;
1713         } else if (sbi->ll_max_rw_chunk != 0) {
1714                 /* first, let's know the end of the current stripe */
1715                 end = *ppos;
1716                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END,
1717                                 (obd_off *)&end);
1718
1719                 /* correct, the end is beyond the request */
1720                 if (end > *ppos + count - 1)
1721                         end = *ppos + count - 1;
1722
1723                 /* and chunk shouldn't be too large even if striping is wide */
1724                 if (end - *ppos > sbi->ll_max_rw_chunk)
1725                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1726                 lock_start = *ppos;
1727                 lock_end = end;
1728                 chunk = end - *ppos + 1;
1729                 if ((count == chunk) && (iov_offset == 0)) {
1730                         if (iov_copy)
1731                                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1732
1733                         iov_copy = (struct iovec *)iov;
1734                         nrsegs_copy = nr_segs;
1735                 } else {
1736                         if (!iov_copy) {
1737                                 nrsegs_orig = nr_segs;
1738                                 OBD_ALLOC(iov_copy, sizeof(*iov) * nr_segs);
1739                                 if (!iov_copy)
1740                                         GOTO(out, retval = -ENOMEM);
1741                         }
1742                         iov_copy_update(&nr_segs, &iov, &nrsegs_copy, iov_copy,
1743                                         &iov_offset, chunk);
1744                 }
1745         } else {
1746                 lock_start = *ppos;
1747                 lock_end = end;
1748                 iov_copy = (struct iovec *)iov;
1749                 nrsegs_copy = nr_segs;
1750         }
1751
1752         tree_locked = ll_file_get_tree_lock_iov(&tree, file, iov_copy,
1753                                                 nrsegs_copy,
1754                                                 (obd_off)lock_start,
1755                                                 (obd_off)lock_end,
1756                                                 OBD_BRW_WRITE);
1757         if (tree_locked < 0)
1758                 GOTO(out, retval = tree_locked);
1759
1760         /* This is ok, g_f_w will overwrite this under i_sem if it races
1761          * with a local truncate, it just makes our maxbyte checking easier.
1762          * The i_size value gets updated in ll_extent_lock() as a consequence
1763          * of the [0,EOF] extent lock we requested above. */
1764         if (file->f_flags & O_APPEND) {
1765                 *ppos = i_size_read(inode);
1766                 end = *ppos + count - 1;
1767         }
1768
1769         if (*ppos >= maxbytes) {
1770                 send_sig(SIGXFSZ, current, 0);
1771                 GOTO(out_unlock, retval = -EFBIG);
1772         }
1773         if (end > maxbytes - 1)
1774                 end = maxbytes - 1;
1775
1776         /* generic_file_write handles O_APPEND after getting i_mutex */
1777         chunk = end - *ppos + 1;
1778         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1779                inode->i_ino, chunk, *ppos);
1780         if (tree_locked) {
1781                 /* write under locks
1782                  *
1783                  * 1. update inode's mtime and ctime as long as
1784                  * concurrent stat (via ll_glimpse_size) might bring
1785                  * out-of-date ones
1786                  *
1787                  * 2. update lsm so that next stat (via
1788                  * ll_glimpse_size) could get correct values in lsm */
1789                 struct ost_lvb xtimes;
1790
1791                 lov_stripe_lock(lsm);
1792                 LTIME_S(inode->i_mtime) = LTIME_S(CURRENT_TIME);
1793                 LTIME_S(inode->i_ctime) = LTIME_S(CURRENT_TIME);
1794                 xtimes.lvb_mtime = LTIME_S(inode->i_mtime);
1795                 xtimes.lvb_ctime = LTIME_S(inode->i_ctime);
1796                 obd_update_lvb(sbi->ll_osc_exp, lsm, &xtimes,
1797                                OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1798                 lov_stripe_unlock(lsm);
1799
1800 #ifdef HAVE_FILE_WRITEV
1801                 retval = generic_file_writev(file, iov_copy, nrsegs_copy, ppos);
1802 #else
1803                 retval = generic_file_aio_write(iocb, iov_copy, nrsegs_copy,
1804                                                 *ppos);
1805 #endif
1806         } else {
1807                 /* lockless write
1808                  *
1809                  * current time will get into request as mtime and
1810                  * ctime (lustre/osc/osc_request.c:osc_build_request())
1811                  */
1812                 retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy,
1813                                              ppos, WRITE, chunk);
1814         }
1815         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1816
1817 out_unlock:
1818         if (tree_locked)
1819                 ll_tree_unlock(&tree);
1820
1821 out:
1822         if (retval > 0) {
1823                 count -= retval;
1824                 sum += retval;
1825                 if (retval == chunk && count > 0)
1826                         goto repeat;
1827         }
1828
1829         up(&ll_i2info(inode)->lli_write_sem);
1830
1831         if (iov_copy && iov_copy != iov)
1832                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1833
1834         retval = (sum > 0) ? sum : retval;
1835         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1836                            retval > 0 ? retval : 0);
1837         RETURN(retval);
1838 }
1839
1840 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1841                              loff_t *ppos)
1842 {
1843         struct iovec local_iov = { .iov_base = (void __user *)buf,
1844                                    .iov_len = count };
1845
1846 #ifdef HAVE_FILE_WRITEV
1847         return ll_file_writev(file, &local_iov, 1, ppos);
1848 #else
1849         struct kiocb kiocb;
1850         ssize_t ret;
1851
1852         init_sync_kiocb(&kiocb, file);
1853         kiocb.ki_pos = *ppos;
1854         kiocb.ki_left = count;
1855
1856         ret = ll_file_aio_write(&kiocb, &local_iov, 1, kiocb.ki_pos);
1857         *ppos = kiocb.ki_pos;
1858
1859         return ret;
1860 #endif
1861 }
1862
1863 #ifdef HAVE_KERNEL_SENDFILE
1864 /*
1865  * Send file content (through pagecache) somewhere with helper
1866  */
1867 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,
1868                                 size_t count, read_actor_t actor, void *target)
1869 {
1870         struct inode *inode = in_file->f_dentry->d_inode;
1871         struct ll_inode_info *lli = ll_i2info(inode);
1872         struct lov_stripe_md *lsm = lli->lli_smd;
1873         struct ll_lock_tree tree;
1874         struct ll_lock_tree_node *node;
1875         struct ost_lvb lvb;
1876         struct ll_ra_read bead;
1877         ssize_t rc;
1878         __u64 kms;
1879         ENTRY;
1880
1881         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1882                inode->i_ino, inode->i_generation, inode, count, *ppos);
1883
1884         /* "If nbyte is 0, read() will return 0 and have no other results."
1885          *                      -- Single Unix Spec */
1886         if (count == 0)
1887                 RETURN(0);
1888
1889         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1890         /* turn off the kernel's read-ahead */
1891         in_file->f_ra.ra_pages = 0;
1892
1893         /* File with no objects, nothing to lock */
1894         if (!lsm) {
1895                 rc = generic_file_sendfile(in_file, ppos, count, actor, target);
1896                 RETURN(rc);
1897         }
1898
1899         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1900         if (IS_ERR(node))
1901                 RETURN(PTR_ERR(node));
1902
1903         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1904         rc = ll_tree_lock(&tree, node, NULL, count,
1905                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1906         if (rc != 0)
1907                 RETURN(rc);
1908
1909         ll_clear_file_contended(inode);
1910         ll_inode_size_lock(inode, 1);
1911         /*
1912          * Consistency guarantees: following possibilities exist for the
1913          * relation between region being read and real file size at this
1914          * moment:
1915          *
1916          *  (A): the region is completely inside of the file;
1917          *
1918          *  (B-x): x bytes of region are inside of the file, the rest is
1919          *  outside;
1920          *
1921          *  (C): the region is completely outside of the file.
1922          *
1923          * This classification is stable under DLM lock acquired by
1924          * ll_tree_lock() above, because to change class, other client has to
1925          * take DLM lock conflicting with our lock. Also, any updates to
1926          * ->i_size by other threads on this client are serialized by
1927          * ll_inode_size_lock(). This guarantees that short reads are handled
1928          * correctly in the face of concurrent writes and truncates.
1929          */
1930         inode_init_lvb(inode, &lvb);
1931         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1932         kms = lvb.lvb_size;
1933         if (*ppos + count - 1 > kms) {
1934                 /* A glimpse is necessary to determine whether we return a
1935                  * short read (B) or some zeroes at the end of the buffer (C) */
1936                 ll_inode_size_unlock(inode, 1);
1937                 rc = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1938                 if (rc)
1939                         goto out;
1940         } else {
1941                 /* region is within kms and, hence, within real file size (A) */
1942                 i_size_write(inode, kms);
1943                 ll_inode_size_unlock(inode, 1);
1944         }
1945
1946         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1947                inode->i_ino, count, *ppos, i_size_read(inode));
1948
1949         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1950         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1951         ll_ra_read_in(in_file, &bead);
1952         /* BUG: 5972 */
1953         file_accessed(in_file);
1954         rc = generic_file_sendfile(in_file, ppos, count, actor, target);
1955         ll_ra_read_ex(in_file, &bead);
1956
1957  out:
1958         ll_tree_unlock(&tree);
1959         RETURN(rc);
1960 }
1961 #endif
1962
1963 /* change based on 
1964  * http://git.kernel.org/?p=linux/kernel/git/torvalds/linux-2.6.git;a=commit;h=f0930fffa99e7fe0a0c4b6c7d9a244dc88288c27
1965  */
1966 #ifdef HAVE_KERNEL_SPLICE_READ
1967 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1968                                    struct pipe_inode_info *pipe, size_t count,
1969                                    unsigned int flags)
1970 {
1971         struct inode *inode = in_file->f_dentry->d_inode;
1972         struct ll_inode_info *lli = ll_i2info(inode);
1973         struct lov_stripe_md *lsm = lli->lli_smd;
1974         struct ll_lock_tree tree;
1975         struct ll_lock_tree_node *node;
1976         struct ost_lvb lvb;
1977         struct ll_ra_read bead;
1978         ssize_t rc;
1979         __u64 kms;
1980         ENTRY;
1981
1982         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1983                inode->i_ino, inode->i_generation, inode, count, *ppos);
1984
1985         /* "If nbyte is 0, read() will return 0 and have no other results."
1986          *                      -- Single Unix Spec */
1987         if (count == 0)
1988                 RETURN(0);
1989
1990         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1991         /* turn off the kernel's read-ahead */
1992         in_file->f_ra.ra_pages = 0;
1993
1994         /* File with no objects, nothing to lock */
1995         if (!lsm) {
1996                 rc = generic_file_splice_read(in_file, ppos, pipe, count, flags);
1997                 RETURN(rc);
1998         }
1999
2000         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
2001         if (IS_ERR(node))
2002                 RETURN(PTR_ERR(node));
2003
2004         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
2005         rc = ll_tree_lock(&tree, node, NULL, count,
2006                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
2007         if (rc != 0)
2008                 RETURN(rc);
2009
2010         ll_clear_file_contended(inode);
2011         ll_inode_size_lock(inode, 1);
2012         /*
2013          * Consistency guarantees: following possibilities exist for the
2014          * relation between region being read and real file size at this
2015          * moment:
2016          *
2017          *  (A): the region is completely inside of the file;
2018          *
2019          *  (B-x): x bytes of region are inside of the file, the rest is
2020          *  outside;
2021          *
2022          *  (C): the region is completely outside of the file.
2023          *
2024          * This classification is stable under DLM lock acquired by
2025          * ll_tree_lock() above, because to change class, other client has to
2026          * take DLM lock conflicting with our lock. Also, any updates to
2027          * ->i_size by other threads on this client are serialized by
2028          * ll_inode_size_lock(). This guarantees that short reads are handled
2029          * correctly in the face of concurrent writes and truncates.
2030          */
2031         inode_init_lvb(inode, &lvb);
2032         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
2033         kms = lvb.lvb_size;
2034         if (*ppos + count - 1 > kms) {
2035                 /* A glimpse is necessary to determine whether we return a
2036                  * short read (B) or some zeroes at the end of the buffer (C) */
2037                 ll_inode_size_unlock(inode, 1);
2038                 rc = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
2039                 if (rc)
2040                         goto out;
2041         } else {
2042                 /* region is within kms and, hence, within real file size (A) */
2043                 i_size_write(inode, kms);
2044                 ll_inode_size_unlock(inode, 1);
2045         }
2046
2047         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
2048                inode->i_ino, count, *ppos, i_size_read(inode));
2049
2050         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
2051         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
2052         ll_ra_read_in(in_file, &bead);
2053         /* BUG: 5972 */
2054         file_accessed(in_file);
2055         rc = generic_file_splice_read(in_file, ppos, pipe, count, flags);
2056         ll_ra_read_ex(in_file, &bead);
2057
2058  out:
2059         ll_tree_unlock(&tree);
2060         RETURN(rc);
2061 }
2062 #endif
2063
2064 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
2065                                unsigned long arg)
2066 {
2067         struct ll_inode_info *lli = ll_i2info(inode);
2068         struct obd_export *exp = ll_i2obdexp(inode);
2069         struct ll_recreate_obj ucreatp;
2070         struct obd_trans_info oti = { 0 };
2071         struct obdo *oa = NULL;
2072         int lsm_size;
2073         int rc = 0;
2074         struct lov_stripe_md *lsm, *lsm2;
2075         ENTRY;
2076
2077         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2078                 RETURN(-EPERM);
2079
2080         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
2081                             sizeof(struct ll_recreate_obj));
2082         if (rc) {
2083                 RETURN(-EFAULT);
2084         }
2085         OBDO_ALLOC(oa);
2086         if (oa == NULL)
2087                 RETURN(-ENOMEM);
2088
2089         down(&lli->lli_size_sem);
2090         lsm = lli->lli_smd;
2091         if (lsm == NULL)
2092                 GOTO(out, rc = -ENOENT);
2093         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
2094                    (lsm->lsm_stripe_count));
2095
2096         OBD_ALLOC(lsm2, lsm_size);
2097         if (lsm2 == NULL)
2098                 GOTO(out, rc = -ENOMEM);
2099
2100         oa->o_id = ucreatp.lrc_id;
2101         oa->o_nlink = ucreatp.lrc_ost_idx;
2102         oa->o_flags |= OBD_FL_RECREATE_OBJS;
2103         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
2104         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2105                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2106
2107         memcpy(lsm2, lsm, lsm_size);
2108         rc = obd_create(exp, oa, &lsm2, &oti);
2109
2110         OBD_FREE(lsm2, lsm_size);
2111         GOTO(out, rc);
2112 out:
2113         up(&lli->lli_size_sem);
2114         OBDO_FREE(oa);
2115         return rc;
2116 }
2117
2118 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
2119                                     int flags, struct lov_user_md *lum,
2120                                     int lum_size)
2121 {
2122         struct ll_inode_info *lli = ll_i2info(inode);
2123         struct lov_stripe_md *lsm;
2124         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
2125         int rc = 0;
2126         ENTRY;
2127
2128         down(&lli->lli_size_sem);
2129         lsm = lli->lli_smd;
2130         if (lsm) {
2131                 up(&lli->lli_size_sem);
2132                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
2133                        inode->i_ino);
2134                 RETURN(-EEXIST);
2135         }
2136
2137         rc = ll_intent_file_open(file, lum, lum_size, &oit);
2138         if (rc)
2139                 GOTO(out, rc);
2140         if (it_disposition(&oit, DISP_LOOKUP_NEG))
2141                 GOTO(out_req_free, rc = -ENOENT);
2142         rc = oit.d.lustre.it_status;
2143         if (rc < 0)
2144                 GOTO(out_req_free, rc);
2145
2146         ll_release_openhandle(file->f_dentry, &oit);
2147
2148  out:
2149         up(&lli->lli_size_sem);
2150         ll_intent_release(&oit);
2151         RETURN(rc);
2152 out_req_free:
2153         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2154         goto out;
2155 }
2156
2157 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
2158                              struct lov_mds_md **lmmp, int *lmm_size,
2159                              struct ptlrpc_request **request)
2160 {
2161         struct ll_sb_info *sbi = ll_i2sbi(inode);
2162         struct ll_fid  fid;
2163         struct mds_body  *body;
2164         struct lov_mds_md *lmm = NULL;
2165         struct ptlrpc_request *req = NULL;
2166         int rc, lmmsize;
2167
2168         ll_inode2fid(&fid, inode);
2169
2170         rc = ll_get_max_mdsize(sbi, &lmmsize);
2171         if (rc)
2172                 RETURN(rc);
2173
2174         rc = mdc_getattr_name(sbi->ll_mdc_exp, &fid,
2175                         filename, strlen(filename) + 1,
2176                         OBD_MD_FLEASIZE | OBD_MD_FLDIREA,
2177                         lmmsize, &req);
2178         if (rc < 0) {
2179                 CDEBUG(D_INFO, "mdc_getattr_name failed "
2180                                 "on %s: rc %d\n", filename, rc);
2181                 GOTO(out, rc);
2182         }
2183
2184         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
2185                         sizeof(*body));
2186         LASSERT(body != NULL); /* checked by mdc_getattr_name */
2187         /* swabbed by mdc_getattr_name */
2188         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF));
2189
2190         lmmsize = body->eadatasize;
2191
2192         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
2193                         lmmsize == 0) {
2194                 GOTO(out, rc = -ENODATA);
2195         }
2196
2197         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
2198                         lmmsize);
2199         LASSERT(lmm != NULL);
2200         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF + 1));
2201
2202         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
2203             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3)) &&
2204             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_JOIN))) {
2205                 GOTO(out, rc = -EPROTO);
2206         }
2207         /*
2208          * This is coming from the MDS, so is probably in
2209          * little endian.  We convert it to host endian before
2210          * passing it to userspace.
2211          */
2212         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
2213                 /* if function called for directory - we should
2214                  * avoid swab not existent lsm objects */
2215                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
2216                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
2217                         if (S_ISREG(body->mode))
2218                                 lustre_swab_lov_user_md_objects(
2219                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
2220                                  ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
2221                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
2222                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
2223                         if (S_ISREG(body->mode))
2224                                 lustre_swab_lov_user_md_objects(
2225                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
2226                                  ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
2227                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_JOIN)) {
2228                         lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
2229                 }
2230         }
2231
2232         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
2233                 struct lov_stripe_md *lsm;
2234                 struct lov_user_md_join *lmj;
2235                 int lmj_size, i, aindex = 0;
2236
2237                 rc = obd_unpackmd(sbi->ll_osc_exp, &lsm, lmm, lmmsize);
2238                 if (rc < 0)
2239                         GOTO(out, rc = -ENOMEM);
2240                 rc = obd_checkmd(sbi->ll_osc_exp, sbi->ll_mdc_exp, lsm);
2241                 if (rc)
2242                         GOTO(out_free_memmd, rc);
2243
2244                 lmj_size = sizeof(struct lov_user_md_join) +
2245                         lsm->lsm_stripe_count *
2246                         sizeof(struct lov_user_ost_data_join);
2247                 OBD_ALLOC(lmj, lmj_size);
2248                 if (!lmj)
2249                         GOTO(out_free_memmd, rc = -ENOMEM);
2250
2251                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
2252                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2253                         struct lov_extent *lex =
2254                                 &lsm->lsm_array->lai_ext_array[aindex];
2255
2256                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
2257                                 aindex ++;
2258                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
2259                                         LPU64" len %d\n", aindex, i,
2260                                         lex->le_start, (int)lex->le_len);
2261                         lmj->lmm_objects[i].l_extent_start =
2262                                 lex->le_start;
2263
2264                         if ((int)lex->le_len == -1)
2265                                 lmj->lmm_objects[i].l_extent_end = -1;
2266                         else
2267                                 lmj->lmm_objects[i].l_extent_end =
2268                                         lex->le_start + lex->le_len;
2269                         lmj->lmm_objects[i].l_object_id =
2270                                 lsm->lsm_oinfo[i]->loi_id;
2271                         lmj->lmm_objects[i].l_object_gr =
2272                                 lsm->lsm_oinfo[i]->loi_gr;
2273                         lmj->lmm_objects[i].l_ost_gen =
2274                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2275                         lmj->lmm_objects[i].l_ost_idx =
2276                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2277                 }
2278                 lmm = (struct lov_mds_md *)lmj;
2279                 lmmsize = lmj_size;
2280 out_free_memmd:
2281                 obd_free_memmd(sbi->ll_osc_exp, &lsm);
2282         }
2283 out:
2284         *lmmp = lmm;
2285         *lmm_size = lmmsize;
2286         *request = req;
2287         return rc;
2288 }
2289 static int ll_lov_setea(struct inode *inode, struct file *file,
2290                             unsigned long arg)
2291 {
2292         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2293         struct lov_user_md  *lump;
2294         int lum_size = sizeof(struct lov_user_md) +
2295                        sizeof(struct lov_user_ost_data);
2296         int rc;
2297         ENTRY;
2298
2299         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2300                 RETURN(-EPERM);
2301
2302         OBD_ALLOC(lump, lum_size);
2303         if (lump == NULL) {
2304                 RETURN(-ENOMEM);
2305         }
2306         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2307         if (rc) {
2308                 OBD_FREE(lump, lum_size);
2309                 RETURN(-EFAULT);
2310         }
2311
2312         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2313
2314         OBD_FREE(lump, lum_size);
2315         RETURN(rc);
2316 }
2317
2318 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2319                             unsigned long arg)
2320 {
2321         struct lov_user_md_v3 lumv3;
2322         struct lov_user_md_v1 *lumv1 = (struct lov_user_md_v1 *)&lumv3;
2323         struct lov_user_md_v1 *lumv1p = (struct lov_user_md_v1 *)arg;
2324         struct lov_user_md_v3 *lumv3p = (struct lov_user_md_v3 *)arg;
2325         int lum_size;
2326         int rc;
2327         int flags = FMODE_WRITE;
2328         ENTRY;
2329
2330         /* first try with v1 which is smaller than v3 */
2331         lum_size = sizeof(struct lov_user_md_v1);
2332         rc = copy_from_user(lumv1, lumv1p, lum_size);
2333         if (rc)
2334                 RETURN(-EFAULT);
2335
2336         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
2337                 lum_size = sizeof(struct lov_user_md_v3);
2338                 rc = copy_from_user(&lumv3, lumv3p, lum_size);
2339                 if (rc)
2340                         RETURN(-EFAULT);
2341         }
2342
2343         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
2344         if (rc == 0) {
2345                  put_user(0, &lumv1p->lmm_stripe_count);
2346                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode),
2347                                     0, ll_i2info(inode)->lli_smd,
2348                                     (void *)arg);
2349         }
2350         RETURN(rc);
2351 }
2352
2353 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2354 {
2355         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2356
2357         if (!lsm)
2358                 RETURN(-ENODATA);
2359
2360         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode), 0, lsm,
2361                             (void *)arg);
2362 }
2363
2364 static int ll_get_grouplock(struct inode *inode, struct file *file,
2365                             unsigned long arg)
2366 {
2367         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2368         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2369                                                     .end = OBD_OBJECT_EOF}};
2370         struct lustre_handle lockh = { 0 };
2371         struct ll_inode_info *lli = ll_i2info(inode);
2372         struct lov_stripe_md *lsm = lli->lli_smd;
2373         int flags = 0, rc;
2374         ENTRY;
2375
2376         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2377                 RETURN(-EINVAL);
2378         }
2379
2380         policy.l_extent.gid = arg;
2381         if (file->f_flags & O_NONBLOCK)
2382                 flags = LDLM_FL_BLOCK_NOWAIT;
2383
2384         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2385         if (rc)
2386                 RETURN(rc);
2387
2388         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2389         fd->fd_gid = arg;
2390         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2391
2392         RETURN(0);
2393 }
2394
2395 static int ll_put_grouplock(struct inode *inode, struct file *file,
2396                             unsigned long arg)
2397 {
2398         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2399         struct ll_inode_info *lli = ll_i2info(inode);
2400         struct lov_stripe_md *lsm = lli->lli_smd;
2401         int rc;
2402         ENTRY;
2403
2404         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2405                 /* Ugh, it's already unlocked. */
2406                 RETURN(-EINVAL);
2407         }
2408
2409         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2410                 RETURN(-EINVAL);
2411
2412         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2413
2414         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2415         if (rc)
2416                 RETURN(rc);
2417
2418         fd->fd_gid = 0;
2419         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2420
2421         RETURN(0);
2422 }
2423
2424 #if LUSTRE_FIX >= 50
2425 static int join_sanity_check(struct inode *head, struct inode *tail)
2426 {
2427         ENTRY;
2428         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2429                 CERROR("server do not support join \n");
2430                 RETURN(-EINVAL);
2431         }
2432         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2433                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2434                        head->i_ino, tail->i_ino);
2435                 RETURN(-EINVAL);
2436         }
2437         if (head->i_ino == tail->i_ino) {
2438                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2439                 RETURN(-EINVAL);
2440         }
2441         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2442                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2443                 RETURN(-EINVAL);
2444         }
2445         RETURN(0);
2446 }
2447
2448 static int join_file(struct inode *head_inode, struct file *head_filp,
2449                      struct file *tail_filp)
2450 {
2451         struct dentry *tail_dentry = tail_filp->f_dentry;
2452         struct lookup_intent oit = {.it_op = IT_OPEN,
2453                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2454         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_PW,
2455                 ll_mdc_blocking_ast, ldlm_completion_ast, NULL, NULL };
2456
2457         struct lustre_handle lockh;
2458         struct mdc_op_data *op_data;
2459         int    rc;
2460         loff_t data;
2461         ENTRY;
2462
2463         tail_dentry = tail_filp->f_dentry;
2464
2465         OBD_ALLOC_PTR(op_data);
2466         if (op_data == NULL) {
2467                 RETURN(-ENOMEM);
2468         }
2469
2470         data = i_size_read(head_inode);
2471         ll_prepare_mdc_op_data(op_data, head_inode,
2472                                tail_dentry->d_parent->d_inode,
2473                                tail_dentry->d_name.name,
2474                                tail_dentry->d_name.len, 0, &data);
2475         rc = mdc_enqueue(ll_i2mdcexp(head_inode), &einfo, &oit,
2476                          op_data, &lockh, NULL, 0, 0);
2477
2478         if (rc < 0)
2479                 GOTO(out, rc);
2480
2481         rc = oit.d.lustre.it_status;
2482
2483         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2484                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2485                 ptlrpc_req_finished((struct ptlrpc_request *)
2486                                     oit.d.lustre.it_data);
2487                 GOTO(out, rc);
2488         }
2489
2490         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2491                                            * away */
2492                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2493                 oit.d.lustre.it_lock_mode = 0;
2494         }
2495         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
2496         it_clear_disposition(&oit, DISP_ENQ_COMPLETE);
2497         ll_release_openhandle(head_filp->f_dentry, &oit);
2498 out:
2499         if (op_data)
2500                 OBD_FREE_PTR(op_data);
2501         ll_intent_release(&oit);
2502         RETURN(rc);
2503 }
2504
2505 static int ll_file_join(struct inode *head, struct file *filp,
2506                         char *filename_tail)
2507 {
2508         struct inode *tail = NULL, *first = NULL, *second = NULL;
2509         struct dentry *tail_dentry;
2510         struct file *tail_filp, *first_filp, *second_filp;
2511         struct ll_lock_tree first_tree, second_tree;
2512         struct ll_lock_tree_node *first_node, *second_node;
2513         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2514         int rc = 0, cleanup_phase = 0;
2515         ENTRY;
2516
2517         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2518                head->i_ino, head->i_generation, head, filename_tail);
2519
2520         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2521         if (IS_ERR(tail_filp)) {
2522                 CERROR("Can not open tail file %s", filename_tail);
2523                 rc = PTR_ERR(tail_filp);
2524                 GOTO(cleanup, rc);
2525         }
2526         tail = igrab(tail_filp->f_dentry->d_inode);
2527
2528         tlli = ll_i2info(tail);
2529         tail_dentry = tail_filp->f_dentry;
2530         LASSERT(tail_dentry);
2531         cleanup_phase = 1;
2532
2533         /*reorder the inode for lock sequence*/
2534         first = head->i_ino > tail->i_ino ? head : tail;
2535         second = head->i_ino > tail->i_ino ? tail : head;
2536         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2537         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2538
2539         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2540                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2541         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2542         if (IS_ERR(first_node)){
2543                 rc = PTR_ERR(first_node);
2544                 GOTO(cleanup, rc);
2545         }
2546         first_tree.lt_fd = first_filp->private_data;
2547         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2548         if (rc != 0)
2549                 GOTO(cleanup, rc);
2550         cleanup_phase = 2;
2551
2552         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2553         if (IS_ERR(second_node)){
2554                 rc = PTR_ERR(second_node);
2555                 GOTO(cleanup, rc);
2556         }
2557         second_tree.lt_fd = second_filp->private_data;
2558         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2559         if (rc != 0)
2560                 GOTO(cleanup, rc);
2561         cleanup_phase = 3;
2562
2563         rc = join_sanity_check(head, tail);
2564         if (rc)
2565                 GOTO(cleanup, rc);
2566
2567         rc = join_file(head, filp, tail_filp);
2568         if (rc)
2569                 GOTO(cleanup, rc);
2570 cleanup:
2571         switch (cleanup_phase) {
2572         case 3:
2573                 ll_tree_unlock(&second_tree);
2574                 obd_cancel_unused(ll_i2obdexp(second),
2575                                   ll_i2info(second)->lli_smd, 0, NULL);
2576         case 2:
2577                 ll_tree_unlock(&first_tree);
2578                 obd_cancel_unused(ll_i2obdexp(first),
2579                                   ll_i2info(first)->lli_smd, 0, NULL);
2580         case 1:
2581                 filp_close(tail_filp, 0);
2582                 if (tail)
2583                         iput(tail);
2584                 if (head && rc == 0) {
2585                         obd_free_memmd(ll_i2sbi(head)->ll_osc_exp,
2586                                        &hlli->lli_smd);
2587                         hlli->lli_smd = NULL;
2588                 }
2589         case 0:
2590                 break;
2591         default:
2592                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2593                 LBUG();
2594         }
2595         RETURN(rc);
2596 }
2597 #endif  /* LUSTRE_FIX >= 50 */
2598
2599 /**
2600  * Close inode open handle
2601  *
2602  * \param dentry [in]     dentry which contains the inode
2603  * \param it     [in,out] intent which contains open info and result
2604  *
2605  * \retval 0     success
2606  * \retval <0    failure
2607  */
2608 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2609 {
2610         struct inode *inode = dentry->d_inode;
2611         struct obd_client_handle *och;
2612         int rc;
2613         ENTRY;
2614
2615         LASSERT(inode);
2616
2617         /* Root ? Do nothing. */
2618         if (dentry->d_inode->i_sb->s_root == dentry)
2619                 RETURN(0);
2620
2621         /* No open handle to close? Move away */
2622         if (!it_disposition(it, DISP_OPEN_OPEN))
2623                 RETURN(0);
2624
2625         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2626
2627         OBD_ALLOC(och, sizeof(*och));
2628         if (!och)
2629                 GOTO(out, rc = -ENOMEM);
2630
2631         ll_och_fill(ll_i2info(inode), it, och);
2632
2633         rc = ll_close_inode_openhandle(inode, och);
2634
2635         OBD_FREE(och, sizeof(*och));
2636  out:
2637         /* this one is in place of ll_file_open */
2638         if (it_disposition(it, DISP_ENQ_OPEN_REF))
2639                 ptlrpc_req_finished(it->d.lustre.it_data);
2640         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2641         RETURN(rc);
2642 }
2643
2644 int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
2645               int num_bytes)
2646 {
2647         struct obd_export *exp = ll_i2obdexp(inode);
2648         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2649         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
2650         int vallen = num_bytes;
2651         int rc;
2652         ENTRY;
2653
2654         /* If the stripe_count > 1 and the application does not understand
2655          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
2656          */
2657         if (lsm->lsm_stripe_count > 1 &&
2658             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
2659                 return -EOPNOTSUPP;
2660
2661         fm_key.oa.o_id = lsm->lsm_object_id;
2662         fm_key.oa.o_valid = OBD_MD_FLID;
2663
2664         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLFID | OBD_MD_FLSIZE);
2665
2666         /* If filesize is 0, then there would be no objects for mapping */
2667         if (fm_key.oa.o_size == 0) {
2668                 fiemap->fm_mapped_extents = 0;
2669                 RETURN(0);
2670         }
2671
2672         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
2673
2674         rc = obd_get_info(exp, sizeof(fm_key), &fm_key, &vallen, fiemap, lsm);
2675         if (rc)
2676                 CERROR("obd_get_info failed: rc = %d\n", rc);
2677
2678         RETURN(rc);
2679 }
2680
2681 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2682                   unsigned long arg)
2683 {
2684         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2685         int flags;
2686         ENTRY;
2687
2688         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2689                inode->i_generation, inode, cmd);
2690         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2691
2692         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2693         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2694                 RETURN(-ENOTTY);
2695
2696         switch(cmd) {
2697         case LL_IOC_GETFLAGS:
2698                 /* Get the current value of the file flags */
2699                 return put_user(fd->fd_flags, (int *)arg);
2700         case LL_IOC_SETFLAGS:
2701         case LL_IOC_CLRFLAGS:
2702                 /* Set or clear specific file flags */
2703                 /* XXX This probably needs checks to ensure the flags are
2704                  *     not abused, and to handle any flag side effects.
2705                  */
2706                 if (get_user(flags, (int *) arg))
2707                         RETURN(-EFAULT);
2708
2709                 if (cmd == LL_IOC_SETFLAGS) {
2710                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2711                             !(file->f_flags & O_DIRECT)) {
2712                                 CERROR("%s: unable to disable locking on "
2713                                        "non-O_DIRECT file\n", current->comm);
2714                                 RETURN(-EINVAL);
2715                         }
2716
2717                         fd->fd_flags |= flags;
2718                 } else {
2719                         fd->fd_flags &= ~flags;
2720                 }
2721                 RETURN(0);
2722         case LL_IOC_LOV_SETSTRIPE:
2723                 RETURN(ll_lov_setstripe(inode, file, arg));
2724         case LL_IOC_LOV_SETEA:
2725                 RETURN(ll_lov_setea(inode, file, arg));
2726         case LL_IOC_LOV_GETSTRIPE:
2727                 RETURN(ll_lov_getstripe(inode, arg));
2728         case LL_IOC_RECREATE_OBJ:
2729                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2730         case EXT3_IOC_FIEMAP: {
2731                 struct ll_user_fiemap *fiemap_s;
2732                 size_t num_bytes, ret_bytes;
2733                 unsigned int extent_count;
2734                 int rc = 0;
2735
2736                 /* Get the extent count so we can calculate the size of
2737                  * required fiemap buffer */
2738                 if (get_user(extent_count,
2739                     &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
2740                         RETURN(-EFAULT);
2741                 num_bytes = sizeof(*fiemap_s) + (extent_count *
2742                                                  sizeof(struct ll_fiemap_extent));
2743                 OBD_VMALLOC(fiemap_s, num_bytes);
2744                 if (fiemap_s == NULL)
2745                         RETURN(-ENOMEM);
2746
2747                 if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
2748                                    sizeof(*fiemap_s)))
2749                         GOTO(error, rc = -EFAULT);
2750
2751                 if (fiemap_s->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
2752                         fiemap_s->fm_flags = fiemap_s->fm_flags &
2753                                                     ~LUSTRE_FIEMAP_FLAGS_COMPAT;
2754                         if (copy_to_user((char *)arg, fiemap_s,
2755                                          sizeof(*fiemap_s)))
2756                                 GOTO(error, rc = -EFAULT);
2757
2758                         GOTO(error, rc = -EBADR);
2759                 }
2760
2761                 /* If fm_extent_count is non-zero, read the first extent since
2762                  * it is used to calculate end_offset and device from previous
2763                  * fiemap call. */
2764                 if (extent_count) {
2765                         if (copy_from_user(&fiemap_s->fm_extents[0],
2766                             (char __user *)arg + sizeof(*fiemap_s),
2767                             sizeof(struct ll_fiemap_extent)))
2768                                 GOTO(error, rc = -EFAULT);
2769                 }
2770
2771                 if (fiemap_s->fm_flags & FIEMAP_FLAG_SYNC) {
2772                         int rc;
2773
2774                         rc = filemap_fdatawrite(inode->i_mapping);
2775                         if (rc)
2776                                 GOTO(error, rc);
2777                 }
2778
2779                 rc = ll_fiemap(inode, fiemap_s, num_bytes);
2780                 if (rc)
2781                         GOTO(error, rc);
2782
2783                 ret_bytes = sizeof(struct ll_user_fiemap);
2784
2785                 if (extent_count != 0)
2786                         ret_bytes += (fiemap_s->fm_mapped_extents *
2787                                          sizeof(struct ll_fiemap_extent));
2788
2789                 if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
2790                         rc = -EFAULT;
2791
2792 error:
2793                 OBD_VFREE(fiemap_s, num_bytes);
2794                 RETURN(rc);
2795         }
2796         case EXT3_IOC_GETFLAGS:
2797         case EXT3_IOC_SETFLAGS:
2798                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2799         case EXT3_IOC_GETVERSION_OLD:
2800         case EXT3_IOC_GETVERSION:
2801                 RETURN(put_user(inode->i_generation, (int *)arg));
2802         case LL_IOC_JOIN: {
2803 #if LUSTRE_FIX >= 50
2804                 /* Allow file join in beta builds to allow debuggging */
2805                 char *ftail;
2806                 int rc;
2807
2808                 ftail = getname((const char *)arg);
2809                 if (IS_ERR(ftail))
2810                         RETURN(PTR_ERR(ftail));
2811                 rc = ll_file_join(inode, file, ftail);
2812                 putname(ftail);
2813                 RETURN(rc);
2814 #else
2815                 CWARN("file join is not supported in this version of Lustre\n");
2816                 RETURN(-ENOTTY);
2817 #endif
2818         }
2819         case LL_IOC_GROUP_LOCK:
2820                 RETURN(ll_get_grouplock(inode, file, arg));
2821         case LL_IOC_GROUP_UNLOCK:
2822                 RETURN(ll_put_grouplock(inode, file, arg));
2823         case IOC_OBD_STATFS:
2824                 RETURN(ll_obd_statfs(inode, (void *)arg));
2825         case OBD_IOC_GETNAME_OLD:
2826         case OBD_IOC_GETNAME: {
2827                 struct obd_device *obd =
2828                         class_exp2obd(ll_i2sbi(inode)->ll_osc_exp);
2829                 if (!obd)
2830                         RETURN(-EFAULT);
2831                 if (copy_to_user((void *)arg, obd->obd_name,
2832                                 strlen(obd->obd_name) + 1))
2833                         RETURN (-EFAULT);
2834                 RETURN(0);
2835         }
2836
2837         /* We need to special case any other ioctls we want to handle,
2838          * to send them to the MDS/OST as appropriate and to properly
2839          * network encode the arg field.
2840         case EXT3_IOC_SETVERSION_OLD:
2841         case EXT3_IOC_SETVERSION:
2842         */
2843         default: {
2844                 int err;
2845
2846                 if (LLIOC_STOP ==
2847                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2848                         RETURN(err);
2849
2850                 RETURN(obd_iocontrol(cmd, ll_i2obdexp(inode), 0, NULL,
2851                                      (void *)arg));
2852         }
2853         }
2854 }
2855
2856 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2857 {
2858         struct inode *inode = file->f_dentry->d_inode;
2859         struct ll_inode_info *lli = ll_i2info(inode);
2860         struct lov_stripe_md *lsm = lli->lli_smd;
2861         loff_t retval;
2862         ENTRY;
2863         retval = offset + ((origin == 2) ? i_size_read(inode) :
2864                            (origin == 1) ? file->f_pos : 0);
2865         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2866                inode->i_ino, inode->i_generation, inode, retval, retval,
2867                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2868         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2869
2870         if (origin == 2) { /* SEEK_END */
2871                 int nonblock = 0, rc;
2872
2873                 if (file->f_flags & O_NONBLOCK)
2874                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2875
2876                 if (lsm != NULL) {
2877                         rc = ll_glimpse_size(inode, nonblock);
2878                         if (rc != 0)
2879                                 RETURN(rc);
2880                 }
2881
2882                 ll_inode_size_lock(inode, 0);
2883                 offset += i_size_read(inode);
2884                 ll_inode_size_unlock(inode, 0);
2885         } else if (origin == 1) { /* SEEK_CUR */
2886                 offset += file->f_pos;
2887         }
2888
2889         retval = -EINVAL;
2890         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2891                 if (offset != file->f_pos) {
2892                         file->f_pos = offset;
2893                         file->f_version = 0;
2894                 }
2895                 retval = offset;
2896         }
2897
2898         RETURN(retval);
2899 }
2900
2901 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2902 {
2903         struct inode *inode = dentry->d_inode;
2904         struct ll_inode_info *lli = ll_i2info(inode);
2905         struct lov_stripe_md *lsm = lli->lli_smd;
2906         struct ll_fid fid;
2907         struct ptlrpc_request *req;
2908         int rc, err;
2909         ENTRY;
2910         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2911                inode->i_generation, inode);
2912         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2913
2914         /* fsync's caller has already called _fdata{sync,write}, we want
2915          * that IO to finish before calling the osc and mdc sync methods */
2916         rc = filemap_fdatawait(inode->i_mapping);
2917
2918         /* catch async errors that were recorded back when async writeback
2919          * failed for pages in this mapping. */
2920         err = lli->lli_async_rc;
2921         lli->lli_async_rc = 0;
2922         if (rc == 0)
2923                 rc = err;
2924         if (lsm) {
2925                 err = lov_test_and_clear_async_rc(lsm);
2926                 if (rc == 0)
2927                         rc = err;
2928         }
2929
2930         ll_inode2fid(&fid, inode);
2931         err = mdc_sync(ll_i2sbi(inode)->ll_mdc_exp, &fid, &req);
2932         if (!rc)
2933                 rc = err;
2934         if (!err)
2935                 ptlrpc_req_finished(req);
2936
2937         if (data && lsm) {
2938                 struct obd_info *oinfo;
2939
2940                 OBD_ALLOC_PTR(oinfo);
2941                 if (!oinfo)
2942                         RETURN(rc ? rc : -ENOMEM);
2943                 OBDO_ALLOC(oinfo->oi_oa);
2944                 if (!oinfo->oi_oa) {
2945                         OBD_FREE_PTR(oinfo);
2946                         RETURN(rc ? rc : -ENOMEM);
2947                 }
2948                 oinfo->oi_oa->o_id = lsm->lsm_object_id;
2949                 oinfo->oi_oa->o_gr = lsm->lsm_object_gr;
2950                 oinfo->oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2951                 obdo_from_inode(oinfo->oi_oa, inode,
2952                                 OBD_MD_FLTYPE | OBD_MD_FLATIME |
2953                                 OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2954                 oinfo->oi_md = lsm;
2955                 err = obd_sync_rqset(ll_i2sbi(inode)->ll_osc_exp, oinfo,
2956                                      0, OBD_OBJECT_EOF);
2957                 if (!rc)
2958                         rc = err;
2959                 OBDO_FREE(oinfo->oi_oa);
2960                 OBD_FREE_PTR(oinfo);
2961         }
2962
2963         RETURN(rc);
2964 }
2965
2966 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2967 {
2968         struct inode *inode = file->f_dentry->d_inode;
2969         struct ll_sb_info *sbi = ll_i2sbi(inode);
2970         struct lu_fid *fid = ll_inode_lu_fid(inode);
2971         struct ldlm_res_id res_id =
2972                     { .name = { fid_seq(fid),
2973                                 fid_oid(fid),
2974                                 fid_ver(fid),
2975                                 LDLM_FLOCK} };
2976         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2977                 ldlm_flock_completion_ast, NULL, file_lock };
2978         struct lustre_handle lockh = {0};
2979         ldlm_policy_data_t flock;
2980         int flags = 0;
2981         int rc;
2982         ENTRY;
2983
2984         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2985                inode->i_ino, file_lock);
2986         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2987
2988         if (fid_is_igif(fid)) {
2989                 /* If this is an IGIF inode, we need to keep the 1.6-style
2990                  * flock mapping for compatibility.  If it is a proper FID
2991                  * then we know any other client accessing it must also be
2992                  * accessing it as a FID and can use the CMD-style flock. */
2993                 res_id.name[2] = LDLM_FLOCK;
2994                 res_id.name[3] = 0;
2995         }
2996
2997         if (file_lock->fl_flags & FL_FLOCK) {
2998                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2999                 /* set missing params for flock() calls */
3000                 file_lock->fl_end = OFFSET_MAX;
3001                 file_lock->fl_pid = current->tgid;
3002         }
3003         flock.l_flock.pid = file_lock->fl_pid;
3004         flock.l_flock.start = file_lock->fl_start;
3005         flock.l_flock.end = file_lock->fl_end;
3006
3007         switch (file_lock->fl_type) {
3008         case F_RDLCK:
3009                 einfo.ei_mode = LCK_PR;
3010                 break;
3011         case F_UNLCK:
3012                 /* An unlock request may or may not have any relation to
3013                  * existing locks so we may not be able to pass a lock handle
3014                  * via a normal ldlm_lock_cancel() request. The request may even
3015                  * unlock a byte range in the middle of an existing lock. In
3016                  * order to process an unlock request we need all of the same
3017                  * information that is given with a normal read or write record
3018                  * lock request. To avoid creating another ldlm unlock (cancel)
3019                  * message we'll treat a LCK_NL flock request as an unlock. */
3020                 einfo.ei_mode = LCK_NL;
3021                 break;
3022         case F_WRLCK:
3023                 einfo.ei_mode = LCK_PW;
3024                 break;
3025         default:
3026                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
3027                 RETURN (-EINVAL);
3028         }
3029
3030         switch (cmd) {
3031         case F_SETLKW:
3032 #ifdef F_SETLKW64
3033         case F_SETLKW64:
3034 #endif
3035                 flags = 0;
3036                 break;
3037         case F_SETLK:
3038 #ifdef F_SETLK64
3039         case F_SETLK64:
3040 #endif
3041                 flags = LDLM_FL_BLOCK_NOWAIT;
3042                 break;
3043         case F_GETLK:
3044 #ifdef F_GETLK64
3045         case F_GETLK64:
3046 #endif
3047                 flags = LDLM_FL_TEST_LOCK;
3048                 /* Save the old mode so that if the mode in the lock changes we
3049                  * can decrement the appropriate reader or writer refcount. */
3050                 file_lock->fl_type = einfo.ei_mode;
3051                 break;
3052         default:
3053                 CERROR("unknown fcntl lock command: %d\n", cmd);
3054                 RETURN (-EINVAL);
3055         }
3056
3057         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
3058                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
3059                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
3060
3061         rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, &einfo, res_id,
3062                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
3063         if ((file_lock->fl_flags & FL_FLOCK) &&
3064             (rc == 0 || file_lock->fl_type == F_UNLCK))
3065                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
3066 #ifdef HAVE_F_OP_FLOCK
3067         if ((file_lock->fl_flags & FL_POSIX) &&
3068             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
3069             !(flags & LDLM_FL_TEST_LOCK))
3070                 posix_lock_file_wait(file, file_lock);
3071 #endif
3072
3073         RETURN(rc);
3074 }
3075
3076 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
3077 {
3078         ENTRY;
3079
3080         RETURN(-ENOSYS);
3081 }
3082
3083 int ll_have_md_lock(struct inode *inode, __u64 bits)
3084 {
3085         struct lustre_handle lockh;
3086         struct ldlm_res_id res_id;
3087         struct obd_device *obddev;
3088         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
3089         int flags;
3090         ENTRY;
3091
3092         if (!inode)
3093                RETURN(0);
3094
3095         obddev = ll_i2mdcexp(inode)->exp_obd;
3096         fid_build_reg_res_name(ll_inode_lu_fid(inode), &res_id);
3097
3098         CDEBUG(D_INFO, "trying to match res "LPU64":"LPU64":"LPU64"\n",
3099                 res_id.name[0],
3100                 res_id.name[1],
3101                 res_id.name[2]);
3102
3103         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
3104         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
3105                             &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
3106                 RETURN(1);
3107         }
3108
3109         RETURN(0);
3110 }
3111
3112 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
3113         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
3114                               * and return success */
3115                 inode->i_nlink = 0;
3116                 /* This path cannot be hit for regular files unless in
3117                  * case of obscure races, so no need to to validate
3118                  * size. */
3119                 if (!S_ISREG(inode->i_mode) &&
3120                     !S_ISDIR(inode->i_mode))
3121                         return 0;
3122         }
3123
3124         if (rc) {
3125                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
3126                 return -abs(rc);
3127
3128         }
3129
3130         return 0;
3131 }
3132
3133 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
3134 {
3135         struct inode *inode = dentry->d_inode;
3136         struct ptlrpc_request *req = NULL;
3137         struct obd_export *exp;
3138         int rc;
3139         ENTRY;
3140
3141         if (!inode) {
3142                 CERROR("REPORT THIS LINE TO PETER\n");
3143                 RETURN(0);
3144         }
3145         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
3146                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
3147
3148         exp = ll_i2mdcexp(inode);
3149
3150         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
3151                 struct lookup_intent oit = { .it_op = IT_GETATTR };
3152                 struct mdc_op_data op_data = { { 0 } };
3153
3154                 /* Call getattr by fid, so do not provide name at all. */
3155                 ll_prepare_mdc_op_data(&op_data, dentry->d_parent->d_inode,
3156                                        dentry->d_inode, NULL, 0, 0, NULL);
3157                 oit.it_flags |= O_CHECK_STALE;
3158                 rc = mdc_intent_lock(exp, &op_data, NULL, 0,
3159                                      /* we are not interested in name
3160                                         based lookup */
3161                                      &oit, 0, &req,
3162                                      ll_mdc_blocking_ast, 0);
3163                 oit.it_flags &= ~O_CHECK_STALE;
3164                 if (rc < 0) {
3165                         rc = ll_inode_revalidate_fini(inode, rc);
3166                         GOTO (out, rc);
3167                 }
3168
3169                 rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
3170                 if (rc != 0) {
3171                         ll_intent_release(&oit);
3172                         GOTO(out, rc);
3173                 }
3174
3175                 /* Unlinked? Unhash dentry, so it is not picked up later by
3176                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
3177                    here to preserve get_cwd functionality on 2.6.
3178                    Bug 10503 */
3179                 if (!dentry->d_inode->i_nlink) {
3180                         spin_lock(&ll_lookup_lock);
3181                         spin_lock(&dcache_lock);
3182                         ll_drop_dentry(dentry);
3183                         spin_unlock(&dcache_lock);
3184                         spin_unlock(&ll_lookup_lock);
3185                 }
3186
3187                 ll_lookup_finish_locks(&oit, dentry);
3188         } else if (!ll_have_md_lock(dentry->d_inode,
3189                                   MDS_INODELOCK_UPDATE|MDS_INODELOCK_LOOKUP)) {
3190                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
3191                 struct ll_fid fid;
3192                 obd_valid valid = OBD_MD_FLGETATTR;
3193                 int ealen = 0;
3194
3195                 if (S_ISREG(inode->i_mode)) {
3196                         rc = ll_get_max_mdsize(sbi, &ealen);
3197                         if (rc)
3198                                 RETURN(rc);
3199                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
3200                 }
3201                 ll_inode2fid(&fid, inode);
3202                 rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req);
3203                 if (rc) {
3204                         rc = ll_inode_revalidate_fini(inode, rc);
3205                         RETURN(rc);
3206                 }
3207
3208                 rc = ll_prep_inode(sbi->ll_osc_exp, &inode, req, REPLY_REC_OFF,
3209                                    NULL);
3210                 if (rc)
3211                         GOTO(out, rc);
3212         }
3213
3214         /* if object not yet allocated, don't validate size */
3215         if (ll_i2info(inode)->lli_smd == NULL) {
3216                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
3217                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
3218                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
3219                 GOTO(out, rc = 0);
3220         }
3221
3222         /* ll_glimpse_size will prefer locally cached writes if they extend
3223          * the file */
3224         rc = ll_glimpse_size(inode, 0);
3225
3226 out:
3227         ptlrpc_req_finished(req);
3228         RETURN(rc);
3229 }
3230
3231 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
3232                   struct lookup_intent *it, struct kstat *stat)
3233 {
3234         struct inode *inode = de->d_inode;
3235         int res = 0;
3236
3237         res = ll_inode_revalidate_it(de, it);
3238         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
3239
3240         if (res)
3241                 return res;
3242
3243         stat->dev = inode->i_sb->s_dev;
3244         stat->ino = inode->i_ino;
3245         stat->mode = inode->i_mode;
3246         stat->nlink = inode->i_nlink;
3247         stat->uid = inode->i_uid;
3248         stat->gid = inode->i_gid;
3249         stat->rdev = kdev_t_to_nr(inode->i_rdev);
3250         stat->atime = inode->i_atime;
3251         stat->mtime = inode->i_mtime;
3252         stat->ctime = inode->i_ctime;
3253 #ifdef HAVE_INODE_BLKSIZE
3254         stat->blksize = inode->i_blksize;
3255 #else
3256         stat->blksize = 1<<inode->i_blkbits;
3257 #endif
3258
3259         ll_inode_size_lock(inode, 0);
3260         stat->size = i_size_read(inode);
3261         stat->blocks = inode->i_blocks;
3262         ll_inode_size_unlock(inode, 0);
3263
3264         return 0;
3265 }
3266 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3267 {
3268         struct lookup_intent it = { .it_op = IT_GETATTR };
3269
3270         return ll_getattr_it(mnt, de, &it, stat);
3271 }
3272
3273 static
3274 int lustre_check_acl(struct inode *inode, int mask)
3275 {
3276 #ifdef CONFIG_FS_POSIX_ACL
3277         struct ll_inode_info *lli = ll_i2info(inode);
3278         struct posix_acl *acl;
3279         int rc;
3280         ENTRY;
3281
3282         spin_lock(&lli->lli_lock);
3283         acl = posix_acl_dup(lli->lli_posix_acl);
3284         spin_unlock(&lli->lli_lock);
3285
3286         if (!acl)
3287                 RETURN(-EAGAIN);
3288
3289         rc = posix_acl_permission(inode, acl, mask);
3290         posix_acl_release(acl);
3291
3292         RETURN(rc);
3293 #else
3294         return -EAGAIN;
3295 #endif
3296 }
3297
3298 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
3299 #ifndef HAVE_INODE_PERMISION_2ARGS
3300 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3301 #else
3302 int ll_inode_permission(struct inode *inode, int mask)
3303 #endif
3304 {
3305         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3306                inode->i_ino, inode->i_generation, inode, mask);
3307
3308         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3309         return generic_permission(inode, mask, lustre_check_acl);
3310 }
3311 #else
3312 #ifndef HAVE_INODE_PERMISION_2ARGS
3313 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
3314 #else
3315 int ll_inode_permission(struct inode *inode, int mask)
3316 #endif
3317 {
3318         int mode = inode->i_mode;
3319         int rc;
3320
3321         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
3322                inode->i_ino, inode->i_generation, inode, mask);
3323         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3324
3325         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
3326             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
3327                 return -EROFS;
3328         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
3329                 return -EACCES;
3330         if (current->fsuid == inode->i_uid) {
3331                 mode >>= 6;
3332         } else if (1) {
3333                 if (((mode >> 3) & mask & S_IRWXO) != mask)
3334                         goto check_groups;
3335                 rc = lustre_check_acl(inode, mask);
3336                 if (rc == -EAGAIN)
3337                         goto check_groups;
3338                 if (rc == -EACCES)
3339                         goto check_capabilities;
3340                 return rc;
3341         } else {
3342 check_groups:
3343                 if (in_group_p(inode->i_gid))
3344                         mode >>= 3;
3345         }
3346         if ((mode & mask & S_IRWXO) == mask)
3347                 return 0;
3348
3349 check_capabilities:
3350         if (!(mask & MAY_EXEC) ||
3351             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
3352                 if (cfs_capable(CFS_CAP_DAC_OVERRIDE))
3353                         return 0;
3354
3355         if (cfs_capable(CFS_CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
3356             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
3357                 return 0;
3358
3359         return -EACCES;
3360 }
3361 #endif
3362
3363 /* -o localflock - only provides locally consistent flock locks */
3364 struct file_operations ll_file_operations = {
3365         .read           = ll_file_read,
3366 #ifdef HAVE_FILE_READV
3367         .readv          = ll_file_readv,
3368 #else
3369         .aio_read       = ll_file_aio_read,
3370 #endif
3371         .write          = ll_file_write,
3372 #ifdef HAVE_FILE_WRITEV
3373         .writev         = ll_file_writev,
3374 #else
3375         .aio_write      = ll_file_aio_write,
3376 #endif
3377         .ioctl          = ll_file_ioctl,
3378         .open           = ll_file_open,
3379         .release        = ll_file_release,
3380         .mmap           = ll_file_mmap,
3381         .llseek         = ll_file_seek,
3382 #ifdef HAVE_KERNEL_SPLICE_READ
3383         .splice_read    = ll_file_splice_read,
3384 #endif
3385 #ifdef HAVE_KERNEL_SENDFILE
3386         .sendfile       = ll_file_sendfile,
3387 #endif
3388         .fsync          = ll_fsync,
3389 };
3390
3391 struct file_operations ll_file_operations_flock = {
3392         .read           = ll_file_read,
3393 #ifdef HAVE_FILE_READV
3394         .readv          = ll_file_readv,
3395 #else
3396         .aio_read       = ll_file_aio_read,
3397 #endif
3398         .write          = ll_file_write,
3399 #ifdef HAVE_FILE_WRITEV
3400         .writev         = ll_file_writev,
3401 #else
3402         .aio_write      = ll_file_aio_write,
3403 #endif
3404         .ioctl          = ll_file_ioctl,
3405         .open           = ll_file_open,
3406         .release        = ll_file_release,
3407         .mmap           = ll_file_mmap,
3408         .llseek         = ll_file_seek,
3409 #ifdef HAVE_KERNEL_SPLICE_READ
3410         .splice_read    = ll_file_splice_read,
3411 #endif
3412 #ifdef HAVE_KERNEL_SENDFILE
3413         .sendfile       = ll_file_sendfile,
3414 #endif
3415         .fsync          = ll_fsync,
3416 #ifdef HAVE_F_OP_FLOCK
3417         .flock          = ll_file_flock,
3418 #endif
3419         .lock           = ll_file_flock
3420 };
3421
3422 /* These are for -o noflock - to return ENOSYS on flock calls */
3423 struct file_operations ll_file_operations_noflock = {
3424         .read           = ll_file_read,
3425 #ifdef HAVE_FILE_READV
3426         .readv          = ll_file_readv,
3427 #else
3428         .aio_read       = ll_file_aio_read,
3429 #endif
3430         .write          = ll_file_write,
3431 #ifdef HAVE_FILE_WRITEV
3432         .writev         = ll_file_writev,
3433 #else
3434         .aio_write      = ll_file_aio_write,
3435 #endif
3436         .ioctl          = ll_file_ioctl,
3437         .open           = ll_file_open,
3438         .release        = ll_file_release,
3439         .mmap           = ll_file_mmap,
3440         .llseek         = ll_file_seek,
3441 #ifdef HAVE_KERNEL_SPLICE_READ
3442         .splice_read    = ll_file_splice_read,
3443 #endif
3444 #ifdef HAVE_KERNEL_SENDFILE
3445         .sendfile       = ll_file_sendfile,
3446 #endif
3447         .fsync          = ll_fsync,
3448 #ifdef HAVE_F_OP_FLOCK
3449         .flock          = ll_file_noflock,
3450 #endif
3451         .lock           = ll_file_noflock
3452 };
3453
3454 struct inode_operations ll_file_inode_operations = {
3455 #ifdef HAVE_VFS_INTENT_PATCHES
3456         .setattr_raw    = ll_setattr_raw,
3457 #endif
3458         .setattr        = ll_setattr,
3459         .truncate       = ll_truncate,
3460         .getattr        = ll_getattr,
3461         .permission     = ll_inode_permission,
3462         .setxattr       = ll_setxattr,
3463         .getxattr       = ll_getxattr,
3464         .listxattr      = ll_listxattr,
3465         .removexattr    = ll_removexattr,
3466 };
3467
3468 /* dynamic ioctl number support routins */
3469 static struct llioc_ctl_data {
3470         struct rw_semaphore ioc_sem;
3471         struct list_head    ioc_head;
3472 } llioc = {
3473         __RWSEM_INITIALIZER(llioc.ioc_sem),
3474         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3475 };
3476
3477
3478 struct llioc_data {
3479         struct list_head        iocd_list;
3480         unsigned int            iocd_size;
3481         llioc_callback_t        iocd_cb;
3482         unsigned int            iocd_count;
3483         unsigned int            iocd_cmd[0];
3484 };
3485
3486 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3487 {
3488         unsigned int size;
3489         struct llioc_data *in_data = NULL;
3490         ENTRY;
3491
3492         if (cb == NULL || cmd == NULL ||
3493             count > LLIOC_MAX_CMD || count < 0)
3494                 RETURN(NULL);
3495
3496         size = sizeof(*in_data) + count * sizeof(unsigned int);
3497         OBD_ALLOC(in_data, size);
3498         if (in_data == NULL)
3499                 RETURN(NULL);
3500
3501         memset(in_data, 0, sizeof(*in_data));
3502         in_data->iocd_size = size;
3503         in_data->iocd_cb = cb;
3504         in_data->iocd_count = count;
3505         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3506
3507         down_write(&llioc.ioc_sem);
3508         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3509         up_write(&llioc.ioc_sem);
3510
3511         RETURN(in_data);
3512 }
3513
3514 void ll_iocontrol_unregister(void *magic)
3515 {
3516         struct llioc_data *tmp;
3517
3518         if (magic == NULL)
3519                 return;
3520
3521         down_write(&llioc.ioc_sem);
3522         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3523                 if (tmp == magic) {
3524                         unsigned int size = tmp->iocd_size;
3525
3526                         list_del(&tmp->iocd_list);
3527                         up_write(&llioc.ioc_sem);
3528
3529                         OBD_FREE(tmp, size);
3530                         return;
3531                 }
3532         }
3533         up_write(&llioc.ioc_sem);
3534
3535         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3536 }
3537
3538 EXPORT_SYMBOL(ll_iocontrol_register);
3539 EXPORT_SYMBOL(ll_iocontrol_unregister);
3540
3541 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
3542                         unsigned int cmd, unsigned long arg, int *rcp)
3543 {
3544         enum llioc_iter ret = LLIOC_CONT;
3545         struct llioc_data *data;
3546         int rc = -EINVAL, i;
3547
3548         down_read(&llioc.ioc_sem);
3549         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3550                 for (i = 0; i < data->iocd_count; i++) {
3551                         if (cmd != data->iocd_cmd[i])
3552                                 continue;
3553
3554                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3555                         break;
3556                 }
3557
3558                 if (ret == LLIOC_STOP)
3559                         break;
3560         }
3561         up_read(&llioc.ioc_sem);
3562
3563         if (rcp)
3564                 *rcp = rc;
3565         return ret;
3566 }