Whamcloud - gitweb
40c87497aff00a0b4560a6157f87e1d93bdcf473
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <linux/pagemap.h>
29 #include <linux/file.h>
30 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
31 #include <linux/lustre_compat25.h>
32 #endif
33 #include "llite_internal.h"
34
35 /* also used by llite/special.c:ll_special_open() */
36 struct ll_file_data *ll_file_data_get(void)
37 {
38         struct ll_file_data *fd;
39
40         OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
41         return fd;
42 }
43
44 static void ll_file_data_put(struct ll_file_data *fd)
45 {
46         if (fd != NULL)
47                 OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof *fd);
48 }
49
50 static int ll_close_inode_openhandle(struct inode *inode,
51                                      struct obd_client_handle *och)
52 {
53         struct ptlrpc_request *req = NULL;
54         struct obd_device *obd;
55         struct obdo *oa;
56         int rc;
57         ENTRY;
58
59         obd = class_exp2obd(ll_i2mdcexp(inode));
60         if (obd == NULL) {
61                 CERROR("Invalid MDC connection handle "LPX64"\n",
62                        ll_i2mdcexp(inode)->exp_handle.h_cookie);
63                 GOTO(out, rc = 0);
64         }
65
66         /*
67          * here we check if this is forced umount. If so this is called on
68          * canceling "open lock" and we do not call mdc_close() in this case, as
69          * it will not be successful, as import is already deactivated.
70          */
71         if (obd->obd_no_recov)
72                 GOTO(out, rc = 0);
73
74         oa = obdo_alloc();
75         if (!oa)
76                 RETURN(-ENOMEM); // XXX We leak openhandle and request here.
77
78         oa->o_id = inode->i_ino;
79         oa->o_valid = OBD_MD_FLID;
80         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
81                                    OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
82                                    OBD_MD_FLATIME | OBD_MD_FLMTIME |
83                                    OBD_MD_FLCTIME);
84         if (0 /* ll_is_inode_dirty(inode) */) {
85                 oa->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
86                 oa->o_valid |= OBD_MD_FLFLAGS;
87         }
88
89         rc = mdc_close(ll_i2mdcexp(inode), oa, och, &req);
90         if (rc == EAGAIN) {
91                 /* We are the last writer, so the MDS has instructed us to get
92                  * the file size and any write cookies, then close again. */
93                 //ll_queue_done_writing(inode);
94                 rc = 0;
95         } else if (rc) {
96                 CERROR("inode %lu mdc close failed: rc = %d\n",
97                        inode->i_ino, rc);
98         }
99
100         obdo_free(oa);
101
102         if (rc == 0) {
103                 rc = ll_objects_destroy(req, inode);
104                 if (rc)
105                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
106                                inode->i_ino, rc);
107         }
108
109         ptlrpc_req_finished(req); /* This is close request */
110         EXIT;
111 out:
112         mdc_clear_open_replay_data(och);
113
114         return rc;
115 }
116
117 int ll_mdc_real_close(struct inode *inode, int flags)
118 {
119         struct ll_inode_info *lli = ll_i2info(inode);
120         int rc = 0;
121         struct obd_client_handle **och_p;
122         struct obd_client_handle *och;
123         __u64 *och_usecount;
124
125         ENTRY;
126
127         if (flags & FMODE_WRITE) {
128                 och_p = &lli->lli_mds_write_och;
129                 och_usecount = &lli->lli_open_fd_write_count;
130         } else if (flags & FMODE_EXEC) {
131                 och_p = &lli->lli_mds_exec_och;
132                 och_usecount = &lli->lli_open_fd_exec_count;
133          } else {
134                 LASSERT(flags & FMODE_READ);
135                 och_p = &lli->lli_mds_read_och;
136                 och_usecount = &lli->lli_open_fd_read_count;
137         }
138
139         down(&lli->lli_och_sem);
140         if (*och_usecount) { /* There are still users of this handle, so
141                                 skip freeing it. */
142                 up(&lli->lli_och_sem);
143                 RETURN(0);
144         }
145         och=*och_p;
146         *och_p = NULL;
147         up(&lli->lli_och_sem);
148
149         if (och) { /* There might be a race and somebody have freed this och
150                       already */
151                 rc = ll_close_inode_openhandle(inode, och);
152                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
153                 OBD_FREE(och, sizeof *och);
154         }
155
156         RETURN(rc);
157 }
158
159 int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
160                         struct file *file)
161 {
162         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
163         struct ll_inode_info *lli = ll_i2info(inode);
164         int rc = 0;
165         ENTRY;
166
167         /* clear group lock, if present */
168         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
169                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
170                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
171                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
172                                       &fd->fd_cwlockh);
173         }
174
175         /* Let's see if we have good enough OPEN lock on the file and if
176            we can skip talking to MDS */
177         if (file->f_dentry->d_inode) { /* Can this ever be false? */
178                 int lockmode;
179                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
180                 struct lustre_handle lockh;
181                 struct inode *inode = file->f_dentry->d_inode;
182                 struct ldlm_res_id file_res_id = {.name={inode->i_ino,
183                                                          inode->i_generation}};
184                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
185
186                 down(&lli->lli_och_sem);
187                 if (fd->fd_omode & FMODE_WRITE) {
188                         lockmode = LCK_CW;
189                         LASSERT(lli->lli_open_fd_write_count);
190                         lli->lli_open_fd_write_count--;
191                 } else if (fd->fd_omode & FMODE_EXEC) {
192                         lockmode = LCK_PR;
193                         LASSERT(lli->lli_open_fd_exec_count);
194                         lli->lli_open_fd_exec_count--;
195                 } else {
196                         lockmode = LCK_CR;
197                         LASSERT(lli->lli_open_fd_read_count);
198                         lli->lli_open_fd_read_count--;
199                 }
200                 up(&lli->lli_och_sem);
201
202                 if (!ldlm_lock_match(mdc_exp->exp_obd->obd_namespace, flags,
203                                      &file_res_id, LDLM_IBITS, &policy,lockmode,
204                                      &lockh)) {
205                         rc = ll_mdc_real_close(file->f_dentry->d_inode,
206                                                 fd->fd_omode);
207                 }
208         } else {
209                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
210                        file, file->f_dentry, file->f_dentry->d_name.name);
211         }
212
213         LUSTRE_FPRIVATE(file) = NULL;
214         ll_file_data_put(fd);
215
216         RETURN(rc);
217 }
218
219 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
220
221 /* While this returns an error code, fput() the caller does not, so we need
222  * to make every effort to clean up all of our state here.  Also, applications
223  * rarely check close errors and even if an error is returned they will not
224  * re-try the close call.
225  */
226 int ll_file_release(struct inode *inode, struct file *file)
227 {
228         struct ll_file_data *fd;
229         struct ll_sb_info *sbi = ll_i2sbi(inode);
230         struct ll_inode_info *lli = ll_i2info(inode);
231         struct lov_stripe_md *lsm = lli->lli_smd;
232         int rc;
233
234         ENTRY;
235         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
236                inode->i_generation, inode);
237         ll_vfs_ops_tally(sbi, VFS_OPS_RELEASE);
238
239         /* don't do anything for / */
240         if (inode->i_sb->s_root == file->f_dentry)
241                 RETURN(0);
242
243         lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_RELEASE);
244
245         fd = LUSTRE_FPRIVATE(file);
246         LASSERT(fd != NULL);
247
248         if (lsm)
249                 lov_test_and_clear_async_rc(lsm);
250         lli->lli_async_rc = 0;
251
252         rc = ll_mdc_close(sbi->ll_mdc_exp, inode, file);
253         RETURN(rc);
254 }
255
256 static int ll_intent_file_open(struct file *file, void *lmm,
257                                int lmmsize, struct lookup_intent *itp)
258 {
259         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
260         struct mdc_op_data data;
261         struct dentry *parent = file->f_dentry->d_parent;
262         const char *name = file->f_dentry->d_name.name;
263         const int len = file->f_dentry->d_name.len;
264         struct inode *inode = file->f_dentry->d_inode;
265         struct ptlrpc_request *req;
266         int rc;
267
268         if (!parent)
269                 RETURN(-ENOENT);
270
271         ll_prepare_mdc_op_data(&data, parent->d_inode, inode, name, len, O_RDWR);
272
273         /* Usually we come here only for NFSD, and we want open lock.
274            But we can also get here with pre 2.6.15 patchless kernels, and in
275            that case that lock is also ok */
276         /* We can also get here if there was cached open handle in revalidate_it
277          * but it disappeared while we were getting from there to ll_file_open.
278          * But this means this file was closed and immediatelly opened which
279          * makes a good candidate for using OPEN lock */
280         /* If lmmsize & lmm are not 0, we are just setting stripe info
281          * parameters. No need for the open lock */
282         if (!lmm && !lmmsize)
283                 itp->it_flags |= MDS_OPEN_LOCK;
284
285         rc = mdc_intent_lock(sbi->ll_mdc_exp, &data, lmm, lmmsize, itp,
286                               0 /*unused */, &req, ll_mdc_blocking_ast, 0);
287         if (rc == -ESTALE) {
288                 /* reason for keep own exit path - don`t flood log
289                 * with messages with -ESTALE errors.
290                 */
291                 if (!it_disposition(itp, DISP_OPEN_OPEN))
292                         GOTO(out, rc);
293                 ll_release_openhandle(file->f_dentry, itp);
294                 GOTO(out_stale, rc);
295         }
296
297         if (rc != 0) {
298                CERROR("lock enqueue: err: %d\n", rc);
299                GOTO(out, rc);
300         }
301
302         if (itp->d.lustre.it_lock_mode)
303                 mdc_set_lock_data(&itp->d.lustre.it_lock_handle,
304                                   inode);
305
306         rc = ll_prep_inode(sbi->ll_osc_exp, &file->f_dentry->d_inode,
307                            req, DLM_REPLY_REC_OFF, NULL);
308 out:
309         ptlrpc_req_finished(itp->d.lustre.it_data);
310
311 out_stale:
312         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
313         ll_intent_drop_lock(itp);
314
315         RETURN(rc);
316 }
317
318
319 static void ll_och_fill(struct ll_inode_info *lli, struct lookup_intent *it,
320                         struct obd_client_handle *och)
321 {
322         struct ptlrpc_request *req = it->d.lustre.it_data;
323         struct mds_body *body;
324
325         LASSERT(och);
326
327         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
328         LASSERT(body != NULL);                  /* reply already checked out */
329         LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* and swabbed in mdc_enqueue */
330
331         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
332         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
333         lli->lli_io_epoch = body->io_epoch;
334
335         mdc_set_open_replay_data(och, it->d.lustre.it_data);
336 }
337
338 int ll_local_open(struct file *file, struct lookup_intent *it,
339                   struct ll_file_data *fd, struct obd_client_handle *och)
340 {
341         ENTRY;
342
343         LASSERT(!LUSTRE_FPRIVATE(file));
344
345         LASSERT(fd != NULL);
346
347         if (och)
348                 ll_och_fill(ll_i2info(file->f_dentry->d_inode), it, och);
349         LUSTRE_FPRIVATE(file) = fd;
350         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
351         fd->fd_omode = it->it_flags;
352
353         RETURN(0);
354 }
355
356 /* Open a file, and (for the very first open) create objects on the OSTs at
357  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
358  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
359  * lli_open_sem to ensure no other process will create objects, send the
360  * stripe MD to the MDS, or try to destroy the objects if that fails.
361  *
362  * If we already have the stripe MD locally then we don't request it in
363  * mdc_open(), by passing a lmm_size = 0.
364  *
365  * It is up to the application to ensure no other processes open this file
366  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
367  * used.  We might be able to avoid races of that sort by getting lli_open_sem
368  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
369  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
370  */
371 int ll_file_open(struct inode *inode, struct file *file)
372 {
373         struct ll_inode_info *lli = ll_i2info(inode);
374         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
375                                           .it_flags = file->f_flags };
376         struct lov_stripe_md *lsm;
377         struct ptlrpc_request *req = NULL;
378         struct obd_client_handle **och_p;
379         __u64 *och_usecount;
380         struct ll_file_data *fd;
381         int rc = 0;
382         ENTRY;
383
384         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
385                inode->i_generation, inode, file->f_flags);
386         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_OPEN);
387
388         /* don't do anything for / */
389         if (inode->i_sb->s_root == file->f_dentry)
390                 RETURN(0);
391
392 #ifdef LUSTRE_KERNEL_VERSION
393         it = file->f_it;
394 #else
395         it = file->private_data; /* XXX: compat macro */
396         file->private_data = NULL; /* prevent ll_local_open assertion */
397 #endif
398
399         fd = ll_file_data_get();
400         if (fd == NULL)
401                 RETURN(-ENOMEM);
402
403         if (!it || !it->d.lustre.it_disposition) {
404                 /* Convert f_flags into access mode. We cannot use file->f_mode,
405                  * because everything but O_ACCMODE mask was stripped from it */
406                 if ((oit.it_flags + 1) & O_ACCMODE)
407                         oit.it_flags++;
408                 if (file->f_flags & O_TRUNC)
409                         oit.it_flags |= FMODE_WRITE;
410
411                 /* kernel only call f_op->open in dentry_open.  filp_open calls
412                  * dentry_open after call to open_namei that checks permissions.
413                  * Only nfsd_open call dentry_open directly without checking
414                  * permissions and because of that this code below is safe. */
415                 if (oit.it_flags & FMODE_WRITE)
416                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
417
418                 /* We do not want O_EXCL here, presumably we opened the file
419                  * already? XXX - NFS implications? */
420                 oit.it_flags &= ~O_EXCL;
421
422                 it = &oit;
423         }
424
425         /* Let's see if we have file open on MDS already. */
426         if (it->it_flags & FMODE_WRITE) {
427                 och_p = &lli->lli_mds_write_och;
428                 och_usecount = &lli->lli_open_fd_write_count;
429         } else if (it->it_flags & FMODE_EXEC) {
430                 och_p = &lli->lli_mds_exec_och;
431                 och_usecount = &lli->lli_open_fd_exec_count;
432          } else {
433                 och_p = &lli->lli_mds_read_och;
434                 och_usecount = &lli->lli_open_fd_read_count;
435         }
436
437         LASSERTF(it->it_flags != 0, "it %p dist %d \n", it,
438                  it->d.lustre.it_disposition);
439
440         down(&lli->lli_och_sem);
441         if (*och_p) { /* Open handle is present */
442                 if (it_disposition(it, DISP_OPEN_OPEN)) {
443                         /* Well, there's extra open request that we do not need,
444                            let's close it somehow. This will decref request. */
445                         ll_release_openhandle(file->f_dentry, it);
446                 }
447                 (*och_usecount)++;
448
449                 rc = ll_local_open(file, it, fd, NULL);
450
451                 LASSERTF(rc == 0, "rc = %d\n", rc);
452         } else {
453                 LASSERT(*och_usecount == 0);
454                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
455                 if (!*och_p) {
456                         ll_file_data_put(fd);
457                         GOTO(out_och_free, rc = -ENOMEM);
458                 }
459                 (*och_usecount)++;
460                 if (!it->d.lustre.it_disposition) {
461                         rc = ll_intent_file_open(file, NULL, 0, it);
462                         if (rc) {
463                                 ll_file_data_put(fd);
464                                 GOTO(out_och_free, rc);
465                         }
466
467                         /* Got some error? Release the request */
468                         if (it->d.lustre.it_status < 0) {
469                                 req = it->d.lustre.it_data;
470                                 ptlrpc_req_finished(req);
471                         }
472                         mdc_set_lock_data(&it->d.lustre.it_lock_handle,
473                                           file->f_dentry->d_inode);
474                 }
475                 req = it->d.lustre.it_data;
476
477                 /* mdc_intent_lock() didn't get a request ref if there was an
478                  * open error, so don't do cleanup on the request here
479                  * (bug 3430) */
480                 /* XXX (green): Should not we bail out on any error here, not
481                  * just open error? */
482                 rc = it_open_error(DISP_OPEN_OPEN, it);
483                 if (rc) {
484                         ll_file_data_put(fd);
485                         GOTO(out_och_free, rc);
486                 }
487
488                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
489                 rc = ll_local_open(file, it, fd, *och_p);
490                 LASSERTF(rc == 0, "rc = %d\n", rc);
491         }
492         up(&lli->lli_och_sem);
493
494         /* Must do this outside lli_och_sem lock to prevent deadlock where
495            different kind of OPEN lock for this same inode gets cancelled
496            by ldlm_cancel_lru */
497         if (!S_ISREG(inode->i_mode))
498                 GOTO(out, rc);
499
500         lsm = lli->lli_smd;
501         if (lsm == NULL) {
502                 if (file->f_flags & O_LOV_DELAY_CREATE ||
503                     !(file->f_mode & FMODE_WRITE)) {
504                         CDEBUG(D_INODE, "object creation was delayed\n");
505                         GOTO(out, rc);
506                 }
507         }
508         file->f_flags &= ~O_LOV_DELAY_CREATE;
509         GOTO(out, rc);
510  out:
511         ptlrpc_req_finished(req);
512         if (req)
513                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
514         if (rc == 0) {
515                 ll_open_complete(inode);
516         } else {
517 out_och_free:
518                 if (*och_p) {
519                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
520                         *och_p = NULL; /* OBD_FREE writes some magic there */
521                         (*och_usecount)--;
522                 }
523                 up(&lli->lli_och_sem);
524         }
525         return rc;
526 }
527
528 /* Fills the obdo with the attributes for the inode defined by lsm */
529 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
530                    struct obdo *oa)
531 {
532         struct ptlrpc_request_set *set;
533         struct obd_info oinfo = { { { 0 } } };
534         int rc;
535         ENTRY;
536
537         LASSERT(lsm != NULL);
538
539         memset(oa, 0, sizeof *oa);
540         oinfo.oi_md = lsm;
541         oinfo.oi_oa = oa;
542         oa->o_id = lsm->lsm_object_id;
543         oa->o_mode = S_IFREG;
544         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
545                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
546                 OBD_MD_FLCTIME;
547
548         set = ptlrpc_prep_set();
549         if (set == NULL) {
550                 rc = -ENOMEM;
551         } else {
552                 rc = obd_getattr_async(exp, &oinfo, set);
553                 if (rc == 0)
554                         rc = ptlrpc_set_wait(set);
555                 ptlrpc_set_destroy(set);
556         }
557         if (rc)
558                 RETURN(rc);
559
560         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
561                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
562         RETURN(0);
563 }
564
565 static inline void ll_remove_suid(struct inode *inode)
566 {
567         unsigned int mode;
568
569         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
570         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
571
572         /* was any of the uid bits set? */
573         mode &= inode->i_mode;
574         if (mode && !capable(CAP_FSETID)) {
575                 inode->i_mode &= ~mode;
576                 // XXX careful here - we cannot change the size
577         }
578 }
579
580 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
581 {
582         struct ll_inode_info *lli = ll_i2info(inode);
583         struct lov_stripe_md *lsm = lli->lli_smd;
584         struct obd_export *exp = ll_i2obdexp(inode);
585         struct {
586                 char name[16];
587                 struct ldlm_lock *lock;
588                 struct lov_stripe_md *lsm;
589         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
590         __u32 stripe, vallen = sizeof(stripe);
591         int rc;
592         ENTRY;
593
594         if (lsm->lsm_stripe_count == 1)
595                 GOTO(check, stripe = 0);
596
597         /* get our offset in the lov */
598         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
599         if (rc != 0) {
600                 CERROR("obd_get_info: rc = %d\n", rc);
601                 RETURN(rc);
602         }
603         LASSERT(stripe < lsm->lsm_stripe_count);
604
605 check:
606         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
607             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[1]){
608                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
609                            lsm->lsm_oinfo[stripe]->loi_id,
610                            lsm->lsm_oinfo[stripe]->loi_gr);
611                 RETURN(-ELDLM_NO_LOCK_DATA);
612         }
613
614         RETURN(stripe);
615 }
616
617 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
618  * we get a lock cancellation for each stripe, so we have to map the obd's
619  * region back onto the stripes in the file that it held.
620  *
621  * No one can dirty the extent until we've finished our work and they can
622  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
623  * but other kernel actors could have pages locked.
624  *
625  * Called with the DLM lock held. */
626 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
627                               struct ldlm_lock *lock, __u32 stripe)
628 {
629         ldlm_policy_data_t tmpex;
630         unsigned long start, end, count, skip, i, j;
631         struct page *page;
632         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
633         struct lustre_handle lockh;
634         ENTRY;
635
636         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
637         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
638                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
639                inode->i_size);
640
641         /* our locks are page granular thanks to osc_enqueue, we invalidate the
642          * whole page. */
643         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
644             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
645                 LDLM_ERROR(lock, "lock not aligned on CFS_PAGE_SIZE %lu", CFS_PAGE_SIZE);
646         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
647         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
648
649         count = ~0;
650         skip = 0;
651         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
652         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
653         if (lsm->lsm_stripe_count > 1) {
654                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
655                 skip = (lsm->lsm_stripe_count - 1) * count;
656                 start += start/count * skip + stripe * count;
657                 if (end != ~0)
658                         end += end/count * skip + stripe * count;
659         }
660         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
661                 end = ~0;
662
663         i = inode->i_size ? (inode->i_size - 1) >> CFS_PAGE_SHIFT : 0;
664         if (i < end)
665                 end = i;
666
667         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
668                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
669                count, skip, end, discard ? " (DISCARDING)" : "");
670
671         /* walk through the vmas on the inode and tear down mmaped pages that
672          * intersect with the lock.  this stops immediately if there are no
673          * mmap()ed regions of the file.  This is not efficient at all and
674          * should be short lived. We'll associate mmap()ed pages with the lock
675          * and will be able to find them directly */
676         for (i = start; i <= end; i += (j + skip)) {
677                 j = min(count - (i % count), end - i + 1);
678                 LASSERT(j > 0);
679                 LASSERT(inode->i_mapping);
680                 if (ll_teardown_mmaps(inode->i_mapping,
681                                       (__u64)i << CFS_PAGE_SHIFT,
682                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
683                         break;
684         }
685
686         /* this is the simplistic implementation of page eviction at
687          * cancelation.  It is careful to get races with other page
688          * lockers handled correctly.  fixes from bug 20 will make it
689          * more efficient by associating locks with pages and with
690          * batching writeback under the lock explicitly. */
691         for (i = start, j = start % count; i <= end;
692              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
693                 if (j == count) {
694                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
695                         i += skip;
696                         j = 0;
697                         if (i > end)
698                                 break;
699                 }
700                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
701                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
702                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
703                          start, i, end);
704
705                 if (!mapping_has_pages(inode->i_mapping)) {
706                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
707                         break;
708                 }
709
710                 cond_resched();
711
712                 page = find_get_page(inode->i_mapping, i);
713                 if (page == NULL)
714                         continue;
715                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
716                                i, tmpex.l_extent.start);
717                 lock_page(page);
718
719                 /* page->mapping to check with racing against teardown */
720                 if (!discard && clear_page_dirty_for_io(page)) {
721                         rc = ll_call_writepage(inode, page);
722                         if (rc != 0)
723                                 CERROR("writepage of page %p failed: %d\n",
724                                        page, rc);
725                         /* either waiting for io to complete or reacquiring
726                          * the lock that the failed writepage released */
727                         lock_page(page);
728                 }
729
730                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
731                 /* check to see if another DLM lock covers this page  b=2765 */
732                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
733                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
734                                       LDLM_FL_TEST_LOCK,
735                                       &lock->l_resource->lr_name, LDLM_EXTENT,
736                                       &tmpex, LCK_PR | LCK_PW, &lockh);
737                 if (rc2 == 0 && page->mapping != NULL) {
738                         struct ll_async_page *llap = llap_cast_private(page);
739                         // checking again to account for writeback's lock_page()
740                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
741                         if (llap)
742                                 ll_ra_accounting(llap, inode->i_mapping);
743                         ll_truncate_complete_page(page);
744                 }
745                 unlock_page(page);
746                 page_cache_release(page);
747         }
748         LASSERTF(tmpex.l_extent.start <=
749                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
750                   lock->l_policy_data.l_extent.end + 1),
751                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
752                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
753                  start, i, end);
754         EXIT;
755 }
756
757 static int ll_extent_lock_callback(struct ldlm_lock *lock,
758                                    struct ldlm_lock_desc *new, void *data,
759                                    int flag)
760 {
761         struct lustre_handle lockh = { 0 };
762         int rc;
763         ENTRY;
764
765         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
766                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
767                 LBUG();
768         }
769
770         switch (flag) {
771         case LDLM_CB_BLOCKING:
772                 ldlm_lock2handle(lock, &lockh);
773                 rc = ldlm_cli_cancel(&lockh);
774                 if (rc != ELDLM_OK)
775                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
776                 break;
777         case LDLM_CB_CANCELING: {
778                 struct inode *inode;
779                 struct ll_inode_info *lli;
780                 struct lov_stripe_md *lsm;
781                 int stripe;
782                 __u64 kms;
783
784                 /* This lock wasn't granted, don't try to evict pages */
785                 if (lock->l_req_mode != lock->l_granted_mode)
786                         RETURN(0);
787
788                 inode = ll_inode_from_lock(lock);
789                 if (inode == NULL)
790                         RETURN(0);
791                 lli = ll_i2info(inode);
792                 if (lli == NULL)
793                         goto iput;
794                 if (lli->lli_smd == NULL)
795                         goto iput;
796                 lsm = lli->lli_smd;
797
798                 stripe = ll_lock_to_stripe_offset(inode, lock);
799                 if (stripe < 0)
800                         goto iput;
801
802                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
803
804                 lov_stripe_lock(lsm);
805                 lock_res_and_lock(lock);
806                 kms = ldlm_extent_shift_kms(lock,
807                                             lsm->lsm_oinfo[stripe]->loi_kms);
808
809                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
810                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
811                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
812                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
813                 unlock_res_and_lock(lock);
814                 lov_stripe_unlock(lsm);
815                 //ll_try_done_writing(inode);
816         iput:
817                 iput(inode);
818                 break;
819         }
820         default:
821                 LBUG();
822         }
823
824         RETURN(0);
825 }
826
827 #if 0
828 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
829 {
830         /* XXX ALLOCATE - 160 bytes */
831         struct inode *inode = ll_inode_from_lock(lock);
832         struct ll_inode_info *lli = ll_i2info(inode);
833         struct lustre_handle lockh = { 0 };
834         struct ost_lvb *lvb;
835         int stripe;
836         ENTRY;
837
838         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
839                      LDLM_FL_BLOCK_CONV)) {
840                 LBUG(); /* not expecting any blocked async locks yet */
841                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
842                            "lock, returning");
843                 ldlm_lock_dump(D_OTHER, lock, 0);
844                 ldlm_reprocess_all(lock->l_resource);
845                 RETURN(0);
846         }
847
848         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
849
850         stripe = ll_lock_to_stripe_offset(inode, lock);
851         if (stripe < 0)
852                 goto iput;
853
854         if (lock->l_lvb_len) {
855                 struct lov_stripe_md *lsm = lli->lli_smd;
856                 __u64 kms;
857                 lvb = lock->l_lvb_data;
858                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
859
860                 LOCK_INODE_MUTEX(inode);
861                 lock_res_and_lock(lock);
862                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
863                 kms = ldlm_extent_shift_kms(NULL, kms);
864                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
865                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
866                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
867                 lsm->lsm_oinfo[stripe].loi_kms = kms;
868                 unlock_res_and_lock(lock);
869                 UNLOCK_INODE_MUTEX(inode);
870         }
871
872 iput:
873         iput(inode);
874         wake_up(&lock->l_waitq);
875
876         ldlm_lock2handle(lock, &lockh);
877         ldlm_lock_decref(&lockh, LCK_PR);
878         RETURN(0);
879 }
880 #endif
881
882 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
883 {
884         struct ptlrpc_request *req = reqp;
885         struct inode *inode = ll_inode_from_lock(lock);
886         struct ll_inode_info *lli;
887         struct lov_stripe_md *lsm;
888         struct ost_lvb *lvb;
889         int rc, stripe;
890         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
891         ENTRY;
892
893         if (inode == NULL)
894                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
895         lli = ll_i2info(inode);
896         if (lli == NULL)
897                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
898         lsm = lli->lli_smd;
899         if (lsm == NULL)
900                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
901
902         /* First, find out which stripe index this lock corresponds to. */
903         stripe = ll_lock_to_stripe_offset(inode, lock);
904         if (stripe < 0)
905                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
906
907         rc = lustre_pack_reply(req, 2, size, NULL);
908         if (rc) {
909                 CERROR("lustre_pack_reply: %d\n", rc);
910                 GOTO(iput, rc);
911         }
912
913         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
914         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
915         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
916         lvb->lvb_atime = LTIME_S(inode->i_atime);
917         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
918
919         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
920                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
921                    inode->i_size, stripe, lvb->lvb_size, lvb->lvb_mtime,
922                    lvb->lvb_atime, lvb->lvb_ctime);
923  iput:
924         iput(inode);
925
926  out:
927         /* These errors are normal races, so we don't want to fill the console
928          * with messages by calling ptlrpc_error() */
929         if (rc == -ELDLM_NO_LOCK_DATA)
930                 lustre_pack_reply(req, 1, NULL, NULL);
931
932         req->rq_status = rc;
933         return rc;
934 }
935
936 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
937                      lstat_t *st)
938 {
939         struct lustre_handle lockh = { 0 };
940         struct obd_enqueue_info einfo = { 0 };
941         struct obd_info oinfo = { { { 0 } } };
942         struct ost_lvb lvb;
943         int rc;
944         
945         ENTRY;
946         
947         einfo.ei_type = LDLM_EXTENT;
948         einfo.ei_mode = LCK_PR;
949         einfo.ei_flags = LDLM_FL_HAS_INTENT;
950         einfo.ei_cb_bl = ll_extent_lock_callback;
951         einfo.ei_cb_cp = ldlm_completion_ast;
952         einfo.ei_cb_gl = ll_glimpse_callback;
953         einfo.ei_cbdata = NULL;
954
955         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
956         oinfo.oi_lockh = &lockh;
957         oinfo.oi_md = lsm;
958
959         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
960         if (rc == -ENOENT)
961                 RETURN(rc);
962         if (rc != 0) {
963                 CERROR("obd_enqueue returned rc %d, "
964                        "returning -EIO\n", rc);
965                 RETURN(rc > 0 ? -EIO : rc);
966         }
967         
968         lov_stripe_lock(lsm);
969         memset(&lvb, 0, sizeof(lvb));
970         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 0);
971         st->st_size = lvb.lvb_size;
972         st->st_blocks = lvb.lvb_blocks;
973         st->st_mtime = lvb.lvb_mtime;
974         st->st_atime = lvb.lvb_atime;
975         st->st_ctime = lvb.lvb_ctime;
976         lov_stripe_unlock(lsm);
977         
978         RETURN(rc);
979 }
980
981 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
982  * file (because it prefers KMS over RSS when larger) */
983 int ll_glimpse_size(struct inode *inode, int ast_flags)
984 {
985         struct ll_inode_info *lli = ll_i2info(inode);
986         struct ll_sb_info *sbi = ll_i2sbi(inode);
987         struct lustre_handle lockh = { 0 };
988         struct obd_enqueue_info einfo = { 0 };
989         struct obd_info oinfo = { { { 0 } } };
990         struct ost_lvb lvb;
991         int rc;
992         ENTRY;
993
994         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
995
996         if (!lli->lli_smd) {
997                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
998                 RETURN(0);
999         }
1000
1001         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1002          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1003          *       won't revoke any conflicting DLM locks held. Instead,
1004          *       ll_glimpse_callback() will be called on each client
1005          *       holding a DLM lock against this file, and resulting size
1006          *       will be returned for each stripe. DLM lock on [0, EOF] is
1007          *       acquired only if there were no conflicting locks. */
1008         einfo.ei_type = LDLM_EXTENT;
1009         einfo.ei_mode = LCK_PR;
1010         einfo.ei_flags = ast_flags | LDLM_FL_HAS_INTENT;
1011         einfo.ei_cb_bl = ll_extent_lock_callback;
1012         einfo.ei_cb_cp = ldlm_completion_ast;
1013         einfo.ei_cb_gl = ll_glimpse_callback;
1014         einfo.ei_cbdata = inode;
1015
1016         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1017         oinfo.oi_lockh = &lockh;
1018         oinfo.oi_md = lli->lli_smd;
1019
1020         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
1021         if (rc == -ENOENT)
1022                 RETURN(rc);
1023         if (rc != 0) {
1024                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1025                 RETURN(rc > 0 ? -EIO : rc);
1026         }
1027
1028         ll_inode_size_lock(inode, 1);
1029         inode_init_lvb(inode, &lvb);
1030         obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0);
1031         inode->i_size = lvb.lvb_size;
1032         inode->i_blocks = lvb.lvb_blocks;
1033         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1034         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1035         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1036         ll_inode_size_unlock(inode, 1);
1037
1038         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1039                inode->i_size, inode->i_blocks);
1040
1041         RETURN(rc);
1042 }
1043
1044 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1045                    struct lov_stripe_md *lsm, int mode,
1046                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1047                    int ast_flags)
1048 {
1049         struct ll_sb_info *sbi = ll_i2sbi(inode);
1050         struct ost_lvb lvb;
1051         struct obd_enqueue_info einfo = { 0 };
1052         struct obd_info oinfo = { { { 0 } } };
1053         int rc;
1054         ENTRY;
1055
1056         LASSERT(!lustre_handle_is_used(lockh));
1057         LASSERT(lsm != NULL);
1058
1059         /* don't drop the mmapped file to LRU */
1060         if (mapping_mapped(inode->i_mapping))
1061                 ast_flags |= LDLM_FL_NO_LRU;
1062
1063         /* XXX phil: can we do this?  won't it screw the file size up? */
1064         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1065             (sbi->ll_flags & LL_SBI_NOLCK))
1066                 RETURN(0);
1067
1068         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1069                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1070
1071         einfo.ei_type = LDLM_EXTENT;
1072         einfo.ei_mode = mode;
1073         einfo.ei_flags = ast_flags;
1074         einfo.ei_cb_bl = ll_extent_lock_callback;
1075         einfo.ei_cb_cp = ldlm_completion_ast;
1076         einfo.ei_cb_gl = ll_glimpse_callback;
1077         einfo.ei_cbdata = inode;
1078
1079         oinfo.oi_policy = *policy;
1080         oinfo.oi_lockh = lockh;
1081         oinfo.oi_md = lsm;
1082
1083         rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo);
1084         *policy = oinfo.oi_policy;
1085         if (rc > 0)
1086                 rc = -EIO;
1087
1088         ll_inode_size_lock(inode, 1);
1089         inode_init_lvb(inode, &lvb);
1090         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 1);
1091
1092         if (policy->l_extent.start == 0 &&
1093             policy->l_extent.end == OBD_OBJECT_EOF) {
1094                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1095                  * the kms under both a DLM lock and the
1096                  * ll_inode_size_lock().  If we don't get the
1097                  * ll_inode_size_lock() here we can match the DLM lock and
1098                  * reset i_size from the kms before the truncating path has
1099                  * updated the kms.  generic_file_write can then trust the
1100                  * stale i_size when doing appending writes and effectively
1101                  * cancel the result of the truncate.  Getting the
1102                  * ll_inode_size_lock() after the enqueue maintains the DLM
1103                  * -> ll_inode_size_lock() acquiring order. */
1104                 inode->i_size = lvb.lvb_size;
1105                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1106                        inode->i_ino, inode->i_size);
1107         }
1108
1109         if (rc == 0) {
1110                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1111                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1112                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1113         }
1114         ll_inode_size_unlock(inode, 1);
1115
1116         RETURN(rc);
1117 }
1118
1119 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1120                      struct lov_stripe_md *lsm, int mode,
1121                      struct lustre_handle *lockh)
1122 {
1123         struct ll_sb_info *sbi = ll_i2sbi(inode);
1124         int rc;
1125         ENTRY;
1126
1127         /* XXX phil: can we do this?  won't it screw the file size up? */
1128         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1129             (sbi->ll_flags & LL_SBI_NOLCK))
1130                 RETURN(0);
1131
1132         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
1133
1134         RETURN(rc);
1135 }
1136
1137 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1138                             loff_t *ppos)
1139 {
1140         struct inode *inode = file->f_dentry->d_inode;
1141         struct ll_inode_info *lli = ll_i2info(inode);
1142         struct lov_stripe_md *lsm = lli->lli_smd;
1143         struct ll_sb_info *sbi = ll_i2sbi(inode);
1144         struct ll_lock_tree tree;
1145         struct ll_lock_tree_node *node;
1146         struct ost_lvb lvb;
1147         struct ll_ra_read bead;
1148         int rc, ra = 0;
1149         loff_t end;
1150         ssize_t retval, chunk, sum = 0;
1151
1152         __u64 kms;
1153         ENTRY;
1154         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1155                inode->i_ino, inode->i_generation, inode, count, *ppos);
1156         ll_vfs_ops_tally(sbi, VFS_OPS_READ);
1157
1158         /* "If nbyte is 0, read() will return 0 and have no other results."
1159          *                      -- Single Unix Spec */
1160         if (count == 0)
1161                 RETURN(0);
1162
1163         lprocfs_counter_add(sbi->ll_stats, LPROC_LL_READ_BYTES, count);
1164
1165         if (!lsm) {
1166                 /* Read on file with no objects should return zero-filled
1167                  * buffers up to file size (we can get non-zero sizes with
1168                  * mknod + truncate, then opening file for read. This is a
1169                  * common pattern in NFS case, it seems). Bug 6243 */
1170                 int notzeroed;
1171                 /* Since there are no objects on OSTs, we have nothing to get
1172                  * lock on and so we are forced to access inode->i_size
1173                  * unguarded */
1174
1175                 /* Read beyond end of file */
1176                 if (*ppos >= inode->i_size)
1177                         RETURN(0);
1178
1179                 if (count > inode->i_size - *ppos)
1180                         count = inode->i_size - *ppos;
1181                 /* Make sure to correctly adjust the file pos pointer for
1182                  * EFAULT case */
1183                 notzeroed = clear_user(buf, count);
1184                 count -= notzeroed;
1185                 *ppos += count;
1186                 if (!count)
1187                         RETURN(-EFAULT);
1188                 RETURN(count);
1189         }
1190
1191 repeat:
1192         if (sbi->ll_max_rw_chunk != 0) {
1193                 /* first, let's know the end of the current stripe */
1194                 end = *ppos;
1195                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END,
1196                                 (obd_off *)&end);
1197
1198                 /* correct, the end is beyond the request */
1199                 if (end > *ppos + count - 1)
1200                         end = *ppos + count - 1;
1201
1202                 /* and chunk shouldn't be too large even if striping is wide */
1203                 if (end - *ppos > sbi->ll_max_rw_chunk)
1204                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1205         } else {
1206                 end = *ppos + count - 1;
1207         }
1208        
1209         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1210         tree.lt_fd = LUSTRE_FPRIVATE(file);
1211         rc = ll_tree_lock(&tree, node, buf, count,
1212                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1213         if (rc != 0)
1214                 GOTO(out, retval = rc);
1215
1216         ll_inode_size_lock(inode, 1);
1217         /*
1218          * Consistency guarantees: following possibilities exist for the
1219          * relation between region being read and real file size at this
1220          * moment:
1221          *
1222          *  (A): the region is completely inside of the file;
1223          *
1224          *  (B-x): x bytes of region are inside of the file, the rest is
1225          *  outside;
1226          *
1227          *  (C): the region is completely outside of the file.
1228          *
1229          * This classification is stable under DLM lock acquired by
1230          * ll_tree_lock() above, because to change class, other client has to
1231          * take DLM lock conflicting with our lock. Also, any updates to
1232          * ->i_size by other threads on this client are serialized by
1233          * ll_inode_size_lock(). This guarantees that short reads are handled
1234          * correctly in the face of concurrent writes and truncates.
1235          */
1236         inode_init_lvb(inode, &lvb);
1237         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1238         kms = lvb.lvb_size;
1239         if (*ppos + count - 1 > kms) {
1240                 /* A glimpse is necessary to determine whether we return a
1241                  * short read (B) or some zeroes at the end of the buffer (C) */
1242                 ll_inode_size_unlock(inode, 1);
1243                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1244                 if (retval) {
1245                         ll_tree_unlock(&tree);
1246                         goto out;
1247                 }
1248         } else {
1249                 /* region is within kms and, hence, within real file size (A).
1250                  * We need to increase i_size to cover the read region so that
1251                  * generic_file_read() will do its job, but that doesn't mean
1252                  * the kms size is _correct_, it is only the _minimum_ size.
1253                  * If someone does a stat they will get the correct size which
1254                  * will always be >= the kms value here.  b=11081 */
1255                 if (inode->i_size < kms)
1256                         inode->i_size = kms;
1257                 ll_inode_size_unlock(inode, 1);
1258         }
1259
1260         chunk = end - *ppos + 1;
1261         CDEBUG(D_INODE,"Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1262                inode->i_ino, chunk, *ppos, inode->i_size);
1263
1264         /* turn off the kernel's read-ahead */
1265 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1266         file->f_ramax = 0;
1267 #else
1268         file->f_ra.ra_pages = 0;
1269 #endif
1270         /* initialize read-ahead window once per syscall */
1271         if (ra == 0) {
1272                 ra = 1;
1273                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1274                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1275                 ll_ra_read_in(file, &bead);
1276         }
1277
1278         /* BUG: 5972 */
1279         file_accessed(file);
1280         retval = generic_file_read(file, buf, chunk, ppos);
1281         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 0);
1282
1283         ll_tree_unlock(&tree);
1284
1285         if (retval > 0) {
1286                 buf += retval;
1287                 count -= retval;
1288                 sum += retval;
1289                 if (retval == chunk && count > 0)
1290                         goto repeat;
1291         }
1292
1293  out:
1294         if (ra != 0)
1295                 ll_ra_read_ex(file, &bead);
1296         retval = (sum > 0) ? sum : retval;
1297         RETURN(retval);
1298 }
1299
1300 /*
1301  * Write to a file (through the page cache).
1302  */
1303 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1304                              loff_t *ppos)
1305 {
1306         struct inode *inode = file->f_dentry->d_inode;
1307         struct ll_sb_info *sbi = ll_i2sbi(inode);
1308         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1309         struct ll_lock_tree tree;
1310         struct ll_lock_tree_node *node;
1311         loff_t maxbytes = ll_file_maxbytes(inode);
1312         loff_t lock_start, lock_end, end;
1313         ssize_t retval, chunk, sum = 0;
1314         int rc;
1315         ENTRY;
1316
1317         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1318                inode->i_ino, inode->i_generation, inode, count, *ppos);
1319         ll_vfs_ops_tally(sbi, VFS_OPS_WRITE);
1320         
1321         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1322
1323         /* POSIX, but surprised the VFS doesn't check this already */
1324         if (count == 0)
1325                 RETURN(0);
1326
1327         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1328          * called on the file, don't fail the below assertion (bug 2388). */
1329         if (file->f_flags & O_LOV_DELAY_CREATE &&
1330             ll_i2info(inode)->lli_smd == NULL)
1331                 RETURN(-EBADF);
1332
1333         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1334
1335         down(&ll_i2info(inode)->lli_write_sem);
1336
1337 repeat:
1338         chunk = 0; /* just to fix gcc's warning */
1339         end = *ppos + count - 1;
1340
1341         if (file->f_flags & O_APPEND) {
1342                 lock_start = 0;
1343                 lock_end = OBD_OBJECT_EOF;
1344         } else if (sbi->ll_max_rw_chunk != 0) {
1345                 /* first, let's know the end of the current stripe */
1346                 end = *ppos;
1347                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END, 
1348                                 (obd_off *)&end);
1349
1350                 /* correct, the end is beyond the request */
1351                 if (end > *ppos + count - 1)
1352                         end = *ppos + count - 1;
1353
1354                 /* and chunk shouldn't be too large even if striping is wide */
1355                 if (end - *ppos > sbi->ll_max_rw_chunk)
1356                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1357                 lock_start = *ppos;
1358                 lock_end = end;
1359         } else {
1360                 lock_start = *ppos;
1361                 lock_end = *ppos + count - 1;
1362         }
1363         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1364
1365         if (IS_ERR(node))
1366                 GOTO(out, retval = PTR_ERR(node));
1367
1368         tree.lt_fd = LUSTRE_FPRIVATE(file);
1369         rc = ll_tree_lock(&tree, node, buf, count,
1370                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1371         if (rc != 0)
1372                 GOTO(out, retval = rc);
1373
1374         /* This is ok, g_f_w will overwrite this under i_sem if it races
1375          * with a local truncate, it just makes our maxbyte checking easier.
1376          * The i_size value gets updated in ll_extent_lock() as a consequence
1377          * of the [0,EOF] extent lock we requested above. */
1378         if (file->f_flags & O_APPEND) {
1379                 *ppos = inode->i_size;
1380                 end = *ppos + count - 1;
1381         }
1382
1383         if (*ppos >= maxbytes) {
1384                 send_sig(SIGXFSZ, current, 0);
1385                 GOTO(out, retval = -EFBIG);
1386         }
1387         if (*ppos + count > maxbytes)
1388                 count = maxbytes - *ppos;
1389
1390         /* generic_file_write handles O_APPEND after getting i_mutex */
1391         chunk = end - *ppos + 1;
1392         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1393                inode->i_ino, chunk, *ppos);
1394         retval = generic_file_write(file, buf, chunk, ppos);
1395         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1396
1397 out:
1398         ll_tree_unlock(&tree);
1399
1400         if (retval > 0) {
1401                 buf += retval;
1402                 count -= retval;
1403                 sum += retval;
1404                 if (retval == chunk && count > 0)
1405                         goto repeat;
1406         }
1407
1408         up(&ll_i2info(inode)->lli_write_sem);
1409
1410         retval = (sum > 0) ? sum : retval;
1411         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WRITE_BYTES,
1412                             retval > 0 ? retval : 0);
1413
1414         if (retval > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) {
1415                 rc = ll_sync_page_range(inode, inode->i_mapping, *ppos - retval,
1416                                         count);
1417                 if (rc < 0)
1418                         retval = rc;
1419         }
1420
1421         RETURN(retval);
1422 }
1423
1424 /*
1425  * Send file content (through pagecache) somewhere with helper
1426  */
1427 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1428 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1429                                 read_actor_t actor, void *target)
1430 {
1431         struct inode *inode = in_file->f_dentry->d_inode;
1432         struct ll_inode_info *lli = ll_i2info(inode);
1433         struct lov_stripe_md *lsm = lli->lli_smd;
1434         struct ll_lock_tree tree;
1435         struct ll_lock_tree_node *node;
1436         struct ost_lvb lvb;
1437         struct ll_ra_read bead;
1438         int rc;
1439         ssize_t retval;
1440         __u64 kms;
1441         ENTRY;
1442         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1443                inode->i_ino, inode->i_generation, inode, count, *ppos);
1444
1445         /* "If nbyte is 0, read() will return 0 and have no other results."
1446          *                      -- Single Unix Spec */
1447         if (count == 0)
1448                 RETURN(0);
1449
1450         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
1451                             count);
1452
1453         /* turn off the kernel's read-ahead */
1454         in_file->f_ra.ra_pages = 0;
1455
1456         /* File with no objects, nothing to lock */
1457         if (!lsm)
1458                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1459
1460         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1461         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1462         rc = ll_tree_lock(&tree, node, NULL, count,
1463                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1464         if (rc != 0)
1465                 RETURN(rc);
1466
1467         ll_inode_size_lock(inode, 1);
1468         /*
1469          * Consistency guarantees: following possibilities exist for the
1470          * relation between region being read and real file size at this
1471          * moment:
1472          *
1473          *  (A): the region is completely inside of the file;
1474          *
1475          *  (B-x): x bytes of region are inside of the file, the rest is
1476          *  outside;
1477          *
1478          *  (C): the region is completely outside of the file.
1479          *
1480          * This classification is stable under DLM lock acquired by
1481          * ll_tree_lock() above, because to change class, other client has to
1482          * take DLM lock conflicting with our lock. Also, any updates to
1483          * ->i_size by other threads on this client are serialized by
1484          * ll_inode_size_lock(). This guarantees that short reads are handled
1485          * correctly in the face of concurrent writes and truncates.
1486          */
1487         inode_init_lvb(inode, &lvb);
1488         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1489         kms = lvb.lvb_size;
1490         if (*ppos + count - 1 > kms) {
1491                 /* A glimpse is necessary to determine whether we return a
1492                  * short read (B) or some zeroes at the end of the buffer (C) */
1493                 ll_inode_size_unlock(inode, 1);
1494                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1495                 if (retval)
1496                         goto out;
1497         } else {
1498                 /* region is within kms and, hence, within real file size (A) */
1499                 inode->i_size = kms;
1500                 ll_inode_size_unlock(inode, 1);
1501         }
1502
1503         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1504                inode->i_ino, count, *ppos, inode->i_size);
1505
1506         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1507         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1508         ll_ra_read_in(in_file, &bead);
1509         /* BUG: 5972 */
1510         file_accessed(in_file);
1511         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1512         ll_ra_read_ex(in_file, &bead);
1513
1514  out:
1515         ll_tree_unlock(&tree);
1516         RETURN(retval);
1517 }
1518 #endif
1519
1520 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1521                                unsigned long arg)
1522 {
1523         struct ll_inode_info *lli = ll_i2info(inode);
1524         struct obd_export *exp = ll_i2obdexp(inode);
1525         struct ll_recreate_obj ucreatp;
1526         struct obd_trans_info oti = { 0 };
1527         struct obdo *oa = NULL;
1528         int lsm_size;
1529         int rc = 0;
1530         struct lov_stripe_md *lsm, *lsm2;
1531         ENTRY;
1532
1533         if (!capable (CAP_SYS_ADMIN))
1534                 RETURN(-EPERM);
1535
1536         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1537                             sizeof(struct ll_recreate_obj));
1538         if (rc) {
1539                 RETURN(-EFAULT);
1540         }
1541         oa = obdo_alloc();
1542         if (oa == NULL)
1543                 RETURN(-ENOMEM);
1544
1545         down(&lli->lli_open_sem);
1546         lsm = lli->lli_smd;
1547         if (lsm == NULL)
1548                 GOTO(out, rc = -ENOENT);
1549         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1550                    (lsm->lsm_stripe_count));
1551
1552         OBD_ALLOC(lsm2, lsm_size);
1553         if (lsm2 == NULL)
1554                 GOTO(out, rc = -ENOMEM);
1555
1556         oa->o_id = ucreatp.lrc_id;
1557         oa->o_nlink = ucreatp.lrc_ost_idx;
1558         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1559         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
1560         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1561                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1562
1563         oti.oti_objid = NULL;
1564         memcpy(lsm2, lsm, lsm_size);
1565         rc = obd_create(exp, oa, &lsm2, &oti);
1566
1567         OBD_FREE(lsm2, lsm_size);
1568         GOTO(out, rc);
1569 out:
1570         up(&lli->lli_open_sem);
1571         obdo_free(oa);
1572         return rc;
1573 }
1574
1575 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1576                                     int flags, struct lov_user_md *lum,
1577                                     int lum_size)
1578 {
1579         struct ll_inode_info *lli = ll_i2info(inode);
1580         struct lov_stripe_md *lsm;
1581         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1582         int rc = 0;
1583         ENTRY;
1584
1585         down(&lli->lli_open_sem);
1586         lsm = lli->lli_smd;
1587         if (lsm) {
1588                 up(&lli->lli_open_sem);
1589                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1590                        inode->i_ino);
1591                 RETURN(-EEXIST);
1592         }
1593
1594         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1595         if (rc)
1596                 GOTO(out, rc);
1597         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1598                 GOTO(out_req_free, rc = -ENOENT);
1599         rc = oit.d.lustre.it_status;
1600         if (rc < 0)
1601                 GOTO(out_req_free, rc);
1602
1603         ll_release_openhandle(file->f_dentry, &oit);
1604
1605  out:
1606         up(&lli->lli_open_sem);
1607         ll_intent_release(&oit);
1608         RETURN(rc);
1609 out_req_free:
1610         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1611         goto out;
1612 }
1613
1614 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1615                              struct lov_mds_md **lmmp, int *lmm_size, 
1616                              struct ptlrpc_request **request)
1617 {
1618         struct ll_sb_info *sbi = ll_i2sbi(inode);
1619         struct ll_fid  fid;
1620         struct mds_body  *body;
1621         struct lov_mds_md *lmm = NULL;
1622         struct ptlrpc_request *req = NULL;
1623         int rc, lmmsize;
1624
1625         ll_inode2fid(&fid, inode);
1626
1627         rc = ll_get_max_mdsize(sbi, &lmmsize);
1628         if (rc)
1629                 RETURN(rc);
1630
1631         rc = mdc_getattr_name(sbi->ll_mdc_exp, &fid,
1632                         filename, strlen(filename) + 1,
1633                         OBD_MD_FLEASIZE | OBD_MD_FLDIREA,
1634                         lmmsize, &req);
1635         if (rc < 0) {
1636                 CDEBUG(D_INFO, "mdc_getattr_name failed "
1637                                 "on %s: rc %d\n", filename, rc);
1638                 GOTO(out, rc);
1639         }
1640
1641         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1642                         sizeof(*body));
1643         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1644         /* swabbed by mdc_getattr_name */
1645         LASSERT_REPSWABBED(req, REPLY_REC_OFF);
1646
1647         lmmsize = body->eadatasize;
1648
1649         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1650                         lmmsize == 0) {
1651                 GOTO(out, rc = -ENODATA);
1652         }
1653
1654         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
1655                         lmmsize);
1656         LASSERT(lmm != NULL);
1657         LASSERT_REPSWABBED(req, REPLY_REC_OFF + 1);
1658
1659         /*
1660          * This is coming from the MDS, so is probably in
1661          * little endian.  We convert it to host endian before
1662          * passing it to userspace.
1663          */
1664         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1665                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1666                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1667         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1668                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1669         }
1670
1671         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1672                 struct lov_stripe_md *lsm;
1673                 struct lov_user_md_join *lmj;
1674                 int lmj_size, i, aindex = 0;
1675
1676                 rc = obd_unpackmd(sbi->ll_osc_exp, &lsm, lmm, lmmsize);
1677                 if (rc < 0)
1678                         GOTO(out, rc = -ENOMEM);
1679                 rc = obd_checkmd(sbi->ll_osc_exp, sbi->ll_mdc_exp, lsm);
1680                 if (rc)
1681                         GOTO(out_free_memmd, rc);
1682
1683                 lmj_size = sizeof(struct lov_user_md_join) +
1684                         lsm->lsm_stripe_count *
1685                         sizeof(struct lov_user_ost_data_join);
1686                 OBD_ALLOC(lmj, lmj_size);
1687                 if (!lmj)
1688                         GOTO(out_free_memmd, rc = -ENOMEM);
1689
1690                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1691                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1692                         struct lov_extent *lex =
1693                                 &lsm->lsm_array->lai_ext_array[aindex];
1694
1695                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1696                                 aindex ++;
1697                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1698                                         LPU64" len %d\n", aindex, i,
1699                                         lex->le_start, (int)lex->le_len);
1700                         lmj->lmm_objects[i].l_extent_start =
1701                                 lex->le_start;
1702
1703                         if ((int)lex->le_len == -1)
1704                                 lmj->lmm_objects[i].l_extent_end = -1;
1705                         else
1706                                 lmj->lmm_objects[i].l_extent_end =
1707                                         lex->le_start + lex->le_len;
1708                         lmj->lmm_objects[i].l_object_id =
1709                                 lsm->lsm_oinfo[i]->loi_id;
1710                         lmj->lmm_objects[i].l_object_gr =
1711                                 lsm->lsm_oinfo[i]->loi_gr;
1712                         lmj->lmm_objects[i].l_ost_gen =
1713                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1714                         lmj->lmm_objects[i].l_ost_idx =
1715                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1716                 }
1717                 lmm = (struct lov_mds_md *)lmj;
1718                 lmmsize = lmj_size;
1719 out_free_memmd:
1720                 obd_free_memmd(sbi->ll_osc_exp, &lsm);
1721         }
1722 out:
1723         *lmmp = lmm;
1724         *lmm_size = lmmsize;
1725         *request = req;
1726         return rc;
1727 }
1728 static int ll_lov_setea(struct inode *inode, struct file *file,
1729                             unsigned long arg)
1730 {
1731         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1732         struct lov_user_md  *lump;
1733         int lum_size = sizeof(struct lov_user_md) +
1734                        sizeof(struct lov_user_ost_data);
1735         int rc;
1736         ENTRY;
1737
1738         if (!capable (CAP_SYS_ADMIN))
1739                 RETURN(-EPERM);
1740
1741         OBD_ALLOC(lump, lum_size);
1742         if (lump == NULL) {
1743                 RETURN(-ENOMEM);
1744         }
1745         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1746         if (rc) {
1747                 OBD_FREE(lump, lum_size);
1748                 RETURN(-EFAULT);
1749         }
1750
1751         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1752
1753         OBD_FREE(lump, lum_size);
1754         RETURN(rc);
1755 }
1756
1757 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1758                             unsigned long arg)
1759 {
1760         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1761         int rc;
1762         int flags = FMODE_WRITE;
1763         ENTRY;
1764
1765         /* Bug 1152: copy properly when this is no longer true */
1766         LASSERT(sizeof(lum) == sizeof(*lump));
1767         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1768         rc = copy_from_user(&lum, lump, sizeof(lum));
1769         if (rc)
1770                 RETURN(-EFAULT);
1771
1772         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1773         if (rc == 0) {
1774                  put_user(0, &lump->lmm_stripe_count);
1775                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode),
1776                                     0, ll_i2info(inode)->lli_smd, lump);
1777         }
1778         RETURN(rc);
1779 }
1780
1781 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1782 {
1783         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1784
1785         if (!lsm)
1786                 RETURN(-ENODATA);
1787
1788         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode), 0, lsm,
1789                             (void *)arg);
1790 }
1791
1792 static int ll_get_grouplock(struct inode *inode, struct file *file,
1793                             unsigned long arg)
1794 {
1795         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1796         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1797                                                     .end = OBD_OBJECT_EOF}};
1798         struct lustre_handle lockh = { 0 };
1799         struct ll_inode_info *lli = ll_i2info(inode);
1800         struct lov_stripe_md *lsm = lli->lli_smd;
1801         int flags = 0, rc;
1802         ENTRY;
1803
1804         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1805                 RETURN(-EINVAL);
1806         }
1807
1808         policy.l_extent.gid = arg;
1809         if (file->f_flags & O_NONBLOCK)
1810                 flags = LDLM_FL_BLOCK_NOWAIT;
1811
1812         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1813         if (rc)
1814                 RETURN(rc);
1815
1816         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1817         fd->fd_gid = arg;
1818         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1819
1820         RETURN(0);
1821 }
1822
1823 static int ll_put_grouplock(struct inode *inode, struct file *file,
1824                             unsigned long arg)
1825 {
1826         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1827         struct ll_inode_info *lli = ll_i2info(inode);
1828         struct lov_stripe_md *lsm = lli->lli_smd;
1829         int rc;
1830         ENTRY;
1831
1832         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1833                 /* Ugh, it's already unlocked. */
1834                 RETURN(-EINVAL);
1835         }
1836
1837         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1838                 RETURN(-EINVAL);
1839
1840         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1841
1842         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1843         if (rc)
1844                 RETURN(rc);
1845
1846         fd->fd_gid = 0;
1847         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1848
1849         RETURN(0);
1850 }
1851
1852 static int join_sanity_check(struct inode *head, struct inode *tail)
1853 {
1854         ENTRY;
1855         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
1856                 CERROR("server do not support join \n");
1857                 RETURN(-EINVAL);
1858         }
1859         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
1860                 CERROR("tail ino %lu and ino head %lu must be regular\n",
1861                        head->i_ino, tail->i_ino);
1862                 RETURN(-EINVAL);
1863         }
1864         if (head->i_ino == tail->i_ino) {
1865                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
1866                 RETURN(-EINVAL);
1867         }
1868         if (head->i_size % JOIN_FILE_ALIGN) {
1869                 CERROR("hsize %llu must be times of 64K\n", head->i_size);
1870                 RETURN(-EINVAL);
1871         }
1872         RETURN(0);
1873 }
1874
1875 static int join_file(struct inode *head_inode, struct file *head_filp,
1876                      struct file *tail_filp)
1877 {
1878         struct inode *tail_inode, *tail_parent;
1879         struct dentry *tail_dentry = tail_filp->f_dentry;
1880         struct lookup_intent oit = {.it_op = IT_OPEN,
1881                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
1882         struct lustre_handle lockh;
1883         struct mdc_op_data *op_data;
1884         __u32  hsize = head_inode->i_size >> 32;
1885         __u32  tsize = head_inode->i_size;
1886         int    rc;
1887         ENTRY;
1888
1889         tail_dentry = tail_filp->f_dentry;
1890         tail_inode = tail_dentry->d_inode;
1891         tail_parent = tail_dentry->d_parent->d_inode;
1892
1893         OBD_ALLOC_PTR(op_data);
1894         if (op_data == NULL) {
1895                 RETURN(-ENOMEM);
1896         }
1897
1898         ll_prepare_mdc_op_data(op_data, head_inode, tail_parent,
1899                                tail_dentry->d_name.name,
1900                                tail_dentry->d_name.len, 0);
1901         rc = mdc_enqueue(ll_i2mdcexp(head_inode), LDLM_IBITS, &oit, LCK_PW,
1902                          op_data, &lockh, &tsize, 0, ldlm_completion_ast,
1903                          ll_mdc_blocking_ast, &hsize, 0);
1904
1905         if (rc < 0)
1906                 GOTO(out, rc);
1907
1908         rc = oit.d.lustre.it_status;
1909
1910         if (rc < 0) {
1911                 ptlrpc_req_finished((struct ptlrpc_request *)
1912                                                           oit.d.lustre.it_data);
1913                 GOTO(out, rc);
1914         }
1915
1916         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
1917                                            * away */
1918                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
1919                 oit.d.lustre.it_lock_mode = 0;
1920         }
1921         ll_release_openhandle(head_filp->f_dentry, &oit);
1922 out:
1923         if (op_data)
1924                 OBD_FREE_PTR(op_data);
1925         ll_intent_release(&oit);
1926         RETURN(rc);
1927 }
1928
1929 static int ll_file_join(struct inode *head, struct file *filp,
1930                         char *filename_tail)
1931 {
1932         struct inode *tail = NULL, *first = NULL, *second = NULL;
1933         struct dentry *tail_dentry;
1934         struct file *tail_filp, *first_filp, *second_filp;
1935         struct ll_lock_tree first_tree, second_tree;
1936         struct ll_lock_tree_node *first_node, *second_node;
1937         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
1938         int rc = 0, cleanup_phase = 0;
1939         ENTRY;
1940
1941         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
1942                head->i_ino, head->i_generation, head, filename_tail);
1943
1944         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
1945         if (IS_ERR(tail_filp)) {
1946                 CERROR("Can not open tail file %s", filename_tail);
1947                 rc = PTR_ERR(tail_filp);
1948                 GOTO(cleanup, rc);
1949         }
1950         tail = igrab(tail_filp->f_dentry->d_inode);
1951
1952         tlli = ll_i2info(tail);
1953         tail_dentry = tail_filp->f_dentry;
1954         LASSERT(tail_dentry);
1955         cleanup_phase = 1;
1956
1957         /*reorder the inode for lock sequence*/
1958         first = head->i_ino > tail->i_ino ? head : tail;
1959         second = head->i_ino > tail->i_ino ? tail : head;
1960         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
1961         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
1962
1963         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
1964                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
1965         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
1966         if (IS_ERR(first_node)){
1967                 rc = PTR_ERR(first_node);
1968                 GOTO(cleanup, rc);
1969         }
1970         first_tree.lt_fd = first_filp->private_data;
1971         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
1972         if (rc != 0)
1973                 GOTO(cleanup, rc);
1974         cleanup_phase = 2;
1975
1976         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
1977         if (IS_ERR(second_node)){
1978                 rc = PTR_ERR(second_node);
1979                 GOTO(cleanup, rc);
1980         }
1981         second_tree.lt_fd = second_filp->private_data;
1982         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
1983         if (rc != 0)
1984                 GOTO(cleanup, rc);
1985         cleanup_phase = 3;
1986
1987         rc = join_sanity_check(head, tail);
1988         if (rc)
1989                 GOTO(cleanup, rc);
1990
1991         rc = join_file(head, filp, tail_filp);
1992         if (rc)
1993                 GOTO(cleanup, rc);
1994 cleanup:
1995         switch (cleanup_phase) {
1996         case 3:
1997                 ll_tree_unlock(&second_tree);
1998                 obd_cancel_unused(ll_i2obdexp(second),
1999                                   ll_i2info(second)->lli_smd, 0, NULL);
2000         case 2:
2001                 ll_tree_unlock(&first_tree);
2002                 obd_cancel_unused(ll_i2obdexp(first),
2003                                   ll_i2info(first)->lli_smd, 0, NULL);
2004         case 1:
2005                 filp_close(tail_filp, 0);
2006                 if (tail)
2007                         iput(tail);
2008                 if (head && rc == 0) {
2009                         obd_free_memmd(ll_i2sbi(head)->ll_osc_exp,
2010                                        &hlli->lli_smd);
2011                         hlli->lli_smd = NULL;
2012                 }
2013         case 0:
2014                 break;
2015         default:
2016                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2017                 LBUG();
2018         }
2019         RETURN(rc);
2020 }
2021
2022 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2023 {
2024         struct inode *inode = dentry->d_inode;
2025         struct obd_client_handle *och;
2026         int rc;
2027         ENTRY;
2028
2029         LASSERT(inode);
2030
2031         /* Root ? Do nothing. */
2032         if (dentry->d_inode->i_sb->s_root == dentry)
2033                 RETURN(0);
2034
2035         /* No open handle to close? Move away */
2036         if (!it_disposition(it, DISP_OPEN_OPEN))
2037                 RETURN(0);
2038
2039         OBD_ALLOC(och, sizeof(*och));
2040         if (!och)
2041                 GOTO(out, rc = -ENOMEM);
2042
2043         ll_och_fill(ll_i2info(inode), it, och);
2044
2045         rc = ll_close_inode_openhandle(inode, och);
2046
2047         OBD_FREE(och, sizeof(*och));
2048  out:
2049         /* this one is in place of ll_file_open */
2050         ptlrpc_req_finished(it->d.lustre.it_data);
2051         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2052         RETURN(rc);
2053 }
2054
2055 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2056                   unsigned long arg)
2057 {
2058         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2059         int flags;
2060         ENTRY;
2061
2062         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2063                inode->i_generation, inode, cmd);
2064         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_IOCTL);
2065
2066         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2067         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2068                 RETURN(-ENOTTY);
2069
2070         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_IOCTL);
2071         switch(cmd) {
2072         case LL_IOC_GETFLAGS:
2073                 /* Get the current value of the file flags */
2074                 return put_user(fd->fd_flags, (int *)arg);
2075         case LL_IOC_SETFLAGS:
2076         case LL_IOC_CLRFLAGS:
2077                 /* Set or clear specific file flags */
2078                 /* XXX This probably needs checks to ensure the flags are
2079                  *     not abused, and to handle any flag side effects.
2080                  */
2081                 if (get_user(flags, (int *) arg))
2082                         RETURN(-EFAULT);
2083
2084                 if (cmd == LL_IOC_SETFLAGS) {
2085                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2086                             !(file->f_flags & O_DIRECT)) {
2087                                 CERROR("%s: unable to disable locking on "
2088                                        "non-O_DIRECT file\n", current->comm);
2089                                 RETURN(-EINVAL);
2090                         }
2091
2092                         fd->fd_flags |= flags;
2093                 } else {
2094                         fd->fd_flags &= ~flags;
2095                 }
2096                 RETURN(0);
2097         case LL_IOC_LOV_SETSTRIPE:
2098                 RETURN(ll_lov_setstripe(inode, file, arg));
2099         case LL_IOC_LOV_SETEA:
2100                 RETURN(ll_lov_setea(inode, file, arg));
2101         case LL_IOC_LOV_GETSTRIPE:
2102                 RETURN(ll_lov_getstripe(inode, arg));
2103         case LL_IOC_RECREATE_OBJ:
2104                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2105         case EXT3_IOC_GETFLAGS:
2106         case EXT3_IOC_SETFLAGS:
2107                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2108         case EXT3_IOC_GETVERSION_OLD:
2109         case EXT3_IOC_GETVERSION:
2110                 RETURN(put_user(inode->i_generation, (int *)arg));
2111         case LL_IOC_JOIN: {
2112                 char *ftail;
2113                 int rc;
2114
2115                 ftail = getname((const char *)arg);
2116                 if (IS_ERR(ftail))
2117                         RETURN(PTR_ERR(ftail));
2118                 rc = ll_file_join(inode, file, ftail);
2119                 putname(ftail);
2120                 RETURN(rc);
2121         }
2122         case LL_IOC_GROUP_LOCK:
2123                 RETURN(ll_get_grouplock(inode, file, arg));
2124         case LL_IOC_GROUP_UNLOCK:
2125                 RETURN(ll_put_grouplock(inode, file, arg));
2126         case IOC_OBD_STATFS:
2127                 RETURN(ll_obd_statfs(inode, (void *)arg));
2128
2129         /* We need to special case any other ioctls we want to handle,
2130          * to send them to the MDS/OST as appropriate and to properly
2131          * network encode the arg field.
2132         case EXT3_IOC_SETVERSION_OLD:
2133         case EXT3_IOC_SETVERSION:
2134         */
2135         default:
2136                 RETURN(obd_iocontrol(cmd, ll_i2obdexp(inode), 0, NULL,
2137                                      (void *)arg));
2138         }
2139 }
2140
2141 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2142 {
2143         struct inode *inode = file->f_dentry->d_inode;
2144         struct ll_inode_info *lli = ll_i2info(inode);
2145         struct lov_stripe_md *lsm = lli->lli_smd;
2146         loff_t retval;
2147         ENTRY;
2148         retval = offset + ((origin == 2) ? inode->i_size :
2149                            (origin == 1) ? file->f_pos : 0);
2150         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2151                inode->i_ino, inode->i_generation, inode, retval, retval,
2152                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2153         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_SEEK);
2154         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
2155         
2156         if (origin == 2) { /* SEEK_END */
2157                 int nonblock = 0, rc;
2158
2159                 if (file->f_flags & O_NONBLOCK)
2160                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2161
2162                 if (lsm != NULL) {
2163                         rc = ll_glimpse_size(inode, nonblock);
2164                         if (rc != 0)
2165                                 RETURN(rc);
2166                 }
2167
2168                 ll_inode_size_lock(inode, 0);
2169                 offset += inode->i_size;
2170                 ll_inode_size_unlock(inode, 0);
2171         } else if (origin == 1) { /* SEEK_CUR */
2172                 offset += file->f_pos;
2173         }
2174
2175         retval = -EINVAL;
2176         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2177                 if (offset != file->f_pos) {
2178                         file->f_pos = offset;
2179 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2180                         file->f_reada = 0;
2181                         file->f_version = ++event;
2182 #endif
2183                 }
2184                 retval = offset;
2185         }
2186
2187         RETURN(retval);
2188 }
2189
2190 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2191 {
2192         struct inode *inode = dentry->d_inode;
2193         struct ll_inode_info *lli = ll_i2info(inode);
2194         struct lov_stripe_md *lsm = lli->lli_smd;
2195         struct ll_fid fid;
2196         struct ptlrpc_request *req;
2197         int rc, err;
2198         ENTRY;
2199         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2200                inode->i_generation, inode);
2201         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_FSYNC);
2202         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_FSYNC);
2203
2204         /* fsync's caller has already called _fdata{sync,write}, we want
2205          * that IO to finish before calling the osc and mdc sync methods */
2206         rc = filemap_fdatawait(inode->i_mapping);
2207
2208         /* catch async errors that were recorded back when async writeback
2209          * failed for pages in this mapping. */
2210         err = lli->lli_async_rc;
2211         lli->lli_async_rc = 0;
2212         if (rc == 0)
2213                 rc = err;
2214         if (lsm) {
2215                 err = lov_test_and_clear_async_rc(lsm);
2216                 if (rc == 0)
2217                         rc = err;
2218         }
2219
2220         ll_inode2fid(&fid, inode);
2221         err = mdc_sync(ll_i2sbi(inode)->ll_mdc_exp, &fid, &req);
2222         if (!rc)
2223                 rc = err;
2224         if (!err)
2225                 ptlrpc_req_finished(req);
2226
2227         if (data && lsm) {
2228                 struct obdo *oa = obdo_alloc();
2229
2230                 if (!oa)
2231                         RETURN(rc ? rc : -ENOMEM);
2232
2233                 oa->o_id = lsm->lsm_object_id;
2234                 oa->o_valid = OBD_MD_FLID;
2235                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2236                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2237
2238                 err = obd_sync(ll_i2sbi(inode)->ll_osc_exp, oa, lsm,
2239                                0, OBD_OBJECT_EOF);
2240                 if (!rc)
2241                         rc = err;
2242                 obdo_free(oa);
2243         }
2244
2245         RETURN(rc);
2246 }
2247
2248 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2249 {
2250         struct inode *inode = file->f_dentry->d_inode;
2251         struct ll_sb_info *sbi = ll_i2sbi(inode);
2252         struct ldlm_res_id res_id =
2253                     { .name = {inode->i_ino, inode->i_generation, LDLM_FLOCK} };
2254         struct lustre_handle lockh = {0};
2255         ldlm_policy_data_t flock;
2256         ldlm_mode_t mode = 0;
2257         int flags = 0;
2258         int rc;
2259         ENTRY;
2260
2261         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2262                inode->i_ino, file_lock);
2263         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_FLOCK);
2264
2265         if (file_lock->fl_flags & FL_FLOCK) {
2266                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2267                 /* set missing params for flock() calls */
2268                 file_lock->fl_end = OFFSET_MAX;
2269                 file_lock->fl_pid = current->tgid;
2270         }
2271         flock.l_flock.pid = file_lock->fl_pid;
2272         flock.l_flock.start = file_lock->fl_start;
2273         flock.l_flock.end = file_lock->fl_end;
2274
2275         switch (file_lock->fl_type) {
2276         case F_RDLCK:
2277                 mode = LCK_PR;
2278                 break;
2279         case F_UNLCK:
2280                 /* An unlock request may or may not have any relation to
2281                  * existing locks so we may not be able to pass a lock handle
2282                  * via a normal ldlm_lock_cancel() request. The request may even
2283                  * unlock a byte range in the middle of an existing lock. In
2284                  * order to process an unlock request we need all of the same
2285                  * information that is given with a normal read or write record
2286                  * lock request. To avoid creating another ldlm unlock (cancel)
2287                  * message we'll treat a LCK_NL flock request as an unlock. */
2288                 mode = LCK_NL;
2289                 break;
2290         case F_WRLCK:
2291                 mode = LCK_PW;
2292                 break;
2293         default:
2294                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2295                 LBUG();
2296         }
2297
2298         switch (cmd) {
2299         case F_SETLKW:
2300 #ifdef F_SETLKW64
2301         case F_SETLKW64:
2302 #endif
2303                 flags = 0;
2304                 break;
2305         case F_SETLK:
2306 #ifdef F_SETLK64
2307         case F_SETLK64:
2308 #endif
2309                 flags = LDLM_FL_BLOCK_NOWAIT;
2310                 break;
2311         case F_GETLK:
2312 #ifdef F_GETLK64
2313         case F_GETLK64:
2314 #endif
2315                 flags = LDLM_FL_TEST_LOCK;
2316                 /* Save the old mode so that if the mode in the lock changes we
2317                  * can decrement the appropriate reader or writer refcount. */
2318                 file_lock->fl_type = mode;
2319                 break;
2320         default:
2321                 CERROR("unknown fcntl lock command: %d\n", cmd);
2322                 LBUG();
2323         }
2324
2325         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2326                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2327                flags, mode, flock.l_flock.start, flock.l_flock.end);
2328
2329         rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, res_id,
2330                               LDLM_FLOCK, &flock, mode, &flags, NULL,
2331                               ldlm_flock_completion_ast, NULL, file_lock,
2332                               NULL, 0, NULL, &lockh, 0);
2333         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2334                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2335 #ifdef HAVE_F_OP_FLOCK
2336         if ((file_lock->fl_flags & FL_POSIX) &&(rc == 0))
2337                 posix_lock_file_wait(file, file_lock);
2338 #endif
2339
2340         RETURN(rc);
2341 }
2342
2343 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2344 {
2345         ENTRY;
2346
2347         RETURN(-ENOSYS);
2348 }
2349
2350 int ll_have_md_lock(struct inode *inode, __u64 bits)
2351 {
2352         struct lustre_handle lockh;
2353         struct ldlm_res_id res_id = { .name = {0} };
2354         struct obd_device *obddev;
2355         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2356         int flags;
2357         ENTRY;
2358
2359         if (!inode)
2360                RETURN(0);
2361
2362         obddev = ll_i2mdcexp(inode)->exp_obd;
2363         res_id.name[0] = inode->i_ino;
2364         res_id.name[1] = inode->i_generation;
2365
2366         CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
2367
2368         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2369         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
2370                             &policy, LCK_CR|LCK_CW|LCK_PR, &lockh)) {
2371                 RETURN(1);
2372         }
2373
2374         RETURN(0);
2375 }
2376
2377 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2378         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2379                               * and return success */
2380                 inode->i_nlink = 0;
2381                 /* This path cannot be hit for regular files unless in
2382                  * case of obscure races, so no need to to validate
2383                  * size. */
2384                 if (!S_ISREG(inode->i_mode) &&
2385                     !S_ISDIR(inode->i_mode))
2386                         return 0;
2387         }
2388
2389         if (rc) {
2390                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2391                 return -abs(rc);
2392
2393         }
2394
2395         return 0;
2396 }
2397
2398 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2399 {
2400         struct inode *inode = dentry->d_inode;
2401         struct ptlrpc_request *req = NULL;
2402         struct obd_export *exp;
2403         int rc;
2404         ENTRY;
2405
2406         if (!inode) {
2407                 CERROR("REPORT THIS LINE TO PETER\n");
2408                 RETURN(0);
2409         }
2410         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2411                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2412 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
2413         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE);
2414 #endif
2415
2416         exp = ll_i2mdcexp(inode);
2417
2418         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2419                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2420                 struct mdc_op_data op_data;
2421
2422                 /* Call getattr by fid, so do not provide name at all. */
2423                 ll_prepare_mdc_op_data(&op_data, dentry->d_parent->d_inode,
2424                                        dentry->d_inode, NULL, 0, 0);
2425                 rc = mdc_intent_lock(exp, &op_data, NULL, 0,
2426                                      /* we are not interested in name
2427                                         based lookup */
2428                                      &oit, 0, &req,
2429                                      ll_mdc_blocking_ast, 0);
2430                 if (rc < 0) {
2431                         rc = ll_inode_revalidate_fini(inode, rc);
2432                         GOTO (out, rc);
2433                 }
2434                 
2435                 rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2436                 if (rc != 0) {
2437                         ll_intent_release(&oit);
2438                         GOTO(out, rc);
2439                 }
2440
2441                 /* Unlinked? Unhash dentry, so it is not picked up later by
2442                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2443                    here to preserve get_cwd functionality on 2.6.
2444                    Bug 10503 */
2445                 if (!dentry->d_inode->i_nlink) {
2446                         spin_lock(&dcache_lock);
2447                         ll_drop_dentry(dentry);
2448                         spin_unlock(&dcache_lock);
2449                 }
2450
2451                 ll_lookup_finish_locks(&oit, dentry);
2452         } else if (!ll_have_md_lock(dentry->d_inode,
2453                                   MDS_INODELOCK_UPDATE|MDS_INODELOCK_LOOKUP)) {
2454                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2455                 struct ll_fid fid;
2456                 obd_valid valid = OBD_MD_FLGETATTR;
2457                 int ealen = 0;
2458
2459                 if (S_ISREG(inode->i_mode)) {
2460                         rc = ll_get_max_mdsize(sbi, &ealen);
2461                         if (rc) 
2462                                 RETURN(rc); 
2463                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2464                 }
2465                 ll_inode2fid(&fid, inode);
2466                 rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req);
2467                 if (rc) {
2468                         rc = ll_inode_revalidate_fini(inode, rc);
2469                         RETURN(rc);
2470                 }
2471
2472                 rc = ll_prep_inode(sbi->ll_osc_exp, &inode, req, REPLY_REC_OFF,
2473                                    NULL);
2474                 if (rc)
2475                         GOTO(out, rc);
2476         }
2477
2478         /* if object not yet allocated, don't validate size */
2479         if (ll_i2info(inode)->lli_smd == NULL) 
2480                 GOTO(out, rc = 0);
2481
2482         /* ll_glimpse_size will prefer locally cached writes if they extend
2483          * the file */
2484         rc = ll_glimpse_size(inode, 0);
2485
2486 out:
2487         ptlrpc_req_finished(req);
2488         RETURN(rc);
2489 }
2490
2491 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2492 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2493                   struct lookup_intent *it, struct kstat *stat)
2494 {
2495         struct inode *inode = de->d_inode;
2496         int res = 0;
2497
2498         res = ll_inode_revalidate_it(de, it);
2499         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
2500
2501         if (res)
2502                 return res;
2503
2504         stat->dev = inode->i_sb->s_dev;
2505         stat->ino = inode->i_ino;
2506         stat->mode = inode->i_mode;
2507         stat->nlink = inode->i_nlink;
2508         stat->uid = inode->i_uid;
2509         stat->gid = inode->i_gid;
2510         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2511         stat->atime = inode->i_atime;
2512         stat->mtime = inode->i_mtime;
2513         stat->ctime = inode->i_ctime;
2514 #ifdef HAVE_INODE_BLKSIZE
2515         stat->blksize = inode->i_blksize;
2516 #else
2517         stat->blksize = 1<<inode->i_blkbits;
2518 #endif
2519
2520         ll_inode_size_lock(inode, 0);
2521         stat->size = inode->i_size;
2522         stat->blocks = inode->i_blocks;
2523         ll_inode_size_unlock(inode, 0);
2524
2525         return 0;
2526 }
2527 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2528 {
2529         struct lookup_intent it = { .it_op = IT_GETATTR };
2530
2531         ll_vfs_ops_tally(ll_i2sbi(de->d_inode), VFS_OPS_GETATTR);
2532         return ll_getattr_it(mnt, de, &it, stat);
2533 }
2534 #endif
2535
2536 static
2537 int lustre_check_acl(struct inode *inode, int mask)
2538 {
2539 #ifdef CONFIG_FS_POSIX_ACL
2540         struct ll_inode_info *lli = ll_i2info(inode);
2541         struct posix_acl *acl;
2542         int rc;
2543         ENTRY;
2544
2545         spin_lock(&lli->lli_lock);
2546         acl = posix_acl_dup(lli->lli_posix_acl);
2547         spin_unlock(&lli->lli_lock);
2548
2549         if (!acl)
2550                 RETURN(-EAGAIN);
2551
2552         rc = posix_acl_permission(inode, acl, mask);
2553         posix_acl_release(acl);
2554
2555         RETURN(rc);
2556 #else
2557         return -EAGAIN;
2558 #endif
2559 }
2560
2561 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2562 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2563 {
2564         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2565                inode->i_ino, inode->i_generation, inode, mask);
2566
2567         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_INODE_PERMISSION);
2568         return generic_permission(inode, mask, lustre_check_acl);
2569 }
2570 #else
2571 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
2572 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2573 #else
2574 int ll_inode_permission(struct inode *inode, int mask)
2575 #endif
2576 {
2577         int mode = inode->i_mode;
2578         int rc;
2579
2580         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2581                inode->i_ino, inode->i_generation, inode, mask);
2582         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_INODE_PERMISSION);
2583
2584         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2585             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2586                 return -EROFS;
2587         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2588                 return -EACCES;
2589         if (current->fsuid == inode->i_uid) {
2590                 mode >>= 6;
2591         } else if (1) {
2592                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2593                         goto check_groups;
2594                 rc = lustre_check_acl(inode, mask);
2595                 if (rc == -EAGAIN)
2596                         goto check_groups;
2597                 if (rc == -EACCES)
2598                         goto check_capabilities;
2599                 return rc;
2600         } else {
2601 check_groups:
2602                 if (in_group_p(inode->i_gid))
2603                         mode >>= 3;
2604         }
2605         if ((mode & mask & S_IRWXO) == mask)
2606                 return 0;
2607
2608 check_capabilities:
2609         if (!(mask & MAY_EXEC) ||
2610             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2611                 if (capable(CAP_DAC_OVERRIDE))
2612                         return 0;
2613
2614         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2615             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2616                 return 0;
2617
2618         return -EACCES;
2619 }
2620 #endif
2621
2622 struct file_operations ll_file_operations = {
2623         .read           = ll_file_read,
2624         .write          = ll_file_write,
2625         .ioctl          = ll_file_ioctl,
2626         .open           = ll_file_open,
2627         .release        = ll_file_release,
2628         .mmap           = ll_file_mmap,
2629         .llseek         = ll_file_seek,
2630 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2631         .sendfile       = ll_file_sendfile,
2632 #endif
2633         .fsync          = ll_fsync,
2634 #ifdef HAVE_F_OP_FLOCK
2635         .flock          = ll_file_noflock,
2636 #endif
2637         .lock           = ll_file_noflock
2638 };
2639
2640 struct file_operations ll_file_operations_flock = {
2641         .read           = ll_file_read,
2642         .write          = ll_file_write,
2643         .ioctl          = ll_file_ioctl,
2644         .open           = ll_file_open,
2645         .release        = ll_file_release,
2646         .mmap           = ll_file_mmap,
2647         .llseek         = ll_file_seek,
2648 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2649         .sendfile       = ll_file_sendfile,
2650 #endif
2651         .fsync          = ll_fsync,
2652 #ifdef HAVE_F_OP_FLOCK
2653         .flock          = ll_file_flock,
2654 #endif
2655         .lock           = ll_file_flock
2656 };
2657
2658
2659 struct inode_operations ll_file_inode_operations = {
2660 #ifdef LUSTRE_KERNEL_VERSION
2661         .setattr_raw    = ll_setattr_raw,
2662 #endif
2663         .setattr        = ll_setattr,
2664         .truncate       = ll_truncate,
2665 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2666         .getattr        = ll_getattr,
2667 #else
2668         .revalidate_it  = ll_inode_revalidate_it,
2669 #endif
2670         .permission     = ll_inode_permission,
2671         .setxattr       = ll_setxattr,
2672         .getxattr       = ll_getxattr,
2673         .listxattr      = ll_listxattr,
2674         .removexattr    = ll_removexattr,
2675 };
2676