Whamcloud - gitweb
land b1_5 onto HEAD
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <linux/pagemap.h>
29 #include <linux/file.h>
30 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
31 #include <linux/lustre_compat25.h>
32 #endif
33 #include "llite_internal.h"
34
35 /* also used by llite/special.c:ll_special_open() */
36 struct ll_file_data *ll_file_data_get(void)
37 {
38         struct ll_file_data *fd;
39
40         OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
41         return fd;
42 }
43
44 static void ll_file_data_put(struct ll_file_data *fd)
45 {
46         if (fd != NULL)
47                 OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof *fd);
48 }
49
50 static int ll_close_inode_openhandle(struct inode *inode,
51                                      struct obd_client_handle *och)
52 {
53         struct ptlrpc_request *req = NULL;
54         struct obd_device *obd;
55         struct obdo *oa;
56         int rc;
57         ENTRY;
58
59         obd = class_exp2obd(ll_i2mdcexp(inode));
60         if (obd == NULL) {
61                 CERROR("Invalid MDC connection handle "LPX64"\n",
62                        ll_i2mdcexp(inode)->exp_handle.h_cookie);
63                 GOTO(out, rc = 0);
64         }
65
66         /*
67          * here we check if this is forced umount. If so this is called on
68          * canceling "open lock" and we do not call mdc_close() in this case, as
69          * it will not be successful, as import is already deactivated.
70          */
71         if (obd->obd_no_recov)
72                 GOTO(out, rc = 0);
73
74         oa = obdo_alloc();
75         if (!oa)
76                 RETURN(-ENOMEM); // XXX We leak openhandle and request here.
77
78         oa->o_id = inode->i_ino;
79         oa->o_valid = OBD_MD_FLID;
80         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
81                                    OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
82                                    OBD_MD_FLATIME | OBD_MD_FLMTIME |
83                                    OBD_MD_FLCTIME);
84         if (0 /* ll_is_inode_dirty(inode) */) {
85                 oa->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
86                 oa->o_valid |= OBD_MD_FLFLAGS;
87         }
88
89         rc = mdc_close(ll_i2mdcexp(inode), oa, och, &req);
90         if (rc == EAGAIN) {
91                 /* We are the last writer, so the MDS has instructed us to get
92                  * the file size and any write cookies, then close again. */
93                 //ll_queue_done_writing(inode);
94                 rc = 0;
95         } else if (rc) {
96                 CERROR("inode %lu mdc close failed: rc = %d\n",
97                        inode->i_ino, rc);
98         }
99
100         obdo_free(oa);
101
102         if (rc == 0) {
103                 rc = ll_objects_destroy(req, inode);
104                 if (rc)
105                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
106                                inode->i_ino, rc);
107         }
108
109         ptlrpc_req_finished(req); /* This is close request */
110         EXIT;
111 out:
112         mdc_clear_open_replay_data(och);
113
114         return rc;
115 }
116
117 int ll_mdc_real_close(struct inode *inode, int flags)
118 {
119         struct ll_inode_info *lli = ll_i2info(inode);
120         int rc = 0;
121         struct obd_client_handle **och_p;
122         struct obd_client_handle *och;
123         __u64 *och_usecount;
124
125         ENTRY;
126
127         if (flags & FMODE_WRITE) {
128                 och_p = &lli->lli_mds_write_och;
129                 och_usecount = &lli->lli_open_fd_write_count;
130         } else if (flags & FMODE_EXEC) {
131                 och_p = &lli->lli_mds_exec_och;
132                 och_usecount = &lli->lli_open_fd_exec_count;
133          } else {
134                 LASSERT(flags & FMODE_READ);
135                 och_p = &lli->lli_mds_read_och;
136                 och_usecount = &lli->lli_open_fd_read_count;
137         }
138
139         down(&lli->lli_och_sem);
140         if (*och_usecount) { /* There are still users of this handle, so
141                                 skip freeing it. */
142                 up(&lli->lli_och_sem);
143                 RETURN(0);
144         }
145         och=*och_p;
146         *och_p = NULL;
147         up(&lli->lli_och_sem);
148
149         if (och) { /* There might be a race and somebody have freed this och
150                       already */
151                 rc = ll_close_inode_openhandle(inode, och);
152                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
153                 OBD_FREE(och, sizeof *och);
154         }
155
156         RETURN(rc);
157 }
158
159 int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
160                         struct file *file)
161 {
162         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
163         struct ll_inode_info *lli = ll_i2info(inode);
164         int rc = 0;
165         ENTRY;
166
167         /* clear group lock, if present */
168         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
169                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
170                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
171                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
172                                       &fd->fd_cwlockh);
173         }
174
175         /* Let's see if we have good enough OPEN lock on the file and if
176            we can skip talking to MDS */
177         if (file->f_dentry->d_inode) { /* Can this ever be false? */
178                 int lockmode;
179                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
180                 struct lustre_handle lockh;
181                 struct inode *inode = file->f_dentry->d_inode;
182                 struct ldlm_res_id file_res_id = {.name={inode->i_ino,
183                                                          inode->i_generation}};
184                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
185
186                 down(&lli->lli_och_sem);
187                 if (fd->fd_omode & FMODE_WRITE) {
188                         lockmode = LCK_CW;
189                         LASSERT(lli->lli_open_fd_write_count);
190                         lli->lli_open_fd_write_count--;
191                 } else if (fd->fd_omode & FMODE_EXEC) {
192                         lockmode = LCK_PR;
193                         LASSERT(lli->lli_open_fd_exec_count);
194                         lli->lli_open_fd_exec_count--;
195                 } else {
196                         lockmode = LCK_CR;
197                         LASSERT(lli->lli_open_fd_read_count);
198                         lli->lli_open_fd_read_count--;
199                 }
200                 up(&lli->lli_och_sem);
201
202                 if (!ldlm_lock_match(mdc_exp->exp_obd->obd_namespace, flags,
203                                      &file_res_id, LDLM_IBITS, &policy,lockmode,
204                                      &lockh)) {
205                         rc = ll_mdc_real_close(file->f_dentry->d_inode,
206                                                 fd->fd_omode);
207                 }
208         } else {
209                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
210                        file, file->f_dentry, file->f_dentry->d_name.name);
211         }
212
213         LUSTRE_FPRIVATE(file) = NULL;
214         ll_file_data_put(fd);
215
216         RETURN(rc);
217 }
218
219 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
220
221 /* While this returns an error code, fput() the caller does not, so we need
222  * to make every effort to clean up all of our state here.  Also, applications
223  * rarely check close errors and even if an error is returned they will not
224  * re-try the close call.
225  */
226 int ll_file_release(struct inode *inode, struct file *file)
227 {
228         struct ll_file_data *fd;
229         struct ll_sb_info *sbi = ll_i2sbi(inode);
230         struct ll_inode_info *lli = ll_i2info(inode);
231         struct lov_stripe_md *lsm = lli->lli_smd;
232         int rc;
233
234         ENTRY;
235         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
236                inode->i_generation, inode);
237         ll_vfs_ops_tally(sbi, VFS_OPS_RELEASE);
238
239         /* don't do anything for / */
240         if (inode->i_sb->s_root == file->f_dentry)
241                 RETURN(0);
242
243         lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_RELEASE);
244
245         fd = LUSTRE_FPRIVATE(file);
246         LASSERT(fd != NULL);
247
248         if (lsm)
249                 lov_test_and_clear_async_rc(lsm);
250         lli->lli_async_rc = 0;
251
252         rc = ll_mdc_close(sbi->ll_mdc_exp, inode, file);
253         RETURN(rc);
254 }
255
256 static int ll_intent_file_open(struct file *file, void *lmm,
257                                int lmmsize, struct lookup_intent *itp)
258 {
259         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
260         struct mdc_op_data data;
261         struct dentry *parent = file->f_dentry->d_parent;
262         const char *name = file->f_dentry->d_name.name;
263         const int len = file->f_dentry->d_name.len;
264         struct inode *inode = file->f_dentry->d_inode;
265         struct ptlrpc_request *req;
266         int rc;
267
268         if (!parent)
269                 RETURN(-ENOENT);
270
271         ll_prepare_mdc_op_data(&data, parent->d_inode, inode, name, len, O_RDWR);
272
273         /* Usually we come here only for NFSD, and we want open lock.
274            But we can also get here with pre 2.6.15 patchless kernels, and in
275            that case that lock is also ok */
276         /* We can also get here if there was cached open handle in revalidate_it
277          * but it disappeared while we were getting from there to ll_file_open.
278          * But this means this file was closed and immediatelly opened which
279          * makes a good candidate for using OPEN lock */
280         /* If lmmsize & lmm are not 0, we are just setting stripe info
281          * parameters. No need for the open lock */
282         if (!lmm && !lmmsize)
283                 itp->it_flags |= MDS_OPEN_LOCK;
284
285         rc = mdc_intent_lock(sbi->ll_mdc_exp, &data, lmm, lmmsize, itp,
286                               0 /*unused */, &req, ll_mdc_blocking_ast, 0);
287         if (rc == -ESTALE) {
288                 ll_release_openhandle(file->f_dentry, itp);
289                 LASSERT(it_disposition(itp, DISP_OPEN_OPEN));
290                 GOTO(out_stale, rc);
291         }
292
293         if (rc != 0) {
294                CERROR("lock enqueue: err: %d\n", rc);
295                GOTO(out, rc);
296         }
297
298         if (itp->d.lustre.it_lock_mode)
299                 mdc_set_lock_data(&itp->d.lustre.it_lock_handle,
300                                   inode);
301
302         rc = ll_prep_inode(sbi->ll_osc_exp, &file->f_dentry->d_inode,
303                            req, DLM_REPLY_REC_OFF, NULL);
304 out:
305         ptlrpc_req_finished(itp->d.lustre.it_data);
306
307 out_stale:
308         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
309         ll_intent_drop_lock(itp);
310
311         RETURN(rc);
312 }
313
314
315 static void ll_och_fill(struct ll_inode_info *lli, struct lookup_intent *it,
316                         struct obd_client_handle *och)
317 {
318         struct ptlrpc_request *req = it->d.lustre.it_data;
319         struct mds_body *body;
320
321         LASSERT(och);
322
323         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
324         LASSERT(body != NULL);                  /* reply already checked out */
325         LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* and swabbed in mdc_enqueue */
326
327         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
328         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
329         lli->lli_io_epoch = body->io_epoch;
330
331         mdc_set_open_replay_data(och, it->d.lustre.it_data);
332 }
333
334 int ll_local_open(struct file *file, struct lookup_intent *it,
335                   struct ll_file_data *fd, struct obd_client_handle *och)
336 {
337         ENTRY;
338
339         LASSERT(!LUSTRE_FPRIVATE(file));
340
341         LASSERT(fd != NULL);
342
343         if (och)
344                 ll_och_fill(ll_i2info(file->f_dentry->d_inode), it, och);
345         LUSTRE_FPRIVATE(file) = fd;
346         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
347         fd->fd_omode = it->it_flags;
348
349         RETURN(0);
350 }
351
352 /* Open a file, and (for the very first open) create objects on the OSTs at
353  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
354  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
355  * lli_open_sem to ensure no other process will create objects, send the
356  * stripe MD to the MDS, or try to destroy the objects if that fails.
357  *
358  * If we already have the stripe MD locally then we don't request it in
359  * mdc_open(), by passing a lmm_size = 0.
360  *
361  * It is up to the application to ensure no other processes open this file
362  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
363  * used.  We might be able to avoid races of that sort by getting lli_open_sem
364  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
365  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
366  */
367 int ll_file_open(struct inode *inode, struct file *file)
368 {
369         struct ll_inode_info *lli = ll_i2info(inode);
370         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
371                                           .it_flags = file->f_flags };
372         struct lov_stripe_md *lsm;
373         struct ptlrpc_request *req = NULL;
374         struct obd_client_handle **och_p;
375         __u64 *och_usecount;
376         struct ll_file_data *fd;
377         int rc = 0;
378         ENTRY;
379
380         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
381                inode->i_generation, inode, file->f_flags);
382         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_OPEN);
383
384         /* don't do anything for / */
385         if (inode->i_sb->s_root == file->f_dentry)
386                 RETURN(0);
387
388 #ifdef LUSTRE_KERNEL_VERSION
389         it = file->f_it;
390 #else
391         it = file->private_data; /* XXX: compat macro */
392         file->private_data = NULL; /* prevent ll_local_open assertion */
393 #endif
394
395         fd = ll_file_data_get();
396         if (fd == NULL)
397                 RETURN(-ENOMEM);
398
399         if (!it || !it->d.lustre.it_disposition) {
400                 /* Convert f_flags into access mode. We cannot use file->f_mode,
401                  * because everything but O_ACCMODE mask was stripped from
402                  * there */
403                 if ((oit.it_flags + 1) & O_ACCMODE)
404                         oit.it_flags++;
405                 if (file->f_flags & O_TRUNC)
406                         oit.it_flags |= FMODE_WRITE;
407
408                 /* kernel only call f_op->open in dentry_open.
409                  * filp_open calls dentry_open after call to open_namei that checks
410                  * for permissions. only nfsd_open call dentry_open directly without
411                  * checking permissions and because of that this code below is safe.
412                  */
413                 if (oit.it_flags & FMODE_WRITE)
414                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
415
416                 /* We do not want O_EXCL here, presumably we opened the file
417                  * already? XXX - NFS implications? */
418                 oit.it_flags &= ~O_EXCL;
419
420                 it = &oit;
421         }
422
423         /* Let's see if we have file open on MDS already. */
424         if (it->it_flags & FMODE_WRITE) {
425                 och_p = &lli->lli_mds_write_och;
426                 och_usecount = &lli->lli_open_fd_write_count;
427         } else if (it->it_flags & FMODE_EXEC) {
428                 och_p = &lli->lli_mds_exec_och;
429                 och_usecount = &lli->lli_open_fd_exec_count;
430          } else {
431                 och_p = &lli->lli_mds_read_och;
432                 och_usecount = &lli->lli_open_fd_read_count;
433         }
434
435         LASSERTF(it->it_flags != 0, "f_it %p dist %d \n", file->f_it,
436                  file->f_it->d.lustre.it_disposition);
437
438         down(&lli->lli_och_sem);
439         if (*och_p) { /* Open handle is present */
440                 if (it_disposition(it, DISP_OPEN_OPEN)) {
441                         /* Well, there's extra open request that we do not need,
442                            let's close it somehow. This will decref request. */
443                         ll_release_openhandle(file->f_dentry, it);
444                 }
445                 (*och_usecount)++;
446
447                 rc = ll_local_open(file, it, fd, NULL);
448
449                 LASSERTF(rc == 0, "rc = %d\n", rc);
450         } else {
451                 LASSERT(*och_usecount == 0);
452                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
453                 if (!*och_p) {
454                         ll_file_data_put(fd);
455                         GOTO(out_och_free, rc = -ENOMEM);
456                 }
457                 (*och_usecount)++;
458                 if (!it->d.lustre.it_disposition) {
459                         rc = ll_intent_file_open(file, NULL, 0, it);
460                         if (rc) {
461                                 ll_file_data_put(fd);
462                                 GOTO(out_och_free, rc);
463                         }
464
465                         /* Got some error? Release the request */
466                         if (it->d.lustre.it_status < 0) {
467                                 req = it->d.lustre.it_data;
468                                 ptlrpc_req_finished(req);
469                         }
470                         mdc_set_lock_data(&it->d.lustre.it_lock_handle,
471                                           file->f_dentry->d_inode);
472                 }
473                 req = it->d.lustre.it_data;
474
475                 /* mdc_intent_lock() didn't get a request ref if there was an
476                  * open error, so don't do cleanup on the request here
477                  * (bug 3430) */
478                 /* XXX (green): Should not we bail out on any error here, not
479                  * just open error? */
480                 rc = it_open_error(DISP_OPEN_OPEN, it);
481                 if (rc) {
482                         ll_file_data_put(fd);
483                         GOTO(out_och_free, rc);
484                 }
485
486                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
487                 rc = ll_local_open(file, it, fd, *och_p);
488                 LASSERTF(rc == 0, "rc = %d\n", rc);
489         }
490         up(&lli->lli_och_sem);
491
492         /* Must do this outside lli_och_sem lock to prevent deadlock where
493            different kind of OPEN lock for this same inode gets cancelled
494            by ldlm_cancel_lru */
495         if (!S_ISREG(inode->i_mode))
496                 GOTO(out, rc);
497
498         lsm = lli->lli_smd;
499         if (lsm == NULL) {
500                 if (file->f_flags & O_LOV_DELAY_CREATE ||
501                     !(file->f_mode & FMODE_WRITE)) {
502                         CDEBUG(D_INODE, "object creation was delayed\n");
503                         GOTO(out, rc);
504                 }
505         }
506         file->f_flags &= ~O_LOV_DELAY_CREATE;
507         GOTO(out, rc);
508  out:
509         ptlrpc_req_finished(req);
510         if (req)
511                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
512         if (rc == 0) {
513                 ll_open_complete(inode);
514         } else {
515 out_och_free:
516                 if (*och_p) {
517                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
518                         *och_p = NULL; /* OBD_FREE writes some magic there */
519                         (*och_usecount)--;
520                 }
521                 up(&lli->lli_och_sem);
522         }
523         return rc;
524 }
525
526 /* Fills the obdo with the attributes for the inode defined by lsm */
527 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
528                    struct obdo *oa)
529 {
530         struct ptlrpc_request_set *set;
531         struct obd_info oinfo = { { { 0 } } };
532         int rc;
533         ENTRY;
534
535         LASSERT(lsm != NULL);
536
537         memset(oa, 0, sizeof *oa);
538         oinfo.oi_md = lsm;
539         oinfo.oi_oa = oa;
540         oa->o_id = lsm->lsm_object_id;
541         oa->o_mode = S_IFREG;
542         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
543                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
544                 OBD_MD_FLCTIME;
545
546         set = ptlrpc_prep_set();
547         if (set == NULL) {
548                 rc = -ENOMEM;
549         } else {
550                 rc = obd_getattr_async(exp, &oinfo, set);
551                 if (rc == 0)
552                         rc = ptlrpc_set_wait(set);
553                 ptlrpc_set_destroy(set);
554         }
555         if (rc)
556                 RETURN(rc);
557
558         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
559                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
560         RETURN(0);
561 }
562
563 static inline void ll_remove_suid(struct inode *inode)
564 {
565         unsigned int mode;
566
567         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
568         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
569
570         /* was any of the uid bits set? */
571         mode &= inode->i_mode;
572         if (mode && !capable(CAP_FSETID)) {
573                 inode->i_mode &= ~mode;
574                 // XXX careful here - we cannot change the size
575         }
576 }
577
578 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
579 {
580         struct ll_inode_info *lli = ll_i2info(inode);
581         struct lov_stripe_md *lsm = lli->lli_smd;
582         struct obd_export *exp = ll_i2obdexp(inode);
583         struct {
584                 char name[16];
585                 struct ldlm_lock *lock;
586                 struct lov_stripe_md *lsm;
587         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
588         __u32 stripe, vallen = sizeof(stripe);
589         int rc;
590         ENTRY;
591
592         if (lsm->lsm_stripe_count == 1)
593                 GOTO(check, stripe = 0);
594
595         /* get our offset in the lov */
596         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
597         if (rc != 0) {
598                 CERROR("obd_get_info: rc = %d\n", rc);
599                 RETURN(rc);
600         }
601         LASSERT(stripe < lsm->lsm_stripe_count);
602
603 check:
604         if (lsm->lsm_oinfo[stripe].loi_id != lock->l_resource->lr_name.name[0]||
605             lsm->lsm_oinfo[stripe].loi_gr != lock->l_resource->lr_name.name[1]){
606                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
607                            lsm->lsm_oinfo[stripe].loi_id,
608                            lsm->lsm_oinfo[stripe].loi_gr);
609                 RETURN(-ELDLM_NO_LOCK_DATA);
610         }
611
612         RETURN(stripe);
613 }
614
615 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
616  * we get a lock cancellation for each stripe, so we have to map the obd's
617  * region back onto the stripes in the file that it held.
618  *
619  * No one can dirty the extent until we've finished our work and they can
620  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
621  * but other kernel actors could have pages locked.
622  *
623  * Called with the DLM lock held. */
624 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
625                               struct ldlm_lock *lock, __u32 stripe)
626 {
627         ldlm_policy_data_t tmpex;
628         unsigned long start, end, count, skip, i, j;
629         struct page *page;
630         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
631         struct lustre_handle lockh;
632         ENTRY;
633
634         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
635         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
636                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
637                inode->i_size);
638
639         /* our locks are page granular thanks to osc_enqueue, we invalidate the
640          * whole page. */
641         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
642             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
643                 LDLM_ERROR(lock, "lock not aligned on CFS_PAGE_SIZE %lu", CFS_PAGE_SIZE);
644         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
645         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
646
647         count = ~0;
648         skip = 0;
649         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
650         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
651         if (lsm->lsm_stripe_count > 1) {
652                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
653                 skip = (lsm->lsm_stripe_count - 1) * count;
654                 start += start/count * skip + stripe * count;
655                 if (end != ~0)
656                         end += end/count * skip + stripe * count;
657         }
658         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
659                 end = ~0;
660
661         i = inode->i_size ? (inode->i_size - 1) >> CFS_PAGE_SHIFT : 0;
662         if (i < end)
663                 end = i;
664
665         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
666                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
667                count, skip, end, discard ? " (DISCARDING)" : "");
668
669         /* walk through the vmas on the inode and tear down mmaped pages that
670          * intersect with the lock.  this stops immediately if there are no
671          * mmap()ed regions of the file.  This is not efficient at all and
672          * should be short lived. We'll associate mmap()ed pages with the lock
673          * and will be able to find them directly */
674         for (i = start; i <= end; i += (j + skip)) {
675                 j = min(count - (i % count), end - i + 1);
676                 LASSERT(j > 0);
677                 LASSERT(inode->i_mapping);
678                 if (ll_teardown_mmaps(inode->i_mapping,
679                                       (__u64)i << CFS_PAGE_SHIFT,
680                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
681                         break;
682         }
683
684         /* this is the simplistic implementation of page eviction at
685          * cancelation.  It is careful to get races with other page
686          * lockers handled correctly.  fixes from bug 20 will make it
687          * more efficient by associating locks with pages and with
688          * batching writeback under the lock explicitly. */
689         for (i = start, j = start % count; i <= end;
690              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
691                 if (j == count) {
692                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
693                         i += skip;
694                         j = 0;
695                         if (i > end)
696                                 break;
697                 }
698                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
699                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
700                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
701                          start, i, end);
702
703                 if (!mapping_has_pages(inode->i_mapping)) {
704                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
705                         break;
706                 }
707
708                 cond_resched();
709
710                 page = find_get_page(inode->i_mapping, i);
711                 if (page == NULL)
712                         continue;
713                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
714                                i, tmpex.l_extent.start);
715                 lock_page(page);
716
717                 /* page->mapping to check with racing against teardown */
718                 if (!discard && clear_page_dirty_for_io(page)) {
719                         rc = ll_call_writepage(inode, page);
720                         if (rc != 0)
721                                 CERROR("writepage of page %p failed: %d\n",
722                                        page, rc);
723                         /* either waiting for io to complete or reacquiring
724                          * the lock that the failed writepage released */
725                         lock_page(page);
726                 }
727
728                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
729                 /* check to see if another DLM lock covers this page  b=2765 */
730                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
731                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
732                                       LDLM_FL_TEST_LOCK,
733                                       &lock->l_resource->lr_name, LDLM_EXTENT,
734                                       &tmpex, LCK_PR | LCK_PW, &lockh);
735                 if (rc2 == 0 && page->mapping != NULL) {
736                         struct ll_async_page *llap = llap_cast_private(page);
737                         // checking again to account for writeback's lock_page()
738                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
739                         if (llap)
740                                 ll_ra_accounting(llap, inode->i_mapping);
741                         ll_truncate_complete_page(page);
742                 }
743                 unlock_page(page);
744                 page_cache_release(page);
745         }
746         LASSERTF(tmpex.l_extent.start <=
747                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
748                   lock->l_policy_data.l_extent.end + 1),
749                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
750                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
751                  start, i, end);
752         EXIT;
753 }
754
755 static int ll_extent_lock_callback(struct ldlm_lock *lock,
756                                    struct ldlm_lock_desc *new, void *data,
757                                    int flag)
758 {
759         struct lustre_handle lockh = { 0 };
760         int rc;
761         ENTRY;
762
763         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
764                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
765                 LBUG();
766         }
767
768         switch (flag) {
769         case LDLM_CB_BLOCKING:
770                 ldlm_lock2handle(lock, &lockh);
771                 rc = ldlm_cli_cancel(&lockh);
772                 if (rc != ELDLM_OK)
773                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
774                 break;
775         case LDLM_CB_CANCELING: {
776                 struct inode *inode;
777                 struct ll_inode_info *lli;
778                 struct lov_stripe_md *lsm;
779                 int stripe;
780                 __u64 kms;
781
782                 /* This lock wasn't granted, don't try to evict pages */
783                 if (lock->l_req_mode != lock->l_granted_mode)
784                         RETURN(0);
785
786                 inode = ll_inode_from_lock(lock);
787                 if (inode == NULL)
788                         RETURN(0);
789                 lli = ll_i2info(inode);
790                 if (lli == NULL)
791                         goto iput;
792                 if (lli->lli_smd == NULL)
793                         goto iput;
794                 lsm = lli->lli_smd;
795
796                 stripe = ll_lock_to_stripe_offset(inode, lock);
797                 if (stripe < 0)
798                         goto iput;
799
800                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
801
802                 lov_stripe_lock(lsm);
803                 lock_res_and_lock(lock);
804                 kms = ldlm_extent_shift_kms(lock,
805                                             lsm->lsm_oinfo[stripe].loi_kms);
806
807                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
808                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
809                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
810                 lsm->lsm_oinfo[stripe].loi_kms = kms;
811                 unlock_res_and_lock(lock);
812                 lov_stripe_unlock(lsm);
813                 //ll_try_done_writing(inode);
814         iput:
815                 iput(inode);
816                 break;
817         }
818         default:
819                 LBUG();
820         }
821
822         RETURN(0);
823 }
824
825 #if 0
826 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
827 {
828         /* XXX ALLOCATE - 160 bytes */
829         struct inode *inode = ll_inode_from_lock(lock);
830         struct ll_inode_info *lli = ll_i2info(inode);
831         struct lustre_handle lockh = { 0 };
832         struct ost_lvb *lvb;
833         int stripe;
834         ENTRY;
835
836         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
837                      LDLM_FL_BLOCK_CONV)) {
838                 LBUG(); /* not expecting any blocked async locks yet */
839                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
840                            "lock, returning");
841                 ldlm_lock_dump(D_OTHER, lock, 0);
842                 ldlm_reprocess_all(lock->l_resource);
843                 RETURN(0);
844         }
845
846         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
847
848         stripe = ll_lock_to_stripe_offset(inode, lock);
849         if (stripe < 0)
850                 goto iput;
851
852         if (lock->l_lvb_len) {
853                 struct lov_stripe_md *lsm = lli->lli_smd;
854                 __u64 kms;
855                 lvb = lock->l_lvb_data;
856                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
857
858                 LOCK_INODE_MUTEX(inode);
859                 lock_res_and_lock(lock);
860                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
861                 kms = ldlm_extent_shift_kms(NULL, kms);
862                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
863                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
864                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
865                 lsm->lsm_oinfo[stripe].loi_kms = kms;
866                 unlock_res_and_lock(lock);
867                 UNLOCK_INODE_MUTEX(inode);
868         }
869
870 iput:
871         iput(inode);
872         wake_up(&lock->l_waitq);
873
874         ldlm_lock2handle(lock, &lockh);
875         ldlm_lock_decref(&lockh, LCK_PR);
876         RETURN(0);
877 }
878 #endif
879
880 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
881 {
882         struct ptlrpc_request *req = reqp;
883         struct inode *inode = ll_inode_from_lock(lock);
884         struct ll_inode_info *lli;
885         struct lov_stripe_md *lsm;
886         struct ost_lvb *lvb;
887         int rc, stripe;
888         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
889         ENTRY;
890
891         if (inode == NULL)
892                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
893         lli = ll_i2info(inode);
894         if (lli == NULL)
895                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
896         lsm = lli->lli_smd;
897         if (lsm == NULL)
898                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
899
900         /* First, find out which stripe index this lock corresponds to. */
901         stripe = ll_lock_to_stripe_offset(inode, lock);
902         if (stripe < 0)
903                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
904
905         rc = lustre_pack_reply(req, 2, size, NULL);
906         if (rc) {
907                 CERROR("lustre_pack_reply: %d\n", rc);
908                 GOTO(iput, rc);
909         }
910
911         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
912         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
913         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
914         lvb->lvb_atime = LTIME_S(inode->i_atime);
915         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
916
917         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
918                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
919                    inode->i_size, stripe, lvb->lvb_size, lvb->lvb_mtime,
920                    lvb->lvb_atime, lvb->lvb_ctime);
921  iput:
922         iput(inode);
923
924  out:
925         /* These errors are normal races, so we don't want to fill the console
926          * with messages by calling ptlrpc_error() */
927         if (rc == -ELDLM_NO_LOCK_DATA)
928                 lustre_pack_reply(req, 1, NULL, NULL);
929
930         req->rq_status = rc;
931         return rc;
932 }
933
934 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
935                      lstat_t *st)
936 {
937         struct lustre_handle lockh = { 0 };
938         struct obd_enqueue_info einfo = { 0 };
939         struct obd_info oinfo = { { { 0 } } };
940         struct ost_lvb lvb;
941         int rc;
942         
943         ENTRY;
944         
945         einfo.ei_type = LDLM_EXTENT;
946         einfo.ei_mode = LCK_PR;
947         einfo.ei_flags = LDLM_FL_HAS_INTENT;
948         einfo.ei_cb_bl = ll_extent_lock_callback;
949         einfo.ei_cb_cp = ldlm_completion_ast;
950         einfo.ei_cb_gl = ll_glimpse_callback;
951         einfo.ei_cbdata = NULL;
952
953         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
954         oinfo.oi_lockh = &lockh;
955         oinfo.oi_md = lsm;
956
957         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
958         if (rc == -ENOENT)
959                 RETURN(rc);
960         if (rc != 0) {
961                 CERROR("obd_enqueue returned rc %d, "
962                        "returning -EIO\n", rc);
963                 RETURN(rc > 0 ? -EIO : rc);
964         }
965         
966         lov_stripe_lock(lsm);
967         memset(&lvb, 0, sizeof(lvb));
968         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 0);
969         st->st_size = lvb.lvb_size;
970         st->st_blocks = lvb.lvb_blocks;
971         st->st_mtime = lvb.lvb_mtime;
972         st->st_atime = lvb.lvb_atime;
973         st->st_ctime = lvb.lvb_ctime;
974         lov_stripe_unlock(lsm);
975         
976         RETURN(rc);
977 }
978
979 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
980  * file (because it prefers KMS over RSS when larger) */
981 int ll_glimpse_size(struct inode *inode, int ast_flags)
982 {
983         struct ll_inode_info *lli = ll_i2info(inode);
984         struct ll_sb_info *sbi = ll_i2sbi(inode);
985         struct lustre_handle lockh = { 0 };
986         struct obd_enqueue_info einfo = { 0 };
987         struct obd_info oinfo = { { { 0 } } };
988         struct ost_lvb lvb;
989         int rc;
990         ENTRY;
991
992         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
993
994         if (!lli->lli_smd) {
995                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
996                 RETURN(0);
997         }
998
999         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1000          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1001          *       won't revoke any conflicting DLM locks held. Instead,
1002          *       ll_glimpse_callback() will be called on each client
1003          *       holding a DLM lock against this file, and resulting size
1004          *       will be returned for each stripe. DLM lock on [0, EOF] is
1005          *       acquired only if there were no conflicting locks. */
1006         einfo.ei_type = LDLM_EXTENT;
1007         einfo.ei_mode = LCK_PR;
1008         einfo.ei_flags = ast_flags | LDLM_FL_HAS_INTENT;
1009         einfo.ei_cb_bl = ll_extent_lock_callback;
1010         einfo.ei_cb_cp = ldlm_completion_ast;
1011         einfo.ei_cb_gl = ll_glimpse_callback;
1012         einfo.ei_cbdata = inode;
1013
1014         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1015         oinfo.oi_lockh = &lockh;
1016         oinfo.oi_md = lli->lli_smd;
1017
1018         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
1019         if (rc == -ENOENT)
1020                 RETURN(rc);
1021         if (rc != 0) {
1022                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1023                 RETURN(rc > 0 ? -EIO : rc);
1024         }
1025
1026         ll_inode_size_lock(inode, 1);
1027         inode_init_lvb(inode, &lvb);
1028         obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0);
1029         inode->i_size = lvb.lvb_size;
1030         inode->i_blocks = lvb.lvb_blocks;
1031         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1032         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1033         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1034         ll_inode_size_unlock(inode, 1);
1035
1036         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1037                inode->i_size, inode->i_blocks);
1038
1039         RETURN(rc);
1040 }
1041
1042 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1043                    struct lov_stripe_md *lsm, int mode,
1044                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1045                    int ast_flags)
1046 {
1047         struct ll_sb_info *sbi = ll_i2sbi(inode);
1048         struct ost_lvb lvb;
1049         struct obd_enqueue_info einfo = { 0 };
1050         struct obd_info oinfo = { { { 0 } } };
1051         int rc;
1052         ENTRY;
1053
1054         LASSERT(!lustre_handle_is_used(lockh));
1055         LASSERT(lsm != NULL);
1056
1057         /* don't drop the mmapped file to LRU */
1058         if (mapping_mapped(inode->i_mapping))
1059                 ast_flags |= LDLM_FL_NO_LRU;
1060
1061         /* XXX phil: can we do this?  won't it screw the file size up? */
1062         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1063             (sbi->ll_flags & LL_SBI_NOLCK))
1064                 RETURN(0);
1065
1066         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1067                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1068
1069         einfo.ei_type = LDLM_EXTENT;
1070         einfo.ei_mode = mode;
1071         einfo.ei_flags = ast_flags;
1072         einfo.ei_cb_bl = ll_extent_lock_callback;
1073         einfo.ei_cb_cp = ldlm_completion_ast;
1074         einfo.ei_cb_gl = ll_glimpse_callback;
1075         einfo.ei_cbdata = inode;
1076
1077         oinfo.oi_policy = *policy;
1078         oinfo.oi_lockh = lockh;
1079         oinfo.oi_md = lsm;
1080
1081         rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo);
1082         *policy = oinfo.oi_policy;
1083         if (rc > 0)
1084                 rc = -EIO;
1085
1086         ll_inode_size_lock(inode, 1);
1087         inode_init_lvb(inode, &lvb);
1088         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 1);
1089
1090         if (policy->l_extent.start == 0 &&
1091             policy->l_extent.end == OBD_OBJECT_EOF) {
1092                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1093                  * the kms under both a DLM lock and the
1094                  * ll_inode_size_lock().  If we don't get the
1095                  * ll_inode_size_lock() here we can match the DLM lock and
1096                  * reset i_size from the kms before the truncating path has
1097                  * updated the kms.  generic_file_write can then trust the
1098                  * stale i_size when doing appending writes and effectively
1099                  * cancel the result of the truncate.  Getting the
1100                  * ll_inode_size_lock() after the enqueue maintains the DLM
1101                  * -> ll_inode_size_lock() acquiring order. */
1102                 inode->i_size = lvb.lvb_size;
1103                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1104                        inode->i_ino, inode->i_size);
1105         }
1106
1107         if (rc == 0) {
1108                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1109                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1110                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1111         }
1112         ll_inode_size_unlock(inode, 1);
1113
1114         RETURN(rc);
1115 }
1116
1117 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1118                      struct lov_stripe_md *lsm, int mode,
1119                      struct lustre_handle *lockh)
1120 {
1121         struct ll_sb_info *sbi = ll_i2sbi(inode);
1122         int rc;
1123         ENTRY;
1124
1125         /* XXX phil: can we do this?  won't it screw the file size up? */
1126         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1127             (sbi->ll_flags & LL_SBI_NOLCK))
1128                 RETURN(0);
1129
1130         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
1131
1132         RETURN(rc);
1133 }
1134
1135 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1136                             loff_t *ppos)
1137 {
1138         struct inode *inode = file->f_dentry->d_inode;
1139         struct ll_inode_info *lli = ll_i2info(inode);
1140         struct lov_stripe_md *lsm = lli->lli_smd;
1141         struct ll_sb_info *sbi = ll_i2sbi(inode);
1142         struct ll_lock_tree tree;
1143         struct ll_lock_tree_node *node;
1144         struct ost_lvb lvb;
1145         struct ll_ra_read bead;
1146         int rc, ra = 0;
1147         loff_t end;
1148         ssize_t retval, chunk, sum = 0;
1149
1150         __u64 kms;
1151         ENTRY;
1152         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1153                inode->i_ino, inode->i_generation, inode, count, *ppos);
1154         ll_vfs_ops_tally(sbi, VFS_OPS_READ);
1155
1156         /* "If nbyte is 0, read() will return 0 and have no other results."
1157          *                      -- Single Unix Spec */
1158         if (count == 0)
1159                 RETURN(0);
1160
1161         lprocfs_counter_add(sbi->ll_stats, LPROC_LL_READ_BYTES, count);
1162
1163         if (!lsm) {
1164                 /* Read on file with no objects should return zero-filled
1165                  * buffers up to file size (we can get non-zero sizes with
1166                  * mknod + truncate, then opening file for read. This is a
1167                  * common pattern in NFS case, it seems). Bug 6243 */
1168                 int notzeroed;
1169                 /* Since there are no objects on OSTs, we have nothing to get
1170                  * lock on and so we are forced to access inode->i_size
1171                  * unguarded */
1172
1173                 /* Read beyond end of file */
1174                 if (*ppos >= inode->i_size)
1175                         RETURN(0);
1176
1177                 if (count > inode->i_size - *ppos)
1178                         count = inode->i_size - *ppos;
1179                 /* Make sure to correctly adjust the file pos pointer for
1180                  * EFAULT case */
1181                 notzeroed = clear_user(buf, count);
1182                 count -= notzeroed;
1183                 *ppos += count;
1184                 if (!count)
1185                         RETURN(-EFAULT);
1186                 RETURN(count);
1187         }
1188
1189 repeat:
1190         if (sbi->ll_max_rw_chunk != 0) {
1191                 /* first, let's know the end of the current stripe */
1192                 end = *ppos;
1193                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END,
1194                                 (obd_off *)&end);
1195
1196                 /* correct, the end is beyond the request */
1197                 if (end > *ppos + count - 1)
1198                         end = *ppos + count - 1;
1199
1200                 /* and chunk shouldn't be too large even if striping is wide */
1201                 if (end - *ppos > sbi->ll_max_rw_chunk)
1202                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1203         } else {
1204                 end = *ppos + count - 1;
1205         }
1206        
1207         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1208         tree.lt_fd = LUSTRE_FPRIVATE(file);
1209         rc = ll_tree_lock(&tree, node, buf, count,
1210                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1211         if (rc != 0)
1212                 GOTO(out, retval = rc);
1213
1214         ll_inode_size_lock(inode, 1);
1215         /*
1216          * Consistency guarantees: following possibilities exist for the
1217          * relation between region being read and real file size at this
1218          * moment:
1219          *
1220          *  (A): the region is completely inside of the file;
1221          *
1222          *  (B-x): x bytes of region are inside of the file, the rest is
1223          *  outside;
1224          *
1225          *  (C): the region is completely outside of the file.
1226          *
1227          * This classification is stable under DLM lock acquired by
1228          * ll_tree_lock() above, because to change class, other client has to
1229          * take DLM lock conflicting with our lock. Also, any updates to
1230          * ->i_size by other threads on this client are serialized by
1231          * ll_inode_size_lock(). This guarantees that short reads are handled
1232          * correctly in the face of concurrent writes and truncates.
1233          */
1234         inode_init_lvb(inode, &lvb);
1235         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1236         kms = lvb.lvb_size;
1237         if (*ppos + count - 1 > kms) {
1238                 /* A glimpse is necessary to determine whether we return a
1239                  * short read (B) or some zeroes at the end of the buffer (C) */
1240                 ll_inode_size_unlock(inode, 1);
1241                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1242                 if (retval) {
1243                         ll_tree_unlock(&tree);
1244                         goto out;
1245                 }
1246         } else {
1247                 /* region is within kms and, hence, within real file size (A).
1248                  * We need to increase i_size to cover the read region so that
1249                  * generic_file_read() will do its job, but that doesn't mean
1250                  * the kms size is _correct_, it is only the _minimum_ size.
1251                  * If someone does a stat they will get the correct size which
1252                  * will always be >= the kms value here.  b=11081 */
1253                 if (inode->i_size < kms)
1254                         inode->i_size = kms;
1255                 ll_inode_size_unlock(inode, 1);
1256         }
1257
1258         chunk = end - *ppos + 1;
1259         CDEBUG(D_INODE,"Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1260                inode->i_ino, chunk, *ppos, inode->i_size);
1261
1262         /* turn off the kernel's read-ahead */
1263 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1264         file->f_ramax = 0;
1265 #else
1266         file->f_ra.ra_pages = 0;
1267 #endif
1268         /* initialize read-ahead window once per syscall */
1269         if (ra == 0) {
1270                 ra = 1;
1271                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1272                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1273                 ll_ra_read_in(file, &bead);
1274         }
1275
1276         /* BUG: 5972 */
1277         file_accessed(file);
1278         retval = generic_file_read(file, buf, chunk, ppos);
1279         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 0);
1280
1281         ll_tree_unlock(&tree);
1282
1283         if (retval > 0) {
1284                 buf += retval;
1285                 count -= retval;
1286                 sum += retval;
1287                 if (retval == chunk && count > 0)
1288                         goto repeat;
1289         }
1290
1291  out:
1292         if (ra != 0)
1293                 ll_ra_read_ex(file, &bead);
1294         retval = (sum > 0) ? sum : retval;
1295         RETURN(retval);
1296 }
1297
1298 /*
1299  * Write to a file (through the page cache).
1300  */
1301 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1302                              loff_t *ppos)
1303 {
1304         struct inode *inode = file->f_dentry->d_inode;
1305         struct ll_sb_info *sbi = ll_i2sbi(inode);
1306         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1307         struct ll_lock_tree tree;
1308         struct ll_lock_tree_node *node;
1309         loff_t maxbytes = ll_file_maxbytes(inode);
1310         loff_t lock_start, lock_end, end;
1311         ssize_t retval, chunk, sum = 0;
1312         int rc;
1313         ENTRY;
1314
1315         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1316                inode->i_ino, inode->i_generation, inode, count, *ppos);
1317         ll_vfs_ops_tally(sbi, VFS_OPS_WRITE);
1318         
1319         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1320
1321         /* POSIX, but surprised the VFS doesn't check this already */
1322         if (count == 0)
1323                 RETURN(0);
1324
1325         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1326          * called on the file, don't fail the below assertion (bug 2388). */
1327         if (file->f_flags & O_LOV_DELAY_CREATE &&
1328             ll_i2info(inode)->lli_smd == NULL)
1329                 RETURN(-EBADF);
1330
1331         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1332
1333         down(&ll_i2info(inode)->lli_write_sem);
1334
1335 repeat:
1336         chunk = 0; /* just to fix gcc's warning */
1337         end = *ppos + count - 1;
1338
1339         if (file->f_flags & O_APPEND) {
1340                 lock_start = 0;
1341                 lock_end = OBD_OBJECT_EOF;
1342         } else if (sbi->ll_max_rw_chunk != 0) {
1343                 /* first, let's know the end of the current stripe */
1344                 end = *ppos;
1345                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END, 
1346                                 (obd_off *)&end);
1347
1348                 /* correct, the end is beyond the request */
1349                 if (end > *ppos + count - 1)
1350                         end = *ppos + count - 1;
1351
1352                 /* and chunk shouldn't be too large even if striping is wide */
1353                 if (end - *ppos > sbi->ll_max_rw_chunk)
1354                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1355                 lock_start = *ppos;
1356                 lock_end = end;
1357         } else {
1358                 lock_start = *ppos;
1359                 lock_end = *ppos + count - 1;
1360         }
1361         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1362
1363         if (IS_ERR(node))
1364                 GOTO(out, retval = PTR_ERR(node));
1365
1366         tree.lt_fd = LUSTRE_FPRIVATE(file);
1367         rc = ll_tree_lock(&tree, node, buf, count,
1368                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1369         if (rc != 0)
1370                 GOTO(out, retval = rc);
1371
1372         /* This is ok, g_f_w will overwrite this under i_sem if it races
1373          * with a local truncate, it just makes our maxbyte checking easier.
1374          * The i_size value gets updated in ll_extent_lock() as a consequence
1375          * of the [0,EOF] extent lock we requested above. */
1376         if (file->f_flags & O_APPEND) {
1377                 *ppos = inode->i_size;
1378                 end = *ppos + count - 1;
1379         }
1380
1381         if (*ppos >= maxbytes) {
1382                 send_sig(SIGXFSZ, current, 0);
1383                 GOTO(out, retval = -EFBIG);
1384         }
1385         if (*ppos + count > maxbytes)
1386                 count = maxbytes - *ppos;
1387
1388         /* generic_file_write handles O_APPEND after getting i_mutex */
1389         chunk = end - *ppos + 1;
1390         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1391                inode->i_ino, chunk, *ppos);
1392         retval = generic_file_write(file, buf, chunk, ppos);
1393         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1394
1395 out:
1396         ll_tree_unlock(&tree);
1397
1398         if (retval > 0) {
1399                 buf += retval;
1400                 count -= retval;
1401                 sum += retval;
1402                 if (retval == chunk && count > 0)
1403                         goto repeat;
1404         }
1405
1406         up(&ll_i2info(inode)->lli_write_sem);
1407
1408         retval = (sum > 0) ? sum : retval;
1409         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WRITE_BYTES,
1410                             retval > 0 ? retval : 0);
1411
1412         if (retval > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) {
1413                 rc = ll_sync_page_range(inode, inode->i_mapping, *ppos - retval,
1414                                         count);
1415                 if (rc < 0)
1416                         retval = rc;
1417         }
1418
1419         RETURN(retval);
1420 }
1421
1422 /*
1423  * Send file content (through pagecache) somewhere with helper
1424  */
1425 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1426 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1427                                 read_actor_t actor, void *target)
1428 {
1429         struct inode *inode = in_file->f_dentry->d_inode;
1430         struct ll_inode_info *lli = ll_i2info(inode);
1431         struct lov_stripe_md *lsm = lli->lli_smd;
1432         struct ll_lock_tree tree;
1433         struct ll_lock_tree_node *node;
1434         struct ost_lvb lvb;
1435         struct ll_ra_read bead;
1436         int rc;
1437         ssize_t retval;
1438         __u64 kms;
1439         ENTRY;
1440         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1441                inode->i_ino, inode->i_generation, inode, count, *ppos);
1442
1443         /* "If nbyte is 0, read() will return 0 and have no other results."
1444          *                      -- Single Unix Spec */
1445         if (count == 0)
1446                 RETURN(0);
1447
1448         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
1449                             count);
1450
1451         /* turn off the kernel's read-ahead */
1452         in_file->f_ra.ra_pages = 0;
1453
1454         /* File with no objects, nothing to lock */
1455         if (!lsm)
1456                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1457
1458         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1459         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1460         rc = ll_tree_lock(&tree, node, NULL, count,
1461                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1462         if (rc != 0)
1463                 RETURN(rc);
1464
1465         ll_inode_size_lock(inode, 1);
1466         /*
1467          * Consistency guarantees: following possibilities exist for the
1468          * relation between region being read and real file size at this
1469          * moment:
1470          *
1471          *  (A): the region is completely inside of the file;
1472          *
1473          *  (B-x): x bytes of region are inside of the file, the rest is
1474          *  outside;
1475          *
1476          *  (C): the region is completely outside of the file.
1477          *
1478          * This classification is stable under DLM lock acquired by
1479          * ll_tree_lock() above, because to change class, other client has to
1480          * take DLM lock conflicting with our lock. Also, any updates to
1481          * ->i_size by other threads on this client are serialized by
1482          * ll_inode_size_lock(). This guarantees that short reads are handled
1483          * correctly in the face of concurrent writes and truncates.
1484          */
1485         inode_init_lvb(inode, &lvb);
1486         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1487         kms = lvb.lvb_size;
1488         if (*ppos + count - 1 > kms) {
1489                 /* A glimpse is necessary to determine whether we return a
1490                  * short read (B) or some zeroes at the end of the buffer (C) */
1491                 ll_inode_size_unlock(inode, 1);
1492                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1493                 if (retval)
1494                         goto out;
1495         } else {
1496                 /* region is within kms and, hence, within real file size (A) */
1497                 inode->i_size = kms;
1498                 ll_inode_size_unlock(inode, 1);
1499         }
1500
1501         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1502                inode->i_ino, count, *ppos, inode->i_size);
1503
1504         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1505         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1506         ll_ra_read_in(in_file, &bead);
1507         /* BUG: 5972 */
1508         file_accessed(in_file);
1509         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1510         ll_ra_read_ex(in_file, &bead);
1511
1512  out:
1513         ll_tree_unlock(&tree);
1514         RETURN(retval);
1515 }
1516 #endif
1517
1518 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1519                                unsigned long arg)
1520 {
1521         struct ll_inode_info *lli = ll_i2info(inode);
1522         struct obd_export *exp = ll_i2obdexp(inode);
1523         struct ll_recreate_obj ucreatp;
1524         struct obd_trans_info oti = { 0 };
1525         struct obdo *oa = NULL;
1526         int lsm_size;
1527         int rc = 0;
1528         struct lov_stripe_md *lsm, *lsm2;
1529         ENTRY;
1530
1531         if (!capable (CAP_SYS_ADMIN))
1532                 RETURN(-EPERM);
1533
1534         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1535                             sizeof(struct ll_recreate_obj));
1536         if (rc) {
1537                 RETURN(-EFAULT);
1538         }
1539         oa = obdo_alloc();
1540         if (oa == NULL)
1541                 RETURN(-ENOMEM);
1542
1543         down(&lli->lli_open_sem);
1544         lsm = lli->lli_smd;
1545         if (lsm == NULL)
1546                 GOTO(out, rc = -ENOENT);
1547         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1548                    (lsm->lsm_stripe_count));
1549
1550         OBD_ALLOC(lsm2, lsm_size);
1551         if (lsm2 == NULL)
1552                 GOTO(out, rc = -ENOMEM);
1553
1554         oa->o_id = ucreatp.lrc_id;
1555         oa->o_nlink = ucreatp.lrc_ost_idx;
1556         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1557         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
1558         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1559                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1560
1561         oti.oti_objid = NULL;
1562         memcpy(lsm2, lsm, lsm_size);
1563         rc = obd_create(exp, oa, &lsm2, &oti);
1564
1565         OBD_FREE(lsm2, lsm_size);
1566         GOTO(out, rc);
1567 out:
1568         up(&lli->lli_open_sem);
1569         obdo_free(oa);
1570         return rc;
1571 }
1572
1573 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1574                                     int flags, struct lov_user_md *lum,
1575                                     int lum_size)
1576 {
1577         struct ll_inode_info *lli = ll_i2info(inode);
1578         struct lov_stripe_md *lsm;
1579         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1580         int rc = 0;
1581         ENTRY;
1582
1583         down(&lli->lli_open_sem);
1584         lsm = lli->lli_smd;
1585         if (lsm) {
1586                 up(&lli->lli_open_sem);
1587                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1588                        inode->i_ino);
1589                 RETURN(-EEXIST);
1590         }
1591
1592         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1593         if (rc)
1594                 GOTO(out, rc);
1595         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1596                 GOTO(out_req_free, rc = -ENOENT);
1597         rc = oit.d.lustre.it_status;
1598         if (rc < 0)
1599                 GOTO(out_req_free, rc);
1600
1601         ll_release_openhandle(file->f_dentry, &oit);
1602
1603  out:
1604         up(&lli->lli_open_sem);
1605         ll_intent_release(&oit);
1606         RETURN(rc);
1607 out_req_free:
1608         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1609         goto out;
1610 }
1611
1612 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1613                              struct lov_mds_md **lmmp, int *lmm_size, 
1614                              struct ptlrpc_request **request)
1615 {
1616         struct ll_sb_info *sbi = ll_i2sbi(inode);
1617         struct ll_fid  fid;
1618         struct mds_body  *body;
1619         struct lov_mds_md *lmm = NULL;
1620         struct ptlrpc_request *req = NULL;
1621         int rc, lmmsize;
1622
1623         ll_inode2fid(&fid, inode);
1624
1625         rc = ll_get_max_mdsize(sbi, &lmmsize);
1626         if (rc)
1627                 RETURN(rc);
1628
1629         rc = mdc_getattr_name(sbi->ll_mdc_exp, &fid,
1630                         filename, strlen(filename) + 1,
1631                         OBD_MD_FLEASIZE | OBD_MD_FLDIREA,
1632                         lmmsize, &req);
1633         if (rc < 0) {
1634                 CDEBUG(D_INFO, "mdc_getattr_name failed "
1635                                 "on %s: rc %d\n", filename, rc);
1636                 GOTO(out, rc);
1637         }
1638
1639         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1640                         sizeof(*body));
1641         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1642         /* swabbed by mdc_getattr_name */
1643         LASSERT_REPSWABBED(req, REPLY_REC_OFF);
1644
1645         lmmsize = body->eadatasize;
1646
1647         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1648                         lmmsize == 0) {
1649                 GOTO(out, rc = -ENODATA);
1650         }
1651
1652         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
1653                         lmmsize);
1654         LASSERT(lmm != NULL);
1655         LASSERT_REPSWABBED(req, REPLY_REC_OFF + 1);
1656
1657         /*
1658          * This is coming from the MDS, so is probably in
1659          * little endian.  We convert it to host endian before
1660          * passing it to userspace.
1661          */
1662         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1663                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1664                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1665         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1666                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1667         }
1668
1669         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1670                 struct lov_stripe_md *lsm;
1671                 struct lov_user_md_join *lmj;
1672                 int lmj_size, i, aindex = 0;
1673
1674                 rc = obd_unpackmd(sbi->ll_osc_exp, &lsm, lmm, lmmsize);
1675                 if (rc < 0)
1676                         GOTO(out, rc = -ENOMEM);
1677                 rc = obd_checkmd(sbi->ll_osc_exp, sbi->ll_mdc_exp, lsm);
1678                 if (rc)
1679                         GOTO(out_free_memmd, rc);
1680
1681                 lmj_size = sizeof(struct lov_user_md_join) +
1682                         lsm->lsm_stripe_count *
1683                         sizeof(struct lov_user_ost_data_join);
1684                 OBD_ALLOC(lmj, lmj_size);
1685                 if (!lmj)
1686                         GOTO(out_free_memmd, rc = -ENOMEM);
1687
1688                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1689                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1690                         struct lov_extent *lex =
1691                                 &lsm->lsm_array->lai_ext_array[aindex];
1692
1693                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1694                                 aindex ++;
1695                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1696                                         LPU64" len %d\n", aindex, i,
1697                                         lex->le_start, (int)lex->le_len);
1698                         lmj->lmm_objects[i].l_extent_start =
1699                                 lex->le_start;
1700
1701                         if ((int)lex->le_len == -1)
1702                                 lmj->lmm_objects[i].l_extent_end = -1;
1703                         else
1704                                 lmj->lmm_objects[i].l_extent_end =
1705                                         lex->le_start + lex->le_len;
1706                         lmj->lmm_objects[i].l_object_id =
1707                                 lsm->lsm_oinfo[i].loi_id;
1708                         lmj->lmm_objects[i].l_object_gr =
1709                                 lsm->lsm_oinfo[i].loi_gr;
1710                         lmj->lmm_objects[i].l_ost_gen =
1711                                 lsm->lsm_oinfo[i].loi_ost_gen;
1712                         lmj->lmm_objects[i].l_ost_idx =
1713                                 lsm->lsm_oinfo[i].loi_ost_idx;
1714                 }
1715                 lmm = (struct lov_mds_md *)lmj;
1716                 lmmsize = lmj_size;
1717 out_free_memmd:
1718                 obd_free_memmd(sbi->ll_osc_exp, &lsm);
1719         }
1720 out:
1721         *lmmp = lmm;
1722         *lmm_size = lmmsize;
1723         *request = req;
1724         return rc;
1725 }
1726 static int ll_lov_setea(struct inode *inode, struct file *file,
1727                             unsigned long arg)
1728 {
1729         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1730         struct lov_user_md  *lump;
1731         int lum_size = sizeof(struct lov_user_md) +
1732                        sizeof(struct lov_user_ost_data);
1733         int rc;
1734         ENTRY;
1735
1736         if (!capable (CAP_SYS_ADMIN))
1737                 RETURN(-EPERM);
1738
1739         OBD_ALLOC(lump, lum_size);
1740         if (lump == NULL) {
1741                 RETURN(-ENOMEM);
1742         }
1743         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1744         if (rc) {
1745                 OBD_FREE(lump, lum_size);
1746                 RETURN(-EFAULT);
1747         }
1748
1749         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1750
1751         OBD_FREE(lump, lum_size);
1752         RETURN(rc);
1753 }
1754
1755 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1756                             unsigned long arg)
1757 {
1758         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1759         int rc;
1760         int flags = FMODE_WRITE;
1761         ENTRY;
1762
1763         /* Bug 1152: copy properly when this is no longer true */
1764         LASSERT(sizeof(lum) == sizeof(*lump));
1765         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1766         rc = copy_from_user(&lum, lump, sizeof(lum));
1767         if (rc)
1768                 RETURN(-EFAULT);
1769
1770         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1771         if (rc == 0) {
1772                  put_user(0, &lump->lmm_stripe_count);
1773                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode),
1774                                     0, ll_i2info(inode)->lli_smd, lump);
1775         }
1776         RETURN(rc);
1777 }
1778
1779 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1780 {
1781         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1782
1783         if (!lsm)
1784                 RETURN(-ENODATA);
1785
1786         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode), 0, lsm,
1787                             (void *)arg);
1788 }
1789
1790 static int ll_get_grouplock(struct inode *inode, struct file *file,
1791                             unsigned long arg)
1792 {
1793         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1794         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1795                                                     .end = OBD_OBJECT_EOF}};
1796         struct lustre_handle lockh = { 0 };
1797         struct ll_inode_info *lli = ll_i2info(inode);
1798         struct lov_stripe_md *lsm = lli->lli_smd;
1799         int flags = 0, rc;
1800         ENTRY;
1801
1802         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1803                 RETURN(-EINVAL);
1804         }
1805
1806         policy.l_extent.gid = arg;
1807         if (file->f_flags & O_NONBLOCK)
1808                 flags = LDLM_FL_BLOCK_NOWAIT;
1809
1810         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1811         if (rc)
1812                 RETURN(rc);
1813
1814         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1815         fd->fd_gid = arg;
1816         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1817
1818         RETURN(0);
1819 }
1820
1821 static int ll_put_grouplock(struct inode *inode, struct file *file,
1822                             unsigned long arg)
1823 {
1824         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1825         struct ll_inode_info *lli = ll_i2info(inode);
1826         struct lov_stripe_md *lsm = lli->lli_smd;
1827         int rc;
1828         ENTRY;
1829
1830         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1831                 /* Ugh, it's already unlocked. */
1832                 RETURN(-EINVAL);
1833         }
1834
1835         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1836                 RETURN(-EINVAL);
1837
1838         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1839
1840         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1841         if (rc)
1842                 RETURN(rc);
1843
1844         fd->fd_gid = 0;
1845         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1846
1847         RETURN(0);
1848 }
1849
1850 static int join_sanity_check(struct inode *head, struct inode *tail)
1851 {
1852         ENTRY;
1853         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
1854                 CERROR("server do not support join \n");
1855                 RETURN(-EINVAL);
1856         }
1857         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
1858                 CERROR("tail ino %lu and ino head %lu must be regular\n",
1859                        head->i_ino, tail->i_ino);
1860                 RETURN(-EINVAL);
1861         }
1862         if (head->i_ino == tail->i_ino) {
1863                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
1864                 RETURN(-EINVAL);
1865         }
1866         if (head->i_size % JOIN_FILE_ALIGN) {
1867                 CERROR("hsize %llu must be times of 64K\n", head->i_size);
1868                 RETURN(-EINVAL);
1869         }
1870         RETURN(0);
1871 }
1872
1873 static int join_file(struct inode *head_inode, struct file *head_filp,
1874                      struct file *tail_filp)
1875 {
1876         struct inode *tail_inode, *tail_parent;
1877         struct dentry *tail_dentry = tail_filp->f_dentry;
1878         struct lookup_intent oit = {.it_op = IT_OPEN,
1879                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
1880         struct lustre_handle lockh;
1881         struct mdc_op_data *op_data;
1882         __u32  hsize = head_inode->i_size >> 32;
1883         __u32  tsize = head_inode->i_size;
1884         int    rc;
1885         ENTRY;
1886
1887         tail_dentry = tail_filp->f_dentry;
1888         tail_inode = tail_dentry->d_inode;
1889         tail_parent = tail_dentry->d_parent->d_inode;
1890
1891         OBD_ALLOC_PTR(op_data);
1892         if (op_data == NULL) {
1893                 RETURN(-ENOMEM);
1894         }
1895
1896         ll_prepare_mdc_op_data(op_data, head_inode, tail_parent,
1897                                tail_dentry->d_name.name,
1898                                tail_dentry->d_name.len, 0);
1899         rc = mdc_enqueue(ll_i2mdcexp(head_inode), LDLM_IBITS, &oit, LCK_PW,
1900                          op_data, &lockh, &tsize, 0, ldlm_completion_ast,
1901                          ll_mdc_blocking_ast, &hsize, 0);
1902
1903         if (rc < 0)
1904                 GOTO(out, rc);
1905
1906         rc = oit.d.lustre.it_status;
1907
1908         if (rc < 0) {
1909                 ptlrpc_req_finished((struct ptlrpc_request *)
1910                                                           oit.d.lustre.it_data);
1911                 GOTO(out, rc);
1912         }
1913
1914         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
1915                                            * away */
1916                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
1917                 oit.d.lustre.it_lock_mode = 0;
1918         }
1919         ll_release_openhandle(head_filp->f_dentry, &oit);
1920 out:
1921         if (op_data)
1922                 OBD_FREE_PTR(op_data);
1923         ll_intent_release(&oit);
1924         RETURN(rc);
1925 }
1926
1927 static int ll_file_join(struct inode *head, struct file *filp,
1928                         char *filename_tail)
1929 {
1930         struct inode *tail = NULL, *first = NULL, *second = NULL;
1931         struct dentry *tail_dentry;
1932         struct file *tail_filp, *first_filp, *second_filp;
1933         struct ll_lock_tree first_tree, second_tree;
1934         struct ll_lock_tree_node *first_node, *second_node;
1935         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
1936         int rc = 0, cleanup_phase = 0;
1937         ENTRY;
1938
1939         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
1940                head->i_ino, head->i_generation, head, filename_tail);
1941
1942         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
1943         if (IS_ERR(tail_filp)) {
1944                 CERROR("Can not open tail file %s", filename_tail);
1945                 rc = PTR_ERR(tail_filp);
1946                 GOTO(cleanup, rc);
1947         }
1948         tail = igrab(tail_filp->f_dentry->d_inode);
1949
1950         tlli = ll_i2info(tail);
1951         tail_dentry = tail_filp->f_dentry;
1952         LASSERT(tail_dentry);
1953         cleanup_phase = 1;
1954
1955         /*reorder the inode for lock sequence*/
1956         first = head->i_ino > tail->i_ino ? head : tail;
1957         second = head->i_ino > tail->i_ino ? tail : head;
1958         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
1959         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
1960
1961         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
1962                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
1963         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
1964         if (IS_ERR(first_node)){
1965                 rc = PTR_ERR(first_node);
1966                 GOTO(cleanup, rc);
1967         }
1968         first_tree.lt_fd = first_filp->private_data;
1969         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
1970         if (rc != 0)
1971                 GOTO(cleanup, rc);
1972         cleanup_phase = 2;
1973
1974         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
1975         if (IS_ERR(second_node)){
1976                 rc = PTR_ERR(second_node);
1977                 GOTO(cleanup, rc);
1978         }
1979         second_tree.lt_fd = second_filp->private_data;
1980         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
1981         if (rc != 0)
1982                 GOTO(cleanup, rc);
1983         cleanup_phase = 3;
1984
1985         rc = join_sanity_check(head, tail);
1986         if (rc)
1987                 GOTO(cleanup, rc);
1988
1989         rc = join_file(head, filp, tail_filp);
1990         if (rc)
1991                 GOTO(cleanup, rc);
1992 cleanup:
1993         switch (cleanup_phase) {
1994         case 3:
1995                 ll_tree_unlock(&second_tree);
1996                 obd_cancel_unused(ll_i2obdexp(second),
1997                                   ll_i2info(second)->lli_smd, 0, NULL);
1998         case 2:
1999                 ll_tree_unlock(&first_tree);
2000                 obd_cancel_unused(ll_i2obdexp(first),
2001                                   ll_i2info(first)->lli_smd, 0, NULL);
2002         case 1:
2003                 filp_close(tail_filp, 0);
2004                 if (tail)
2005                         iput(tail);
2006                 if (head && rc == 0) {
2007                         obd_free_memmd(ll_i2sbi(head)->ll_osc_exp,
2008                                        &hlli->lli_smd);
2009                         hlli->lli_smd = NULL;
2010                 }
2011         case 0:
2012                 break;
2013         default:
2014                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2015                 LBUG();
2016         }
2017         RETURN(rc);
2018 }
2019
2020 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2021 {
2022         struct inode *inode = dentry->d_inode;
2023         struct obd_client_handle *och;
2024         int rc;
2025         ENTRY;
2026
2027         LASSERT(inode);
2028
2029         /* Root ? Do nothing. */
2030         if (dentry->d_inode->i_sb->s_root == dentry)
2031                 RETURN(0);
2032
2033         /* No open handle to close? Move away */
2034         if (!it_disposition(it, DISP_OPEN_OPEN))
2035                 RETURN(0);
2036
2037         OBD_ALLOC(och, sizeof(*och));
2038         if (!och)
2039                 GOTO(out, rc = -ENOMEM);
2040
2041         ll_och_fill(ll_i2info(inode), it, och);
2042
2043         rc = ll_close_inode_openhandle(inode, och);
2044
2045         OBD_FREE(och, sizeof(*och));
2046  out:
2047         /* this one is in place of ll_file_open */
2048         ptlrpc_req_finished(it->d.lustre.it_data);
2049         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2050         RETURN(rc);
2051 }
2052
2053 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2054                   unsigned long arg)
2055 {
2056         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2057         int flags;
2058         ENTRY;
2059
2060         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2061                inode->i_generation, inode, cmd);
2062         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_IOCTL);
2063
2064         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2065         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2066                 RETURN(-ENOTTY);
2067
2068         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_IOCTL);
2069         switch(cmd) {
2070         case LL_IOC_GETFLAGS:
2071                 /* Get the current value of the file flags */
2072                 return put_user(fd->fd_flags, (int *)arg);
2073         case LL_IOC_SETFLAGS:
2074         case LL_IOC_CLRFLAGS:
2075                 /* Set or clear specific file flags */
2076                 /* XXX This probably needs checks to ensure the flags are
2077                  *     not abused, and to handle any flag side effects.
2078                  */
2079                 if (get_user(flags, (int *) arg))
2080                         RETURN(-EFAULT);
2081
2082                 if (cmd == LL_IOC_SETFLAGS) {
2083                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2084                             !(file->f_flags & O_DIRECT)) {
2085                                 CERROR("%s: unable to disable locking on "
2086                                        "non-O_DIRECT file\n", current->comm);
2087                                 RETURN(-EINVAL);
2088                         }
2089
2090                         fd->fd_flags |= flags;
2091                 } else {
2092                         fd->fd_flags &= ~flags;
2093                 }
2094                 RETURN(0);
2095         case LL_IOC_LOV_SETSTRIPE:
2096                 RETURN(ll_lov_setstripe(inode, file, arg));
2097         case LL_IOC_LOV_SETEA:
2098                 RETURN(ll_lov_setea(inode, file, arg));
2099         case LL_IOC_LOV_GETSTRIPE:
2100                 RETURN(ll_lov_getstripe(inode, arg));
2101         case LL_IOC_RECREATE_OBJ:
2102                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2103         case EXT3_IOC_GETFLAGS:
2104         case EXT3_IOC_SETFLAGS:
2105                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2106         case EXT3_IOC_GETVERSION_OLD:
2107         case EXT3_IOC_GETVERSION:
2108                 RETURN(put_user(inode->i_generation, (int *)arg));
2109         case LL_IOC_JOIN: {
2110                 char *ftail;
2111                 int rc;
2112
2113                 ftail = getname((const char *)arg);
2114                 if (IS_ERR(ftail))
2115                         RETURN(PTR_ERR(ftail));
2116                 rc = ll_file_join(inode, file, ftail);
2117                 putname(ftail);
2118                 RETURN(rc);
2119         }
2120         case LL_IOC_GROUP_LOCK:
2121                 RETURN(ll_get_grouplock(inode, file, arg));
2122         case LL_IOC_GROUP_UNLOCK:
2123                 RETURN(ll_put_grouplock(inode, file, arg));
2124         case IOC_OBD_STATFS:
2125                 RETURN(ll_obd_statfs(inode, (void *)arg));
2126
2127         /* We need to special case any other ioctls we want to handle,
2128          * to send them to the MDS/OST as appropriate and to properly
2129          * network encode the arg field.
2130         case EXT3_IOC_SETVERSION_OLD:
2131         case EXT3_IOC_SETVERSION:
2132         */
2133         default:
2134                 RETURN(obd_iocontrol(cmd, ll_i2obdexp(inode), 0, NULL,
2135                                      (void *)arg));
2136         }
2137 }
2138
2139 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2140 {
2141         struct inode *inode = file->f_dentry->d_inode;
2142         struct ll_inode_info *lli = ll_i2info(inode);
2143         struct lov_stripe_md *lsm = lli->lli_smd;
2144         loff_t retval;
2145         ENTRY;
2146         retval = offset + ((origin == 2) ? inode->i_size :
2147                            (origin == 1) ? file->f_pos : 0);
2148         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2149                inode->i_ino, inode->i_generation, inode, retval, retval,
2150                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2151         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_SEEK);
2152         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
2153         
2154         if (origin == 2) { /* SEEK_END */
2155                 int nonblock = 0, rc;
2156
2157                 if (file->f_flags & O_NONBLOCK)
2158                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2159
2160                 if (lsm != NULL) {
2161                         rc = ll_glimpse_size(inode, nonblock);
2162                         if (rc != 0)
2163                                 RETURN(rc);
2164                 }
2165
2166                 ll_inode_size_lock(inode, 0);
2167                 offset += inode->i_size;
2168                 ll_inode_size_unlock(inode, 0);
2169         } else if (origin == 1) { /* SEEK_CUR */
2170                 offset += file->f_pos;
2171         }
2172
2173         retval = -EINVAL;
2174         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2175                 if (offset != file->f_pos) {
2176                         file->f_pos = offset;
2177 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2178                         file->f_reada = 0;
2179                         file->f_version = ++event;
2180 #endif
2181                 }
2182                 retval = offset;
2183         }
2184
2185         RETURN(retval);
2186 }
2187
2188 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2189 {
2190         struct inode *inode = dentry->d_inode;
2191         struct ll_inode_info *lli = ll_i2info(inode);
2192         struct lov_stripe_md *lsm = lli->lli_smd;
2193         struct ll_fid fid;
2194         struct ptlrpc_request *req;
2195         int rc, err;
2196         ENTRY;
2197         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2198                inode->i_generation, inode);
2199         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_FSYNC);
2200         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_FSYNC);
2201
2202         /* fsync's caller has already called _fdata{sync,write}, we want
2203          * that IO to finish before calling the osc and mdc sync methods */
2204         rc = filemap_fdatawait(inode->i_mapping);
2205
2206         /* catch async errors that were recorded back when async writeback
2207          * failed for pages in this mapping. */
2208         err = lli->lli_async_rc;
2209         lli->lli_async_rc = 0;
2210         if (rc == 0)
2211                 rc = err;
2212         if (lsm) {
2213                 err = lov_test_and_clear_async_rc(lsm);
2214                 if (rc == 0)
2215                         rc = err;
2216         }
2217
2218         ll_inode2fid(&fid, inode);
2219         err = mdc_sync(ll_i2sbi(inode)->ll_mdc_exp, &fid, &req);
2220         if (!rc)
2221                 rc = err;
2222         if (!err)
2223                 ptlrpc_req_finished(req);
2224
2225         if (data && lsm) {
2226                 struct obdo *oa = obdo_alloc();
2227
2228                 if (!oa)
2229                         RETURN(rc ? rc : -ENOMEM);
2230
2231                 oa->o_id = lsm->lsm_object_id;
2232                 oa->o_valid = OBD_MD_FLID;
2233                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2234                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2235
2236                 err = obd_sync(ll_i2sbi(inode)->ll_osc_exp, oa, lsm,
2237                                0, OBD_OBJECT_EOF);
2238                 if (!rc)
2239                         rc = err;
2240                 obdo_free(oa);
2241         }
2242
2243         RETURN(rc);
2244 }
2245
2246 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2247 {
2248         struct inode *inode = file->f_dentry->d_inode;
2249         struct ll_sb_info *sbi = ll_i2sbi(inode);
2250         struct ldlm_res_id res_id =
2251                     { .name = {inode->i_ino, inode->i_generation, LDLM_FLOCK} };
2252         struct lustre_handle lockh = {0};
2253         ldlm_policy_data_t flock;
2254         ldlm_mode_t mode = 0;
2255         int flags = 0;
2256         int rc;
2257         ENTRY;
2258
2259         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2260                inode->i_ino, file_lock);
2261         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_FLOCK);
2262
2263         if (file_lock->fl_flags & FL_FLOCK) {
2264                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2265                 /* set missing params for flock() calls */
2266                 file_lock->fl_end = OFFSET_MAX;
2267                 file_lock->fl_pid = current->tgid;
2268         }
2269         flock.l_flock.pid = file_lock->fl_pid;
2270         flock.l_flock.start = file_lock->fl_start;
2271         flock.l_flock.end = file_lock->fl_end;
2272
2273         switch (file_lock->fl_type) {
2274         case F_RDLCK:
2275                 mode = LCK_PR;
2276                 break;
2277         case F_UNLCK:
2278                 /* An unlock request may or may not have any relation to
2279                  * existing locks so we may not be able to pass a lock handle
2280                  * via a normal ldlm_lock_cancel() request. The request may even
2281                  * unlock a byte range in the middle of an existing lock. In
2282                  * order to process an unlock request we need all of the same
2283                  * information that is given with a normal read or write record
2284                  * lock request. To avoid creating another ldlm unlock (cancel)
2285                  * message we'll treat a LCK_NL flock request as an unlock. */
2286                 mode = LCK_NL;
2287                 break;
2288         case F_WRLCK:
2289                 mode = LCK_PW;
2290                 break;
2291         default:
2292                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2293                 LBUG();
2294         }
2295
2296         switch (cmd) {
2297         case F_SETLKW:
2298 #ifdef F_SETLKW64
2299         case F_SETLKW64:
2300 #endif
2301                 flags = 0;
2302                 break;
2303         case F_SETLK:
2304 #ifdef F_SETLK64
2305         case F_SETLK64:
2306 #endif
2307                 flags = LDLM_FL_BLOCK_NOWAIT;
2308                 break;
2309         case F_GETLK:
2310 #ifdef F_GETLK64
2311         case F_GETLK64:
2312 #endif
2313                 flags = LDLM_FL_TEST_LOCK;
2314                 /* Save the old mode so that if the mode in the lock changes we
2315                  * can decrement the appropriate reader or writer refcount. */
2316                 file_lock->fl_type = mode;
2317                 break;
2318         default:
2319                 CERROR("unknown fcntl lock command: %d\n", cmd);
2320                 LBUG();
2321         }
2322
2323         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2324                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2325                flags, mode, flock.l_flock.start, flock.l_flock.end);
2326
2327         rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, res_id,
2328                               LDLM_FLOCK, &flock, mode, &flags, NULL,
2329                               ldlm_flock_completion_ast, NULL, file_lock,
2330                               NULL, 0, NULL, &lockh, 0);
2331         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2332                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2333 #ifdef HAVE_F_OP_FLOCK
2334         if ((file_lock->fl_flags & FL_POSIX) &&(rc == 0))
2335                 posix_lock_file_wait(file, file_lock);
2336 #endif
2337
2338         RETURN(rc);
2339 }
2340
2341 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2342 {
2343         ENTRY;
2344
2345         RETURN(-ENOSYS);
2346 }
2347
2348 int ll_have_md_lock(struct inode *inode, __u64 bits)
2349 {
2350         struct lustre_handle lockh;
2351         struct ldlm_res_id res_id = { .name = {0} };
2352         struct obd_device *obddev;
2353         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2354         int flags;
2355         ENTRY;
2356
2357         if (!inode)
2358                RETURN(0);
2359
2360         obddev = ll_i2mdcexp(inode)->exp_obd;
2361         res_id.name[0] = inode->i_ino;
2362         res_id.name[1] = inode->i_generation;
2363
2364         CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
2365
2366         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2367         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
2368                             &policy, LCK_CR|LCK_CW|LCK_PR, &lockh)) {
2369                 RETURN(1);
2370         }
2371
2372         RETURN(0);
2373 }
2374
2375 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2376         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2377                               * and return success */
2378                 inode->i_nlink = 0;
2379                 /* This path cannot be hit for regular files unless in
2380                  * case of obscure races, so no need to to validate
2381                  * size. */
2382                 if (!S_ISREG(inode->i_mode) &&
2383                     !S_ISDIR(inode->i_mode))
2384                         return 0;
2385         }
2386
2387         if (rc) {
2388                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2389                 return -abs(rc);
2390
2391         }
2392
2393         return 0;
2394 }
2395
2396 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2397 {
2398         struct inode *inode = dentry->d_inode;
2399         struct ptlrpc_request *req = NULL;
2400         struct obd_export *exp;
2401         int rc;
2402         ENTRY;
2403
2404         if (!inode) {
2405                 CERROR("REPORT THIS LINE TO PETER\n");
2406                 RETURN(0);
2407         }
2408         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2409                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2410 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
2411         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE);
2412 #endif
2413
2414         exp = ll_i2mdcexp(inode);
2415
2416         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2417                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2418                 struct mdc_op_data op_data;
2419
2420                 /* Call getattr by fid, so do not provide name at all. */
2421                 ll_prepare_mdc_op_data(&op_data, dentry->d_parent->d_inode,
2422                                        dentry->d_inode, NULL, 0, 0);
2423                 rc = mdc_intent_lock(exp, &op_data, NULL, 0,
2424                                      /* we are not interested in name
2425                                         based lookup */
2426                                      &oit, 0, &req,
2427                                      ll_mdc_blocking_ast, 0);
2428                 if (rc < 0) {
2429                         rc = ll_inode_revalidate_fini(inode, rc);
2430                         GOTO (out, rc);
2431                 }
2432                 
2433                 rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2434                 if (rc != 0) {
2435                         ll_intent_release(&oit);
2436                         GOTO(out, rc);
2437                 }
2438
2439                 /* Unlinked? Unhash dentry, so it is not picked up later by
2440                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2441                    here to preserve get_cwd functionality on 2.6.
2442                    Bug 10503 */
2443                 if (!dentry->d_inode->i_nlink) {
2444                         spin_lock(&dcache_lock);
2445                         ll_drop_dentry(dentry);
2446                         spin_unlock(&dcache_lock);
2447                 }
2448
2449                 ll_lookup_finish_locks(&oit, dentry);
2450         } else if (!ll_have_md_lock(dentry->d_inode,
2451                                   MDS_INODELOCK_UPDATE|MDS_INODELOCK_LOOKUP)) {
2452                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2453                 struct ll_fid fid;
2454                 obd_valid valid = OBD_MD_FLGETATTR;
2455                 int ealen = 0;
2456
2457                 if (S_ISREG(inode->i_mode)) {
2458                         rc = ll_get_max_mdsize(sbi, &ealen);
2459                         if (rc) 
2460                                 RETURN(rc); 
2461                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2462                 }
2463                 ll_inode2fid(&fid, inode);
2464                 rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req);
2465                 if (rc) {
2466                         rc = ll_inode_revalidate_fini(inode, rc);
2467                         RETURN(rc);
2468                 }
2469
2470                 rc = ll_prep_inode(sbi->ll_osc_exp, &inode, req, REPLY_REC_OFF,
2471                                    NULL);
2472                 if (rc)
2473                         GOTO(out, rc);
2474         }
2475
2476         /* if object not yet allocated, don't validate size */
2477         if (ll_i2info(inode)->lli_smd == NULL) 
2478                 GOTO(out, rc = 0);
2479
2480         /* ll_glimpse_size will prefer locally cached writes if they extend
2481          * the file */
2482         rc = ll_glimpse_size(inode, 0);
2483
2484 out:
2485         ptlrpc_req_finished(req);
2486         RETURN(rc);
2487 }
2488
2489 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2490 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2491                   struct lookup_intent *it, struct kstat *stat)
2492 {
2493         struct inode *inode = de->d_inode;
2494         int res = 0;
2495
2496         res = ll_inode_revalidate_it(de, it);
2497         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
2498
2499         if (res)
2500                 return res;
2501
2502         stat->dev = inode->i_sb->s_dev;
2503         stat->ino = inode->i_ino;
2504         stat->mode = inode->i_mode;
2505         stat->nlink = inode->i_nlink;
2506         stat->uid = inode->i_uid;
2507         stat->gid = inode->i_gid;
2508         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2509         stat->atime = inode->i_atime;
2510         stat->mtime = inode->i_mtime;
2511         stat->ctime = inode->i_ctime;
2512 #ifdef HAVE_INODE_BLKSIZE
2513         stat->blksize = inode->i_blksize;
2514 #else
2515         stat->blksize = 1<<inode->i_blkbits;
2516 #endif
2517
2518         ll_inode_size_lock(inode, 0);
2519         stat->size = inode->i_size;
2520         stat->blocks = inode->i_blocks;
2521         ll_inode_size_unlock(inode, 0);
2522
2523         return 0;
2524 }
2525 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2526 {
2527         struct lookup_intent it = { .it_op = IT_GETATTR };
2528
2529         ll_vfs_ops_tally(ll_i2sbi(de->d_inode), VFS_OPS_GETATTR);
2530         return ll_getattr_it(mnt, de, &it, stat);
2531 }
2532 #endif
2533
2534 static
2535 int lustre_check_acl(struct inode *inode, int mask)
2536 {
2537 #ifdef CONFIG_FS_POSIX_ACL
2538         struct ll_inode_info *lli = ll_i2info(inode);
2539         struct posix_acl *acl;
2540         int rc;
2541         ENTRY;
2542
2543         spin_lock(&lli->lli_lock);
2544         acl = posix_acl_dup(lli->lli_posix_acl);
2545         spin_unlock(&lli->lli_lock);
2546
2547         if (!acl)
2548                 RETURN(-EAGAIN);
2549
2550         rc = posix_acl_permission(inode, acl, mask);
2551         posix_acl_release(acl);
2552
2553         RETURN(rc);
2554 #else
2555         return -EAGAIN;
2556 #endif
2557 }
2558
2559 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2560 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2561 {
2562         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2563                inode->i_ino, inode->i_generation, inode, mask);
2564
2565         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_INODE_PERMISSION);
2566         return generic_permission(inode, mask, lustre_check_acl);
2567 }
2568 #else
2569 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
2570 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2571 #else
2572 int ll_inode_permission(struct inode *inode, int mask)
2573 #endif
2574 {
2575         int mode = inode->i_mode;
2576         int rc;
2577
2578         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2579                inode->i_ino, inode->i_generation, inode, mask);
2580         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_INODE_PERMISSION);
2581
2582         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2583             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2584                 return -EROFS;
2585         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2586                 return -EACCES;
2587         if (current->fsuid == inode->i_uid) {
2588                 mode >>= 6;
2589         } else if (1) {
2590                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2591                         goto check_groups;
2592                 rc = lustre_check_acl(inode, mask);
2593                 if (rc == -EAGAIN)
2594                         goto check_groups;
2595                 if (rc == -EACCES)
2596                         goto check_capabilities;
2597                 return rc;
2598         } else {
2599 check_groups:
2600                 if (in_group_p(inode->i_gid))
2601                         mode >>= 3;
2602         }
2603         if ((mode & mask & S_IRWXO) == mask)
2604                 return 0;
2605
2606 check_capabilities:
2607         if (!(mask & MAY_EXEC) ||
2608             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2609                 if (capable(CAP_DAC_OVERRIDE))
2610                         return 0;
2611
2612         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2613             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2614                 return 0;
2615
2616         return -EACCES;
2617 }
2618 #endif
2619
2620 struct file_operations ll_file_operations = {
2621         .read           = ll_file_read,
2622         .write          = ll_file_write,
2623         .ioctl          = ll_file_ioctl,
2624         .open           = ll_file_open,
2625         .release        = ll_file_release,
2626         .mmap           = ll_file_mmap,
2627         .llseek         = ll_file_seek,
2628 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2629         .sendfile       = ll_file_sendfile,
2630 #endif
2631         .fsync          = ll_fsync,
2632 #ifdef HAVE_F_OP_FLOCK
2633         .flock          = ll_file_noflock,
2634 #endif
2635         .lock           = ll_file_noflock
2636 };
2637
2638 struct file_operations ll_file_operations_flock = {
2639         .read           = ll_file_read,
2640         .write          = ll_file_write,
2641         .ioctl          = ll_file_ioctl,
2642         .open           = ll_file_open,
2643         .release        = ll_file_release,
2644         .mmap           = ll_file_mmap,
2645         .llseek         = ll_file_seek,
2646 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2647         .sendfile       = ll_file_sendfile,
2648 #endif
2649         .fsync          = ll_fsync,
2650 #ifdef HAVE_F_OP_FLOCK
2651         .flock          = ll_file_flock,
2652 #endif
2653         .lock           = ll_file_flock
2654 };
2655
2656
2657 struct inode_operations ll_file_inode_operations = {
2658 #ifdef LUSTRE_KERNEL_VERSION
2659         .setattr_raw    = ll_setattr_raw,
2660 #endif
2661         .setattr        = ll_setattr,
2662         .truncate       = ll_truncate,
2663 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2664         .getattr        = ll_getattr,
2665 #else
2666         .revalidate_it  = ll_inode_revalidate_it,
2667 #endif
2668         .permission     = ll_inode_permission,
2669         .setxattr       = ll_setxattr,
2670         .getxattr       = ll_getxattr,
2671         .listxattr      = ll_listxattr,
2672         .removexattr    = ll_removexattr,
2673 };
2674