Whamcloud - gitweb
Branch b1_8
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <linux/pagemap.h>
29 #include <linux/file.h>
30 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
31 #include <linux/lustre_compat25.h>
32 #endif
33 #include "llite_internal.h"
34
35 /* also used by llite/special.c:ll_special_open() */
36 struct ll_file_data *ll_file_data_get(void)
37 {
38         struct ll_file_data *fd;
39
40         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
41         return fd;
42 }
43
44 static void ll_file_data_put(struct ll_file_data *fd)
45 {
46         if (fd != NULL)
47                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
48 }
49
50 static int ll_close_inode_openhandle(struct inode *inode,
51                                      struct obd_client_handle *och)
52 {
53         struct ptlrpc_request *req = NULL;
54         struct obd_device *obd;
55         struct obdo *oa;
56         int rc;
57         ENTRY;
58
59         obd = class_exp2obd(ll_i2mdcexp(inode));
60         if (obd == NULL) {
61                 CERROR("Invalid MDC connection handle "LPX64"\n",
62                        ll_i2mdcexp(inode)->exp_handle.h_cookie);
63                 GOTO(out, rc = 0);
64         }
65
66         /*
67          * here we check if this is forced umount. If so this is called on
68          * canceling "open lock" and we do not call mdc_close() in this case, as
69          * it will not be successful, as import is already deactivated.
70          */
71         if (obd->obd_force)
72                 GOTO(out, rc = 0);
73
74         OBDO_ALLOC(oa);
75         if (!oa)
76                 RETURN(-ENOMEM); // XXX We leak openhandle and request here.
77
78         oa->o_id = inode->i_ino;
79         oa->o_valid = OBD_MD_FLID;
80         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
81                                    OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
82                                    OBD_MD_FLATIME | OBD_MD_FLMTIME |
83                                    OBD_MD_FLCTIME);
84         if (ll_is_inode_dirty(inode)) {
85                 oa->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
86                 oa->o_valid |= OBD_MD_FLFLAGS;
87         }
88
89         rc = mdc_close(ll_i2mdcexp(inode), oa, och, &req);
90         if (rc == EAGAIN) {
91                 /* We are the last writer, so the MDS has instructed us to get
92                  * the file size and any write cookies, then close again. */
93                 ll_queue_done_writing(inode);
94                 rc = 0;
95         } else if (rc) {
96                 CERROR("inode %lu mdc close failed: rc = %d\n",
97                        inode->i_ino, rc);
98         }
99
100         OBDO_FREE(oa);
101
102         if (rc == 0) {
103                 rc = ll_objects_destroy(req, inode);
104                 if (rc)
105                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
106                                inode->i_ino, rc);
107         }
108
109         ptlrpc_req_finished(req); /* This is close request */
110         EXIT;
111 out:
112         mdc_clear_open_replay_data(och);
113
114         return rc;
115 }
116
117 int ll_mdc_real_close(struct inode *inode, int flags)
118 {
119         struct ll_inode_info *lli = ll_i2info(inode);
120         int rc = 0;
121         struct obd_client_handle **och_p;
122         struct obd_client_handle *och;
123         __u64 *och_usecount;
124
125         ENTRY;
126
127         if (flags & FMODE_WRITE) {
128                 och_p = &lli->lli_mds_write_och;
129                 och_usecount = &lli->lli_open_fd_write_count;
130         } else if (flags & FMODE_EXEC) {
131                 och_p = &lli->lli_mds_exec_och;
132                 och_usecount = &lli->lli_open_fd_exec_count;
133          } else {
134                 LASSERT(flags & FMODE_READ);
135                 och_p = &lli->lli_mds_read_och;
136                 och_usecount = &lli->lli_open_fd_read_count;
137         }
138
139         down(&lli->lli_och_sem);
140         if (*och_usecount) { /* There are still users of this handle, so
141                                 skip freeing it. */
142                 up(&lli->lli_och_sem);
143                 RETURN(0);
144         }
145         och=*och_p;
146         *och_p = NULL;
147         up(&lli->lli_och_sem);
148
149         if (och) { /* There might be a race and somebody have freed this och
150                       already */
151                 rc = ll_close_inode_openhandle(inode, och);
152                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
153                 OBD_FREE(och, sizeof *och);
154         }
155
156         RETURN(rc);
157 }
158
159 int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
160                         struct file *file)
161 {
162         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
163         struct ll_inode_info *lli = ll_i2info(inode);
164         int rc = 0;
165         ENTRY;
166
167         /* clear group lock, if present */
168         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
169                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
170                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
171                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
172                                       &fd->fd_cwlockh);
173         }
174
175         /* Let's see if we have good enough OPEN lock on the file and if
176            we can skip talking to MDS */
177         if (file->f_dentry->d_inode) { /* Can this ever be false? */
178                 int lockmode;
179                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
180                 struct lustre_handle lockh;
181                 struct inode *inode = file->f_dentry->d_inode;
182                 struct ldlm_res_id file_res_id = {.name={inode->i_ino,
183                                                          inode->i_generation}};
184                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
185
186                 down(&lli->lli_och_sem);
187                 if (fd->fd_omode & FMODE_WRITE) {
188                         lockmode = LCK_CW;
189                         LASSERT(lli->lli_open_fd_write_count);
190                         lli->lli_open_fd_write_count--;
191                 } else if (fd->fd_omode & FMODE_EXEC) {
192                         lockmode = LCK_PR;
193                         LASSERT(lli->lli_open_fd_exec_count);
194                         lli->lli_open_fd_exec_count--;
195                 } else {
196                         lockmode = LCK_CR;
197                         LASSERT(lli->lli_open_fd_read_count);
198                         lli->lli_open_fd_read_count--;
199                 }
200                 up(&lli->lli_och_sem);
201
202                 if (!ldlm_lock_match(mdc_exp->exp_obd->obd_namespace, flags,
203                                      &file_res_id, LDLM_IBITS, &policy,lockmode,
204                                      &lockh)) {
205                         rc = ll_mdc_real_close(file->f_dentry->d_inode,
206                                                 fd->fd_omode);
207                 }
208         } else {
209                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
210                        file, file->f_dentry, file->f_dentry->d_name.name);
211         }
212
213         LUSTRE_FPRIVATE(file) = NULL;
214         ll_file_data_put(fd);
215
216         RETURN(rc);
217 }
218
219 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
220
221 /* While this returns an error code, fput() the caller does not, so we need
222  * to make every effort to clean up all of our state here.  Also, applications
223  * rarely check close errors and even if an error is returned they will not
224  * re-try the close call.
225  */
226 int ll_file_release(struct inode *inode, struct file *file)
227 {
228         struct ll_file_data *fd;
229         struct ll_sb_info *sbi = ll_i2sbi(inode);
230         struct ll_inode_info *lli = ll_i2info(inode);
231         struct lov_stripe_md *lsm = lli->lli_smd;
232         int rc;
233
234         ENTRY;
235         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
236                inode->i_generation, inode);
237
238
239         if (inode->i_sb->s_root != file->f_dentry)
240                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
241         fd = LUSTRE_FPRIVATE(file);
242         LASSERT(fd != NULL);
243
244         /*
245          * The last ref on @file, maybe not the the owner pid of statahead.
246          * Different processes can open the same dir, "ll_opendir_key" means:
247          * it is me that should stop the statahead thread.
248          */
249         if (lli->lli_opendir_key == fd)
250                 ll_stop_statahead(inode, fd);
251
252         if (inode->i_sb->s_root == file->f_dentry) {
253                 LUSTRE_FPRIVATE(file) = NULL;
254                 ll_file_data_put(fd);
255                 RETURN(0);
256         }
257         
258         if (lsm)
259                 lov_test_and_clear_async_rc(lsm);
260         lli->lli_async_rc = 0;
261
262         rc = ll_mdc_close(sbi->ll_mdc_exp, inode, file);
263         RETURN(rc);
264 }
265
266 static int ll_intent_file_open(struct file *file, void *lmm,
267                                int lmmsize, struct lookup_intent *itp)
268 {
269         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
270         struct mdc_op_data data;
271         struct dentry *parent = file->f_dentry->d_parent;
272         const char *name = file->f_dentry->d_name.name;
273         const int len = file->f_dentry->d_name.len;
274         struct inode *inode = file->f_dentry->d_inode;
275         struct ptlrpc_request *req;
276         int rc;
277         ENTRY;
278
279         if (!parent)
280                 RETURN(-ENOENT);
281
282         ll_prepare_mdc_op_data(&data, parent->d_inode, inode,
283                                name, len, O_RDWR, NULL);
284
285         /* Usually we come here only for NFSD, and we want open lock.
286            But we can also get here with pre 2.6.15 patchless kernels, and in
287            that case that lock is also ok */
288         /* We can also get here if there was cached open handle in revalidate_it
289          * but it disappeared while we were getting from there to ll_file_open.
290          * But this means this file was closed and immediatelly opened which
291          * makes a good candidate for using OPEN lock */
292         /* If lmmsize & lmm are not 0, we are just setting stripe info
293          * parameters. No need for the open lock */
294         if (!lmm && !lmmsize)
295                 itp->it_flags |= MDS_OPEN_LOCK;
296
297         rc = mdc_intent_lock(sbi->ll_mdc_exp, &data, lmm, lmmsize, itp,
298                               0 /*unused */, &req, ll_mdc_blocking_ast, 0);
299         if (rc == -ESTALE) {
300                 /* reason for keep own exit path - don`t flood log
301                 * with messages with -ESTALE errors.
302                 */
303                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
304                      it_open_error(DISP_OPEN_OPEN, itp))
305                         GOTO(out, rc);
306                 ll_release_openhandle(file->f_dentry, itp);
307                 GOTO(out_stale, rc);
308         }
309
310         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
311                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
312                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
313                 GOTO(out, rc);
314         }
315
316         if (itp->d.lustre.it_lock_mode)
317                 mdc_set_lock_data(&itp->d.lustre.it_lock_handle,
318                                   inode);
319
320         rc = ll_prep_inode(sbi->ll_osc_exp, &file->f_dentry->d_inode,
321                            req, DLM_REPLY_REC_OFF, NULL);
322 out:
323         ptlrpc_req_finished(itp->d.lustre.it_data);
324
325 out_stale:
326         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
327         ll_intent_drop_lock(itp);
328
329         RETURN(rc);
330 }
331
332
333 static void ll_och_fill(struct ll_inode_info *lli, struct lookup_intent *it,
334                         struct obd_client_handle *och)
335 {
336         struct ptlrpc_request *req = it->d.lustre.it_data;
337         struct mds_body *body;
338
339         LASSERT(och);
340
341         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
342         LASSERT(body != NULL);                  /* reply already checked out */
343         /* and swabbed in mdc_enqueue */
344         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
345
346         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
347         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
348         lli->lli_io_epoch = body->io_epoch;
349
350         mdc_set_open_replay_data(och, it->d.lustre.it_data);
351 }
352
353 int ll_local_open(struct file *file, struct lookup_intent *it,
354                   struct ll_file_data *fd, struct obd_client_handle *och)
355 {
356         ENTRY;
357
358         LASSERT(!LUSTRE_FPRIVATE(file));
359
360         LASSERT(fd != NULL);
361
362         if (och)
363                 ll_och_fill(ll_i2info(file->f_dentry->d_inode), it, och);
364         LUSTRE_FPRIVATE(file) = fd;
365         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
366         fd->fd_omode = it->it_flags;
367
368         RETURN(0);
369 }
370
371 /* Open a file, and (for the very first open) create objects on the OSTs at
372  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
373  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
374  * lli_open_sem to ensure no other process will create objects, send the
375  * stripe MD to the MDS, or try to destroy the objects if that fails.
376  *
377  * If we already have the stripe MD locally then we don't request it in
378  * mdc_open(), by passing a lmm_size = 0.
379  *
380  * It is up to the application to ensure no other processes open this file
381  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
382  * used.  We might be able to avoid races of that sort by getting lli_open_sem
383  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
384  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
385  */
386 int ll_file_open(struct inode *inode, struct file *file)
387 {
388         struct ll_inode_info *lli = ll_i2info(inode);
389         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
390                                           .it_flags = file->f_flags };
391         struct lov_stripe_md *lsm;
392         struct ptlrpc_request *req = NULL;
393         struct obd_client_handle **och_p;
394         __u64 *och_usecount;
395         struct ll_file_data *fd;
396         int rc = 0, opendir_set = 0;
397         ENTRY;
398
399         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
400                inode->i_generation, inode, file->f_flags);
401
402 #ifdef HAVE_VFS_INTENT_PATCHES
403         it = file->f_it;
404 #else
405         it = file->private_data; /* XXX: compat macro */
406         file->private_data = NULL; /* prevent ll_local_open assertion */
407 #endif
408
409         fd = ll_file_data_get();
410         if (fd == NULL)
411                 RETURN(-ENOMEM);
412
413         if (S_ISDIR(inode->i_mode)) {
414                 spin_lock(&lli->lli_lock);
415                 /*
416                  * "lli->lli_opendir_pid != 0" means someone has set it.
417                  * "lli->lli_sai != NULL" means the previous statahead has not
418                  *                        been cleanup.
419                  */ 
420                 if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) {
421                         opendir_set = 1;
422                         lli->lli_opendir_pid = cfs_curproc_pid();
423                         lli->lli_opendir_key = fd;
424                 } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) {
425                         /* Two cases for this:
426                          * (1) The same process open such directory many times.
427                          * (2) The old process opened the directory, and exited
428                          *     before its children processes. Then new process
429                          *     with the same pid opens such directory before the
430                          *     old process's children processes exit.
431                          * Change the owner to the latest one.
432                          */
433                         opendir_set = 2;
434                         lli->lli_opendir_key = fd;
435                 }
436                 spin_unlock(&lli->lli_lock);
437         }
438
439         if (inode->i_sb->s_root == file->f_dentry) {
440                 LUSTRE_FPRIVATE(file) = fd;
441                 RETURN(0);
442         }
443
444         if (!it || !it->d.lustre.it_disposition) {
445                 /* Convert f_flags into access mode. We cannot use file->f_mode,
446                  * because everything but O_ACCMODE mask was stripped from it */
447                 if ((oit.it_flags + 1) & O_ACCMODE)
448                         oit.it_flags++;
449                 if (file->f_flags & O_TRUNC)
450                         oit.it_flags |= FMODE_WRITE;
451
452                 /* kernel only call f_op->open in dentry_open.  filp_open calls
453                  * dentry_open after call to open_namei that checks permissions.
454                  * Only nfsd_open call dentry_open directly without checking
455                  * permissions and because of that this code below is safe. */
456                 if (oit.it_flags & FMODE_WRITE)
457                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
458
459                 /* We do not want O_EXCL here, presumably we opened the file
460                  * already? XXX - NFS implications? */
461                 oit.it_flags &= ~O_EXCL;
462
463                 it = &oit;
464         }
465
466 restart:
467         /* Let's see if we have file open on MDS already. */
468         if (it->it_flags & FMODE_WRITE) {
469                 och_p = &lli->lli_mds_write_och;
470                 och_usecount = &lli->lli_open_fd_write_count;
471         } else if (it->it_flags & FMODE_EXEC) {
472                 och_p = &lli->lli_mds_exec_och;
473                 och_usecount = &lli->lli_open_fd_exec_count;
474          } else {
475                 och_p = &lli->lli_mds_read_och;
476                 och_usecount = &lli->lli_open_fd_read_count;
477         }
478
479         LASSERTF(it->it_flags != 0, "it %p dist %d \n", it,
480                  it->d.lustre.it_disposition);
481
482         down(&lli->lli_och_sem);
483         if (*och_p) { /* Open handle is present */
484                 if (it_disposition(it, DISP_OPEN_OPEN)) {
485                         /* Well, there's extra open request that we do not need,
486                            let's close it somehow. This will decref request. */
487                         rc = it_open_error(DISP_OPEN_OPEN, it);
488                         if (rc) {
489                                 up(&lli->lli_och_sem);
490                                 ll_file_data_put(fd);
491                                 GOTO(out_openerr, rc);
492                         }       
493                         ll_release_openhandle(file->f_dentry, it);
494                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
495                                              LPROC_LL_OPEN);
496                 }
497                 (*och_usecount)++;
498
499                 rc = ll_local_open(file, it, fd, NULL);
500
501                 LASSERTF(rc == 0, "rc = %d\n", rc);
502         } else {
503                 LASSERT(*och_usecount == 0);
504                 if (!it->d.lustre.it_disposition) {
505                         /* We cannot just request lock handle now, new ELC code
506                            means that one of other OPEN locks for this file
507                            could be cancelled, and since blocking ast handler
508                            would attempt to grab och_sem as well, that would
509                            result in a deadlock */
510                         up(&lli->lli_och_sem);
511                         rc = ll_intent_file_open(file, NULL, 0, it);
512                         if (rc) {
513                                 ll_file_data_put(fd);
514                                 GOTO(out_openerr, rc);
515                         }
516
517                         mdc_set_lock_data(&it->d.lustre.it_lock_handle,
518                                           file->f_dentry->d_inode);
519                         goto restart;
520                 }
521  
522                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
523                 if (!*och_p) {
524                         ll_file_data_put(fd);
525                         GOTO(out_och_free, rc = -ENOMEM);
526                 }
527                 (*och_usecount)++;
528                req = it->d.lustre.it_data;
529
530                 /* mdc_intent_lock() didn't get a request ref if there was an
531                  * open error, so don't do cleanup on the request here
532                  * (bug 3430) */
533                 /* XXX (green): Should not we bail out on any error here, not
534                  * just open error? */
535                 rc = it_open_error(DISP_OPEN_OPEN, it);
536                 if (rc) {
537                         ll_file_data_put(fd);
538                         GOTO(out_och_free, rc);
539                 }
540
541                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
542                 rc = ll_local_open(file, it, fd, *och_p);
543                 LASSERTF(rc == 0, "rc = %d\n", rc);
544         }
545         up(&lli->lli_och_sem);
546
547         /* Must do this outside lli_och_sem lock to prevent deadlock where
548            different kind of OPEN lock for this same inode gets cancelled
549            by ldlm_cancel_lru */
550         if (!S_ISREG(inode->i_mode))
551                 GOTO(out, rc);
552
553         lsm = lli->lli_smd;
554         if (lsm == NULL) {
555                 if (file->f_flags & O_LOV_DELAY_CREATE ||
556                     !(file->f_mode & FMODE_WRITE)) {
557                         CDEBUG(D_INODE, "object creation was delayed\n");
558                         GOTO(out, rc);
559                 }
560         }
561         file->f_flags &= ~O_LOV_DELAY_CREATE;
562         GOTO(out, rc);
563  out:
564         ptlrpc_req_finished(req);
565         if (req)
566                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
567         if (rc == 0) {
568                 ll_open_complete(inode);
569         } else {
570 out_och_free:
571                 if (*och_p) {
572                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
573                         *och_p = NULL; /* OBD_FREE writes some magic there */
574                         (*och_usecount)--;
575                 }
576                 up(&lli->lli_och_sem);
577 out_openerr:
578                 if (opendir_set) {
579                         lli->lli_opendir_key = NULL;
580                         lli->lli_opendir_pid = 0;
581                 } else if (unlikely(opendir_set == 2)) {
582                         ll_stop_statahead(inode, fd);
583                 }
584         }
585         return rc;
586 }
587
588 /* Fills the obdo with the attributes for the inode defined by lsm */
589 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
590                    struct obdo *oa)
591 {
592         struct ptlrpc_request_set *set;
593         struct obd_info oinfo = { { { 0 } } };
594         int rc;
595         ENTRY;
596
597         LASSERT(lsm != NULL);
598
599         memset(oa, 0, sizeof *oa);
600         oinfo.oi_md = lsm;
601         oinfo.oi_oa = oa;
602         oa->o_id = lsm->lsm_object_id;
603         oa->o_mode = S_IFREG;
604         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
605                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
606                 OBD_MD_FLCTIME;
607
608         set = ptlrpc_prep_set();
609         if (set == NULL) {
610                 rc = -ENOMEM;
611         } else {
612                 rc = obd_getattr_async(exp, &oinfo, set);
613                 if (rc == 0)
614                         rc = ptlrpc_set_wait(set);
615                 ptlrpc_set_destroy(set);
616         }
617         if (rc)
618                 RETURN(rc);
619
620         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
621                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
622         RETURN(0);
623 }
624
625 static inline void ll_remove_suid(struct inode *inode)
626 {
627         unsigned int mode;
628
629         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
630         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
631
632         /* was any of the uid bits set? */
633         mode &= inode->i_mode;
634         if (mode && !capable(CAP_FSETID)) {
635                 inode->i_mode &= ~mode;
636                 // XXX careful here - we cannot change the size
637         }
638 }
639
640 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
641 {
642         struct ll_inode_info *lli = ll_i2info(inode);
643         struct lov_stripe_md *lsm = lli->lli_smd;
644         struct obd_export *exp = ll_i2obdexp(inode);
645         struct {
646                 char name[16];
647                 struct ldlm_lock *lock;
648                 struct lov_stripe_md *lsm;
649         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
650         __u32 stripe, vallen = sizeof(stripe);
651         int rc;
652         ENTRY;
653
654         if (lsm->lsm_stripe_count == 1)
655                 GOTO(check, stripe = 0);
656
657         /* get our offset in the lov */
658         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
659         if (rc != 0) {
660                 CERROR("obd_get_info: rc = %d\n", rc);
661                 RETURN(rc);
662         }
663         LASSERT(stripe < lsm->lsm_stripe_count);
664
665 check:
666         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
667             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[1]){
668                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
669                            lsm->lsm_oinfo[stripe]->loi_id,
670                            lsm->lsm_oinfo[stripe]->loi_gr);
671                 RETURN(-ELDLM_NO_LOCK_DATA);
672         }
673
674         RETURN(stripe);
675 }
676
677 /* Get extra page reference to ensure it is not going away */
678 void ll_pin_extent_cb(void *data)
679 {
680         struct page *page = data;
681         
682         page_cache_get(page);
683
684         return;
685 }
686 /* Flush the page from page cache for an extent as its canceled.
687  * Page to remove is delivered as @data.
688  *
689  * No one can dirty the extent until we've finished our work and they cannot
690  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
691  * but other kernel actors could have pages locked.
692  *
693  * If @discard is set, there is no need to write the page if it is dirty.
694  *
695  * Called with the DLM lock held. */
696 int ll_page_removal_cb(void *data, int discard)
697 {
698         int rc;
699         struct page *page = data;
700         struct address_space *mapping;
701
702         ENTRY;
703
704         /* We have page reference already from ll_pin_page */
705         lock_page(page);
706
707         /* Already truncated by somebody */
708         if (!page->mapping)
709                 GOTO(out, rc = 0);
710
711         mapping = page->mapping;
712
713         ll_teardown_mmaps(mapping,
714                           (__u64)page->index << PAGE_CACHE_SHIFT,
715                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
716                                                               ~PAGE_CACHE_MASK);
717         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
718         if (!discard && PageWriteback(page))
719                 wait_on_page_writeback(page);
720
721         if (!discard && clear_page_dirty_for_io(page)) {
722                 rc = ll_call_writepage(page->mapping->host, page);
723                 /* either waiting for io to complete or reacquiring
724                  * the lock that the failed writepage released */
725                 lock_page(page);
726                 wait_on_page_writeback(page);
727                 if (rc < 0) {
728                         CERROR("writepage inode %lu(%p) of page %p "
729                                "failed: %d\n", mapping->host->i_ino,
730                                mapping->host, page, rc);
731 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
732                         if (rc == -ENOSPC)
733                                 set_bit(AS_ENOSPC, &mapping->flags);
734                         else
735                                 set_bit(AS_EIO, &mapping->flags);
736 #else
737                         mapping->gfp_mask |= AS_EIO_MASK;
738 #endif
739                 }
740         }
741         if (page->mapping != NULL) {
742                 struct ll_async_page *llap = llap_cast_private(page);
743                 // checking again to account for writeback's lock_page()
744                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
745                 if (llap)
746                         ll_ra_accounting(llap, page->mapping);
747                 ll_truncate_complete_page(page);
748         }
749         EXIT;
750 out:
751         LASSERT(!PageWriteback(page));
752         unlock_page(page);
753         page_cache_release(page);
754
755         return 0;
756 }
757
758 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
759                              void *data, int flag)
760 {
761         struct inode *inode;
762         struct ll_inode_info *lli;
763         struct lov_stripe_md *lsm;
764         int stripe;
765         __u64 kms;
766
767         ENTRY;
768
769         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
770                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
771                 LBUG();
772         }
773
774         inode = ll_inode_from_lock(lock);
775         if (inode == NULL)
776                 RETURN(0);
777         lli = ll_i2info(inode);
778         if (lli == NULL)
779                 GOTO(iput, 0);
780         if (lli->lli_smd == NULL)
781                 GOTO(iput, 0);
782         lsm = lli->lli_smd;
783
784         stripe = ll_lock_to_stripe_offset(inode, lock);
785         if (stripe < 0)
786                 GOTO(iput, 0);
787
788         lov_stripe_lock(lsm);
789         lock_res_and_lock(lock);
790         kms = ldlm_extent_shift_kms(lock,
791                                     lsm->lsm_oinfo[stripe]->loi_kms);
792
793         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
794                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
795                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
796         lsm->lsm_oinfo[stripe]->loi_kms = kms;
797         unlock_res_and_lock(lock);
798         lov_stripe_unlock(lsm);
799         ll_try_done_writing(inode);
800         EXIT;
801 iput:
802         iput(inode);
803
804         return 0;
805 }
806
807 #if 0
808 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
809 {
810         /* XXX ALLOCATE - 160 bytes */
811         struct inode *inode = ll_inode_from_lock(lock);
812         struct ll_inode_info *lli = ll_i2info(inode);
813         struct lustre_handle lockh = { 0 };
814         struct ost_lvb *lvb;
815         int stripe;
816         ENTRY;
817
818         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
819                      LDLM_FL_BLOCK_CONV)) {
820                 LBUG(); /* not expecting any blocked async locks yet */
821                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
822                            "lock, returning");
823                 ldlm_lock_dump(D_OTHER, lock, 0);
824                 ldlm_reprocess_all(lock->l_resource);
825                 RETURN(0);
826         }
827
828         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
829
830         stripe = ll_lock_to_stripe_offset(inode, lock);
831         if (stripe < 0)
832                 goto iput;
833
834         if (lock->l_lvb_len) {
835                 struct lov_stripe_md *lsm = lli->lli_smd;
836                 __u64 kms;
837                 lvb = lock->l_lvb_data;
838                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
839
840                 lock_res_and_lock(lock);
841                 ll_inode_size_lock(inode, 1);
842                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
843                 kms = ldlm_extent_shift_kms(NULL, kms);
844                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
845                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
846                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
847                 lsm->lsm_oinfo[stripe].loi_kms = kms;
848                 ll_inode_size_unlock(inode, 1);
849                 unlock_res_and_lock(lock);
850         }
851
852 iput:
853         iput(inode);
854         wake_up(&lock->l_waitq);
855
856         ldlm_lock2handle(lock, &lockh);
857         ldlm_lock_decref(&lockh, LCK_PR);
858         RETURN(0);
859 }
860 #endif
861
862 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
863 {
864         struct ptlrpc_request *req = reqp;
865         struct inode *inode = ll_inode_from_lock(lock);
866         struct ll_inode_info *lli;
867         struct lov_stripe_md *lsm;
868         struct ost_lvb *lvb;
869         int rc, stripe;
870         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
871         ENTRY;
872
873         if (inode == NULL)
874                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
875         lli = ll_i2info(inode);
876         if (lli == NULL)
877                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
878         lsm = lli->lli_smd;
879         if (lsm == NULL)
880                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
881
882         /* First, find out which stripe index this lock corresponds to. */
883         stripe = ll_lock_to_stripe_offset(inode, lock);
884         if (stripe < 0)
885                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
886
887         rc = lustre_pack_reply(req, 2, size, NULL);
888         if (rc)
889                 GOTO(iput, rc);
890
891         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
892         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
893         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
894         lvb->lvb_atime = LTIME_S(inode->i_atime);
895         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
896
897         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
898                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
899                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
900                    lvb->lvb_atime, lvb->lvb_ctime);
901  iput:
902         iput(inode);
903
904  out:
905         /* These errors are normal races, so we don't want to fill the console
906          * with messages by calling ptlrpc_error() */
907         if (rc == -ELDLM_NO_LOCK_DATA)
908                 lustre_pack_reply(req, 1, NULL, NULL);
909
910         req->rq_status = rc;
911         return rc;
912 }
913
914 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
915                      lstat_t *st)
916 {
917         struct lustre_handle lockh = { 0 };
918         struct ldlm_enqueue_info einfo = { 0 };
919         struct obd_info oinfo = { { { 0 } } };
920         struct ost_lvb lvb;
921         int rc;
922         
923         ENTRY;
924         
925         einfo.ei_type = LDLM_EXTENT;
926         einfo.ei_mode = LCK_PR;
927         einfo.ei_cb_bl = osc_extent_blocking_cb;
928         einfo.ei_cb_cp = ldlm_completion_ast;
929         einfo.ei_cb_gl = ll_glimpse_callback;
930         einfo.ei_cbdata = NULL;
931
932         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
933         oinfo.oi_lockh = &lockh;
934         oinfo.oi_md = lsm;
935         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
936
937         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
938         if (rc == -ENOENT)
939                 RETURN(rc);
940         if (rc != 0) {
941                 CERROR("obd_enqueue returned rc %d, "
942                        "returning -EIO\n", rc);
943                 RETURN(rc > 0 ? -EIO : rc);
944         }
945         
946         lov_stripe_lock(lsm);
947         memset(&lvb, 0, sizeof(lvb));
948         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 0);
949         st->st_size = lvb.lvb_size;
950         st->st_blocks = lvb.lvb_blocks;
951         st->st_mtime = lvb.lvb_mtime;
952         st->st_atime = lvb.lvb_atime;
953         st->st_ctime = lvb.lvb_ctime;
954         lov_stripe_unlock(lsm);
955         
956         RETURN(rc);
957 }
958
959 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
960  * file (because it prefers KMS over RSS when larger) */
961 int ll_glimpse_size(struct inode *inode, int ast_flags)
962 {
963         struct ll_inode_info *lli = ll_i2info(inode);
964         struct ll_sb_info *sbi = ll_i2sbi(inode);
965         struct lustre_handle lockh = { 0 };
966         struct ldlm_enqueue_info einfo = { 0 };
967         struct obd_info oinfo = { { { 0 } } };
968         struct ost_lvb lvb;
969         int rc;
970         ENTRY;
971
972         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
973
974         if (!lli->lli_smd) {
975                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
976                 RETURN(0);
977         }
978
979         /* NOTE: this looks like DLM lock request, but it may not be one. Due
980          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
981          *       won't revoke any conflicting DLM locks held. Instead,
982          *       ll_glimpse_callback() will be called on each client
983          *       holding a DLM lock against this file, and resulting size
984          *       will be returned for each stripe. DLM lock on [0, EOF] is
985          *       acquired only if there were no conflicting locks. */
986         einfo.ei_type = LDLM_EXTENT;
987         einfo.ei_mode = LCK_PR;
988         einfo.ei_cb_bl = osc_extent_blocking_cb;
989         einfo.ei_cb_cp = ldlm_completion_ast;
990         einfo.ei_cb_gl = ll_glimpse_callback;
991         einfo.ei_cbdata = inode;
992
993         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
994         oinfo.oi_lockh = &lockh;
995         oinfo.oi_md = lli->lli_smd;
996         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
997
998         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
999         if (rc == -ENOENT)
1000                 RETURN(rc);
1001         if (rc != 0) {
1002                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1003                 RETURN(rc > 0 ? -EIO : rc);
1004         }
1005
1006         ll_inode_size_lock(inode, 1);
1007         inode_init_lvb(inode, &lvb);
1008         rc = obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0);
1009         i_size_write(inode, lvb.lvb_size);
1010         inode->i_blocks = lvb.lvb_blocks;
1011         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1012         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1013         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1014         ll_inode_size_unlock(inode, 1);
1015
1016         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1017                i_size_read(inode), (long long)inode->i_blocks);
1018
1019         RETURN(rc);
1020 }
1021
1022 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1023                    struct lov_stripe_md *lsm, int mode,
1024                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1025                    int ast_flags)
1026 {
1027         struct ll_sb_info *sbi = ll_i2sbi(inode);
1028         struct ost_lvb lvb;
1029         struct ldlm_enqueue_info einfo = { 0 };
1030         struct obd_info oinfo = { { { 0 } } };
1031         int rc;
1032         ENTRY;
1033
1034         LASSERT(!lustre_handle_is_used(lockh));
1035         LASSERT(lsm != NULL);
1036
1037         /* don't drop the mmapped file to LRU */
1038         if (mapping_mapped(inode->i_mapping))
1039                 ast_flags |= LDLM_FL_NO_LRU;
1040
1041         /* XXX phil: can we do this?  won't it screw the file size up? */
1042         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1043             (sbi->ll_flags & LL_SBI_NOLCK))
1044                 RETURN(0);
1045
1046         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1047                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1048
1049         einfo.ei_type = LDLM_EXTENT;
1050         einfo.ei_mode = mode;
1051         einfo.ei_cb_bl = osc_extent_blocking_cb;
1052         einfo.ei_cb_cp = ldlm_completion_ast;
1053         einfo.ei_cb_gl = ll_glimpse_callback;
1054         einfo.ei_cbdata = inode;
1055
1056         oinfo.oi_policy = *policy;
1057         oinfo.oi_lockh = lockh;
1058         oinfo.oi_md = lsm;
1059         oinfo.oi_flags = ast_flags;
1060
1061         rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo, NULL);
1062         *policy = oinfo.oi_policy;
1063         if (rc > 0)
1064                 rc = -EIO;
1065
1066         ll_inode_size_lock(inode, 1);
1067         inode_init_lvb(inode, &lvb);
1068         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 1);
1069
1070         if (policy->l_extent.start == 0 &&
1071             policy->l_extent.end == OBD_OBJECT_EOF) {
1072                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1073                  * the kms under both a DLM lock and the
1074                  * ll_inode_size_lock().  If we don't get the
1075                  * ll_inode_size_lock() here we can match the DLM lock and
1076                  * reset i_size from the kms before the truncating path has
1077                  * updated the kms.  generic_file_write can then trust the
1078                  * stale i_size when doing appending writes and effectively
1079                  * cancel the result of the truncate.  Getting the
1080                  * ll_inode_size_lock() after the enqueue maintains the DLM
1081                  * -> ll_inode_size_lock() acquiring order. */
1082                 i_size_write(inode, lvb.lvb_size);
1083                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1084                        inode->i_ino, i_size_read(inode));
1085         }
1086
1087         if (rc == 0) {
1088                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1089                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1090                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1091         }
1092         ll_inode_size_unlock(inode, 1);
1093
1094         RETURN(rc);
1095 }
1096
1097 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1098                      struct lov_stripe_md *lsm, int mode,
1099                      struct lustre_handle *lockh)
1100 {
1101         struct ll_sb_info *sbi = ll_i2sbi(inode);
1102         int rc;
1103         ENTRY;
1104
1105         /* XXX phil: can we do this?  won't it screw the file size up? */
1106         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1107             (sbi->ll_flags & LL_SBI_NOLCK))
1108                 RETURN(0);
1109
1110         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
1111
1112         RETURN(rc);
1113 }
1114
1115 static void ll_set_file_contended(struct inode *inode)
1116 {
1117         struct ll_inode_info *lli = ll_i2info(inode);
1118
1119         lli->lli_contention_time = cfs_time_current();
1120         set_bit(LLI_F_CONTENDED, &lli->lli_flags);
1121 }
1122
1123 void ll_clear_file_contended(struct inode *inode)
1124 {
1125         struct ll_inode_info *lli = ll_i2info(inode);
1126
1127         clear_bit(LLI_F_CONTENDED, &lli->lli_flags);
1128 }
1129
1130 static int ll_is_file_contended(struct file *file)
1131 {
1132         struct inode *inode = file->f_dentry->d_inode;
1133         struct ll_inode_info *lli = ll_i2info(inode);
1134         struct ll_sb_info *sbi = ll_i2sbi(inode);
1135         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1136         ENTRY;
1137
1138         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1139                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1140                        " osc connect flags = 0x"LPX64"\n",
1141                        sbi->ll_lco.lco_flags);
1142                 RETURN(0);
1143         }
1144         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1145                 RETURN(1);
1146         if (test_bit(LLI_F_CONTENDED, &lli->lli_flags)) {
1147                 cfs_time_t cur_time = cfs_time_current();
1148                 cfs_time_t retry_time;
1149
1150                 retry_time = cfs_time_add(
1151                         lli->lli_contention_time,
1152                         cfs_time_seconds(sbi->ll_contention_time));
1153                 if (cfs_time_after(cur_time, retry_time)) {
1154                         ll_clear_file_contended(inode);
1155                         RETURN(0);
1156                 }
1157                 RETURN(1);
1158         }
1159         RETURN(0);
1160 }
1161
1162 static int ll_file_get_tree_lock_iov(struct ll_lock_tree *tree,
1163                                      struct file *file, const struct iovec *iov,
1164                                      unsigned long nr_segs,
1165                                      loff_t start, loff_t end, int rw)
1166 {
1167         int append;
1168         int tree_locked = 0;
1169         int rc;
1170         struct inode * inode = file->f_dentry->d_inode;
1171
1172         append = (rw == WRITE) && (file->f_flags & O_APPEND);
1173
1174         if (append || !ll_is_file_contended(file)) {
1175                 struct ll_lock_tree_node *node;
1176                 int ast_flags;
1177
1178                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1179                 if (file->f_flags & O_NONBLOCK)
1180                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1181                 node = ll_node_from_inode(inode, start, end,
1182                                           (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
1183                 if (IS_ERR(node)) {
1184                         rc = PTR_ERR(node);
1185                         GOTO(out, rc);
1186                 }
1187                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1188                 rc = ll_tree_lock_iov(tree, node, iov, nr_segs, ast_flags);
1189                 if (rc == 0)
1190                         tree_locked = 1;
1191                 else if (rc == -EUSERS)
1192                         ll_set_file_contended(inode);
1193                 else
1194                         GOTO(out, rc);
1195         }
1196         RETURN(tree_locked);
1197 out:
1198         return rc;
1199 }
1200
1201 /* XXX: exact copy from kernel code (__generic_file_aio_write_nolock from rhel4)
1202  */
1203 static size_t ll_file_get_iov_count(const struct iovec *iov, 
1204                                      unsigned long *nr_segs)
1205 {
1206         size_t count = 0;
1207         unsigned long seg;
1208
1209         for (seg = 0; seg < *nr_segs; seg++) {
1210                 const struct iovec *iv = &iov[seg];
1211
1212                 /*
1213                  * If any segment has a negative length, or the cumulative
1214                  * length ever wraps negative then return -EINVAL.
1215                  */
1216                 count += iv->iov_len;
1217                 if (unlikely((ssize_t)(count|iv->iov_len) < 0))
1218                         return -EINVAL;
1219                 if (access_ok(VERIFY_WRITE, iv->iov_base, iv->iov_len))
1220                         continue;
1221                 if (seg == 0)
1222                         return -EFAULT;
1223                 *nr_segs = seg;
1224                 count -= iv->iov_len;   /* This segment is no good */
1225                 break;
1226         }
1227         return count;
1228 }
1229
1230 static int iov_copy_update(unsigned long *nr_segs, const struct iovec **iov_out,
1231                            unsigned long *nrsegs_copy,
1232                            struct iovec *iov_copy, size_t *offset,
1233                            size_t size)
1234 {
1235         int i;
1236         const struct iovec *iov = *iov_out;
1237         for (i = 0; i < *nr_segs;
1238              i++) {
1239                 const struct iovec *iv = &iov[i];
1240                 struct iovec *ivc = &iov_copy[i];
1241                 *ivc = *iv;
1242                 if (i == 0) {
1243                         ivc->iov_len -= *offset;
1244                         ivc->iov_base += *offset;
1245                 }
1246                 if (ivc->iov_len >= size) {
1247                         ivc->iov_len = size;
1248                         if (i == 0)
1249                                 *offset += size;
1250                         else
1251                                 *offset = size;
1252                         break;
1253                 }
1254                 size -= ivc->iov_len;
1255         }
1256         *iov_out += i;
1257         *nr_segs -= i;
1258         *nrsegs_copy = i + 1;
1259
1260         return 0;
1261 }
1262
1263 static int ll_reget_short_lock(struct page *page, int rw,
1264                                obd_off start, obd_off end,
1265                                void **cookie)
1266 {
1267         struct ll_async_page *llap;
1268         struct obd_export *exp;
1269         struct inode *inode = page->mapping->host;
1270
1271         ENTRY;
1272
1273         exp = ll_i2obdexp(inode);
1274         if (exp == NULL)
1275                 RETURN(0);
1276
1277         llap = llap_cast_private(page);
1278         if (llap == NULL)
1279                 RETURN(0);
1280
1281         RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
1282                                     &llap->llap_cookie, rw, start, end,
1283                                     cookie));
1284 }
1285
1286 static void ll_release_short_lock(struct inode *inode, obd_off end,
1287                                   void *cookie, int rw)
1288 {
1289         struct obd_export *exp;
1290         int rc;
1291
1292         exp = ll_i2obdexp(inode);
1293         if (exp == NULL)
1294                 return;
1295
1296         rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
1297                                     cookie, rw);
1298         if (rc < 0)
1299                 CERROR("unlock failed (%d)\n", rc);
1300 }
1301
1302 static inline int ll_file_get_fast_lock(struct file *file,
1303                                         obd_off ppos, obd_off end,
1304                                         const struct iovec *iov,
1305                                         unsigned long nr_segs,
1306                                         void **cookie, int rw)
1307 {
1308         int rc = 0, seg;
1309         struct page *page;
1310
1311         ENTRY;
1312
1313         /* we would like this read request to be lockfree */
1314         for (seg = 0; seg < nr_segs; seg++) {
1315                 const struct iovec *iv = &iov[seg];
1316                 if (ll_region_mapped((unsigned long)iv->iov_base, iv->iov_len))
1317                         GOTO(out, rc);
1318         }
1319
1320         page = find_lock_page(file->f_dentry->d_inode->i_mapping,
1321                               ppos >> CFS_PAGE_SHIFT);
1322         if (page) {
1323                 if (ll_reget_short_lock(page, rw, ppos, end, cookie))
1324                         rc = 1;
1325
1326                 unlock_page(page);
1327                 page_cache_release(page);
1328         }
1329
1330 out:
1331         RETURN(rc);
1332 }
1333
1334 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
1335                                          void *cookie, int rw)
1336 {
1337         ll_release_short_lock(inode, end, cookie, rw);
1338 }
1339
1340 enum ll_lock_style {
1341         LL_LOCK_STYLE_NOLOCK   = 0,
1342         LL_LOCK_STYLE_FASTLOCK = 1,
1343         LL_LOCK_STYLE_TREELOCK = 2
1344 };
1345
1346 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
1347                                    obd_off end, const struct iovec *iov,
1348                                    unsigned long nr_segs, void **cookie,
1349                                    struct ll_lock_tree *tree, int rw)
1350 {
1351         int rc;
1352
1353         ENTRY;
1354
1355         if (ll_file_get_fast_lock(file, ppos, end, iov, nr_segs, cookie, rw))
1356                 RETURN(LL_LOCK_STYLE_FASTLOCK);
1357
1358         rc = ll_file_get_tree_lock_iov(tree, file, iov, nr_segs,
1359                                        ppos, end, rw);
1360         /* rc: 1 for tree lock, 0 for no lock, <0 for error */
1361         switch (rc) {
1362         case 1:
1363                 RETURN(LL_LOCK_STYLE_TREELOCK);
1364         case 0:
1365                 RETURN(LL_LOCK_STYLE_NOLOCK);
1366         }
1367
1368         /* an error happened if we reached this point, rc = -errno here */
1369         RETURN(rc);
1370 }
1371
1372 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
1373                                     enum ll_lock_style lock_style,
1374                                     void *cookie, struct ll_lock_tree *tree,
1375                                     int rw)
1376
1377 {
1378         switch (lock_style) {
1379         case LL_LOCK_STYLE_TREELOCK:
1380                 ll_tree_unlock(tree);
1381                 break;
1382         case LL_LOCK_STYLE_FASTLOCK:
1383                 ll_file_put_fast_lock(inode, end, cookie, rw);
1384                 break;
1385         default:
1386                 CERROR("invalid locking style (%d)\n", lock_style);
1387         }
1388 }
1389
1390 #ifdef HAVE_FILE_READV
1391 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
1392                               unsigned long nr_segs, loff_t *ppos)
1393 {
1394 #else
1395 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
1396                                 unsigned long nr_segs, loff_t pos)
1397 {
1398         struct file *file = iocb->ki_filp;
1399         loff_t *ppos = &iocb->ki_pos;
1400 #endif
1401         struct inode *inode = file->f_dentry->d_inode;
1402         struct ll_inode_info *lli = ll_i2info(inode);
1403         struct lov_stripe_md *lsm = lli->lli_smd;
1404         struct ll_sb_info *sbi = ll_i2sbi(inode);
1405         struct ll_lock_tree tree;
1406         struct ost_lvb lvb;
1407         struct ll_ra_read bead;
1408         int ra = 0;
1409         obd_off end;
1410         ssize_t retval, chunk, sum = 0;
1411         int lock_style;
1412         struct iovec *iov_copy = NULL;
1413         unsigned long nrsegs_copy, nrsegs_orig = 0;
1414         size_t count, iov_offset = 0;
1415         __u64 kms;
1416         void *cookie;
1417         ENTRY;
1418
1419         count = ll_file_get_iov_count(iov, &nr_segs);
1420         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1421                inode->i_ino, inode->i_generation, inode, count, *ppos);
1422         /* "If nbyte is 0, read() will return 0 and have no other results."
1423          *                      -- Single Unix Spec */
1424         if (count == 0)
1425                 RETURN(0);
1426
1427         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1428
1429         if (!lsm) {
1430                 /* Read on file with no objects should return zero-filled
1431                  * buffers up to file size (we can get non-zero sizes with
1432                  * mknod + truncate, then opening file for read. This is a
1433                  * common pattern in NFS case, it seems). Bug 6243 */
1434                 int notzeroed;
1435                 /* Since there are no objects on OSTs, we have nothing to get
1436                  * lock on and so we are forced to access inode->i_size
1437                  * unguarded */
1438
1439                 /* Read beyond end of file */
1440                 if (*ppos >= i_size_read(inode))
1441                         RETURN(0);
1442
1443                 if (count > i_size_read(inode) - *ppos)
1444                         count = i_size_read(inode) - *ppos;
1445                 /* Make sure to correctly adjust the file pos pointer for
1446                  * EFAULT case */
1447                 for (nrsegs_copy = 0; nrsegs_copy < nr_segs; nrsegs_copy++) {
1448                         const struct iovec *iv = &iov[nrsegs_copy];
1449
1450                         if (count < iv->iov_len)
1451                                 chunk = count;
1452                         else
1453                                 chunk = iv->iov_len;
1454                         notzeroed = clear_user(iv->iov_base, chunk);
1455                         sum += (chunk - notzeroed);
1456                         count -= (chunk - notzeroed);
1457                         if (notzeroed || !count)
1458                                 break;
1459                 }
1460                 *ppos += sum;
1461                 if (!sum)
1462                         RETURN(-EFAULT);
1463                 RETURN(sum);
1464         }
1465
1466 repeat:
1467         if (sbi->ll_max_rw_chunk != 0) {
1468                 /* first, let's know the end of the current stripe */
1469                 end = *ppos;
1470                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END, &end);
1471
1472                 /* correct, the end is beyond the request */
1473                 if (end > *ppos + count - 1)
1474                         end = *ppos + count - 1;
1475
1476                 /* and chunk shouldn't be too large even if striping is wide */
1477                 if (end - *ppos > sbi->ll_max_rw_chunk)
1478                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1479
1480                 chunk = end - *ppos + 1;
1481                 if ((count == chunk) && (iov_offset == 0)) {
1482                         if (iov_copy)
1483                                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1484
1485                         iov_copy = (struct iovec *)iov;
1486                         nrsegs_copy = nr_segs;
1487                 } else {
1488                         if (!iov_copy) {
1489                                 nrsegs_orig = nr_segs;
1490                                 OBD_ALLOC(iov_copy, sizeof(*iov) * nr_segs);
1491                                 if (!iov_copy)
1492                                         GOTO(out, retval = -ENOMEM); 
1493                         }
1494
1495                         iov_copy_update(&nr_segs, &iov, &nrsegs_copy, iov_copy,
1496                                         &iov_offset, chunk);
1497                 }
1498         } else {
1499                 end = *ppos + count - 1;
1500                 iov_copy = (struct iovec *)iov;
1501                 nrsegs_copy = nr_segs;
1502         }
1503
1504         lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
1505                                       iov_copy, nrsegs_copy, &cookie, &tree,
1506                                       OBD_BRW_READ);
1507         if (lock_style < 0)
1508                 GOTO(out, retval = lock_style);
1509
1510         ll_inode_size_lock(inode, 1);
1511         /*
1512          * Consistency guarantees: following possibilities exist for the
1513          * relation between region being read and real file size at this
1514          * moment:
1515          *
1516          *  (A): the region is completely inside of the file;
1517          *
1518          *  (B-x): x bytes of region are inside of the file, the rest is
1519          *  outside;
1520          *
1521          *  (C): the region is completely outside of the file.
1522          *
1523          * This classification is stable under DLM lock acquired by
1524          * ll_tree_lock() above, because to change class, other client has to
1525          * take DLM lock conflicting with our lock. Also, any updates to
1526          * ->i_size by other threads on this client are serialized by
1527          * ll_inode_size_lock(). This guarantees that short reads are handled
1528          * correctly in the face of concurrent writes and truncates.
1529          */
1530         inode_init_lvb(inode, &lvb);
1531         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1532         kms = lvb.lvb_size;
1533         if (*ppos + count - 1 > kms) {
1534                 /* A glimpse is necessary to determine whether we return a
1535                  * short read (B) or some zeroes at the end of the buffer (C) */
1536                 ll_inode_size_unlock(inode, 1);
1537                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1538                 if (retval) {
1539                         if (lock_style != LL_LOCK_STYLE_NOLOCK)
1540                                 ll_file_put_lock(inode, end, lock_style,
1541                                                  cookie, &tree, OBD_BRW_READ);
1542                         goto out;
1543                 }
1544         } else {
1545                 /* region is within kms and, hence, within real file size (A).
1546                  * We need to increase i_size to cover the read region so that
1547                  * generic_file_read() will do its job, but that doesn't mean
1548                  * the kms size is _correct_, it is only the _minimum_ size.
1549                  * If someone does a stat they will get the correct size which
1550                  * will always be >= the kms value here.  b=11081 */
1551                 if (i_size_read(inode) < kms)
1552                         i_size_write(inode, kms);
1553                 ll_inode_size_unlock(inode, 1);
1554         }
1555
1556         chunk = end - *ppos + 1;
1557         CDEBUG(D_INODE,"Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1558                inode->i_ino, chunk, *ppos, i_size_read(inode));
1559
1560         /* turn off the kernel's read-ahead */
1561         if (lock_style != LL_LOCK_STYLE_NOLOCK) {
1562 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1563                 file->f_ramax = 0;
1564 #else
1565                 file->f_ra.ra_pages = 0;
1566 #endif
1567                 /* initialize read-ahead window once per syscall */
1568                 if (ra == 0) {
1569                         ra = 1;
1570                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1571                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1572                         ll_ra_read_in(file, &bead);
1573                 }
1574
1575                 /* BUG: 5972 */
1576                 file_accessed(file);
1577 #ifdef HAVE_FILE_READV
1578                 retval = generic_file_readv(file, iov_copy, nrsegs_copy, ppos);
1579 #else
1580                 retval = generic_file_aio_read(iocb, iov_copy, nrsegs_copy,
1581                                                *ppos);
1582 #endif
1583                 ll_file_put_lock(inode, end, lock_style, cookie,
1584                                  &tree, OBD_BRW_READ);
1585         } else {
1586                 retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy, ppos,
1587                                              READ, chunk);
1588         }
1589         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1590         if (retval > 0) {
1591                 count -= retval;
1592                 sum += retval;
1593                 if (retval == chunk && count > 0)
1594                         goto repeat;
1595         }
1596
1597  out:
1598         if (ra != 0)
1599                 ll_ra_read_ex(file, &bead);
1600         retval = (sum > 0) ? sum : retval;
1601
1602         if (iov_copy && iov_copy != iov)
1603                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1604
1605         RETURN(retval);
1606 }
1607
1608 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1609                             loff_t *ppos)
1610 {
1611         struct iovec local_iov = { .iov_base = (void __user *)buf,
1612                                    .iov_len = count };
1613 #ifdef HAVE_FILE_READV
1614         return ll_file_readv(file, &local_iov, 1, ppos);
1615 #else
1616         struct kiocb kiocb;
1617         ssize_t ret;
1618
1619         init_sync_kiocb(&kiocb, file);
1620         kiocb.ki_pos = *ppos;
1621         kiocb.ki_left = count;
1622
1623         ret = ll_file_aio_read(&kiocb, &local_iov, 1, kiocb.ki_pos);
1624         *ppos = kiocb.ki_pos;
1625         return ret;
1626 #endif
1627 }
1628
1629 /*
1630  * Write to a file (through the page cache).
1631  */
1632 #ifdef HAVE_FILE_WRITEV
1633 static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
1634                               unsigned long nr_segs, loff_t *ppos)
1635 {
1636 #else /* AIO stuff */
1637 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1638                                  unsigned long nr_segs, loff_t pos)
1639 {
1640         struct file *file = iocb->ki_filp;
1641         loff_t *ppos = &iocb->ki_pos;
1642 #endif
1643         struct inode *inode = file->f_dentry->d_inode;
1644         struct ll_sb_info *sbi = ll_i2sbi(inode);
1645         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1646         struct ll_lock_tree tree;
1647         loff_t maxbytes = ll_file_maxbytes(inode);
1648         loff_t lock_start, lock_end, end;
1649         ssize_t retval, chunk, sum = 0;
1650         int tree_locked;
1651         struct iovec *iov_copy = NULL;
1652         unsigned long nrsegs_copy, nrsegs_orig = 0;
1653         size_t count, iov_offset = 0;
1654         ENTRY;
1655
1656         count = ll_file_get_iov_count(iov, &nr_segs);
1657
1658         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1659                inode->i_ino, inode->i_generation, inode, count, *ppos);
1660         
1661         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1662
1663         /* POSIX, but surprised the VFS doesn't check this already */
1664         if (count == 0)
1665                 RETURN(0);
1666
1667         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1668          * called on the file, don't fail the below assertion (bug 2388). */
1669         if (file->f_flags & O_LOV_DELAY_CREATE &&
1670             ll_i2info(inode)->lli_smd == NULL)
1671                 RETURN(-EBADF);
1672
1673         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1674
1675         down(&ll_i2info(inode)->lli_write_sem);
1676
1677 repeat:
1678         chunk = 0; /* just to fix gcc's warning */
1679         end = *ppos + count - 1;
1680
1681         if (file->f_flags & O_APPEND) {
1682                 lock_start = 0;
1683                 lock_end = OBD_OBJECT_EOF;
1684                 iov_copy = (struct iovec *)iov;
1685                 nrsegs_copy = nr_segs;
1686         } else if (sbi->ll_max_rw_chunk != 0) {
1687                 /* first, let's know the end of the current stripe */
1688                 end = *ppos;
1689                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END, 
1690                                 (obd_off *)&end);
1691
1692                 /* correct, the end is beyond the request */
1693                 if (end > *ppos + count - 1)
1694                         end = *ppos + count - 1;
1695
1696                 /* and chunk shouldn't be too large even if striping is wide */
1697                 if (end - *ppos > sbi->ll_max_rw_chunk)
1698                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1699                 lock_start = *ppos;
1700                 lock_end = end;
1701                 chunk = end - *ppos + 1;
1702                 if ((count == chunk) && (iov_offset == 0)) {
1703                         if (iov_copy)
1704                                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1705
1706                         iov_copy = (struct iovec *)iov;
1707                         nrsegs_copy = nr_segs;
1708                 } else {
1709                         if (!iov_copy) {
1710                                 nrsegs_orig = nr_segs;
1711                                 OBD_ALLOC(iov_copy, sizeof(*iov) * nr_segs);
1712                                 if (!iov_copy)
1713                                         GOTO(out, retval = -ENOMEM); 
1714                         }
1715                         iov_copy_update(&nr_segs, &iov, &nrsegs_copy, iov_copy,
1716                                         &iov_offset, chunk);
1717                 }
1718         } else {
1719                 lock_start = *ppos;
1720                 lock_end = end;
1721                 iov_copy = (struct iovec *)iov;
1722                 nrsegs_copy = nr_segs;
1723         }
1724
1725         tree_locked = ll_file_get_tree_lock_iov(&tree, file, iov_copy,
1726                                                 nrsegs_copy,
1727                                                 (obd_off)lock_start,
1728                                                 (obd_off)lock_end,
1729                                                 OBD_BRW_WRITE);
1730         if (tree_locked < 0)
1731                 GOTO(out, retval = tree_locked);
1732
1733         /* This is ok, g_f_w will overwrite this under i_sem if it races
1734          * with a local truncate, it just makes our maxbyte checking easier.
1735          * The i_size value gets updated in ll_extent_lock() as a consequence
1736          * of the [0,EOF] extent lock we requested above. */
1737         if (file->f_flags & O_APPEND) {
1738                 *ppos = i_size_read(inode);
1739                 end = *ppos + count - 1;
1740         }
1741
1742         if (*ppos >= maxbytes) {
1743                 send_sig(SIGXFSZ, current, 0);
1744                 GOTO(out_unlock, retval = -EFBIG);
1745         }
1746         if (end > maxbytes - 1)
1747                 end = maxbytes - 1;
1748
1749         /* generic_file_write handles O_APPEND after getting i_mutex */
1750         chunk = end - *ppos + 1;
1751         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1752                inode->i_ino, chunk, *ppos);
1753         if (tree_locked)
1754 #ifdef HAVE_FILE_WRITEV
1755                 retval = generic_file_writev(file, iov_copy, nrsegs_copy, ppos);
1756 #else
1757                 retval = generic_file_aio_write(iocb, iov_copy, nrsegs_copy,
1758                                                 *ppos);
1759 #endif
1760         else
1761                 retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy,
1762                                              ppos, WRITE, chunk);
1763         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1764
1765 out_unlock:
1766         if (tree_locked)
1767                 ll_tree_unlock(&tree);
1768
1769 out:
1770         if (retval > 0) {
1771                 count -= retval;
1772                 sum += retval;
1773                 if (retval == chunk && count > 0)
1774                         goto repeat;
1775         }
1776
1777         up(&ll_i2info(inode)->lli_write_sem);
1778
1779         if (iov_copy && iov_copy != iov)
1780                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
1781
1782         retval = (sum > 0) ? sum : retval;
1783         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1784                            retval > 0 ? retval : 0);
1785         RETURN(retval);
1786 }
1787
1788 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1789                              loff_t *ppos)
1790 {
1791         struct iovec local_iov = { .iov_base = (void __user *)buf,
1792                                    .iov_len = count };
1793
1794 #ifdef HAVE_FILE_WRITEV
1795         return ll_file_writev(file, &local_iov, 1, ppos);
1796 #else
1797         struct kiocb kiocb;
1798         ssize_t ret;
1799
1800         init_sync_kiocb(&kiocb, file);
1801         kiocb.ki_pos = *ppos;
1802         kiocb.ki_left = count;
1803
1804         ret = ll_file_aio_write(&kiocb, &local_iov, 1, kiocb.ki_pos);
1805         *ppos = kiocb.ki_pos;
1806
1807         return ret;
1808 #endif
1809 }
1810
1811 /*
1812  * Send file content (through pagecache) somewhere with helper
1813  */
1814 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1815 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1816                                 read_actor_t actor, void *target)
1817 {
1818         struct inode *inode = in_file->f_dentry->d_inode;
1819         struct ll_inode_info *lli = ll_i2info(inode);
1820         struct lov_stripe_md *lsm = lli->lli_smd;
1821         struct ll_lock_tree tree;
1822         struct ll_lock_tree_node *node;
1823         struct ost_lvb lvb;
1824         struct ll_ra_read bead;
1825         int rc;
1826         ssize_t retval;
1827         __u64 kms;
1828         ENTRY;
1829         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1830                inode->i_ino, inode->i_generation, inode, count, *ppos);
1831
1832         /* "If nbyte is 0, read() will return 0 and have no other results."
1833          *                      -- Single Unix Spec */
1834         if (count == 0)
1835                 RETURN(0);
1836
1837         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1838         /* turn off the kernel's read-ahead */
1839         in_file->f_ra.ra_pages = 0;
1840
1841         /* File with no objects, nothing to lock */
1842         if (!lsm)
1843                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1844
1845         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1846         if (IS_ERR(node))
1847                 RETURN(PTR_ERR(node));
1848
1849         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1850         rc = ll_tree_lock(&tree, node, NULL, count,
1851                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1852         if (rc != 0)
1853                 RETURN(rc);
1854
1855         ll_clear_file_contended(inode);
1856         ll_inode_size_lock(inode, 1);
1857         /*
1858          * Consistency guarantees: following possibilities exist for the
1859          * relation between region being read and real file size at this
1860          * moment:
1861          *
1862          *  (A): the region is completely inside of the file;
1863          *
1864          *  (B-x): x bytes of region are inside of the file, the rest is
1865          *  outside;
1866          *
1867          *  (C): the region is completely outside of the file.
1868          *
1869          * This classification is stable under DLM lock acquired by
1870          * ll_tree_lock() above, because to change class, other client has to
1871          * take DLM lock conflicting with our lock. Also, any updates to
1872          * ->i_size by other threads on this client are serialized by
1873          * ll_inode_size_lock(). This guarantees that short reads are handled
1874          * correctly in the face of concurrent writes and truncates.
1875          */
1876         inode_init_lvb(inode, &lvb);
1877         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1878         kms = lvb.lvb_size;
1879         if (*ppos + count - 1 > kms) {
1880                 /* A glimpse is necessary to determine whether we return a
1881                  * short read (B) or some zeroes at the end of the buffer (C) */
1882                 ll_inode_size_unlock(inode, 1);
1883                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1884                 if (retval)
1885                         goto out;
1886         } else {
1887                 /* region is within kms and, hence, within real file size (A) */
1888                 i_size_write(inode, kms);
1889                 ll_inode_size_unlock(inode, 1);
1890         }
1891
1892         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1893                inode->i_ino, count, *ppos, i_size_read(inode));
1894
1895         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1896         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1897         ll_ra_read_in(in_file, &bead);
1898         /* BUG: 5972 */
1899         file_accessed(in_file);
1900         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1901         ll_ra_read_ex(in_file, &bead);
1902
1903  out:
1904         ll_tree_unlock(&tree);
1905         RETURN(retval);
1906 }
1907 #endif
1908
1909 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1910                                unsigned long arg)
1911 {
1912         struct ll_inode_info *lli = ll_i2info(inode);
1913         struct obd_export *exp = ll_i2obdexp(inode);
1914         struct ll_recreate_obj ucreatp;
1915         struct obd_trans_info oti = { 0 };
1916         struct obdo *oa = NULL;
1917         int lsm_size;
1918         int rc = 0;
1919         struct lov_stripe_md *lsm, *lsm2;
1920         ENTRY;
1921
1922         if (!capable (CAP_SYS_ADMIN))
1923                 RETURN(-EPERM);
1924
1925         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1926                             sizeof(struct ll_recreate_obj));
1927         if (rc) {
1928                 RETURN(-EFAULT);
1929         }
1930         OBDO_ALLOC(oa);
1931         if (oa == NULL)
1932                 RETURN(-ENOMEM);
1933
1934         down(&lli->lli_size_sem);
1935         lsm = lli->lli_smd;
1936         if (lsm == NULL)
1937                 GOTO(out, rc = -ENOENT);
1938         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1939                    (lsm->lsm_stripe_count));
1940
1941         OBD_ALLOC(lsm2, lsm_size);
1942         if (lsm2 == NULL)
1943                 GOTO(out, rc = -ENOMEM);
1944
1945         oa->o_id = ucreatp.lrc_id;
1946         oa->o_nlink = ucreatp.lrc_ost_idx;
1947         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1948         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
1949         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1950                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1951
1952         memcpy(lsm2, lsm, lsm_size);
1953         rc = obd_create(exp, oa, &lsm2, &oti);
1954
1955         OBD_FREE(lsm2, lsm_size);
1956         GOTO(out, rc);
1957 out:
1958         up(&lli->lli_size_sem);
1959         OBDO_FREE(oa);
1960         return rc;
1961 }
1962
1963 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1964                                     int flags, struct lov_user_md *lum,
1965                                     int lum_size)
1966 {
1967         struct ll_inode_info *lli = ll_i2info(inode);
1968         struct lov_stripe_md *lsm;
1969         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1970         int rc = 0;
1971         ENTRY;
1972
1973         down(&lli->lli_size_sem);
1974         lsm = lli->lli_smd;
1975         if (lsm) {
1976                 up(&lli->lli_size_sem);
1977                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1978                        inode->i_ino);
1979                 RETURN(-EEXIST);
1980         }
1981
1982         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1983         if (rc)
1984                 GOTO(out, rc);
1985         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1986                 GOTO(out_req_free, rc = -ENOENT);
1987         rc = oit.d.lustre.it_status;
1988         if (rc < 0)
1989                 GOTO(out_req_free, rc);
1990
1991         ll_release_openhandle(file->f_dentry, &oit);
1992
1993  out:
1994         up(&lli->lli_size_sem);
1995         ll_intent_release(&oit);
1996         RETURN(rc);
1997 out_req_free:
1998         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1999         goto out;
2000 }
2001
2002 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
2003                              struct lov_mds_md **lmmp, int *lmm_size, 
2004                              struct ptlrpc_request **request)
2005 {
2006         struct ll_sb_info *sbi = ll_i2sbi(inode);
2007         struct ll_fid  fid;
2008         struct mds_body  *body;
2009         struct lov_mds_md *lmm = NULL;
2010         struct ptlrpc_request *req = NULL;
2011         int rc, lmmsize;
2012
2013         ll_inode2fid(&fid, inode);
2014
2015         rc = ll_get_max_mdsize(sbi, &lmmsize);
2016         if (rc)
2017                 RETURN(rc);
2018
2019         rc = mdc_getattr_name(sbi->ll_mdc_exp, &fid,
2020                         filename, strlen(filename) + 1,
2021                         OBD_MD_FLEASIZE | OBD_MD_FLDIREA,
2022                         lmmsize, &req);
2023         if (rc < 0) {
2024                 CDEBUG(D_INFO, "mdc_getattr_name failed "
2025                                 "on %s: rc %d\n", filename, rc);
2026                 GOTO(out, rc);
2027         }
2028
2029         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
2030                         sizeof(*body));
2031         LASSERT(body != NULL); /* checked by mdc_getattr_name */
2032         /* swabbed by mdc_getattr_name */
2033         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF));
2034
2035         lmmsize = body->eadatasize;
2036
2037         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
2038                         lmmsize == 0) {
2039                 GOTO(out, rc = -ENODATA);
2040         }
2041
2042         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
2043                         lmmsize);
2044         LASSERT(lmm != NULL);
2045         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF + 1));
2046
2047         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC)) &&
2048              (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_JOIN))) {
2049                 GOTO(out, rc = -EPROTO);
2050         }
2051         /*
2052          * This is coming from the MDS, so is probably in
2053          * little endian.  We convert it to host endian before
2054          * passing it to userspace.
2055          */
2056         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
2057                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC)) {
2058                         lustre_swab_lov_user_md((struct lov_user_md *)lmm);
2059                         lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
2060                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_JOIN)) {
2061                         lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
2062                 }
2063         }
2064
2065         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
2066                 struct lov_stripe_md *lsm;
2067                 struct lov_user_md_join *lmj;
2068                 int lmj_size, i, aindex = 0;
2069
2070                 rc = obd_unpackmd(sbi->ll_osc_exp, &lsm, lmm, lmmsize);
2071                 if (rc < 0)
2072                         GOTO(out, rc = -ENOMEM);
2073                 rc = obd_checkmd(sbi->ll_osc_exp, sbi->ll_mdc_exp, lsm);
2074                 if (rc)
2075                         GOTO(out_free_memmd, rc);
2076
2077                 lmj_size = sizeof(struct lov_user_md_join) +
2078                         lsm->lsm_stripe_count *
2079                         sizeof(struct lov_user_ost_data_join);
2080                 OBD_ALLOC(lmj, lmj_size);
2081                 if (!lmj)
2082                         GOTO(out_free_memmd, rc = -ENOMEM);
2083
2084                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
2085                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2086                         struct lov_extent *lex =
2087                                 &lsm->lsm_array->lai_ext_array[aindex];
2088
2089                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
2090                                 aindex ++;
2091                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
2092                                         LPU64" len %d\n", aindex, i,
2093                                         lex->le_start, (int)lex->le_len);
2094                         lmj->lmm_objects[i].l_extent_start =
2095                                 lex->le_start;
2096
2097                         if ((int)lex->le_len == -1)
2098                                 lmj->lmm_objects[i].l_extent_end = -1;
2099                         else
2100                                 lmj->lmm_objects[i].l_extent_end =
2101                                         lex->le_start + lex->le_len;
2102                         lmj->lmm_objects[i].l_object_id =
2103                                 lsm->lsm_oinfo[i]->loi_id;
2104                         lmj->lmm_objects[i].l_object_gr =
2105                                 lsm->lsm_oinfo[i]->loi_gr;
2106                         lmj->lmm_objects[i].l_ost_gen =
2107                                 lsm->lsm_oinfo[i]->loi_ost_gen;
2108                         lmj->lmm_objects[i].l_ost_idx =
2109                                 lsm->lsm_oinfo[i]->loi_ost_idx;
2110                 }
2111                 lmm = (struct lov_mds_md *)lmj;
2112                 lmmsize = lmj_size;
2113 out_free_memmd:
2114                 obd_free_memmd(sbi->ll_osc_exp, &lsm);
2115         }
2116 out:
2117         *lmmp = lmm;
2118         *lmm_size = lmmsize;
2119         *request = req;
2120         return rc;
2121 }
2122 static int ll_lov_setea(struct inode *inode, struct file *file,
2123                             unsigned long arg)
2124 {
2125         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
2126         struct lov_user_md  *lump;
2127         int lum_size = sizeof(struct lov_user_md) +
2128                        sizeof(struct lov_user_ost_data);
2129         int rc;
2130         ENTRY;
2131
2132         if (!capable (CAP_SYS_ADMIN))
2133                 RETURN(-EPERM);
2134
2135         OBD_ALLOC(lump, lum_size);
2136         if (lump == NULL) {
2137                 RETURN(-ENOMEM);
2138         }
2139         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2140         if (rc) {
2141                 OBD_FREE(lump, lum_size);
2142                 RETURN(-EFAULT);
2143         }
2144
2145         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2146
2147         OBD_FREE(lump, lum_size);
2148         RETURN(rc);
2149 }
2150
2151 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2152                             unsigned long arg)
2153 {
2154         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
2155         int rc;
2156         int flags = FMODE_WRITE;
2157         ENTRY;
2158
2159         /* Bug 1152: copy properly when this is no longer true */
2160         LASSERT(sizeof(lum) == sizeof(*lump));
2161         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
2162         rc = copy_from_user(&lum, lump, sizeof(lum));
2163         if (rc)
2164                 RETURN(-EFAULT);
2165
2166         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
2167         if (rc == 0) {
2168                  put_user(0, &lump->lmm_stripe_count);
2169                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode),
2170                                     0, ll_i2info(inode)->lli_smd, lump);
2171         }
2172         RETURN(rc);
2173 }
2174
2175 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2176 {
2177         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2178
2179         if (!lsm)
2180                 RETURN(-ENODATA);
2181
2182         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode), 0, lsm,
2183                             (void *)arg);
2184 }
2185
2186 static int ll_get_grouplock(struct inode *inode, struct file *file,
2187                             unsigned long arg)
2188 {
2189         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2190         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2191                                                     .end = OBD_OBJECT_EOF}};
2192         struct lustre_handle lockh = { 0 };
2193         struct ll_inode_info *lli = ll_i2info(inode);
2194         struct lov_stripe_md *lsm = lli->lli_smd;
2195         int flags = 0, rc;
2196         ENTRY;
2197
2198         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2199                 RETURN(-EINVAL);
2200         }
2201
2202         policy.l_extent.gid = arg;
2203         if (file->f_flags & O_NONBLOCK)
2204                 flags = LDLM_FL_BLOCK_NOWAIT;
2205
2206         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2207         if (rc)
2208                 RETURN(rc);
2209
2210         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2211         fd->fd_gid = arg;
2212         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2213
2214         RETURN(0);
2215 }
2216
2217 static int ll_put_grouplock(struct inode *inode, struct file *file,
2218                             unsigned long arg)
2219 {
2220         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2221         struct ll_inode_info *lli = ll_i2info(inode);
2222         struct lov_stripe_md *lsm = lli->lli_smd;
2223         int rc;
2224         ENTRY;
2225
2226         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2227                 /* Ugh, it's already unlocked. */
2228                 RETURN(-EINVAL);
2229         }
2230
2231         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2232                 RETURN(-EINVAL);
2233
2234         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2235
2236         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2237         if (rc)
2238                 RETURN(rc);
2239
2240         fd->fd_gid = 0;
2241         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2242
2243         RETURN(0);
2244 }
2245
2246 static int join_sanity_check(struct inode *head, struct inode *tail)
2247 {
2248         ENTRY;
2249         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2250                 CERROR("server do not support join \n");
2251                 RETURN(-EINVAL);
2252         }
2253         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2254                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2255                        head->i_ino, tail->i_ino);
2256                 RETURN(-EINVAL);
2257         }
2258         if (head->i_ino == tail->i_ino) {
2259                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2260                 RETURN(-EINVAL);
2261         }
2262         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2263                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2264                 RETURN(-EINVAL);
2265         }
2266         RETURN(0);
2267 }
2268
2269 static int join_file(struct inode *head_inode, struct file *head_filp,
2270                      struct file *tail_filp)
2271 {
2272         struct dentry *tail_dentry = tail_filp->f_dentry;
2273         struct lookup_intent oit = {.it_op = IT_OPEN,
2274                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2275         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_PW,
2276                 ll_mdc_blocking_ast, ldlm_completion_ast, NULL, NULL };
2277
2278         struct lustre_handle lockh;
2279         struct mdc_op_data *op_data;
2280         int    rc;
2281         loff_t data;
2282         ENTRY;
2283
2284         tail_dentry = tail_filp->f_dentry;
2285
2286         OBD_ALLOC_PTR(op_data);
2287         if (op_data == NULL) {
2288                 RETURN(-ENOMEM);
2289         }
2290
2291         data = i_size_read(head_inode);
2292         ll_prepare_mdc_op_data(op_data, head_inode,
2293                                tail_dentry->d_parent->d_inode,
2294                                tail_dentry->d_name.name,
2295                                tail_dentry->d_name.len, 0, &data);
2296         rc = mdc_enqueue(ll_i2mdcexp(head_inode), &einfo, &oit,
2297                          op_data, &lockh, NULL, 0, 0);
2298
2299         if (rc < 0)
2300                 GOTO(out, rc);
2301
2302         rc = oit.d.lustre.it_status;
2303
2304         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2305                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2306                 ptlrpc_req_finished((struct ptlrpc_request *)
2307                                     oit.d.lustre.it_data);
2308                 GOTO(out, rc);
2309         }
2310
2311         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2312                                            * away */
2313                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2314                 oit.d.lustre.it_lock_mode = 0;
2315         }
2316         ll_release_openhandle(head_filp->f_dentry, &oit);
2317 out:
2318         if (op_data)
2319                 OBD_FREE_PTR(op_data);
2320         ll_intent_release(&oit);
2321         RETURN(rc);
2322 }
2323
2324 static int ll_file_join(struct inode *head, struct file *filp,
2325                         char *filename_tail)
2326 {
2327         struct inode *tail = NULL, *first = NULL, *second = NULL;
2328         struct dentry *tail_dentry;
2329         struct file *tail_filp, *first_filp, *second_filp;
2330         struct ll_lock_tree first_tree, second_tree;
2331         struct ll_lock_tree_node *first_node, *second_node;
2332         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2333         int rc = 0, cleanup_phase = 0;
2334         ENTRY;
2335
2336         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2337                head->i_ino, head->i_generation, head, filename_tail);
2338
2339         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2340         if (IS_ERR(tail_filp)) {
2341                 CERROR("Can not open tail file %s", filename_tail);
2342                 rc = PTR_ERR(tail_filp);
2343                 GOTO(cleanup, rc);
2344         }
2345         tail = igrab(tail_filp->f_dentry->d_inode);
2346
2347         tlli = ll_i2info(tail);
2348         tail_dentry = tail_filp->f_dentry;
2349         LASSERT(tail_dentry);
2350         cleanup_phase = 1;
2351
2352         /*reorder the inode for lock sequence*/
2353         first = head->i_ino > tail->i_ino ? head : tail;
2354         second = head->i_ino > tail->i_ino ? tail : head;
2355         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2356         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2357
2358         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2359                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2360         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2361         if (IS_ERR(first_node)){
2362                 rc = PTR_ERR(first_node);
2363                 GOTO(cleanup, rc);
2364         }
2365         first_tree.lt_fd = first_filp->private_data;
2366         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2367         if (rc != 0)
2368                 GOTO(cleanup, rc);
2369         cleanup_phase = 2;
2370
2371         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2372         if (IS_ERR(second_node)){
2373                 rc = PTR_ERR(second_node);
2374                 GOTO(cleanup, rc);
2375         }
2376         second_tree.lt_fd = second_filp->private_data;
2377         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2378         if (rc != 0)
2379                 GOTO(cleanup, rc);
2380         cleanup_phase = 3;
2381
2382         rc = join_sanity_check(head, tail);
2383         if (rc)
2384                 GOTO(cleanup, rc);
2385
2386         rc = join_file(head, filp, tail_filp);
2387         if (rc)
2388                 GOTO(cleanup, rc);
2389 cleanup:
2390         switch (cleanup_phase) {
2391         case 3:
2392                 ll_tree_unlock(&second_tree);
2393                 obd_cancel_unused(ll_i2obdexp(second),
2394                                   ll_i2info(second)->lli_smd, 0, NULL);
2395         case 2:
2396                 ll_tree_unlock(&first_tree);
2397                 obd_cancel_unused(ll_i2obdexp(first),
2398                                   ll_i2info(first)->lli_smd, 0, NULL);
2399         case 1:
2400                 filp_close(tail_filp, 0);
2401                 if (tail)
2402                         iput(tail);
2403                 if (head && rc == 0) {
2404                         obd_free_memmd(ll_i2sbi(head)->ll_osc_exp,
2405                                        &hlli->lli_smd);
2406                         hlli->lli_smd = NULL;
2407                 }
2408         case 0:
2409                 break;
2410         default:
2411                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2412                 LBUG();
2413         }
2414         RETURN(rc);
2415 }
2416
2417 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2418 {
2419         struct inode *inode = dentry->d_inode;
2420         struct obd_client_handle *och;
2421         int rc;
2422         ENTRY;
2423
2424         LASSERT(inode);
2425
2426         /* Root ? Do nothing. */
2427         if (dentry->d_inode->i_sb->s_root == dentry)
2428                 RETURN(0);
2429
2430         /* No open handle to close? Move away */
2431         if (!it_disposition(it, DISP_OPEN_OPEN))
2432                 RETURN(0);
2433
2434         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2435
2436         OBD_ALLOC(och, sizeof(*och));
2437         if (!och)
2438                 GOTO(out, rc = -ENOMEM);
2439
2440         ll_och_fill(ll_i2info(inode), it, och);
2441
2442         rc = ll_close_inode_openhandle(inode, och);
2443
2444         OBD_FREE(och, sizeof(*och));
2445  out:
2446         /* this one is in place of ll_file_open */
2447         ptlrpc_req_finished(it->d.lustre.it_data);
2448         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2449         RETURN(rc);
2450 }
2451
2452 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2453                   unsigned long arg)
2454 {
2455         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2456         int flags;
2457         ENTRY;
2458
2459         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2460                inode->i_generation, inode, cmd);
2461         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2462
2463         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2464         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2465                 RETURN(-ENOTTY);
2466
2467         switch(cmd) {
2468         case LL_IOC_GETFLAGS:
2469                 /* Get the current value of the file flags */
2470                 return put_user(fd->fd_flags, (int *)arg);
2471         case LL_IOC_SETFLAGS:
2472         case LL_IOC_CLRFLAGS:
2473                 /* Set or clear specific file flags */
2474                 /* XXX This probably needs checks to ensure the flags are
2475                  *     not abused, and to handle any flag side effects.
2476                  */
2477                 if (get_user(flags, (int *) arg))
2478                         RETURN(-EFAULT);
2479
2480                 if (cmd == LL_IOC_SETFLAGS) {
2481                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2482                             !(file->f_flags & O_DIRECT)) {
2483                                 CERROR("%s: unable to disable locking on "
2484                                        "non-O_DIRECT file\n", current->comm);
2485                                 RETURN(-EINVAL);
2486                         }
2487
2488                         fd->fd_flags |= flags;
2489                 } else {
2490                         fd->fd_flags &= ~flags;
2491                 }
2492                 RETURN(0);
2493         case LL_IOC_LOV_SETSTRIPE:
2494                 RETURN(ll_lov_setstripe(inode, file, arg));
2495         case LL_IOC_LOV_SETEA:
2496                 RETURN(ll_lov_setea(inode, file, arg));
2497         case LL_IOC_LOV_GETSTRIPE:
2498                 RETURN(ll_lov_getstripe(inode, arg));
2499         case LL_IOC_RECREATE_OBJ:
2500                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2501         case EXT3_IOC_GETFLAGS:
2502         case EXT3_IOC_SETFLAGS:
2503                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2504         case EXT3_IOC_GETVERSION_OLD:
2505         case EXT3_IOC_GETVERSION:
2506                 RETURN(put_user(inode->i_generation, (int *)arg));
2507         case LL_IOC_JOIN: {
2508                 char *ftail;
2509                 int rc;
2510
2511                 ftail = getname((const char *)arg);
2512                 if (IS_ERR(ftail))
2513                         RETURN(PTR_ERR(ftail));
2514                 rc = ll_file_join(inode, file, ftail);
2515                 putname(ftail);
2516                 RETURN(rc);
2517         }
2518         case LL_IOC_GROUP_LOCK:
2519                 RETURN(ll_get_grouplock(inode, file, arg));
2520         case LL_IOC_GROUP_UNLOCK:
2521                 RETURN(ll_put_grouplock(inode, file, arg));
2522         case IOC_OBD_STATFS:
2523                 RETURN(ll_obd_statfs(inode, (void *)arg));
2524         case OBD_IOC_GETNAME_OLD:
2525         case OBD_IOC_GETNAME: {
2526                 struct obd_device *obd =
2527                         class_exp2obd(ll_i2sbi(inode)->ll_osc_exp);
2528                 if (!obd)
2529                         RETURN(-EFAULT);
2530                 if (copy_to_user((void *)arg, obd->obd_name,
2531                                 strlen(obd->obd_name) + 1))
2532                         RETURN (-EFAULT);
2533                 RETURN(0);
2534         }
2535
2536         /* We need to special case any other ioctls we want to handle,
2537          * to send them to the MDS/OST as appropriate and to properly
2538          * network encode the arg field.
2539         case EXT3_IOC_SETVERSION_OLD:
2540         case EXT3_IOC_SETVERSION:
2541         */
2542         default: {
2543                 int err;
2544
2545                 if (LLIOC_STOP == 
2546                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2547                         RETURN(err);
2548
2549                 RETURN(obd_iocontrol(cmd, ll_i2obdexp(inode), 0, NULL,
2550                                      (void *)arg));
2551         }
2552         }
2553 }
2554
2555 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2556 {
2557         struct inode *inode = file->f_dentry->d_inode;
2558         struct ll_inode_info *lli = ll_i2info(inode);
2559         struct lov_stripe_md *lsm = lli->lli_smd;
2560         loff_t retval;
2561         ENTRY;
2562         retval = offset + ((origin == 2) ? i_size_read(inode) :
2563                            (origin == 1) ? file->f_pos : 0);
2564         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2565                inode->i_ino, inode->i_generation, inode, retval, retval,
2566                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2567         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2568
2569         if (origin == 2) { /* SEEK_END */
2570                 int nonblock = 0, rc;
2571
2572                 if (file->f_flags & O_NONBLOCK)
2573                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2574
2575                 if (lsm != NULL) {
2576                         rc = ll_glimpse_size(inode, nonblock);
2577                         if (rc != 0)
2578                                 RETURN(rc);
2579                 }
2580
2581                 ll_inode_size_lock(inode, 0);
2582                 offset += i_size_read(inode);
2583                 ll_inode_size_unlock(inode, 0);
2584         } else if (origin == 1) { /* SEEK_CUR */
2585                 offset += file->f_pos;
2586         }
2587
2588         retval = -EINVAL;
2589         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2590                 if (offset != file->f_pos) {
2591                         file->f_pos = offset;
2592 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2593                         file->f_reada = 0;
2594                         file->f_version = ++event;
2595 #else
2596                         file->f_version = 0;
2597 #endif
2598                 }
2599                 retval = offset;
2600         }
2601
2602         RETURN(retval);
2603 }
2604
2605 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2606 {
2607         struct inode *inode = dentry->d_inode;
2608         struct ll_inode_info *lli = ll_i2info(inode);
2609         struct lov_stripe_md *lsm = lli->lli_smd;
2610         struct ll_fid fid;
2611         struct ptlrpc_request *req;
2612         int rc, err;
2613         ENTRY;
2614         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2615                inode->i_generation, inode);
2616         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2617
2618         /* fsync's caller has already called _fdata{sync,write}, we want
2619          * that IO to finish before calling the osc and mdc sync methods */
2620         rc = filemap_fdatawait(inode->i_mapping);
2621
2622         /* catch async errors that were recorded back when async writeback
2623          * failed for pages in this mapping. */
2624         err = lli->lli_async_rc;
2625         lli->lli_async_rc = 0;
2626         if (rc == 0)
2627                 rc = err;
2628         if (lsm) {
2629                 err = lov_test_and_clear_async_rc(lsm);
2630                 if (rc == 0)
2631                         rc = err;
2632         }
2633
2634         ll_inode2fid(&fid, inode);
2635         err = mdc_sync(ll_i2sbi(inode)->ll_mdc_exp, &fid, &req);
2636         if (!rc)
2637                 rc = err;
2638         if (!err)
2639                 ptlrpc_req_finished(req);
2640
2641         if (data && lsm) {
2642                 struct obdo *oa;
2643
2644                 OBDO_ALLOC(oa);
2645                 if (!oa)
2646                         RETURN(rc ? rc : -ENOMEM);
2647
2648                 oa->o_id = lsm->lsm_object_id;
2649                 oa->o_valid = OBD_MD_FLID;
2650                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2651                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2652
2653                 err = obd_sync(ll_i2sbi(inode)->ll_osc_exp, oa, lsm,
2654                                0, OBD_OBJECT_EOF);
2655                 if (!rc)
2656                         rc = err;
2657                 OBDO_FREE(oa);
2658         }
2659
2660         RETURN(rc);
2661 }
2662
2663 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2664 {
2665         struct inode *inode = file->f_dentry->d_inode;
2666         struct ll_sb_info *sbi = ll_i2sbi(inode);
2667         struct ldlm_res_id res_id =
2668                     { .name = {inode->i_ino, inode->i_generation, LDLM_FLOCK} };
2669         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2670                 ldlm_flock_completion_ast, NULL, file_lock };
2671         struct lustre_handle lockh = {0};
2672         ldlm_policy_data_t flock;
2673         int flags = 0;
2674         int rc;
2675         ENTRY;
2676
2677         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2678                inode->i_ino, file_lock);
2679         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2680
2681         if (file_lock->fl_flags & FL_FLOCK) {
2682                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2683                 /* set missing params for flock() calls */
2684                 file_lock->fl_end = OFFSET_MAX;
2685                 file_lock->fl_pid = current->tgid;
2686         }
2687         flock.l_flock.pid = file_lock->fl_pid;
2688         flock.l_flock.start = file_lock->fl_start;
2689         flock.l_flock.end = file_lock->fl_end;
2690
2691         switch (file_lock->fl_type) {
2692         case F_RDLCK:
2693                 einfo.ei_mode = LCK_PR;
2694                 break;
2695         case F_UNLCK:
2696                 /* An unlock request may or may not have any relation to
2697                  * existing locks so we may not be able to pass a lock handle
2698                  * via a normal ldlm_lock_cancel() request. The request may even
2699                  * unlock a byte range in the middle of an existing lock. In
2700                  * order to process an unlock request we need all of the same
2701                  * information that is given with a normal read or write record
2702                  * lock request. To avoid creating another ldlm unlock (cancel)
2703                  * message we'll treat a LCK_NL flock request as an unlock. */
2704                 einfo.ei_mode = LCK_NL;
2705                 break;
2706         case F_WRLCK:
2707                 einfo.ei_mode = LCK_PW;
2708                 break;
2709         default:
2710                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2711                 LBUG();
2712         }
2713
2714         switch (cmd) {
2715         case F_SETLKW:
2716 #ifdef F_SETLKW64
2717         case F_SETLKW64:
2718 #endif
2719                 flags = 0;
2720                 break;
2721         case F_SETLK:
2722 #ifdef F_SETLK64
2723         case F_SETLK64:
2724 #endif
2725                 flags = LDLM_FL_BLOCK_NOWAIT;
2726                 break;
2727         case F_GETLK:
2728 #ifdef F_GETLK64
2729         case F_GETLK64:
2730 #endif
2731                 flags = LDLM_FL_TEST_LOCK;
2732                 /* Save the old mode so that if the mode in the lock changes we
2733                  * can decrement the appropriate reader or writer refcount. */
2734                 file_lock->fl_type = einfo.ei_mode;
2735                 break;
2736         default:
2737                 CERROR("unknown fcntl lock command: %d\n", cmd);
2738                 LBUG();
2739         }
2740
2741         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2742                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2743                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2744
2745         rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, &einfo, res_id,
2746                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2747         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2748                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2749 #ifdef HAVE_F_OP_FLOCK
2750         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2751             !(flags & LDLM_FL_TEST_LOCK))
2752                 posix_lock_file_wait(file, file_lock);
2753 #endif
2754
2755         RETURN(rc);
2756 }
2757
2758 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2759 {
2760         ENTRY;
2761
2762         RETURN(-ENOSYS);
2763 }
2764
2765 int ll_have_md_lock(struct inode *inode, __u64 bits)
2766 {
2767         struct lustre_handle lockh;
2768         struct ldlm_res_id res_id = { .name = {0} };
2769         struct obd_device *obddev;
2770         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2771         int flags;
2772         ENTRY;
2773
2774         if (!inode)
2775                RETURN(0);
2776
2777         obddev = ll_i2mdcexp(inode)->exp_obd;
2778         res_id.name[0] = inode->i_ino;
2779         res_id.name[1] = inode->i_generation;
2780
2781         CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
2782
2783         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2784         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
2785                             &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2786                 RETURN(1);
2787         }
2788
2789         RETURN(0);
2790 }
2791
2792 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2793         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2794                               * and return success */
2795                 inode->i_nlink = 0;
2796                 /* This path cannot be hit for regular files unless in
2797                  * case of obscure races, so no need to to validate
2798                  * size. */
2799                 if (!S_ISREG(inode->i_mode) &&
2800                     !S_ISDIR(inode->i_mode))
2801                         return 0;
2802         }
2803
2804         if (rc) {
2805                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2806                 return -abs(rc);
2807
2808         }
2809
2810         return 0;
2811 }
2812
2813 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2814 {
2815         struct inode *inode = dentry->d_inode;
2816         struct ptlrpc_request *req = NULL;
2817         struct obd_export *exp;
2818         int rc;
2819         ENTRY;
2820
2821         if (!inode) {
2822                 CERROR("REPORT THIS LINE TO PETER\n");
2823                 RETURN(0);
2824         }
2825         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2826                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2827 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
2828         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_REVALIDATE, 1);
2829 #endif
2830
2831         exp = ll_i2mdcexp(inode);
2832
2833         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2834                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2835                 struct mdc_op_data op_data;
2836
2837                 /* Call getattr by fid, so do not provide name at all. */
2838                 ll_prepare_mdc_op_data(&op_data, dentry->d_parent->d_inode,
2839                                        dentry->d_inode, NULL, 0, 0, NULL);
2840                 rc = mdc_intent_lock(exp, &op_data, NULL, 0,
2841                                      /* we are not interested in name
2842                                         based lookup */
2843                                      &oit, 0, &req,
2844                                      ll_mdc_blocking_ast, 0);
2845                 if (rc < 0) {
2846                         rc = ll_inode_revalidate_fini(inode, rc);
2847                         GOTO (out, rc);
2848                 }
2849                 
2850                 rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2851                 if (rc != 0) {
2852                         ll_intent_release(&oit);
2853                         GOTO(out, rc);
2854                 }
2855
2856                 /* Unlinked? Unhash dentry, so it is not picked up later by
2857                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2858                    here to preserve get_cwd functionality on 2.6.
2859                    Bug 10503 */
2860                 if (!dentry->d_inode->i_nlink) {
2861                         spin_lock(&dcache_lock);
2862                         ll_drop_dentry(dentry);
2863                         spin_unlock(&dcache_lock);
2864                 }
2865
2866                 ll_lookup_finish_locks(&oit, dentry);
2867         } else if (!ll_have_md_lock(dentry->d_inode,
2868                                   MDS_INODELOCK_UPDATE|MDS_INODELOCK_LOOKUP)) {
2869                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2870                 struct ll_fid fid;
2871                 obd_valid valid = OBD_MD_FLGETATTR;
2872                 int ealen = 0;
2873
2874                 if (S_ISREG(inode->i_mode)) {
2875                         rc = ll_get_max_mdsize(sbi, &ealen);
2876                         if (rc) 
2877                                 RETURN(rc); 
2878                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2879                 }
2880                 ll_inode2fid(&fid, inode);
2881                 rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req);
2882                 if (rc) {
2883                         rc = ll_inode_revalidate_fini(inode, rc);
2884                         RETURN(rc);
2885                 }
2886
2887                 rc = ll_prep_inode(sbi->ll_osc_exp, &inode, req, REPLY_REC_OFF,
2888                                    NULL);
2889                 if (rc)
2890                         GOTO(out, rc);
2891         }
2892
2893         /* if object not yet allocated, don't validate size */
2894         if (ll_i2info(inode)->lli_smd == NULL) 
2895                 GOTO(out, rc = 0);
2896
2897         /* ll_glimpse_size will prefer locally cached writes if they extend
2898          * the file */
2899         rc = ll_glimpse_size(inode, 0);
2900
2901 out:
2902         ptlrpc_req_finished(req);
2903         RETURN(rc);
2904 }
2905
2906 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2907 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2908                   struct lookup_intent *it, struct kstat *stat)
2909 {
2910         struct inode *inode = de->d_inode;
2911         int res = 0;
2912
2913         res = ll_inode_revalidate_it(de, it);
2914         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2915
2916         if (res)
2917                 return res;
2918
2919         stat->dev = inode->i_sb->s_dev;
2920         stat->ino = inode->i_ino;
2921         stat->mode = inode->i_mode;
2922         stat->nlink = inode->i_nlink;
2923         stat->uid = inode->i_uid;
2924         stat->gid = inode->i_gid;
2925         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2926         stat->atime = inode->i_atime;
2927         stat->mtime = inode->i_mtime;
2928         stat->ctime = inode->i_ctime;
2929 #ifdef HAVE_INODE_BLKSIZE
2930         stat->blksize = inode->i_blksize;
2931 #else
2932         stat->blksize = 1<<inode->i_blkbits;
2933 #endif
2934
2935         ll_inode_size_lock(inode, 0);
2936         stat->size = i_size_read(inode);
2937         stat->blocks = inode->i_blocks;
2938         ll_inode_size_unlock(inode, 0);
2939
2940         return 0;
2941 }
2942 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2943 {
2944         struct lookup_intent it = { .it_op = IT_GETATTR };
2945
2946         return ll_getattr_it(mnt, de, &it, stat);
2947 }
2948 #endif
2949
2950 static
2951 int lustre_check_acl(struct inode *inode, int mask)
2952 {
2953 #ifdef CONFIG_FS_POSIX_ACL
2954         struct ll_inode_info *lli = ll_i2info(inode);
2955         struct posix_acl *acl;
2956         int rc;
2957         ENTRY;
2958
2959         spin_lock(&lli->lli_lock);
2960         acl = posix_acl_dup(lli->lli_posix_acl);
2961         spin_unlock(&lli->lli_lock);
2962
2963         if (!acl)
2964                 RETURN(-EAGAIN);
2965
2966         rc = posix_acl_permission(inode, acl, mask);
2967         posix_acl_release(acl);
2968
2969         RETURN(rc);
2970 #else
2971         return -EAGAIN;
2972 #endif
2973 }
2974
2975 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2976 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2977 {
2978         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2979                inode->i_ino, inode->i_generation, inode, mask);
2980
2981         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2982         return generic_permission(inode, mask, lustre_check_acl);
2983 }
2984 #else
2985 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
2986 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2987 #else
2988 int ll_inode_permission(struct inode *inode, int mask)
2989 #endif
2990 {
2991         int mode = inode->i_mode;
2992         int rc;
2993
2994         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2995                inode->i_ino, inode->i_generation, inode, mask);
2996         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2997
2998         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2999             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
3000                 return -EROFS;
3001         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
3002                 return -EACCES;
3003         if (current->fsuid == inode->i_uid) {
3004                 mode >>= 6;
3005         } else if (1) {
3006                 if (((mode >> 3) & mask & S_IRWXO) != mask)
3007                         goto check_groups;
3008                 rc = lustre_check_acl(inode, mask);
3009                 if (rc == -EAGAIN)
3010                         goto check_groups;
3011                 if (rc == -EACCES)
3012                         goto check_capabilities;
3013                 return rc;
3014         } else {
3015 check_groups:
3016                 if (in_group_p(inode->i_gid))
3017                         mode >>= 3;
3018         }
3019         if ((mode & mask & S_IRWXO) == mask)
3020                 return 0;
3021
3022 check_capabilities:
3023         if (!(mask & MAY_EXEC) ||
3024             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
3025                 if (capable(CAP_DAC_OVERRIDE))
3026                         return 0;
3027
3028         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
3029             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
3030                 return 0;
3031
3032         return -EACCES;
3033 }
3034 #endif
3035
3036 /* -o localflock - only provides locally consistent flock locks */
3037 struct file_operations ll_file_operations = {
3038         .read           = ll_file_read,
3039 #ifdef HAVE_FILE_READV
3040         .readv          = ll_file_readv,
3041 #else
3042         .aio_read       = ll_file_aio_read,
3043 #endif
3044         .write          = ll_file_write,
3045 #ifdef HAVE_FILE_WRITEV
3046         .writev         = ll_file_writev,
3047 #else
3048         .aio_write      = ll_file_aio_write,
3049 #endif
3050         .ioctl          = ll_file_ioctl,
3051         .open           = ll_file_open,
3052         .release        = ll_file_release,
3053         .mmap           = ll_file_mmap,
3054         .llseek         = ll_file_seek,
3055 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
3056         .sendfile       = ll_file_sendfile,
3057 #endif
3058         .fsync          = ll_fsync,
3059 };
3060
3061 struct file_operations ll_file_operations_flock = {
3062         .read           = ll_file_read,
3063 #ifdef HAVE_FILE_READV
3064         .readv          = ll_file_readv,
3065 #else
3066         .aio_read       = ll_file_aio_read,
3067 #endif
3068         .write          = ll_file_write,
3069 #ifdef HAVE_FILE_WRITEV
3070         .writev         = ll_file_writev,
3071 #else   
3072         .aio_write      = ll_file_aio_write,
3073 #endif
3074         .ioctl          = ll_file_ioctl,
3075         .open           = ll_file_open,
3076         .release        = ll_file_release,
3077         .mmap           = ll_file_mmap,
3078         .llseek         = ll_file_seek,
3079 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
3080         .sendfile       = ll_file_sendfile,
3081 #endif
3082         .fsync          = ll_fsync,
3083 #ifdef HAVE_F_OP_FLOCK
3084         .flock          = ll_file_flock,
3085 #endif
3086         .lock           = ll_file_flock
3087 };
3088
3089 /* These are for -o noflock - to return ENOSYS on flock calls */
3090 struct file_operations ll_file_operations_noflock = {
3091         .read           = ll_file_read,
3092 #ifdef HAVE_FILE_READV
3093         .readv          = ll_file_readv,
3094 #else
3095         .aio_read       = ll_file_aio_read,
3096 #endif
3097         .write          = ll_file_write,
3098 #ifdef HAVE_FILE_WRITEV
3099         .writev         = ll_file_writev,
3100 #else   
3101         .aio_write      = ll_file_aio_write,
3102 #endif
3103         .ioctl          = ll_file_ioctl,
3104         .open           = ll_file_open,
3105         .release        = ll_file_release,
3106         .mmap           = ll_file_mmap,
3107         .llseek         = ll_file_seek,
3108 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
3109         .sendfile       = ll_file_sendfile,
3110 #endif
3111         .fsync          = ll_fsync,
3112 #ifdef HAVE_F_OP_FLOCK
3113         .flock          = ll_file_noflock,
3114 #endif
3115         .lock           = ll_file_noflock
3116 };
3117
3118 struct inode_operations ll_file_inode_operations = {
3119 #ifdef HAVE_VFS_INTENT_PATCHES
3120         .setattr_raw    = ll_setattr_raw,
3121 #endif
3122         .setattr        = ll_setattr,
3123         .truncate       = ll_truncate,
3124 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
3125         .getattr        = ll_getattr,
3126 #else
3127         .revalidate_it  = ll_inode_revalidate_it,
3128 #endif
3129         .permission     = ll_inode_permission,
3130         .setxattr       = ll_setxattr,
3131         .getxattr       = ll_getxattr,
3132         .listxattr      = ll_listxattr,
3133         .removexattr    = ll_removexattr,
3134 };
3135
3136 /* dynamic ioctl number support routins */
3137 static struct llioc_ctl_data {
3138         struct rw_semaphore ioc_sem;
3139         struct list_head    ioc_head;
3140 } llioc = { 
3141         __RWSEM_INITIALIZER(llioc.ioc_sem), 
3142         CFS_LIST_HEAD_INIT(llioc.ioc_head)
3143 };
3144
3145
3146 struct llioc_data {
3147         struct list_head        iocd_list;
3148         unsigned int            iocd_size;
3149         llioc_callback_t        iocd_cb;
3150         unsigned int            iocd_count;
3151         unsigned int            iocd_cmd[0];
3152 };
3153
3154 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3155 {
3156         unsigned int size;
3157         struct llioc_data *in_data = NULL;
3158         ENTRY;
3159
3160         if (cb == NULL || cmd == NULL ||
3161             count > LLIOC_MAX_CMD || count < 0)
3162                 RETURN(NULL);
3163
3164         size = sizeof(*in_data) + count * sizeof(unsigned int);
3165         OBD_ALLOC(in_data, size);
3166         if (in_data == NULL)
3167                 RETURN(NULL);
3168
3169         memset(in_data, 0, sizeof(*in_data));
3170         in_data->iocd_size = size;
3171         in_data->iocd_cb = cb;
3172         in_data->iocd_count = count;
3173         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3174
3175         down_write(&llioc.ioc_sem);
3176         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3177         up_write(&llioc.ioc_sem);
3178
3179         RETURN(in_data);
3180 }
3181
3182 void ll_iocontrol_unregister(void *magic)
3183 {
3184         struct llioc_data *tmp;
3185
3186         if (magic == NULL)
3187                 return;
3188
3189         down_write(&llioc.ioc_sem);
3190         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3191                 if (tmp == magic) {
3192                         unsigned int size = tmp->iocd_size;
3193
3194                         list_del(&tmp->iocd_list);
3195                         up_write(&llioc.ioc_sem);
3196
3197                         OBD_FREE(tmp, size);
3198                         return;
3199                 }
3200         }
3201         up_write(&llioc.ioc_sem);
3202
3203         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3204 }
3205
3206 EXPORT_SYMBOL(ll_iocontrol_register);
3207 EXPORT_SYMBOL(ll_iocontrol_unregister);
3208
3209 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
3210                         unsigned int cmd, unsigned long arg, int *rcp)
3211 {
3212         enum llioc_iter ret = LLIOC_CONT;
3213         struct llioc_data *data;
3214         int rc = -EINVAL, i;
3215
3216         down_read(&llioc.ioc_sem);
3217         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3218                 for (i = 0; i < data->iocd_count; i++) {
3219                         if (cmd != data->iocd_cmd[i]) 
3220                                 continue;
3221
3222                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3223                         break;
3224                 }
3225
3226                 if (ret == LLIOC_STOP)
3227                         break;
3228         }
3229         up_read(&llioc.ioc_sem);
3230
3231         if (rcp)
3232                 *rcp = rc;
3233         return ret;
3234 }