Whamcloud - gitweb
Branch:HEAD
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <linux/pagemap.h>
29 #include <linux/file.h>
30 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
31 #include <linux/lustre_compat25.h>
32 #endif
33 #include "llite_internal.h"
34
35 /* also used by llite/special.c:ll_special_open() */
36 struct ll_file_data *ll_file_data_get(void)
37 {
38         struct ll_file_data *fd;
39
40         OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
41         return fd;
42 }
43
44 static void ll_file_data_put(struct ll_file_data *fd)
45 {
46         if (fd != NULL)
47                 OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof *fd);
48 }
49
50 static int ll_close_inode_openhandle(struct inode *inode,
51                                      struct obd_client_handle *och)
52 {
53         struct ptlrpc_request *req = NULL;
54         struct obd_device *obd;
55         struct obdo *oa;
56         int rc;
57         ENTRY;
58
59         obd = class_exp2obd(ll_i2mdcexp(inode));
60         if (obd == NULL) {
61                 CERROR("Invalid MDC connection handle "LPX64"\n",
62                        ll_i2mdcexp(inode)->exp_handle.h_cookie);
63                 GOTO(out, rc = 0);
64         }
65
66         /*
67          * here we check if this is forced umount. If so this is called on
68          * canceling "open lock" and we do not call mdc_close() in this case, as
69          * it will not be successful, as import is already deactivated.
70          */
71         if (obd->obd_no_recov)
72                 GOTO(out, rc = 0);
73
74         oa = obdo_alloc();
75         if (!oa)
76                 RETURN(-ENOMEM); // XXX We leak openhandle and request here.
77
78         oa->o_id = inode->i_ino;
79         oa->o_valid = OBD_MD_FLID;
80         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
81                                    OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
82                                    OBD_MD_FLATIME | OBD_MD_FLMTIME |
83                                    OBD_MD_FLCTIME);
84         if (0 /* ll_is_inode_dirty(inode) */) {
85                 oa->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
86                 oa->o_valid |= OBD_MD_FLFLAGS;
87         }
88
89         rc = mdc_close(ll_i2mdcexp(inode), oa, och, &req);
90         if (rc == EAGAIN) {
91                 /* We are the last writer, so the MDS has instructed us to get
92                  * the file size and any write cookies, then close again. */
93                 //ll_queue_done_writing(inode);
94                 rc = 0;
95         } else if (rc) {
96                 CERROR("inode %lu mdc close failed: rc = %d\n",
97                        inode->i_ino, rc);
98         }
99
100         obdo_free(oa);
101
102         if (rc == 0) {
103                 rc = ll_objects_destroy(req, inode);
104                 if (rc)
105                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
106                                inode->i_ino, rc);
107         }
108
109         ptlrpc_req_finished(req); /* This is close request */
110         EXIT;
111 out:
112         mdc_clear_open_replay_data(och);
113
114         return rc;
115 }
116
117 int ll_mdc_real_close(struct inode *inode, int flags)
118 {
119         struct ll_inode_info *lli = ll_i2info(inode);
120         int rc = 0;
121         struct obd_client_handle **och_p;
122         struct obd_client_handle *och;
123         __u64 *och_usecount;
124
125         ENTRY;
126
127         if (flags & FMODE_WRITE) {
128                 och_p = &lli->lli_mds_write_och;
129                 och_usecount = &lli->lli_open_fd_write_count;
130         } else if (flags & FMODE_EXEC) {
131                 och_p = &lli->lli_mds_exec_och;
132                 och_usecount = &lli->lli_open_fd_exec_count;
133          } else {
134                 LASSERT(flags & FMODE_READ);
135                 och_p = &lli->lli_mds_read_och;
136                 och_usecount = &lli->lli_open_fd_read_count;
137         }
138
139         down(&lli->lli_och_sem);
140         if (*och_usecount) { /* There are still users of this handle, so
141                                 skip freeing it. */
142                 up(&lli->lli_och_sem);
143                 RETURN(0);
144         }
145         och=*och_p;
146         *och_p = NULL;
147         up(&lli->lli_och_sem);
148
149         if (och) { /* There might be a race and somebody have freed this och
150                       already */
151                 rc = ll_close_inode_openhandle(inode, och);
152                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
153                 OBD_FREE(och, sizeof *och);
154         }
155
156         RETURN(rc);
157 }
158
159 int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
160                         struct file *file)
161 {
162         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
163         struct ll_inode_info *lli = ll_i2info(inode);
164         int rc = 0;
165         ENTRY;
166
167         /* clear group lock, if present */
168         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
169                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
170                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
171                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
172                                       &fd->fd_cwlockh);
173         }
174
175         /* Let's see if we have good enough OPEN lock on the file and if
176            we can skip talking to MDS */
177         if (file->f_dentry->d_inode) { /* Can this ever be false? */
178                 int lockmode;
179                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
180                 struct lustre_handle lockh;
181                 struct inode *inode = file->f_dentry->d_inode;
182                 struct ldlm_res_id file_res_id = {.name={inode->i_ino,
183                                                          inode->i_generation}};
184                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
185
186                 down(&lli->lli_och_sem);
187                 if (fd->fd_omode & FMODE_WRITE) {
188                         lockmode = LCK_CW;
189                         LASSERT(lli->lli_open_fd_write_count);
190                         lli->lli_open_fd_write_count--;
191                 } else if (fd->fd_omode & FMODE_EXEC) {
192                         lockmode = LCK_PR;
193                         LASSERT(lli->lli_open_fd_exec_count);
194                         lli->lli_open_fd_exec_count--;
195                 } else {
196                         lockmode = LCK_CR;
197                         LASSERT(lli->lli_open_fd_read_count);
198                         lli->lli_open_fd_read_count--;
199                 }
200                 up(&lli->lli_och_sem);
201
202                 if (!ldlm_lock_match(mdc_exp->exp_obd->obd_namespace, flags,
203                                      &file_res_id, LDLM_IBITS, &policy,lockmode,
204                                      &lockh)) {
205                         rc = ll_mdc_real_close(file->f_dentry->d_inode,
206                                                 fd->fd_omode);
207                 }
208         } else {
209                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
210                        file, file->f_dentry, file->f_dentry->d_name.name);
211         }
212
213         LUSTRE_FPRIVATE(file) = NULL;
214         ll_file_data_put(fd);
215
216         RETURN(rc);
217 }
218
219 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
220
221 /* While this returns an error code, fput() the caller does not, so we need
222  * to make every effort to clean up all of our state here.  Also, applications
223  * rarely check close errors and even if an error is returned they will not
224  * re-try the close call.
225  */
226 int ll_file_release(struct inode *inode, struct file *file)
227 {
228         struct ll_file_data *fd;
229         struct ll_sb_info *sbi = ll_i2sbi(inode);
230         struct ll_inode_info *lli = ll_i2info(inode);
231         struct lov_stripe_md *lsm = lli->lli_smd;
232         int rc;
233
234         ENTRY;
235         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
236                inode->i_generation, inode);
237         ll_vfs_ops_tally(sbi, VFS_OPS_RELEASE);
238
239         /* don't do anything for / */
240         if (inode->i_sb->s_root == file->f_dentry)
241                 RETURN(0);
242
243         lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_RELEASE);
244
245         fd = LUSTRE_FPRIVATE(file);
246         LASSERT(fd != NULL);
247
248         if (lsm)
249                 lov_test_and_clear_async_rc(lsm);
250         lli->lli_async_rc = 0;
251
252         rc = ll_mdc_close(sbi->ll_mdc_exp, inode, file);
253         RETURN(rc);
254 }
255
256 static int ll_intent_file_open(struct file *file, void *lmm,
257                                int lmmsize, struct lookup_intent *itp)
258 {
259         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
260         struct mdc_op_data data;
261         struct dentry *parent = file->f_dentry->d_parent;
262         const char *name = file->f_dentry->d_name.name;
263         const int len = file->f_dentry->d_name.len;
264         struct inode *inode = file->f_dentry->d_inode;
265         struct ptlrpc_request *req;
266         int rc;
267
268         if (!parent)
269                 RETURN(-ENOENT);
270
271         ll_prepare_mdc_op_data(&data, parent->d_inode, inode, name, len, O_RDWR);
272
273         /* Usually we come here only for NFSD, and we want open lock.
274            But we can also get here with pre 2.6.15 patchless kernels, and in
275            that case that lock is also ok */
276         /* We can also get here if there was cached open handle in revalidate_it
277          * but it disappeared while we were getting from there to ll_file_open.
278          * But this means this file was closed and immediatelly opened which
279          * makes a good candidate for using OPEN lock */
280         /* If lmmsize & lmm are not 0, we are just setting stripe info
281          * parameters. No need for the open lock */
282         if (!lmm && !lmmsize)
283                 itp->it_flags |= MDS_OPEN_LOCK;
284
285         rc = mdc_intent_lock(sbi->ll_mdc_exp, &data, lmm, lmmsize, itp,
286                               0 /*unused */, &req, ll_mdc_blocking_ast, 0);
287         if (rc == -ESTALE) {
288                 /* reason for keep own exit path - don`t flood log
289                 * with messages with -ESTALE errors.
290                 */
291                 if (!it_disposition(itp, DISP_OPEN_OPEN))
292                         GOTO(out, rc);
293                 ll_release_openhandle(file->f_dentry, itp);
294                 GOTO(out_stale, rc);
295         }
296
297         if (rc != 0) {
298                CERROR("lock enqueue: err: %d\n", rc);
299                GOTO(out, rc);
300         }
301
302         if (itp->d.lustre.it_lock_mode)
303                 mdc_set_lock_data(&itp->d.lustre.it_lock_handle,
304                                   inode);
305
306         rc = ll_prep_inode(sbi->ll_osc_exp, &file->f_dentry->d_inode,
307                            req, DLM_REPLY_REC_OFF, NULL);
308 out:
309         ptlrpc_req_finished(itp->d.lustre.it_data);
310
311 out_stale:
312         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
313         ll_intent_drop_lock(itp);
314
315         RETURN(rc);
316 }
317
318
319 static void ll_och_fill(struct ll_inode_info *lli, struct lookup_intent *it,
320                         struct obd_client_handle *och)
321 {
322         struct ptlrpc_request *req = it->d.lustre.it_data;
323         struct mds_body *body;
324
325         LASSERT(och);
326
327         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
328         LASSERT(body != NULL);                  /* reply already checked out */
329         LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* and swabbed in mdc_enqueue */
330
331         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
332         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
333         lli->lli_io_epoch = body->io_epoch;
334
335         mdc_set_open_replay_data(och, it->d.lustre.it_data);
336 }
337
338 int ll_local_open(struct file *file, struct lookup_intent *it,
339                   struct ll_file_data *fd, struct obd_client_handle *och)
340 {
341         ENTRY;
342
343         LASSERT(!LUSTRE_FPRIVATE(file));
344
345         LASSERT(fd != NULL);
346
347         if (och)
348                 ll_och_fill(ll_i2info(file->f_dentry->d_inode), it, och);
349         LUSTRE_FPRIVATE(file) = fd;
350         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
351         fd->fd_omode = it->it_flags;
352
353         RETURN(0);
354 }
355
356 /* Open a file, and (for the very first open) create objects on the OSTs at
357  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
358  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
359  * lli_open_sem to ensure no other process will create objects, send the
360  * stripe MD to the MDS, or try to destroy the objects if that fails.
361  *
362  * If we already have the stripe MD locally then we don't request it in
363  * mdc_open(), by passing a lmm_size = 0.
364  *
365  * It is up to the application to ensure no other processes open this file
366  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
367  * used.  We might be able to avoid races of that sort by getting lli_open_sem
368  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
369  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
370  */
371 int ll_file_open(struct inode *inode, struct file *file)
372 {
373         struct ll_inode_info *lli = ll_i2info(inode);
374         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
375                                           .it_flags = file->f_flags };
376         struct lov_stripe_md *lsm;
377         struct ptlrpc_request *req = NULL;
378         struct obd_client_handle **och_p;
379         __u64 *och_usecount;
380         struct ll_file_data *fd;
381         int rc = 0;
382         ENTRY;
383
384         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
385                inode->i_generation, inode, file->f_flags);
386         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_OPEN);
387
388         /* don't do anything for / */
389         if (inode->i_sb->s_root == file->f_dentry)
390                 RETURN(0);
391
392 #ifdef LUSTRE_KERNEL_VERSION
393         it = file->f_it;
394 #else
395         it = file->private_data; /* XXX: compat macro */
396         file->private_data = NULL; /* prevent ll_local_open assertion */
397 #endif
398
399         fd = ll_file_data_get();
400         if (fd == NULL)
401                 RETURN(-ENOMEM);
402
403         if (!it || !it->d.lustre.it_disposition) {
404                 /* Convert f_flags into access mode. We cannot use file->f_mode,
405                  * because everything but O_ACCMODE mask was stripped from it */
406                 if ((oit.it_flags + 1) & O_ACCMODE)
407                         oit.it_flags++;
408                 if (file->f_flags & O_TRUNC)
409                         oit.it_flags |= FMODE_WRITE;
410
411                 /* kernel only call f_op->open in dentry_open.  filp_open calls
412                  * dentry_open after call to open_namei that checks permissions.
413                  * Only nfsd_open call dentry_open directly without checking
414                  * permissions and because of that this code below is safe. */
415                 if (oit.it_flags & FMODE_WRITE)
416                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
417
418                 /* We do not want O_EXCL here, presumably we opened the file
419                  * already? XXX - NFS implications? */
420                 oit.it_flags &= ~O_EXCL;
421
422                 it = &oit;
423         }
424
425         /* Let's see if we have file open on MDS already. */
426         if (it->it_flags & FMODE_WRITE) {
427                 och_p = &lli->lli_mds_write_och;
428                 och_usecount = &lli->lli_open_fd_write_count;
429         } else if (it->it_flags & FMODE_EXEC) {
430                 och_p = &lli->lli_mds_exec_och;
431                 och_usecount = &lli->lli_open_fd_exec_count;
432          } else {
433                 och_p = &lli->lli_mds_read_och;
434                 och_usecount = &lli->lli_open_fd_read_count;
435         }
436
437         LASSERTF(it->it_flags != 0, "it %p dist %d \n", it,
438                  it->d.lustre.it_disposition);
439
440         down(&lli->lli_och_sem);
441         if (*och_p) { /* Open handle is present */
442                 if (it_disposition(it, DISP_OPEN_OPEN)) {
443                         /* Well, there's extra open request that we do not need,
444                            let's close it somehow. This will decref request. */
445                         ll_release_openhandle(file->f_dentry, it);
446                 }
447                 (*och_usecount)++;
448
449                 rc = ll_local_open(file, it, fd, NULL);
450
451                 LASSERTF(rc == 0, "rc = %d\n", rc);
452         } else {
453                 LASSERT(*och_usecount == 0);
454                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
455                 if (!*och_p) {
456                         ll_file_data_put(fd);
457                         GOTO(out_och_free, rc = -ENOMEM);
458                 }
459                 (*och_usecount)++;
460                 if (!it->d.lustre.it_disposition) {
461                         rc = ll_intent_file_open(file, NULL, 0, it);
462                         if (rc) {
463                                 ll_file_data_put(fd);
464                                 GOTO(out_och_free, rc);
465                         }
466
467                         /* Got some error? Release the request */
468                         if (it->d.lustre.it_status < 0) {
469                                 req = it->d.lustre.it_data;
470                                 ptlrpc_req_finished(req);
471                         }
472                         mdc_set_lock_data(&it->d.lustre.it_lock_handle,
473                                           file->f_dentry->d_inode);
474                 }
475                 req = it->d.lustre.it_data;
476
477                 /* mdc_intent_lock() didn't get a request ref if there was an
478                  * open error, so don't do cleanup on the request here
479                  * (bug 3430) */
480                 /* XXX (green): Should not we bail out on any error here, not
481                  * just open error? */
482                 rc = it_open_error(DISP_OPEN_OPEN, it);
483                 if (rc) {
484                         ll_file_data_put(fd);
485                         GOTO(out_och_free, rc);
486                 }
487
488                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
489                 rc = ll_local_open(file, it, fd, *och_p);
490                 LASSERTF(rc == 0, "rc = %d\n", rc);
491         }
492         up(&lli->lli_och_sem);
493
494         /* Must do this outside lli_och_sem lock to prevent deadlock where
495            different kind of OPEN lock for this same inode gets cancelled
496            by ldlm_cancel_lru */
497         if (!S_ISREG(inode->i_mode))
498                 GOTO(out, rc);
499
500         lsm = lli->lli_smd;
501         if (lsm == NULL) {
502                 if (file->f_flags & O_LOV_DELAY_CREATE ||
503                     !(file->f_mode & FMODE_WRITE)) {
504                         CDEBUG(D_INODE, "object creation was delayed\n");
505                         GOTO(out, rc);
506                 }
507         }
508         file->f_flags &= ~O_LOV_DELAY_CREATE;
509         GOTO(out, rc);
510  out:
511         ptlrpc_req_finished(req);
512         if (req)
513                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
514         if (rc == 0) {
515                 ll_open_complete(inode);
516         } else {
517 out_och_free:
518                 if (*och_p) {
519                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
520                         *och_p = NULL; /* OBD_FREE writes some magic there */
521                         (*och_usecount)--;
522                 }
523                 up(&lli->lli_och_sem);
524         }
525         return rc;
526 }
527
528 /* Fills the obdo with the attributes for the inode defined by lsm */
529 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
530                    struct obdo *oa)
531 {
532         struct ptlrpc_request_set *set;
533         struct obd_info oinfo = { { { 0 } } };
534         int rc;
535         ENTRY;
536
537         LASSERT(lsm != NULL);
538
539         memset(oa, 0, sizeof *oa);
540         oinfo.oi_md = lsm;
541         oinfo.oi_oa = oa;
542         oa->o_id = lsm->lsm_object_id;
543         oa->o_mode = S_IFREG;
544         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
545                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
546                 OBD_MD_FLCTIME;
547
548         set = ptlrpc_prep_set();
549         if (set == NULL) {
550                 rc = -ENOMEM;
551         } else {
552                 rc = obd_getattr_async(exp, &oinfo, set);
553                 if (rc == 0)
554                         rc = ptlrpc_set_wait(set);
555                 ptlrpc_set_destroy(set);
556         }
557         if (rc)
558                 RETURN(rc);
559
560         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
561                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
562         RETURN(0);
563 }
564
565 static inline void ll_remove_suid(struct inode *inode)
566 {
567         unsigned int mode;
568
569         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
570         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
571
572         /* was any of the uid bits set? */
573         mode &= inode->i_mode;
574         if (mode && !capable(CAP_FSETID)) {
575                 inode->i_mode &= ~mode;
576                 // XXX careful here - we cannot change the size
577         }
578 }
579
580 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
581 {
582         struct ll_inode_info *lli = ll_i2info(inode);
583         struct lov_stripe_md *lsm = lli->lli_smd;
584         struct obd_export *exp = ll_i2obdexp(inode);
585         struct {
586                 char name[16];
587                 struct ldlm_lock *lock;
588                 struct lov_stripe_md *lsm;
589         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
590         __u32 stripe, vallen = sizeof(stripe);
591         int rc;
592         ENTRY;
593
594         if (lsm->lsm_stripe_count == 1)
595                 GOTO(check, stripe = 0);
596
597         /* get our offset in the lov */
598         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
599         if (rc != 0) {
600                 CERROR("obd_get_info: rc = %d\n", rc);
601                 RETURN(rc);
602         }
603         LASSERT(stripe < lsm->lsm_stripe_count);
604
605 check:
606         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
607             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[1]){
608                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
609                            lsm->lsm_oinfo[stripe]->loi_id,
610                            lsm->lsm_oinfo[stripe]->loi_gr);
611                 RETURN(-ELDLM_NO_LOCK_DATA);
612         }
613
614         RETURN(stripe);
615 }
616
617 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
618  * we get a lock cancellation for each stripe, so we have to map the obd's
619  * region back onto the stripes in the file that it held.
620  *
621  * No one can dirty the extent until we've finished our work and they can
622  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
623  * but other kernel actors could have pages locked.
624  *
625  * Called with the DLM lock held. */
626 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
627                               struct ldlm_lock *lock, __u32 stripe)
628 {
629         ldlm_policy_data_t tmpex;
630         unsigned long start, end, count, skip, i, j;
631         struct page *page;
632         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
633         struct lustre_handle lockh;
634         ENTRY;
635
636         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
637         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
638                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
639                inode->i_size);
640
641         /* our locks are page granular thanks to osc_enqueue, we invalidate the
642          * whole page. */
643         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
644             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
645                 LDLM_ERROR(lock, "lock not aligned on CFS_PAGE_SIZE %lu", CFS_PAGE_SIZE);
646         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
647         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
648
649         count = ~0;
650         skip = 0;
651         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
652         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
653         if (lsm->lsm_stripe_count > 1) {
654                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
655                 skip = (lsm->lsm_stripe_count - 1) * count;
656                 start += start/count * skip + stripe * count;
657                 if (end != ~0)
658                         end += end/count * skip + stripe * count;
659         }
660         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
661                 end = ~0;
662
663         i = inode->i_size ? (inode->i_size - 1) >> CFS_PAGE_SHIFT : 0;
664         if (i < end)
665                 end = i;
666
667         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
668                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
669                count, skip, end, discard ? " (DISCARDING)" : "");
670
671         /* walk through the vmas on the inode and tear down mmaped pages that
672          * intersect with the lock.  this stops immediately if there are no
673          * mmap()ed regions of the file.  This is not efficient at all and
674          * should be short lived. We'll associate mmap()ed pages with the lock
675          * and will be able to find them directly */
676         for (i = start; i <= end; i += (j + skip)) {
677                 j = min(count - (i % count), end - i + 1);
678                 LASSERT(j > 0);
679                 LASSERT(inode->i_mapping);
680                 if (ll_teardown_mmaps(inode->i_mapping,
681                                       (__u64)i << CFS_PAGE_SHIFT,
682                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
683                         break;
684         }
685
686         /* this is the simplistic implementation of page eviction at
687          * cancelation.  It is careful to get races with other page
688          * lockers handled correctly.  fixes from bug 20 will make it
689          * more efficient by associating locks with pages and with
690          * batching writeback under the lock explicitly. */
691         for (i = start, j = start % count; i <= end;
692              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
693                 if (j == count) {
694                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
695                         i += skip;
696                         j = 0;
697                         if (i > end)
698                                 break;
699                 }
700                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
701                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
702                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
703                          start, i, end);
704
705                 if (!mapping_has_pages(inode->i_mapping)) {
706                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
707                         break;
708                 }
709
710                 cond_resched();
711
712                 page = find_get_page(inode->i_mapping, i);
713                 if (page == NULL)
714                         continue;
715                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
716                                i, tmpex.l_extent.start);
717                 lock_page(page);
718
719                 /* page->mapping to check with racing against teardown */
720                 if (!discard && clear_page_dirty_for_io(page)) {
721                         rc = ll_call_writepage(inode, page);
722                         if (rc != 0)
723                                 CERROR("writepage of page %p failed: %d\n",
724                                        page, rc);
725                         /* either waiting for io to complete or reacquiring
726                          * the lock that the failed writepage released */
727                         lock_page(page);
728                 }
729
730                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
731                 /* check to see if another DLM lock covers this page  b=2765 */
732                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
733                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
734                                       LDLM_FL_TEST_LOCK,
735                                       &lock->l_resource->lr_name, LDLM_EXTENT,
736                                       &tmpex, LCK_PR | LCK_PW, &lockh);
737                 if (rc2 == 0 && page->mapping != NULL) {
738                         struct ll_async_page *llap = llap_cast_private(page);
739                         // checking again to account for writeback's lock_page()
740                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
741                         if (llap)
742                                 ll_ra_accounting(llap, inode->i_mapping);
743                         ll_truncate_complete_page(page);
744                 }
745                 unlock_page(page);
746                 page_cache_release(page);
747         }
748         LASSERTF(tmpex.l_extent.start <=
749                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
750                   lock->l_policy_data.l_extent.end + 1),
751                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
752                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
753                  start, i, end);
754         EXIT;
755 }
756
757 static int ll_extent_lock_callback(struct ldlm_lock *lock,
758                                    struct ldlm_lock_desc *new, void *data,
759                                    int flag)
760 {
761         struct lustre_handle lockh = { 0 };
762         int rc;
763         ENTRY;
764
765         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
766                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
767                 LBUG();
768         }
769
770         switch (flag) {
771         case LDLM_CB_BLOCKING:
772                 ldlm_lock2handle(lock, &lockh);
773                 rc = ldlm_cli_cancel(&lockh);
774                 if (rc != ELDLM_OK)
775                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
776                 break;
777         case LDLM_CB_CANCELING: {
778                 struct inode *inode;
779                 struct ll_inode_info *lli;
780                 struct lov_stripe_md *lsm;
781                 int stripe;
782                 __u64 kms;
783
784                 /* This lock wasn't granted, don't try to evict pages */
785                 if (lock->l_req_mode != lock->l_granted_mode)
786                         RETURN(0);
787
788                 inode = ll_inode_from_lock(lock);
789                 if (inode == NULL)
790                         RETURN(0);
791                 lli = ll_i2info(inode);
792                 if (lli == NULL)
793                         goto iput;
794                 if (lli->lli_smd == NULL)
795                         goto iput;
796                 lsm = lli->lli_smd;
797
798                 stripe = ll_lock_to_stripe_offset(inode, lock);
799                 if (stripe < 0)
800                         goto iput;
801
802                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
803
804                 lov_stripe_lock(lsm);
805                 lock_res_and_lock(lock);
806                 kms = ldlm_extent_shift_kms(lock,
807                                             lsm->lsm_oinfo[stripe]->loi_kms);
808
809                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
810                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
811                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
812                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
813                 unlock_res_and_lock(lock);
814                 lov_stripe_unlock(lsm);
815                 //ll_try_done_writing(inode);
816         iput:
817                 iput(inode);
818                 break;
819         }
820         default:
821                 LBUG();
822         }
823
824         RETURN(0);
825 }
826
827 #if 0
828 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
829 {
830         /* XXX ALLOCATE - 160 bytes */
831         struct inode *inode = ll_inode_from_lock(lock);
832         struct ll_inode_info *lli = ll_i2info(inode);
833         struct lustre_handle lockh = { 0 };
834         struct ost_lvb *lvb;
835         int stripe;
836         ENTRY;
837
838         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
839                      LDLM_FL_BLOCK_CONV)) {
840                 LBUG(); /* not expecting any blocked async locks yet */
841                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
842                            "lock, returning");
843                 ldlm_lock_dump(D_OTHER, lock, 0);
844                 ldlm_reprocess_all(lock->l_resource);
845                 RETURN(0);
846         }
847
848         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
849
850         stripe = ll_lock_to_stripe_offset(inode, lock);
851         if (stripe < 0)
852                 goto iput;
853
854         if (lock->l_lvb_len) {
855                 struct lov_stripe_md *lsm = lli->lli_smd;
856                 __u64 kms;
857                 lvb = lock->l_lvb_data;
858                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
859
860                 LOCK_INODE_MUTEX(inode);
861                 lock_res_and_lock(lock);
862                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
863                 kms = ldlm_extent_shift_kms(NULL, kms);
864                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
865                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
866                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
867                 lsm->lsm_oinfo[stripe].loi_kms = kms;
868                 unlock_res_and_lock(lock);
869                 UNLOCK_INODE_MUTEX(inode);
870         }
871
872 iput:
873         iput(inode);
874         wake_up(&lock->l_waitq);
875
876         ldlm_lock2handle(lock, &lockh);
877         ldlm_lock_decref(&lockh, LCK_PR);
878         RETURN(0);
879 }
880 #endif
881
882 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
883 {
884         struct ptlrpc_request *req = reqp;
885         struct inode *inode = ll_inode_from_lock(lock);
886         struct ll_inode_info *lli;
887         struct lov_stripe_md *lsm;
888         struct ost_lvb *lvb;
889         int rc, stripe;
890         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
891         ENTRY;
892
893         if (inode == NULL)
894                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
895         lli = ll_i2info(inode);
896         if (lli == NULL)
897                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
898         lsm = lli->lli_smd;
899         if (lsm == NULL)
900                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
901
902         /* First, find out which stripe index this lock corresponds to. */
903         stripe = ll_lock_to_stripe_offset(inode, lock);
904         if (stripe < 0)
905                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
906
907         rc = lustre_pack_reply(req, 2, size, NULL);
908         if (rc) {
909                 CERROR("lustre_pack_reply: %d\n", rc);
910                 GOTO(iput, rc);
911         }
912
913         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
914         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
915         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
916         lvb->lvb_atime = LTIME_S(inode->i_atime);
917         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
918
919         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
920                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
921                    inode->i_size, stripe, lvb->lvb_size, lvb->lvb_mtime,
922                    lvb->lvb_atime, lvb->lvb_ctime);
923  iput:
924         iput(inode);
925
926  out:
927         /* These errors are normal races, so we don't want to fill the console
928          * with messages by calling ptlrpc_error() */
929         if (rc == -ELDLM_NO_LOCK_DATA)
930                 lustre_pack_reply(req, 1, NULL, NULL);
931
932         req->rq_status = rc;
933         return rc;
934 }
935
936 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
937                      lstat_t *st)
938 {
939         struct lustre_handle lockh = { 0 };
940         struct obd_enqueue_info einfo = { 0 };
941         struct obd_info oinfo = { { { 0 } } };
942         struct ost_lvb lvb;
943         int rc;
944         
945         ENTRY;
946         
947         einfo.ei_type = LDLM_EXTENT;
948         einfo.ei_mode = LCK_PR;
949         einfo.ei_flags = LDLM_FL_HAS_INTENT;
950         einfo.ei_cb_bl = ll_extent_lock_callback;
951         einfo.ei_cb_cp = ldlm_completion_ast;
952         einfo.ei_cb_gl = ll_glimpse_callback;
953         einfo.ei_cbdata = NULL;
954
955         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
956         oinfo.oi_lockh = &lockh;
957         oinfo.oi_md = lsm;
958
959         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
960         if (rc == -ENOENT)
961                 RETURN(rc);
962         if (rc != 0) {
963                 CERROR("obd_enqueue returned rc %d, "
964                        "returning -EIO\n", rc);
965                 RETURN(rc > 0 ? -EIO : rc);
966         }
967         
968         lov_stripe_lock(lsm);
969         memset(&lvb, 0, sizeof(lvb));
970         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 0);
971         st->st_size = lvb.lvb_size;
972         st->st_blocks = lvb.lvb_blocks;
973         st->st_mtime = lvb.lvb_mtime;
974         st->st_atime = lvb.lvb_atime;
975         st->st_ctime = lvb.lvb_ctime;
976         lov_stripe_unlock(lsm);
977         
978         RETURN(rc);
979 }
980
981 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
982  * file (because it prefers KMS over RSS when larger) */
983 int ll_glimpse_size(struct inode *inode, int ast_flags)
984 {
985         struct ll_inode_info *lli = ll_i2info(inode);
986         struct ll_sb_info *sbi = ll_i2sbi(inode);
987         struct lustre_handle lockh = { 0 };
988         struct obd_enqueue_info einfo = { 0 };
989         struct obd_info oinfo = { { { 0 } } };
990         struct ost_lvb lvb;
991         int rc;
992         ENTRY;
993
994         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
995
996         if (!lli->lli_smd) {
997                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
998                 RETURN(0);
999         }
1000
1001         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1002          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1003          *       won't revoke any conflicting DLM locks held. Instead,
1004          *       ll_glimpse_callback() will be called on each client
1005          *       holding a DLM lock against this file, and resulting size
1006          *       will be returned for each stripe. DLM lock on [0, EOF] is
1007          *       acquired only if there were no conflicting locks. */
1008         einfo.ei_type = LDLM_EXTENT;
1009         einfo.ei_mode = LCK_PR;
1010         einfo.ei_flags = ast_flags | LDLM_FL_HAS_INTENT;
1011         einfo.ei_cb_bl = ll_extent_lock_callback;
1012         einfo.ei_cb_cp = ldlm_completion_ast;
1013         einfo.ei_cb_gl = ll_glimpse_callback;
1014         einfo.ei_cbdata = inode;
1015
1016         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1017         oinfo.oi_lockh = &lockh;
1018         oinfo.oi_md = lli->lli_smd;
1019
1020         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
1021         if (rc == -ENOENT)
1022                 RETURN(rc);
1023         if (rc != 0) {
1024                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1025                 RETURN(rc > 0 ? -EIO : rc);
1026         }
1027
1028         ll_inode_size_lock(inode, 1);
1029         inode_init_lvb(inode, &lvb);
1030         obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0);
1031         inode->i_size = lvb.lvb_size;
1032         inode->i_blocks = lvb.lvb_blocks;
1033         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1034         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1035         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1036         ll_inode_size_unlock(inode, 1);
1037
1038         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1039                inode->i_size, inode->i_blocks);
1040
1041         RETURN(rc);
1042 }
1043
1044 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1045                    struct lov_stripe_md *lsm, int mode,
1046                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1047                    int ast_flags)
1048 {
1049         struct ll_sb_info *sbi = ll_i2sbi(inode);
1050         struct ost_lvb lvb;
1051         struct obd_enqueue_info einfo = { 0 };
1052         struct obd_info oinfo = { { { 0 } } };
1053         int rc;
1054         ENTRY;
1055
1056         LASSERT(!lustre_handle_is_used(lockh));
1057         LASSERT(lsm != NULL);
1058
1059         /* don't drop the mmapped file to LRU */
1060         if (mapping_mapped(inode->i_mapping))
1061                 ast_flags |= LDLM_FL_NO_LRU;
1062
1063         /* XXX phil: can we do this?  won't it screw the file size up? */
1064         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1065             (sbi->ll_flags & LL_SBI_NOLCK))
1066                 RETURN(0);
1067
1068         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1069                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1070
1071         einfo.ei_type = LDLM_EXTENT;
1072         einfo.ei_mode = mode;
1073         einfo.ei_flags = ast_flags;
1074         einfo.ei_cb_bl = ll_extent_lock_callback;
1075         einfo.ei_cb_cp = ldlm_completion_ast;
1076         einfo.ei_cb_gl = ll_glimpse_callback;
1077         einfo.ei_cbdata = inode;
1078
1079         oinfo.oi_policy = *policy;
1080         oinfo.oi_lockh = lockh;
1081         oinfo.oi_md = lsm;
1082
1083         rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo);
1084         *policy = oinfo.oi_policy;
1085         if (rc > 0)
1086                 rc = -EIO;
1087
1088         ll_inode_size_lock(inode, 1);
1089         inode_init_lvb(inode, &lvb);
1090         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 1);
1091
1092         if (policy->l_extent.start == 0 &&
1093             policy->l_extent.end == OBD_OBJECT_EOF) {
1094                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1095                  * the kms under both a DLM lock and the
1096                  * ll_inode_size_lock().  If we don't get the
1097                  * ll_inode_size_lock() here we can match the DLM lock and
1098                  * reset i_size from the kms before the truncating path has
1099                  * updated the kms.  generic_file_write can then trust the
1100                  * stale i_size when doing appending writes and effectively
1101                  * cancel the result of the truncate.  Getting the
1102                  * ll_inode_size_lock() after the enqueue maintains the DLM
1103                  * -> ll_inode_size_lock() acquiring order. */
1104                 inode->i_size = lvb.lvb_size;
1105                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1106                        inode->i_ino, inode->i_size);
1107         }
1108
1109         if (rc == 0) {
1110                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1111                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1112                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1113         }
1114         ll_inode_size_unlock(inode, 1);
1115
1116         RETURN(rc);
1117 }
1118
1119 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1120                      struct lov_stripe_md *lsm, int mode,
1121                      struct lustre_handle *lockh)
1122 {
1123         struct ll_sb_info *sbi = ll_i2sbi(inode);
1124         int rc;
1125         ENTRY;
1126
1127         /* XXX phil: can we do this?  won't it screw the file size up? */
1128         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1129             (sbi->ll_flags & LL_SBI_NOLCK))
1130                 RETURN(0);
1131
1132         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
1133
1134         RETURN(rc);
1135 }
1136
1137 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1138                             loff_t *ppos)
1139 {
1140         struct inode *inode = file->f_dentry->d_inode;
1141         struct ll_inode_info *lli = ll_i2info(inode);
1142         struct lov_stripe_md *lsm = lli->lli_smd;
1143         struct ll_sb_info *sbi = ll_i2sbi(inode);
1144         struct ll_lock_tree tree;
1145         struct ll_lock_tree_node *node;
1146         struct ost_lvb lvb;
1147         struct ll_ra_read bead;
1148         int rc, ra = 0;
1149         loff_t end;
1150         ssize_t retval, chunk, sum = 0;
1151
1152         __u64 kms;
1153         ENTRY;
1154         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1155                inode->i_ino, inode->i_generation, inode, count, *ppos);
1156         ll_vfs_ops_tally(sbi, VFS_OPS_READ);
1157
1158         /* "If nbyte is 0, read() will return 0 and have no other results."
1159          *                      -- Single Unix Spec */
1160         if (count == 0)
1161                 RETURN(0);
1162
1163         lprocfs_counter_add(sbi->ll_stats, LPROC_LL_READ_BYTES, count);
1164
1165         if (!lsm) {
1166                 /* Read on file with no objects should return zero-filled
1167                  * buffers up to file size (we can get non-zero sizes with
1168                  * mknod + truncate, then opening file for read. This is a
1169                  * common pattern in NFS case, it seems). Bug 6243 */
1170                 int notzeroed;
1171                 /* Since there are no objects on OSTs, we have nothing to get
1172                  * lock on and so we are forced to access inode->i_size
1173                  * unguarded */
1174
1175                 /* Read beyond end of file */
1176                 if (*ppos >= inode->i_size)
1177                         RETURN(0);
1178
1179                 if (count > inode->i_size - *ppos)
1180                         count = inode->i_size - *ppos;
1181                 /* Make sure to correctly adjust the file pos pointer for
1182                  * EFAULT case */
1183                 notzeroed = clear_user(buf, count);
1184                 count -= notzeroed;
1185                 *ppos += count;
1186                 if (!count)
1187                         RETURN(-EFAULT);
1188                 RETURN(count);
1189         }
1190
1191 repeat:
1192         if (sbi->ll_max_rw_chunk != 0) {
1193                 /* first, let's know the end of the current stripe */
1194                 end = *ppos;
1195                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END,
1196                                 (obd_off *)&end);
1197
1198                 /* correct, the end is beyond the request */
1199                 if (end > *ppos + count - 1)
1200                         end = *ppos + count - 1;
1201
1202                 /* and chunk shouldn't be too large even if striping is wide */
1203                 if (end - *ppos > sbi->ll_max_rw_chunk)
1204                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1205         } else {
1206                 end = *ppos + count - 1;
1207         }
1208        
1209         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1210         tree.lt_fd = LUSTRE_FPRIVATE(file);
1211         rc = ll_tree_lock(&tree, node, buf, count,
1212                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1213         if (rc != 0)
1214                 GOTO(out, retval = rc);
1215
1216         ll_inode_size_lock(inode, 1);
1217         /*
1218          * Consistency guarantees: following possibilities exist for the
1219          * relation between region being read and real file size at this
1220          * moment:
1221          *
1222          *  (A): the region is completely inside of the file;
1223          *
1224          *  (B-x): x bytes of region are inside of the file, the rest is
1225          *  outside;
1226          *
1227          *  (C): the region is completely outside of the file.
1228          *
1229          * This classification is stable under DLM lock acquired by
1230          * ll_tree_lock() above, because to change class, other client has to
1231          * take DLM lock conflicting with our lock. Also, any updates to
1232          * ->i_size by other threads on this client are serialized by
1233          * ll_inode_size_lock(). This guarantees that short reads are handled
1234          * correctly in the face of concurrent writes and truncates.
1235          */
1236         inode_init_lvb(inode, &lvb);
1237         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1238         kms = lvb.lvb_size;
1239         if (*ppos + count - 1 > kms) {
1240                 /* A glimpse is necessary to determine whether we return a
1241                  * short read (B) or some zeroes at the end of the buffer (C) */
1242                 ll_inode_size_unlock(inode, 1);
1243                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1244                 if (retval) {
1245                         ll_tree_unlock(&tree);
1246                         goto out;
1247                 }
1248         } else {
1249                 /* region is within kms and, hence, within real file size (A).
1250                  * We need to increase i_size to cover the read region so that
1251                  * generic_file_read() will do its job, but that doesn't mean
1252                  * the kms size is _correct_, it is only the _minimum_ size.
1253                  * If someone does a stat they will get the correct size which
1254                  * will always be >= the kms value here.  b=11081 */
1255                 if (inode->i_size < kms)
1256                         inode->i_size = kms;
1257                 ll_inode_size_unlock(inode, 1);
1258         }
1259
1260         chunk = end - *ppos + 1;
1261         CDEBUG(D_INODE,"Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1262                inode->i_ino, chunk, *ppos, inode->i_size);
1263
1264         /* turn off the kernel's read-ahead */
1265 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1266         file->f_ramax = 0;
1267 #else
1268         file->f_ra.ra_pages = 0;
1269 #endif
1270         /* initialize read-ahead window once per syscall */
1271         if (ra == 0) {
1272                 ra = 1;
1273                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1274                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1275                 ll_ra_read_in(file, &bead);
1276         }
1277
1278         /* BUG: 5972 */
1279         file_accessed(file);
1280         retval = generic_file_read(file, buf, chunk, ppos);
1281         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 0);
1282
1283         ll_tree_unlock(&tree);
1284
1285         if (retval > 0) {
1286                 buf += retval;
1287                 count -= retval;
1288                 sum += retval;
1289                 if (retval == chunk && count > 0)
1290                         goto repeat;
1291         }
1292
1293  out:
1294         if (ra != 0)
1295                 ll_ra_read_ex(file, &bead);
1296         retval = (sum > 0) ? sum : retval;
1297         RETURN(retval);
1298 }
1299
1300 /*
1301  * Write to a file (through the page cache).
1302  */
1303 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1304                              loff_t *ppos)
1305 {
1306         struct inode *inode = file->f_dentry->d_inode;
1307         struct ll_sb_info *sbi = ll_i2sbi(inode);
1308         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1309         struct ll_lock_tree tree;
1310         struct ll_lock_tree_node *node;
1311         loff_t maxbytes = ll_file_maxbytes(inode);
1312         loff_t lock_start, lock_end, end;
1313         ssize_t retval, chunk, sum = 0;
1314         int rc;
1315         ENTRY;
1316
1317         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1318                inode->i_ino, inode->i_generation, inode, count, *ppos);
1319         ll_vfs_ops_tally(sbi, VFS_OPS_WRITE);
1320         
1321         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1322
1323         /* POSIX, but surprised the VFS doesn't check this already */
1324         if (count == 0)
1325                 RETURN(0);
1326
1327         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1328          * called on the file, don't fail the below assertion (bug 2388). */
1329         if (file->f_flags & O_LOV_DELAY_CREATE &&
1330             ll_i2info(inode)->lli_smd == NULL)
1331                 RETURN(-EBADF);
1332
1333         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1334
1335         down(&ll_i2info(inode)->lli_write_sem);
1336
1337 repeat:
1338         chunk = 0; /* just to fix gcc's warning */
1339         end = *ppos + count - 1;
1340
1341         if (file->f_flags & O_APPEND) {
1342                 lock_start = 0;
1343                 lock_end = OBD_OBJECT_EOF;
1344         } else if (sbi->ll_max_rw_chunk != 0) {
1345                 /* first, let's know the end of the current stripe */
1346                 end = *ppos;
1347                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END, 
1348                                 (obd_off *)&end);
1349
1350                 /* correct, the end is beyond the request */
1351                 if (end > *ppos + count - 1)
1352                         end = *ppos + count - 1;
1353
1354                 /* and chunk shouldn't be too large even if striping is wide */
1355                 if (end - *ppos > sbi->ll_max_rw_chunk)
1356                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1357                 lock_start = *ppos;
1358                 lock_end = end;
1359         } else {
1360                 lock_start = *ppos;
1361                 lock_end = *ppos + count - 1;
1362         }
1363         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1364
1365         if (IS_ERR(node))
1366                 GOTO(out, retval = PTR_ERR(node));
1367
1368         tree.lt_fd = LUSTRE_FPRIVATE(file);
1369         rc = ll_tree_lock(&tree, node, buf, count,
1370                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1371         if (rc != 0)
1372                 GOTO(out, retval = rc);
1373
1374         /* This is ok, g_f_w will overwrite this under i_sem if it races
1375          * with a local truncate, it just makes our maxbyte checking easier.
1376          * The i_size value gets updated in ll_extent_lock() as a consequence
1377          * of the [0,EOF] extent lock we requested above. */
1378         if (file->f_flags & O_APPEND) {
1379                 *ppos = inode->i_size;
1380                 end = *ppos + count - 1;
1381         }
1382
1383         if (*ppos >= maxbytes) {
1384                 send_sig(SIGXFSZ, current, 0);
1385                 GOTO(out, retval = -EFBIG);
1386         }
1387         if (*ppos + count > maxbytes)
1388                 count = maxbytes - *ppos;
1389
1390         /* generic_file_write handles O_APPEND after getting i_mutex */
1391         chunk = end - *ppos + 1;
1392         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1393                inode->i_ino, chunk, *ppos);
1394         retval = generic_file_write(file, buf, chunk, ppos);
1395         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1396
1397 out:
1398         ll_tree_unlock(&tree);
1399
1400         if (retval > 0) {
1401                 buf += retval;
1402                 count -= retval;
1403                 sum += retval;
1404                 if (retval == chunk && count > 0)
1405                         goto repeat;
1406         }
1407
1408         up(&ll_i2info(inode)->lli_write_sem);
1409
1410         retval = (sum > 0) ? sum : retval;
1411         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WRITE_BYTES,
1412                             retval > 0 ? retval : 0);
1413         RETURN(retval);
1414 }
1415
1416 /*
1417  * Send file content (through pagecache) somewhere with helper
1418  */
1419 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1420 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1421                                 read_actor_t actor, void *target)
1422 {
1423         struct inode *inode = in_file->f_dentry->d_inode;
1424         struct ll_inode_info *lli = ll_i2info(inode);
1425         struct lov_stripe_md *lsm = lli->lli_smd;
1426         struct ll_lock_tree tree;
1427         struct ll_lock_tree_node *node;
1428         struct ost_lvb lvb;
1429         struct ll_ra_read bead;
1430         int rc;
1431         ssize_t retval;
1432         __u64 kms;
1433         ENTRY;
1434         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1435                inode->i_ino, inode->i_generation, inode, count, *ppos);
1436
1437         /* "If nbyte is 0, read() will return 0 and have no other results."
1438          *                      -- Single Unix Spec */
1439         if (count == 0)
1440                 RETURN(0);
1441
1442         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
1443                             count);
1444
1445         /* turn off the kernel's read-ahead */
1446         in_file->f_ra.ra_pages = 0;
1447
1448         /* File with no objects, nothing to lock */
1449         if (!lsm)
1450                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1451
1452         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1453         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1454         rc = ll_tree_lock(&tree, node, NULL, count,
1455                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1456         if (rc != 0)
1457                 RETURN(rc);
1458
1459         ll_inode_size_lock(inode, 1);
1460         /*
1461          * Consistency guarantees: following possibilities exist for the
1462          * relation between region being read and real file size at this
1463          * moment:
1464          *
1465          *  (A): the region is completely inside of the file;
1466          *
1467          *  (B-x): x bytes of region are inside of the file, the rest is
1468          *  outside;
1469          *
1470          *  (C): the region is completely outside of the file.
1471          *
1472          * This classification is stable under DLM lock acquired by
1473          * ll_tree_lock() above, because to change class, other client has to
1474          * take DLM lock conflicting with our lock. Also, any updates to
1475          * ->i_size by other threads on this client are serialized by
1476          * ll_inode_size_lock(). This guarantees that short reads are handled
1477          * correctly in the face of concurrent writes and truncates.
1478          */
1479         inode_init_lvb(inode, &lvb);
1480         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1481         kms = lvb.lvb_size;
1482         if (*ppos + count - 1 > kms) {
1483                 /* A glimpse is necessary to determine whether we return a
1484                  * short read (B) or some zeroes at the end of the buffer (C) */
1485                 ll_inode_size_unlock(inode, 1);
1486                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1487                 if (retval)
1488                         goto out;
1489         } else {
1490                 /* region is within kms and, hence, within real file size (A) */
1491                 inode->i_size = kms;
1492                 ll_inode_size_unlock(inode, 1);
1493         }
1494
1495         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1496                inode->i_ino, count, *ppos, inode->i_size);
1497
1498         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1499         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1500         ll_ra_read_in(in_file, &bead);
1501         /* BUG: 5972 */
1502         file_accessed(in_file);
1503         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1504         ll_ra_read_ex(in_file, &bead);
1505
1506  out:
1507         ll_tree_unlock(&tree);
1508         RETURN(retval);
1509 }
1510 #endif
1511
1512 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1513                                unsigned long arg)
1514 {
1515         struct ll_inode_info *lli = ll_i2info(inode);
1516         struct obd_export *exp = ll_i2obdexp(inode);
1517         struct ll_recreate_obj ucreatp;
1518         struct obd_trans_info oti = { 0 };
1519         struct obdo *oa = NULL;
1520         int lsm_size;
1521         int rc = 0;
1522         struct lov_stripe_md *lsm, *lsm2;
1523         ENTRY;
1524
1525         if (!capable (CAP_SYS_ADMIN))
1526                 RETURN(-EPERM);
1527
1528         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1529                             sizeof(struct ll_recreate_obj));
1530         if (rc) {
1531                 RETURN(-EFAULT);
1532         }
1533         oa = obdo_alloc();
1534         if (oa == NULL)
1535                 RETURN(-ENOMEM);
1536
1537         down(&lli->lli_open_sem);
1538         lsm = lli->lli_smd;
1539         if (lsm == NULL)
1540                 GOTO(out, rc = -ENOENT);
1541         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1542                    (lsm->lsm_stripe_count));
1543
1544         OBD_ALLOC(lsm2, lsm_size);
1545         if (lsm2 == NULL)
1546                 GOTO(out, rc = -ENOMEM);
1547
1548         oa->o_id = ucreatp.lrc_id;
1549         oa->o_nlink = ucreatp.lrc_ost_idx;
1550         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1551         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
1552         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1553                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1554
1555         oti.oti_objid = NULL;
1556         memcpy(lsm2, lsm, lsm_size);
1557         rc = obd_create(exp, oa, &lsm2, &oti);
1558
1559         OBD_FREE(lsm2, lsm_size);
1560         GOTO(out, rc);
1561 out:
1562         up(&lli->lli_open_sem);
1563         obdo_free(oa);
1564         return rc;
1565 }
1566
1567 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1568                                     int flags, struct lov_user_md *lum,
1569                                     int lum_size)
1570 {
1571         struct ll_inode_info *lli = ll_i2info(inode);
1572         struct lov_stripe_md *lsm;
1573         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1574         int rc = 0;
1575         ENTRY;
1576
1577         down(&lli->lli_open_sem);
1578         lsm = lli->lli_smd;
1579         if (lsm) {
1580                 up(&lli->lli_open_sem);
1581                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1582                        inode->i_ino);
1583                 RETURN(-EEXIST);
1584         }
1585
1586         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1587         if (rc)
1588                 GOTO(out, rc);
1589         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1590                 GOTO(out_req_free, rc = -ENOENT);
1591         rc = oit.d.lustre.it_status;
1592         if (rc < 0)
1593                 GOTO(out_req_free, rc);
1594
1595         ll_release_openhandle(file->f_dentry, &oit);
1596
1597  out:
1598         up(&lli->lli_open_sem);
1599         ll_intent_release(&oit);
1600         RETURN(rc);
1601 out_req_free:
1602         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1603         goto out;
1604 }
1605
1606 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1607                              struct lov_mds_md **lmmp, int *lmm_size, 
1608                              struct ptlrpc_request **request)
1609 {
1610         struct ll_sb_info *sbi = ll_i2sbi(inode);
1611         struct ll_fid  fid;
1612         struct mds_body  *body;
1613         struct lov_mds_md *lmm = NULL;
1614         struct ptlrpc_request *req = NULL;
1615         int rc, lmmsize;
1616
1617         ll_inode2fid(&fid, inode);
1618
1619         rc = ll_get_max_mdsize(sbi, &lmmsize);
1620         if (rc)
1621                 RETURN(rc);
1622
1623         rc = mdc_getattr_name(sbi->ll_mdc_exp, &fid,
1624                         filename, strlen(filename) + 1,
1625                         OBD_MD_FLEASIZE | OBD_MD_FLDIREA,
1626                         lmmsize, &req);
1627         if (rc < 0) {
1628                 CDEBUG(D_INFO, "mdc_getattr_name failed "
1629                                 "on %s: rc %d\n", filename, rc);
1630                 GOTO(out, rc);
1631         }
1632
1633         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1634                         sizeof(*body));
1635         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1636         /* swabbed by mdc_getattr_name */
1637         LASSERT_REPSWABBED(req, REPLY_REC_OFF);
1638
1639         lmmsize = body->eadatasize;
1640
1641         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1642                         lmmsize == 0) {
1643                 GOTO(out, rc = -ENODATA);
1644         }
1645
1646         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
1647                         lmmsize);
1648         LASSERT(lmm != NULL);
1649         LASSERT_REPSWABBED(req, REPLY_REC_OFF + 1);
1650
1651         /*
1652          * This is coming from the MDS, so is probably in
1653          * little endian.  We convert it to host endian before
1654          * passing it to userspace.
1655          */
1656         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1657                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1658                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1659         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1660                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1661         }
1662
1663         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1664                 struct lov_stripe_md *lsm;
1665                 struct lov_user_md_join *lmj;
1666                 int lmj_size, i, aindex = 0;
1667
1668                 rc = obd_unpackmd(sbi->ll_osc_exp, &lsm, lmm, lmmsize);
1669                 if (rc < 0)
1670                         GOTO(out, rc = -ENOMEM);
1671                 rc = obd_checkmd(sbi->ll_osc_exp, sbi->ll_mdc_exp, lsm);
1672                 if (rc)
1673                         GOTO(out_free_memmd, rc);
1674
1675                 lmj_size = sizeof(struct lov_user_md_join) +
1676                         lsm->lsm_stripe_count *
1677                         sizeof(struct lov_user_ost_data_join);
1678                 OBD_ALLOC(lmj, lmj_size);
1679                 if (!lmj)
1680                         GOTO(out_free_memmd, rc = -ENOMEM);
1681
1682                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1683                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1684                         struct lov_extent *lex =
1685                                 &lsm->lsm_array->lai_ext_array[aindex];
1686
1687                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1688                                 aindex ++;
1689                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1690                                         LPU64" len %d\n", aindex, i,
1691                                         lex->le_start, (int)lex->le_len);
1692                         lmj->lmm_objects[i].l_extent_start =
1693                                 lex->le_start;
1694
1695                         if ((int)lex->le_len == -1)
1696                                 lmj->lmm_objects[i].l_extent_end = -1;
1697                         else
1698                                 lmj->lmm_objects[i].l_extent_end =
1699                                         lex->le_start + lex->le_len;
1700                         lmj->lmm_objects[i].l_object_id =
1701                                 lsm->lsm_oinfo[i]->loi_id;
1702                         lmj->lmm_objects[i].l_object_gr =
1703                                 lsm->lsm_oinfo[i]->loi_gr;
1704                         lmj->lmm_objects[i].l_ost_gen =
1705                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1706                         lmj->lmm_objects[i].l_ost_idx =
1707                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1708                 }
1709                 lmm = (struct lov_mds_md *)lmj;
1710                 lmmsize = lmj_size;
1711 out_free_memmd:
1712                 obd_free_memmd(sbi->ll_osc_exp, &lsm);
1713         }
1714 out:
1715         *lmmp = lmm;
1716         *lmm_size = lmmsize;
1717         *request = req;
1718         return rc;
1719 }
1720 static int ll_lov_setea(struct inode *inode, struct file *file,
1721                             unsigned long arg)
1722 {
1723         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1724         struct lov_user_md  *lump;
1725         int lum_size = sizeof(struct lov_user_md) +
1726                        sizeof(struct lov_user_ost_data);
1727         int rc;
1728         ENTRY;
1729
1730         if (!capable (CAP_SYS_ADMIN))
1731                 RETURN(-EPERM);
1732
1733         OBD_ALLOC(lump, lum_size);
1734         if (lump == NULL) {
1735                 RETURN(-ENOMEM);
1736         }
1737         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1738         if (rc) {
1739                 OBD_FREE(lump, lum_size);
1740                 RETURN(-EFAULT);
1741         }
1742
1743         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1744
1745         OBD_FREE(lump, lum_size);
1746         RETURN(rc);
1747 }
1748
1749 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1750                             unsigned long arg)
1751 {
1752         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1753         int rc;
1754         int flags = FMODE_WRITE;
1755         ENTRY;
1756
1757         /* Bug 1152: copy properly when this is no longer true */
1758         LASSERT(sizeof(lum) == sizeof(*lump));
1759         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1760         rc = copy_from_user(&lum, lump, sizeof(lum));
1761         if (rc)
1762                 RETURN(-EFAULT);
1763
1764         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1765         if (rc == 0) {
1766                  put_user(0, &lump->lmm_stripe_count);
1767                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode),
1768                                     0, ll_i2info(inode)->lli_smd, lump);
1769         }
1770         RETURN(rc);
1771 }
1772
1773 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1774 {
1775         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1776
1777         if (!lsm)
1778                 RETURN(-ENODATA);
1779
1780         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode), 0, lsm,
1781                             (void *)arg);
1782 }
1783
1784 static int ll_get_grouplock(struct inode *inode, struct file *file,
1785                             unsigned long arg)
1786 {
1787         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1788         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1789                                                     .end = OBD_OBJECT_EOF}};
1790         struct lustre_handle lockh = { 0 };
1791         struct ll_inode_info *lli = ll_i2info(inode);
1792         struct lov_stripe_md *lsm = lli->lli_smd;
1793         int flags = 0, rc;
1794         ENTRY;
1795
1796         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1797                 RETURN(-EINVAL);
1798         }
1799
1800         policy.l_extent.gid = arg;
1801         if (file->f_flags & O_NONBLOCK)
1802                 flags = LDLM_FL_BLOCK_NOWAIT;
1803
1804         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1805         if (rc)
1806                 RETURN(rc);
1807
1808         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1809         fd->fd_gid = arg;
1810         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1811
1812         RETURN(0);
1813 }
1814
1815 static int ll_put_grouplock(struct inode *inode, struct file *file,
1816                             unsigned long arg)
1817 {
1818         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1819         struct ll_inode_info *lli = ll_i2info(inode);
1820         struct lov_stripe_md *lsm = lli->lli_smd;
1821         int rc;
1822         ENTRY;
1823
1824         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1825                 /* Ugh, it's already unlocked. */
1826                 RETURN(-EINVAL);
1827         }
1828
1829         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1830                 RETURN(-EINVAL);
1831
1832         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1833
1834         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1835         if (rc)
1836                 RETURN(rc);
1837
1838         fd->fd_gid = 0;
1839         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1840
1841         RETURN(0);
1842 }
1843
1844 static int join_sanity_check(struct inode *head, struct inode *tail)
1845 {
1846         ENTRY;
1847         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
1848                 CERROR("server do not support join \n");
1849                 RETURN(-EINVAL);
1850         }
1851         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
1852                 CERROR("tail ino %lu and ino head %lu must be regular\n",
1853                        head->i_ino, tail->i_ino);
1854                 RETURN(-EINVAL);
1855         }
1856         if (head->i_ino == tail->i_ino) {
1857                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
1858                 RETURN(-EINVAL);
1859         }
1860         if (head->i_size % JOIN_FILE_ALIGN) {
1861                 CERROR("hsize %llu must be times of 64K\n", head->i_size);
1862                 RETURN(-EINVAL);
1863         }
1864         RETURN(0);
1865 }
1866
1867 static int join_file(struct inode *head_inode, struct file *head_filp,
1868                      struct file *tail_filp)
1869 {
1870         struct inode *tail_inode, *tail_parent;
1871         struct dentry *tail_dentry = tail_filp->f_dentry;
1872         struct lookup_intent oit = {.it_op = IT_OPEN,
1873                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
1874         struct lustre_handle lockh;
1875         struct mdc_op_data *op_data;
1876         __u32  hsize = head_inode->i_size >> 32;
1877         __u32  tsize = head_inode->i_size;
1878         int    rc;
1879         ENTRY;
1880
1881         tail_dentry = tail_filp->f_dentry;
1882         tail_inode = tail_dentry->d_inode;
1883         tail_parent = tail_dentry->d_parent->d_inode;
1884
1885         OBD_ALLOC_PTR(op_data);
1886         if (op_data == NULL) {
1887                 RETURN(-ENOMEM);
1888         }
1889
1890         ll_prepare_mdc_op_data(op_data, head_inode, tail_parent,
1891                                tail_dentry->d_name.name,
1892                                tail_dentry->d_name.len, 0);
1893         rc = mdc_enqueue(ll_i2mdcexp(head_inode), LDLM_IBITS, &oit, LCK_PW,
1894                          op_data, &lockh, &tsize, 0, ldlm_completion_ast,
1895                          ll_mdc_blocking_ast, &hsize, 0);
1896
1897         if (rc < 0)
1898                 GOTO(out, rc);
1899
1900         rc = oit.d.lustre.it_status;
1901
1902         if (rc < 0) {
1903                 ptlrpc_req_finished((struct ptlrpc_request *)
1904                                                           oit.d.lustre.it_data);
1905                 GOTO(out, rc);
1906         }
1907
1908         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
1909                                            * away */
1910                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
1911                 oit.d.lustre.it_lock_mode = 0;
1912         }
1913         ll_release_openhandle(head_filp->f_dentry, &oit);
1914 out:
1915         if (op_data)
1916                 OBD_FREE_PTR(op_data);
1917         ll_intent_release(&oit);
1918         RETURN(rc);
1919 }
1920
1921 static int ll_file_join(struct inode *head, struct file *filp,
1922                         char *filename_tail)
1923 {
1924         struct inode *tail = NULL, *first = NULL, *second = NULL;
1925         struct dentry *tail_dentry;
1926         struct file *tail_filp, *first_filp, *second_filp;
1927         struct ll_lock_tree first_tree, second_tree;
1928         struct ll_lock_tree_node *first_node, *second_node;
1929         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
1930         int rc = 0, cleanup_phase = 0;
1931         ENTRY;
1932
1933         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
1934                head->i_ino, head->i_generation, head, filename_tail);
1935
1936         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
1937         if (IS_ERR(tail_filp)) {
1938                 CERROR("Can not open tail file %s", filename_tail);
1939                 rc = PTR_ERR(tail_filp);
1940                 GOTO(cleanup, rc);
1941         }
1942         tail = igrab(tail_filp->f_dentry->d_inode);
1943
1944         tlli = ll_i2info(tail);
1945         tail_dentry = tail_filp->f_dentry;
1946         LASSERT(tail_dentry);
1947         cleanup_phase = 1;
1948
1949         /*reorder the inode for lock sequence*/
1950         first = head->i_ino > tail->i_ino ? head : tail;
1951         second = head->i_ino > tail->i_ino ? tail : head;
1952         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
1953         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
1954
1955         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
1956                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
1957         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
1958         if (IS_ERR(first_node)){
1959                 rc = PTR_ERR(first_node);
1960                 GOTO(cleanup, rc);
1961         }
1962         first_tree.lt_fd = first_filp->private_data;
1963         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
1964         if (rc != 0)
1965                 GOTO(cleanup, rc);
1966         cleanup_phase = 2;
1967
1968         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
1969         if (IS_ERR(second_node)){
1970                 rc = PTR_ERR(second_node);
1971                 GOTO(cleanup, rc);
1972         }
1973         second_tree.lt_fd = second_filp->private_data;
1974         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
1975         if (rc != 0)
1976                 GOTO(cleanup, rc);
1977         cleanup_phase = 3;
1978
1979         rc = join_sanity_check(head, tail);
1980         if (rc)
1981                 GOTO(cleanup, rc);
1982
1983         rc = join_file(head, filp, tail_filp);
1984         if (rc)
1985                 GOTO(cleanup, rc);
1986 cleanup:
1987         switch (cleanup_phase) {
1988         case 3:
1989                 ll_tree_unlock(&second_tree);
1990                 obd_cancel_unused(ll_i2obdexp(second),
1991                                   ll_i2info(second)->lli_smd, 0, NULL);
1992         case 2:
1993                 ll_tree_unlock(&first_tree);
1994                 obd_cancel_unused(ll_i2obdexp(first),
1995                                   ll_i2info(first)->lli_smd, 0, NULL);
1996         case 1:
1997                 filp_close(tail_filp, 0);
1998                 if (tail)
1999                         iput(tail);
2000                 if (head && rc == 0) {
2001                         obd_free_memmd(ll_i2sbi(head)->ll_osc_exp,
2002                                        &hlli->lli_smd);
2003                         hlli->lli_smd = NULL;
2004                 }
2005         case 0:
2006                 break;
2007         default:
2008                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2009                 LBUG();
2010         }
2011         RETURN(rc);
2012 }
2013
2014 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2015 {
2016         struct inode *inode = dentry->d_inode;
2017         struct obd_client_handle *och;
2018         int rc;
2019         ENTRY;
2020
2021         LASSERT(inode);
2022
2023         /* Root ? Do nothing. */
2024         if (dentry->d_inode->i_sb->s_root == dentry)
2025                 RETURN(0);
2026
2027         /* No open handle to close? Move away */
2028         if (!it_disposition(it, DISP_OPEN_OPEN))
2029                 RETURN(0);
2030
2031         OBD_ALLOC(och, sizeof(*och));
2032         if (!och)
2033                 GOTO(out, rc = -ENOMEM);
2034
2035         ll_och_fill(ll_i2info(inode), it, och);
2036
2037         rc = ll_close_inode_openhandle(inode, och);
2038
2039         OBD_FREE(och, sizeof(*och));
2040  out:
2041         /* this one is in place of ll_file_open */
2042         ptlrpc_req_finished(it->d.lustre.it_data);
2043         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2044         RETURN(rc);
2045 }
2046
2047 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2048                   unsigned long arg)
2049 {
2050         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2051         int flags;
2052         ENTRY;
2053
2054         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2055                inode->i_generation, inode, cmd);
2056         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_IOCTL);
2057
2058         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2059         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2060                 RETURN(-ENOTTY);
2061
2062         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_IOCTL);
2063         switch(cmd) {
2064         case LL_IOC_GETFLAGS:
2065                 /* Get the current value of the file flags */
2066                 return put_user(fd->fd_flags, (int *)arg);
2067         case LL_IOC_SETFLAGS:
2068         case LL_IOC_CLRFLAGS:
2069                 /* Set or clear specific file flags */
2070                 /* XXX This probably needs checks to ensure the flags are
2071                  *     not abused, and to handle any flag side effects.
2072                  */
2073                 if (get_user(flags, (int *) arg))
2074                         RETURN(-EFAULT);
2075
2076                 if (cmd == LL_IOC_SETFLAGS) {
2077                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2078                             !(file->f_flags & O_DIRECT)) {
2079                                 CERROR("%s: unable to disable locking on "
2080                                        "non-O_DIRECT file\n", current->comm);
2081                                 RETURN(-EINVAL);
2082                         }
2083
2084                         fd->fd_flags |= flags;
2085                 } else {
2086                         fd->fd_flags &= ~flags;
2087                 }
2088                 RETURN(0);
2089         case LL_IOC_LOV_SETSTRIPE:
2090                 RETURN(ll_lov_setstripe(inode, file, arg));
2091         case LL_IOC_LOV_SETEA:
2092                 RETURN(ll_lov_setea(inode, file, arg));
2093         case LL_IOC_LOV_GETSTRIPE:
2094                 RETURN(ll_lov_getstripe(inode, arg));
2095         case LL_IOC_RECREATE_OBJ:
2096                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2097         case EXT3_IOC_GETFLAGS:
2098         case EXT3_IOC_SETFLAGS:
2099                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2100         case EXT3_IOC_GETVERSION_OLD:
2101         case EXT3_IOC_GETVERSION:
2102                 RETURN(put_user(inode->i_generation, (int *)arg));
2103         case LL_IOC_JOIN: {
2104                 char *ftail;
2105                 int rc;
2106
2107                 ftail = getname((const char *)arg);
2108                 if (IS_ERR(ftail))
2109                         RETURN(PTR_ERR(ftail));
2110                 rc = ll_file_join(inode, file, ftail);
2111                 putname(ftail);
2112                 RETURN(rc);
2113         }
2114         case LL_IOC_GROUP_LOCK:
2115                 RETURN(ll_get_grouplock(inode, file, arg));
2116         case LL_IOC_GROUP_UNLOCK:
2117                 RETURN(ll_put_grouplock(inode, file, arg));
2118         case IOC_OBD_STATFS:
2119                 RETURN(ll_obd_statfs(inode, (void *)arg));
2120
2121         /* We need to special case any other ioctls we want to handle,
2122          * to send them to the MDS/OST as appropriate and to properly
2123          * network encode the arg field.
2124         case EXT3_IOC_SETVERSION_OLD:
2125         case EXT3_IOC_SETVERSION:
2126         */
2127         default:
2128                 RETURN(obd_iocontrol(cmd, ll_i2obdexp(inode), 0, NULL,
2129                                      (void *)arg));
2130         }
2131 }
2132
2133 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2134 {
2135         struct inode *inode = file->f_dentry->d_inode;
2136         struct ll_inode_info *lli = ll_i2info(inode);
2137         struct lov_stripe_md *lsm = lli->lli_smd;
2138         loff_t retval;
2139         ENTRY;
2140         retval = offset + ((origin == 2) ? inode->i_size :
2141                            (origin == 1) ? file->f_pos : 0);
2142         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2143                inode->i_ino, inode->i_generation, inode, retval, retval,
2144                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2145         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_SEEK);
2146         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
2147         
2148         if (origin == 2) { /* SEEK_END */
2149                 int nonblock = 0, rc;
2150
2151                 if (file->f_flags & O_NONBLOCK)
2152                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2153
2154                 if (lsm != NULL) {
2155                         rc = ll_glimpse_size(inode, nonblock);
2156                         if (rc != 0)
2157                                 RETURN(rc);
2158                 }
2159
2160                 ll_inode_size_lock(inode, 0);
2161                 offset += inode->i_size;
2162                 ll_inode_size_unlock(inode, 0);
2163         } else if (origin == 1) { /* SEEK_CUR */
2164                 offset += file->f_pos;
2165         }
2166
2167         retval = -EINVAL;
2168         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2169                 if (offset != file->f_pos) {
2170                         file->f_pos = offset;
2171 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2172                         file->f_reada = 0;
2173                         file->f_version = ++event;
2174 #endif
2175                 }
2176                 retval = offset;
2177         }
2178
2179         RETURN(retval);
2180 }
2181
2182 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2183 {
2184         struct inode *inode = dentry->d_inode;
2185         struct ll_inode_info *lli = ll_i2info(inode);
2186         struct lov_stripe_md *lsm = lli->lli_smd;
2187         struct ll_fid fid;
2188         struct ptlrpc_request *req;
2189         int rc, err;
2190         ENTRY;
2191         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2192                inode->i_generation, inode);
2193         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_FSYNC);
2194         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_FSYNC);
2195
2196         /* fsync's caller has already called _fdata{sync,write}, we want
2197          * that IO to finish before calling the osc and mdc sync methods */
2198         rc = filemap_fdatawait(inode->i_mapping);
2199
2200         /* catch async errors that were recorded back when async writeback
2201          * failed for pages in this mapping. */
2202         err = lli->lli_async_rc;
2203         lli->lli_async_rc = 0;
2204         if (rc == 0)
2205                 rc = err;
2206         if (lsm) {
2207                 err = lov_test_and_clear_async_rc(lsm);
2208                 if (rc == 0)
2209                         rc = err;
2210         }
2211
2212         ll_inode2fid(&fid, inode);
2213         err = mdc_sync(ll_i2sbi(inode)->ll_mdc_exp, &fid, &req);
2214         if (!rc)
2215                 rc = err;
2216         if (!err)
2217                 ptlrpc_req_finished(req);
2218
2219         if (data && lsm) {
2220                 struct obdo *oa = obdo_alloc();
2221
2222                 if (!oa)
2223                         RETURN(rc ? rc : -ENOMEM);
2224
2225                 oa->o_id = lsm->lsm_object_id;
2226                 oa->o_valid = OBD_MD_FLID;
2227                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2228                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2229
2230                 err = obd_sync(ll_i2sbi(inode)->ll_osc_exp, oa, lsm,
2231                                0, OBD_OBJECT_EOF);
2232                 if (!rc)
2233                         rc = err;
2234                 obdo_free(oa);
2235         }
2236
2237         RETURN(rc);
2238 }
2239
2240 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2241 {
2242         struct inode *inode = file->f_dentry->d_inode;
2243         struct ll_sb_info *sbi = ll_i2sbi(inode);
2244         struct ldlm_res_id res_id =
2245                     { .name = {inode->i_ino, inode->i_generation, LDLM_FLOCK} };
2246         struct lustre_handle lockh = {0};
2247         ldlm_policy_data_t flock;
2248         ldlm_mode_t mode = 0;
2249         int flags = 0;
2250         int rc;
2251         ENTRY;
2252
2253         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2254                inode->i_ino, file_lock);
2255         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_FLOCK);
2256
2257         if (file_lock->fl_flags & FL_FLOCK) {
2258                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2259                 /* set missing params for flock() calls */
2260                 file_lock->fl_end = OFFSET_MAX;
2261                 file_lock->fl_pid = current->tgid;
2262         }
2263         flock.l_flock.pid = file_lock->fl_pid;
2264         flock.l_flock.start = file_lock->fl_start;
2265         flock.l_flock.end = file_lock->fl_end;
2266
2267         switch (file_lock->fl_type) {
2268         case F_RDLCK:
2269                 mode = LCK_PR;
2270                 break;
2271         case F_UNLCK:
2272                 /* An unlock request may or may not have any relation to
2273                  * existing locks so we may not be able to pass a lock handle
2274                  * via a normal ldlm_lock_cancel() request. The request may even
2275                  * unlock a byte range in the middle of an existing lock. In
2276                  * order to process an unlock request we need all of the same
2277                  * information that is given with a normal read or write record
2278                  * lock request. To avoid creating another ldlm unlock (cancel)
2279                  * message we'll treat a LCK_NL flock request as an unlock. */
2280                 mode = LCK_NL;
2281                 break;
2282         case F_WRLCK:
2283                 mode = LCK_PW;
2284                 break;
2285         default:
2286                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2287                 LBUG();
2288         }
2289
2290         switch (cmd) {
2291         case F_SETLKW:
2292 #ifdef F_SETLKW64
2293         case F_SETLKW64:
2294 #endif
2295                 flags = 0;
2296                 break;
2297         case F_SETLK:
2298 #ifdef F_SETLK64
2299         case F_SETLK64:
2300 #endif
2301                 flags = LDLM_FL_BLOCK_NOWAIT;
2302                 break;
2303         case F_GETLK:
2304 #ifdef F_GETLK64
2305         case F_GETLK64:
2306 #endif
2307                 flags = LDLM_FL_TEST_LOCK;
2308                 /* Save the old mode so that if the mode in the lock changes we
2309                  * can decrement the appropriate reader or writer refcount. */
2310                 file_lock->fl_type = mode;
2311                 break;
2312         default:
2313                 CERROR("unknown fcntl lock command: %d\n", cmd);
2314                 LBUG();
2315         }
2316
2317         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2318                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2319                flags, mode, flock.l_flock.start, flock.l_flock.end);
2320
2321         rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, res_id,
2322                               LDLM_FLOCK, &flock, mode, &flags, NULL,
2323                               ldlm_flock_completion_ast, NULL, file_lock,
2324                               NULL, 0, NULL, &lockh, 0);
2325         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2326                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2327 #ifdef HAVE_F_OP_FLOCK
2328         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2329             !(flags & LDLM_FL_TEST_LOCK))
2330                 posix_lock_file_wait(file, file_lock);
2331 #endif
2332
2333         RETURN(rc);
2334 }
2335
2336 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2337 {
2338         ENTRY;
2339
2340         RETURN(-ENOSYS);
2341 }
2342
2343 int ll_have_md_lock(struct inode *inode, __u64 bits)
2344 {
2345         struct lustre_handle lockh;
2346         struct ldlm_res_id res_id = { .name = {0} };
2347         struct obd_device *obddev;
2348         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2349         int flags;
2350         ENTRY;
2351
2352         if (!inode)
2353                RETURN(0);
2354
2355         obddev = ll_i2mdcexp(inode)->exp_obd;
2356         res_id.name[0] = inode->i_ino;
2357         res_id.name[1] = inode->i_generation;
2358
2359         CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
2360
2361         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2362         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
2363                             &policy, LCK_CR|LCK_CW|LCK_PR, &lockh)) {
2364                 RETURN(1);
2365         }
2366
2367         RETURN(0);
2368 }
2369
2370 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2371         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2372                               * and return success */
2373                 inode->i_nlink = 0;
2374                 /* This path cannot be hit for regular files unless in
2375                  * case of obscure races, so no need to to validate
2376                  * size. */
2377                 if (!S_ISREG(inode->i_mode) &&
2378                     !S_ISDIR(inode->i_mode))
2379                         return 0;
2380         }
2381
2382         if (rc) {
2383                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2384                 return -abs(rc);
2385
2386         }
2387
2388         return 0;
2389 }
2390
2391 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2392 {
2393         struct inode *inode = dentry->d_inode;
2394         struct ptlrpc_request *req = NULL;
2395         struct obd_export *exp;
2396         int rc;
2397         ENTRY;
2398
2399         if (!inode) {
2400                 CERROR("REPORT THIS LINE TO PETER\n");
2401                 RETURN(0);
2402         }
2403         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2404                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2405 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
2406         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE);
2407 #endif
2408
2409         exp = ll_i2mdcexp(inode);
2410
2411         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2412                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2413                 struct mdc_op_data op_data;
2414
2415                 /* Call getattr by fid, so do not provide name at all. */
2416                 ll_prepare_mdc_op_data(&op_data, dentry->d_parent->d_inode,
2417                                        dentry->d_inode, NULL, 0, 0);
2418                 rc = mdc_intent_lock(exp, &op_data, NULL, 0,
2419                                      /* we are not interested in name
2420                                         based lookup */
2421                                      &oit, 0, &req,
2422                                      ll_mdc_blocking_ast, 0);
2423                 if (rc < 0) {
2424                         rc = ll_inode_revalidate_fini(inode, rc);
2425                         GOTO (out, rc);
2426                 }
2427                 
2428                 rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2429                 if (rc != 0) {
2430                         ll_intent_release(&oit);
2431                         GOTO(out, rc);
2432                 }
2433
2434                 /* Unlinked? Unhash dentry, so it is not picked up later by
2435                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2436                    here to preserve get_cwd functionality on 2.6.
2437                    Bug 10503 */
2438                 if (!dentry->d_inode->i_nlink) {
2439                         spin_lock(&dcache_lock);
2440                         ll_drop_dentry(dentry);
2441                         spin_unlock(&dcache_lock);
2442                 }
2443
2444                 ll_lookup_finish_locks(&oit, dentry);
2445         } else if (!ll_have_md_lock(dentry->d_inode,
2446                                   MDS_INODELOCK_UPDATE|MDS_INODELOCK_LOOKUP)) {
2447                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2448                 struct ll_fid fid;
2449                 obd_valid valid = OBD_MD_FLGETATTR;
2450                 int ealen = 0;
2451
2452                 if (S_ISREG(inode->i_mode)) {
2453                         rc = ll_get_max_mdsize(sbi, &ealen);
2454                         if (rc) 
2455                                 RETURN(rc); 
2456                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2457                 }
2458                 ll_inode2fid(&fid, inode);
2459                 rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req);
2460                 if (rc) {
2461                         rc = ll_inode_revalidate_fini(inode, rc);
2462                         RETURN(rc);
2463                 }
2464
2465                 rc = ll_prep_inode(sbi->ll_osc_exp, &inode, req, REPLY_REC_OFF,
2466                                    NULL);
2467                 if (rc)
2468                         GOTO(out, rc);
2469         }
2470
2471         /* if object not yet allocated, don't validate size */
2472         if (ll_i2info(inode)->lli_smd == NULL) 
2473                 GOTO(out, rc = 0);
2474
2475         /* ll_glimpse_size will prefer locally cached writes if they extend
2476          * the file */
2477         rc = ll_glimpse_size(inode, 0);
2478
2479 out:
2480         ptlrpc_req_finished(req);
2481         RETURN(rc);
2482 }
2483
2484 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2485 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2486                   struct lookup_intent *it, struct kstat *stat)
2487 {
2488         struct inode *inode = de->d_inode;
2489         int res = 0;
2490
2491         res = ll_inode_revalidate_it(de, it);
2492         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
2493
2494         if (res)
2495                 return res;
2496
2497         stat->dev = inode->i_sb->s_dev;
2498         stat->ino = inode->i_ino;
2499         stat->mode = inode->i_mode;
2500         stat->nlink = inode->i_nlink;
2501         stat->uid = inode->i_uid;
2502         stat->gid = inode->i_gid;
2503         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2504         stat->atime = inode->i_atime;
2505         stat->mtime = inode->i_mtime;
2506         stat->ctime = inode->i_ctime;
2507 #ifdef HAVE_INODE_BLKSIZE
2508         stat->blksize = inode->i_blksize;
2509 #else
2510         stat->blksize = 1<<inode->i_blkbits;
2511 #endif
2512
2513         ll_inode_size_lock(inode, 0);
2514         stat->size = inode->i_size;
2515         stat->blocks = inode->i_blocks;
2516         ll_inode_size_unlock(inode, 0);
2517
2518         return 0;
2519 }
2520 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2521 {
2522         struct lookup_intent it = { .it_op = IT_GETATTR };
2523
2524         ll_vfs_ops_tally(ll_i2sbi(de->d_inode), VFS_OPS_GETATTR);
2525         return ll_getattr_it(mnt, de, &it, stat);
2526 }
2527 #endif
2528
2529 static
2530 int lustre_check_acl(struct inode *inode, int mask)
2531 {
2532 #ifdef CONFIG_FS_POSIX_ACL
2533         struct ll_inode_info *lli = ll_i2info(inode);
2534         struct posix_acl *acl;
2535         int rc;
2536         ENTRY;
2537
2538         spin_lock(&lli->lli_lock);
2539         acl = posix_acl_dup(lli->lli_posix_acl);
2540         spin_unlock(&lli->lli_lock);
2541
2542         if (!acl)
2543                 RETURN(-EAGAIN);
2544
2545         rc = posix_acl_permission(inode, acl, mask);
2546         posix_acl_release(acl);
2547
2548         RETURN(rc);
2549 #else
2550         return -EAGAIN;
2551 #endif
2552 }
2553
2554 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2555 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2556 {
2557         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2558                inode->i_ino, inode->i_generation, inode, mask);
2559
2560         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_INODE_PERMISSION);
2561         return generic_permission(inode, mask, lustre_check_acl);
2562 }
2563 #else
2564 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
2565 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2566 #else
2567 int ll_inode_permission(struct inode *inode, int mask)
2568 #endif
2569 {
2570         int mode = inode->i_mode;
2571         int rc;
2572
2573         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2574                inode->i_ino, inode->i_generation, inode, mask);
2575         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_INODE_PERMISSION);
2576
2577         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2578             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2579                 return -EROFS;
2580         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2581                 return -EACCES;
2582         if (current->fsuid == inode->i_uid) {
2583                 mode >>= 6;
2584         } else if (1) {
2585                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2586                         goto check_groups;
2587                 rc = lustre_check_acl(inode, mask);
2588                 if (rc == -EAGAIN)
2589                         goto check_groups;
2590                 if (rc == -EACCES)
2591                         goto check_capabilities;
2592                 return rc;
2593         } else {
2594 check_groups:
2595                 if (in_group_p(inode->i_gid))
2596                         mode >>= 3;
2597         }
2598         if ((mode & mask & S_IRWXO) == mask)
2599                 return 0;
2600
2601 check_capabilities:
2602         if (!(mask & MAY_EXEC) ||
2603             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2604                 if (capable(CAP_DAC_OVERRIDE))
2605                         return 0;
2606
2607         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2608             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2609                 return 0;
2610
2611         return -EACCES;
2612 }
2613 #endif
2614
2615 struct file_operations ll_file_operations = {
2616         .read           = ll_file_read,
2617         .write          = ll_file_write,
2618         .ioctl          = ll_file_ioctl,
2619         .open           = ll_file_open,
2620         .release        = ll_file_release,
2621         .mmap           = ll_file_mmap,
2622         .llseek         = ll_file_seek,
2623 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2624         .sendfile       = ll_file_sendfile,
2625 #endif
2626         .fsync          = ll_fsync,
2627 #ifdef HAVE_F_OP_FLOCK
2628         .flock          = ll_file_noflock,
2629 #endif
2630         .lock           = ll_file_noflock
2631 };
2632
2633 struct file_operations ll_file_operations_flock = {
2634         .read           = ll_file_read,
2635         .write          = ll_file_write,
2636         .ioctl          = ll_file_ioctl,
2637         .open           = ll_file_open,
2638         .release        = ll_file_release,
2639         .mmap           = ll_file_mmap,
2640         .llseek         = ll_file_seek,
2641 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2642         .sendfile       = ll_file_sendfile,
2643 #endif
2644         .fsync          = ll_fsync,
2645 #ifdef HAVE_F_OP_FLOCK
2646         .flock          = ll_file_flock,
2647 #endif
2648         .lock           = ll_file_flock
2649 };
2650
2651
2652 struct inode_operations ll_file_inode_operations = {
2653 #ifdef LUSTRE_KERNEL_VERSION
2654         .setattr_raw    = ll_setattr_raw,
2655 #endif
2656         .setattr        = ll_setattr,
2657         .truncate       = ll_truncate,
2658 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2659         .getattr        = ll_getattr,
2660 #else
2661         .revalidate_it  = ll_inode_revalidate_it,
2662 #endif
2663         .permission     = ll_inode_permission,
2664         .setxattr       = ll_setxattr,
2665         .getxattr       = ll_getxattr,
2666         .listxattr      = ll_listxattr,
2667         .removexattr    = ll_removexattr,
2668 };
2669