Whamcloud - gitweb
4e1fe10c205352a8b2427087ebb47461f2682542
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <linux/pagemap.h>
29 #include <linux/file.h>
30 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
31 #include <linux/lustre_compat25.h>
32 #endif
33 #include "llite_internal.h"
34
35 /* also used by llite/special.c:ll_special_open() */
36 struct ll_file_data *ll_file_data_get(void)
37 {
38         struct ll_file_data *fd;
39
40         OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
41         return fd;
42 }
43
44 static void ll_file_data_put(struct ll_file_data *fd)
45 {
46         if (fd != NULL)
47                 OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof *fd);
48 }
49
50 static int ll_close_inode_openhandle(struct inode *inode,
51                                      struct obd_client_handle *och)
52 {
53         struct ptlrpc_request *req = NULL;
54         struct obd_device *obd;
55         struct obdo *oa;
56         int rc;
57         ENTRY;
58
59         obd = class_exp2obd(ll_i2mdcexp(inode));
60         if (obd == NULL) {
61                 CERROR("Invalid MDC connection handle "LPX64"\n",
62                        ll_i2mdcexp(inode)->exp_handle.h_cookie);
63                 GOTO(out, rc = 0);
64         }
65
66         /*
67          * here we check if this is forced umount. If so this is called on
68          * canceling "open lock" and we do not call mdc_close() in this case, as
69          * it will not be successful, as import is already deactivated.
70          */
71         if (obd->obd_no_recov)
72                 GOTO(out, rc = 0);
73
74         oa = obdo_alloc();
75         if (!oa)
76                 RETURN(-ENOMEM); // XXX We leak openhandle and request here.
77
78         oa->o_id = inode->i_ino;
79         oa->o_valid = OBD_MD_FLID;
80         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
81                                    OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
82                                    OBD_MD_FLATIME | OBD_MD_FLMTIME |
83                                    OBD_MD_FLCTIME);
84         if (0 /* ll_is_inode_dirty(inode) */) {
85                 oa->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
86                 oa->o_valid |= OBD_MD_FLFLAGS;
87         }
88
89         rc = mdc_close(ll_i2mdcexp(inode), oa, och, &req);
90         if (rc == EAGAIN) {
91                 /* We are the last writer, so the MDS has instructed us to get
92                  * the file size and any write cookies, then close again. */
93                 //ll_queue_done_writing(inode);
94                 rc = 0;
95         } else if (rc) {
96                 CERROR("inode %lu mdc close failed: rc = %d\n",
97                        inode->i_ino, rc);
98         }
99
100         obdo_free(oa);
101
102         if (rc == 0) {
103                 rc = ll_objects_destroy(req, inode);
104                 if (rc)
105                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
106                                inode->i_ino, rc);
107         }
108
109         ptlrpc_req_finished(req); /* This is close request */
110         EXIT;
111 out:
112         mdc_clear_open_replay_data(och);
113
114         return rc;
115 }
116
117 int ll_mdc_real_close(struct inode *inode, int flags)
118 {
119         struct ll_inode_info *lli = ll_i2info(inode);
120         int rc = 0;
121         struct obd_client_handle **och_p;
122         struct obd_client_handle *och;
123         __u64 *och_usecount;
124
125         ENTRY;
126
127         if (flags & FMODE_WRITE) {
128                 och_p = &lli->lli_mds_write_och;
129                 och_usecount = &lli->lli_open_fd_write_count;
130         } else if (flags & FMODE_EXEC) {
131                 och_p = &lli->lli_mds_exec_och;
132                 och_usecount = &lli->lli_open_fd_exec_count;
133          } else {
134                 LASSERT(flags & FMODE_READ);
135                 och_p = &lli->lli_mds_read_och;
136                 och_usecount = &lli->lli_open_fd_read_count;
137         }
138
139         down(&lli->lli_och_sem);
140         if (*och_usecount) { /* There are still users of this handle, so
141                                 skip freeing it. */
142                 up(&lli->lli_och_sem);
143                 RETURN(0);
144         }
145         och=*och_p;
146         *och_p = NULL;
147         up(&lli->lli_och_sem);
148
149         if (och) { /* There might be a race and somebody have freed this och
150                       already */
151                 rc = ll_close_inode_openhandle(inode, och);
152                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
153                 OBD_FREE(och, sizeof *och);
154         }
155
156         RETURN(rc);
157 }
158
159 int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
160                         struct file *file)
161 {
162         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
163         struct ll_inode_info *lli = ll_i2info(inode);
164         int rc = 0;
165         ENTRY;
166
167         /* clear group lock, if present */
168         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
169                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
170                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
171                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
172                                       &fd->fd_cwlockh);
173         }
174
175         /* Let's see if we have good enough OPEN lock on the file and if
176            we can skip talking to MDS */
177         if (file->f_dentry->d_inode) { /* Can this ever be false? */
178                 int lockmode;
179                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
180                 struct lustre_handle lockh;
181                 struct inode *inode = file->f_dentry->d_inode;
182                 struct ldlm_res_id file_res_id = {.name={inode->i_ino,
183                                                          inode->i_generation}};
184                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
185
186                 down(&lli->lli_och_sem);
187                 if (fd->fd_omode & FMODE_WRITE) {
188                         lockmode = LCK_CW;
189                         LASSERT(lli->lli_open_fd_write_count);
190                         lli->lli_open_fd_write_count--;
191                 } else if (fd->fd_omode & FMODE_EXEC) {
192                         lockmode = LCK_PR;
193                         LASSERT(lli->lli_open_fd_exec_count);
194                         lli->lli_open_fd_exec_count--;
195                 } else {
196                         lockmode = LCK_CR;
197                         LASSERT(lli->lli_open_fd_read_count);
198                         lli->lli_open_fd_read_count--;
199                 }
200                 up(&lli->lli_och_sem);
201
202                 if (!ldlm_lock_match(mdc_exp->exp_obd->obd_namespace, flags,
203                                      &file_res_id, LDLM_IBITS, &policy,lockmode,
204                                      &lockh)) {
205                         rc = ll_mdc_real_close(file->f_dentry->d_inode,
206                                                 fd->fd_omode);
207                 }
208         } else {
209                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
210                        file, file->f_dentry, file->f_dentry->d_name.name);
211         }
212
213         LUSTRE_FPRIVATE(file) = NULL;
214         ll_file_data_put(fd);
215
216         RETURN(rc);
217 }
218
219 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
220
221 /* While this returns an error code, fput() the caller does not, so we need
222  * to make every effort to clean up all of our state here.  Also, applications
223  * rarely check close errors and even if an error is returned they will not
224  * re-try the close call.
225  */
226 int ll_file_release(struct inode *inode, struct file *file)
227 {
228         struct ll_file_data *fd;
229         struct ll_sb_info *sbi = ll_i2sbi(inode);
230         struct ll_inode_info *lli = ll_i2info(inode);
231         struct lov_stripe_md *lsm = lli->lli_smd;
232         int rc;
233
234         ENTRY;
235         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
236                inode->i_generation, inode);
237         ll_vfs_ops_tally(sbi, VFS_OPS_RELEASE);
238
239         /* don't do anything for / */
240         if (inode->i_sb->s_root == file->f_dentry)
241                 RETURN(0);
242
243         lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_RELEASE);
244
245         fd = LUSTRE_FPRIVATE(file);
246         LASSERT(fd != NULL);
247
248         if (lsm)
249                 lov_test_and_clear_async_rc(lsm);
250         lli->lli_async_rc = 0;
251
252         rc = ll_mdc_close(sbi->ll_mdc_exp, inode, file);
253         RETURN(rc);
254 }
255
256 static int ll_intent_file_open(struct file *file, void *lmm,
257                                int lmmsize, struct lookup_intent *itp)
258 {
259         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
260         struct mdc_op_data data;
261         struct dentry *parent = file->f_dentry->d_parent;
262         const char *name = file->f_dentry->d_name.name;
263         const int len = file->f_dentry->d_name.len;
264         struct inode *inode = file->f_dentry->d_inode;
265         struct ptlrpc_request *req;
266         int rc;
267
268         if (!parent)
269                 RETURN(-ENOENT);
270
271         ll_prepare_mdc_op_data(&data, parent->d_inode, inode, name, len, O_RDWR);
272
273         /* Usually we come here only for NFSD, and we want open lock.
274            But we can also get here with pre 2.6.15 patchless kernels, and in
275            that case that lock is also ok */
276         /* We can also get here if there was cached open handle in revalidate_it
277          * but it disappeared while we were getting from there to ll_file_open.
278          * But this means this file was closed and immediatelly opened which
279          * makes a good candidate for using OPEN lock */
280         /* If lmmsize & lmm are not 0, we are just setting stripe info
281          * parameters. No need for the open lock */
282         if (!lmm && !lmmsize)
283                 itp->it_flags |= MDS_OPEN_LOCK;
284
285         rc = mdc_intent_lock(sbi->ll_mdc_exp, &data, lmm, lmmsize, itp,
286                               0 /*unused */, &req, ll_mdc_blocking_ast, 0);
287         if (rc == -ESTALE) {
288                 /* reason for keep own exit path - don`t flood log
289                 * with messages with -ESTALE errors.
290                 */
291                 if (!it_disposition(itp, DISP_OPEN_OPEN))
292                         GOTO(out, rc);
293                 ll_release_openhandle(file->f_dentry, itp);
294                 GOTO(out_stale, rc);
295         }
296
297         if (rc != 0) {
298                CERROR("lock enqueue: err: %d\n", rc);
299                GOTO(out, rc);
300         }
301
302         if (itp->d.lustre.it_lock_mode)
303                 mdc_set_lock_data(&itp->d.lustre.it_lock_handle,
304                                   inode);
305
306         rc = ll_prep_inode(sbi->ll_osc_exp, &file->f_dentry->d_inode,
307                            req, DLM_REPLY_REC_OFF, NULL);
308 out:
309         ptlrpc_req_finished(itp->d.lustre.it_data);
310
311 out_stale:
312         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
313         ll_intent_drop_lock(itp);
314
315         RETURN(rc);
316 }
317
318
319 static void ll_och_fill(struct ll_inode_info *lli, struct lookup_intent *it,
320                         struct obd_client_handle *och)
321 {
322         struct ptlrpc_request *req = it->d.lustre.it_data;
323         struct mds_body *body;
324
325         LASSERT(och);
326
327         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
328         LASSERT(body != NULL);                  /* reply already checked out */
329         LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* and swabbed in mdc_enqueue */
330
331         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
332         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
333         lli->lli_io_epoch = body->io_epoch;
334
335         mdc_set_open_replay_data(och, it->d.lustre.it_data);
336 }
337
338 int ll_local_open(struct file *file, struct lookup_intent *it,
339                   struct ll_file_data *fd, struct obd_client_handle *och)
340 {
341         ENTRY;
342
343         LASSERT(!LUSTRE_FPRIVATE(file));
344
345         LASSERT(fd != NULL);
346
347         if (och)
348                 ll_och_fill(ll_i2info(file->f_dentry->d_inode), it, och);
349         LUSTRE_FPRIVATE(file) = fd;
350         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
351         fd->fd_omode = it->it_flags;
352
353         RETURN(0);
354 }
355
356 /* Open a file, and (for the very first open) create objects on the OSTs at
357  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
358  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
359  * lli_open_sem to ensure no other process will create objects, send the
360  * stripe MD to the MDS, or try to destroy the objects if that fails.
361  *
362  * If we already have the stripe MD locally then we don't request it in
363  * mdc_open(), by passing a lmm_size = 0.
364  *
365  * It is up to the application to ensure no other processes open this file
366  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
367  * used.  We might be able to avoid races of that sort by getting lli_open_sem
368  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
369  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
370  */
371 int ll_file_open(struct inode *inode, struct file *file)
372 {
373         struct ll_inode_info *lli = ll_i2info(inode);
374         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
375                                           .it_flags = file->f_flags };
376         struct lov_stripe_md *lsm;
377         struct ptlrpc_request *req = NULL;
378         struct obd_client_handle **och_p;
379         __u64 *och_usecount;
380         struct ll_file_data *fd;
381         int rc = 0;
382         ENTRY;
383
384         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
385                inode->i_generation, inode, file->f_flags);
386         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_OPEN);
387
388         /* don't do anything for / */
389         if (inode->i_sb->s_root == file->f_dentry)
390                 RETURN(0);
391
392 #ifdef LUSTRE_KERNEL_VERSION
393         it = file->f_it;
394 #else
395         it = file->private_data; /* XXX: compat macro */
396         file->private_data = NULL; /* prevent ll_local_open assertion */
397 #endif
398
399         fd = ll_file_data_get();
400         if (fd == NULL)
401                 RETURN(-ENOMEM);
402
403         if (!it || !it->d.lustre.it_disposition) {
404                 /* Convert f_flags into access mode. We cannot use file->f_mode,
405                  * because everything but O_ACCMODE mask was stripped from
406                  * there */
407                 if ((oit.it_flags + 1) & O_ACCMODE)
408                         oit.it_flags++;
409                 if (file->f_flags & O_TRUNC)
410                         oit.it_flags |= FMODE_WRITE;
411
412                 /* kernel only call f_op->open in dentry_open.
413                  * filp_open calls dentry_open after call to open_namei that checks
414                  * for permissions. only nfsd_open call dentry_open directly without
415                  * checking permissions and because of that this code below is safe.
416                  */
417                 if (oit.it_flags & FMODE_WRITE)
418                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
419
420                 /* We do not want O_EXCL here, presumably we opened the file
421                  * already? XXX - NFS implications? */
422                 oit.it_flags &= ~O_EXCL;
423
424                 it = &oit;
425         }
426
427         /* Let's see if we have file open on MDS already. */
428         if (it->it_flags & FMODE_WRITE) {
429                 och_p = &lli->lli_mds_write_och;
430                 och_usecount = &lli->lli_open_fd_write_count;
431         } else if (it->it_flags & FMODE_EXEC) {
432                 och_p = &lli->lli_mds_exec_och;
433                 och_usecount = &lli->lli_open_fd_exec_count;
434          } else {
435                 och_p = &lli->lli_mds_read_och;
436                 och_usecount = &lli->lli_open_fd_read_count;
437         }
438
439         LASSERTF(it->it_flags != 0, "f_it %p dist %d \n", file->f_it,
440                  file->f_it->d.lustre.it_disposition);
441
442         down(&lli->lli_och_sem);
443         if (*och_p) { /* Open handle is present */
444                 if (it_disposition(it, DISP_OPEN_OPEN)) {
445                         /* Well, there's extra open request that we do not need,
446                            let's close it somehow. This will decref request. */
447                         ll_release_openhandle(file->f_dentry, it);
448                 }
449                 (*och_usecount)++;
450
451                 rc = ll_local_open(file, it, fd, NULL);
452
453                 LASSERTF(rc == 0, "rc = %d\n", rc);
454         } else {
455                 LASSERT(*och_usecount == 0);
456                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
457                 if (!*och_p) {
458                         ll_file_data_put(fd);
459                         GOTO(out_och_free, rc = -ENOMEM);
460                 }
461                 (*och_usecount)++;
462                 if (!it->d.lustre.it_disposition) {
463                         rc = ll_intent_file_open(file, NULL, 0, it);
464                         if (rc) {
465                                 ll_file_data_put(fd);
466                                 GOTO(out_och_free, rc);
467                         }
468
469                         /* Got some error? Release the request */
470                         if (it->d.lustre.it_status < 0) {
471                                 req = it->d.lustre.it_data;
472                                 ptlrpc_req_finished(req);
473                         }
474                         mdc_set_lock_data(&it->d.lustre.it_lock_handle,
475                                           file->f_dentry->d_inode);
476                 }
477                 req = it->d.lustre.it_data;
478
479                 /* mdc_intent_lock() didn't get a request ref if there was an
480                  * open error, so don't do cleanup on the request here
481                  * (bug 3430) */
482                 /* XXX (green): Should not we bail out on any error here, not
483                  * just open error? */
484                 rc = it_open_error(DISP_OPEN_OPEN, it);
485                 if (rc) {
486                         ll_file_data_put(fd);
487                         GOTO(out_och_free, rc);
488                 }
489
490                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
491                 rc = ll_local_open(file, it, fd, *och_p);
492                 LASSERTF(rc == 0, "rc = %d\n", rc);
493         }
494         up(&lli->lli_och_sem);
495
496         /* Must do this outside lli_och_sem lock to prevent deadlock where
497            different kind of OPEN lock for this same inode gets cancelled
498            by ldlm_cancel_lru */
499         if (!S_ISREG(inode->i_mode))
500                 GOTO(out, rc);
501
502         lsm = lli->lli_smd;
503         if (lsm == NULL) {
504                 if (file->f_flags & O_LOV_DELAY_CREATE ||
505                     !(file->f_mode & FMODE_WRITE)) {
506                         CDEBUG(D_INODE, "object creation was delayed\n");
507                         GOTO(out, rc);
508                 }
509         }
510         file->f_flags &= ~O_LOV_DELAY_CREATE;
511         GOTO(out, rc);
512  out:
513         ptlrpc_req_finished(req);
514         if (req)
515                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
516         if (rc == 0) {
517                 ll_open_complete(inode);
518         } else {
519 out_och_free:
520                 if (*och_p) {
521                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
522                         *och_p = NULL; /* OBD_FREE writes some magic there */
523                         (*och_usecount)--;
524                 }
525                 up(&lli->lli_och_sem);
526         }
527         return rc;
528 }
529
530 /* Fills the obdo with the attributes for the inode defined by lsm */
531 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
532                    struct obdo *oa)
533 {
534         struct ptlrpc_request_set *set;
535         struct obd_info oinfo = { { { 0 } } };
536         int rc;
537         ENTRY;
538
539         LASSERT(lsm != NULL);
540
541         memset(oa, 0, sizeof *oa);
542         oinfo.oi_md = lsm;
543         oinfo.oi_oa = oa;
544         oa->o_id = lsm->lsm_object_id;
545         oa->o_mode = S_IFREG;
546         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
547                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
548                 OBD_MD_FLCTIME;
549
550         set = ptlrpc_prep_set();
551         if (set == NULL) {
552                 rc = -ENOMEM;
553         } else {
554                 rc = obd_getattr_async(exp, &oinfo, set);
555                 if (rc == 0)
556                         rc = ptlrpc_set_wait(set);
557                 ptlrpc_set_destroy(set);
558         }
559         if (rc)
560                 RETURN(rc);
561
562         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
563                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
564         RETURN(0);
565 }
566
567 static inline void ll_remove_suid(struct inode *inode)
568 {
569         unsigned int mode;
570
571         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
572         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
573
574         /* was any of the uid bits set? */
575         mode &= inode->i_mode;
576         if (mode && !capable(CAP_FSETID)) {
577                 inode->i_mode &= ~mode;
578                 // XXX careful here - we cannot change the size
579         }
580 }
581
582 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
583 {
584         struct ll_inode_info *lli = ll_i2info(inode);
585         struct lov_stripe_md *lsm = lli->lli_smd;
586         struct obd_export *exp = ll_i2obdexp(inode);
587         struct {
588                 char name[16];
589                 struct ldlm_lock *lock;
590                 struct lov_stripe_md *lsm;
591         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
592         __u32 stripe, vallen = sizeof(stripe);
593         int rc;
594         ENTRY;
595
596         if (lsm->lsm_stripe_count == 1)
597                 GOTO(check, stripe = 0);
598
599         /* get our offset in the lov */
600         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
601         if (rc != 0) {
602                 CERROR("obd_get_info: rc = %d\n", rc);
603                 RETURN(rc);
604         }
605         LASSERT(stripe < lsm->lsm_stripe_count);
606
607 check:
608         if (lsm->lsm_oinfo[stripe].loi_id != lock->l_resource->lr_name.name[0]||
609             lsm->lsm_oinfo[stripe].loi_gr != lock->l_resource->lr_name.name[1]){
610                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
611                            lsm->lsm_oinfo[stripe].loi_id,
612                            lsm->lsm_oinfo[stripe].loi_gr);
613                 RETURN(-ELDLM_NO_LOCK_DATA);
614         }
615
616         RETURN(stripe);
617 }
618
619 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
620  * we get a lock cancellation for each stripe, so we have to map the obd's
621  * region back onto the stripes in the file that it held.
622  *
623  * No one can dirty the extent until we've finished our work and they can
624  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
625  * but other kernel actors could have pages locked.
626  *
627  * Called with the DLM lock held. */
628 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
629                               struct ldlm_lock *lock, __u32 stripe)
630 {
631         ldlm_policy_data_t tmpex;
632         unsigned long start, end, count, skip, i, j;
633         struct page *page;
634         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
635         struct lustre_handle lockh;
636         ENTRY;
637
638         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
639         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
640                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
641                inode->i_size);
642
643         /* our locks are page granular thanks to osc_enqueue, we invalidate the
644          * whole page. */
645         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
646             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
647                 LDLM_ERROR(lock, "lock not aligned on CFS_PAGE_SIZE %lu", CFS_PAGE_SIZE);
648         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
649         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
650
651         count = ~0;
652         skip = 0;
653         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
654         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
655         if (lsm->lsm_stripe_count > 1) {
656                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
657                 skip = (lsm->lsm_stripe_count - 1) * count;
658                 start += start/count * skip + stripe * count;
659                 if (end != ~0)
660                         end += end/count * skip + stripe * count;
661         }
662         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
663                 end = ~0;
664
665         i = inode->i_size ? (inode->i_size - 1) >> CFS_PAGE_SHIFT : 0;
666         if (i < end)
667                 end = i;
668
669         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
670                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
671                count, skip, end, discard ? " (DISCARDING)" : "");
672
673         /* walk through the vmas on the inode and tear down mmaped pages that
674          * intersect with the lock.  this stops immediately if there are no
675          * mmap()ed regions of the file.  This is not efficient at all and
676          * should be short lived. We'll associate mmap()ed pages with the lock
677          * and will be able to find them directly */
678         for (i = start; i <= end; i += (j + skip)) {
679                 j = min(count - (i % count), end - i + 1);
680                 LASSERT(j > 0);
681                 LASSERT(inode->i_mapping);
682                 if (ll_teardown_mmaps(inode->i_mapping,
683                                       (__u64)i << CFS_PAGE_SHIFT,
684                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
685                         break;
686         }
687
688         /* this is the simplistic implementation of page eviction at
689          * cancelation.  It is careful to get races with other page
690          * lockers handled correctly.  fixes from bug 20 will make it
691          * more efficient by associating locks with pages and with
692          * batching writeback under the lock explicitly. */
693         for (i = start, j = start % count; i <= end;
694              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
695                 if (j == count) {
696                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
697                         i += skip;
698                         j = 0;
699                         if (i > end)
700                                 break;
701                 }
702                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
703                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
704                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
705                          start, i, end);
706
707                 if (!mapping_has_pages(inode->i_mapping)) {
708                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
709                         break;
710                 }
711
712                 cond_resched();
713
714                 page = find_get_page(inode->i_mapping, i);
715                 if (page == NULL)
716                         continue;
717                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
718                                i, tmpex.l_extent.start);
719                 lock_page(page);
720
721                 /* page->mapping to check with racing against teardown */
722                 if (!discard && clear_page_dirty_for_io(page)) {
723                         rc = ll_call_writepage(inode, page);
724                         if (rc != 0)
725                                 CERROR("writepage of page %p failed: %d\n",
726                                        page, rc);
727                         /* either waiting for io to complete or reacquiring
728                          * the lock that the failed writepage released */
729                         lock_page(page);
730                 }
731
732                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
733                 /* check to see if another DLM lock covers this page  b=2765 */
734                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
735                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
736                                       LDLM_FL_TEST_LOCK,
737                                       &lock->l_resource->lr_name, LDLM_EXTENT,
738                                       &tmpex, LCK_PR | LCK_PW, &lockh);
739                 if (rc2 == 0 && page->mapping != NULL) {
740                         struct ll_async_page *llap = llap_cast_private(page);
741                         // checking again to account for writeback's lock_page()
742                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
743                         if (llap)
744                                 ll_ra_accounting(llap, inode->i_mapping);
745                         ll_truncate_complete_page(page);
746                 }
747                 unlock_page(page);
748                 page_cache_release(page);
749         }
750         LASSERTF(tmpex.l_extent.start <=
751                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
752                   lock->l_policy_data.l_extent.end + 1),
753                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
754                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
755                  start, i, end);
756         EXIT;
757 }
758
759 static int ll_extent_lock_callback(struct ldlm_lock *lock,
760                                    struct ldlm_lock_desc *new, void *data,
761                                    int flag)
762 {
763         struct lustre_handle lockh = { 0 };
764         int rc;
765         ENTRY;
766
767         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
768                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
769                 LBUG();
770         }
771
772         switch (flag) {
773         case LDLM_CB_BLOCKING:
774                 ldlm_lock2handle(lock, &lockh);
775                 rc = ldlm_cli_cancel(&lockh);
776                 if (rc != ELDLM_OK)
777                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
778                 break;
779         case LDLM_CB_CANCELING: {
780                 struct inode *inode;
781                 struct ll_inode_info *lli;
782                 struct lov_stripe_md *lsm;
783                 int stripe;
784                 __u64 kms;
785
786                 /* This lock wasn't granted, don't try to evict pages */
787                 if (lock->l_req_mode != lock->l_granted_mode)
788                         RETURN(0);
789
790                 inode = ll_inode_from_lock(lock);
791                 if (inode == NULL)
792                         RETURN(0);
793                 lli = ll_i2info(inode);
794                 if (lli == NULL)
795                         goto iput;
796                 if (lli->lli_smd == NULL)
797                         goto iput;
798                 lsm = lli->lli_smd;
799
800                 stripe = ll_lock_to_stripe_offset(inode, lock);
801                 if (stripe < 0)
802                         goto iput;
803
804                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
805
806                 lov_stripe_lock(lsm);
807                 lock_res_and_lock(lock);
808                 kms = ldlm_extent_shift_kms(lock,
809                                             lsm->lsm_oinfo[stripe].loi_kms);
810
811                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
812                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
813                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
814                 lsm->lsm_oinfo[stripe].loi_kms = kms;
815                 unlock_res_and_lock(lock);
816                 lov_stripe_unlock(lsm);
817                 //ll_try_done_writing(inode);
818         iput:
819                 iput(inode);
820                 break;
821         }
822         default:
823                 LBUG();
824         }
825
826         RETURN(0);
827 }
828
829 #if 0
830 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
831 {
832         /* XXX ALLOCATE - 160 bytes */
833         struct inode *inode = ll_inode_from_lock(lock);
834         struct ll_inode_info *lli = ll_i2info(inode);
835         struct lustre_handle lockh = { 0 };
836         struct ost_lvb *lvb;
837         int stripe;
838         ENTRY;
839
840         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
841                      LDLM_FL_BLOCK_CONV)) {
842                 LBUG(); /* not expecting any blocked async locks yet */
843                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
844                            "lock, returning");
845                 ldlm_lock_dump(D_OTHER, lock, 0);
846                 ldlm_reprocess_all(lock->l_resource);
847                 RETURN(0);
848         }
849
850         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
851
852         stripe = ll_lock_to_stripe_offset(inode, lock);
853         if (stripe < 0)
854                 goto iput;
855
856         if (lock->l_lvb_len) {
857                 struct lov_stripe_md *lsm = lli->lli_smd;
858                 __u64 kms;
859                 lvb = lock->l_lvb_data;
860                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
861
862                 LOCK_INODE_MUTEX(inode);
863                 lock_res_and_lock(lock);
864                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
865                 kms = ldlm_extent_shift_kms(NULL, kms);
866                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
867                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
868                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
869                 lsm->lsm_oinfo[stripe].loi_kms = kms;
870                 unlock_res_and_lock(lock);
871                 UNLOCK_INODE_MUTEX(inode);
872         }
873
874 iput:
875         iput(inode);
876         wake_up(&lock->l_waitq);
877
878         ldlm_lock2handle(lock, &lockh);
879         ldlm_lock_decref(&lockh, LCK_PR);
880         RETURN(0);
881 }
882 #endif
883
884 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
885 {
886         struct ptlrpc_request *req = reqp;
887         struct inode *inode = ll_inode_from_lock(lock);
888         struct ll_inode_info *lli;
889         struct lov_stripe_md *lsm;
890         struct ost_lvb *lvb;
891         int rc, stripe;
892         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
893         ENTRY;
894
895         if (inode == NULL)
896                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
897         lli = ll_i2info(inode);
898         if (lli == NULL)
899                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
900         lsm = lli->lli_smd;
901         if (lsm == NULL)
902                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
903
904         /* First, find out which stripe index this lock corresponds to. */
905         stripe = ll_lock_to_stripe_offset(inode, lock);
906         if (stripe < 0)
907                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
908
909         rc = lustre_pack_reply(req, 2, size, NULL);
910         if (rc) {
911                 CERROR("lustre_pack_reply: %d\n", rc);
912                 GOTO(iput, rc);
913         }
914
915         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
916         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
917         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
918         lvb->lvb_atime = LTIME_S(inode->i_atime);
919         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
920
921         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
922                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
923                    inode->i_size, stripe, lvb->lvb_size, lvb->lvb_mtime,
924                    lvb->lvb_atime, lvb->lvb_ctime);
925  iput:
926         iput(inode);
927
928  out:
929         /* These errors are normal races, so we don't want to fill the console
930          * with messages by calling ptlrpc_error() */
931         if (rc == -ELDLM_NO_LOCK_DATA)
932                 lustre_pack_reply(req, 1, NULL, NULL);
933
934         req->rq_status = rc;
935         return rc;
936 }
937
938 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
939                      lstat_t *st)
940 {
941         struct lustre_handle lockh = { 0 };
942         struct obd_enqueue_info einfo = { 0 };
943         struct obd_info oinfo = { { { 0 } } };
944         struct ost_lvb lvb;
945         int rc;
946         
947         ENTRY;
948         
949         einfo.ei_type = LDLM_EXTENT;
950         einfo.ei_mode = LCK_PR;
951         einfo.ei_flags = LDLM_FL_HAS_INTENT;
952         einfo.ei_cb_bl = ll_extent_lock_callback;
953         einfo.ei_cb_cp = ldlm_completion_ast;
954         einfo.ei_cb_gl = ll_glimpse_callback;
955         einfo.ei_cbdata = NULL;
956
957         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
958         oinfo.oi_lockh = &lockh;
959         oinfo.oi_md = lsm;
960
961         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
962         if (rc == -ENOENT)
963                 RETURN(rc);
964         if (rc != 0) {
965                 CERROR("obd_enqueue returned rc %d, "
966                        "returning -EIO\n", rc);
967                 RETURN(rc > 0 ? -EIO : rc);
968         }
969         
970         lov_stripe_lock(lsm);
971         memset(&lvb, 0, sizeof(lvb));
972         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 0);
973         st->st_size = lvb.lvb_size;
974         st->st_blocks = lvb.lvb_blocks;
975         st->st_mtime = lvb.lvb_mtime;
976         st->st_atime = lvb.lvb_atime;
977         st->st_ctime = lvb.lvb_ctime;
978         lov_stripe_unlock(lsm);
979         
980         RETURN(rc);
981 }
982
983 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
984  * file (because it prefers KMS over RSS when larger) */
985 int ll_glimpse_size(struct inode *inode, int ast_flags)
986 {
987         struct ll_inode_info *lli = ll_i2info(inode);
988         struct ll_sb_info *sbi = ll_i2sbi(inode);
989         struct lustre_handle lockh = { 0 };
990         struct obd_enqueue_info einfo = { 0 };
991         struct obd_info oinfo = { { { 0 } } };
992         struct ost_lvb lvb;
993         int rc;
994         ENTRY;
995
996         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
997
998         if (!lli->lli_smd) {
999                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1000                 RETURN(0);
1001         }
1002
1003         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1004          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1005          *       won't revoke any conflicting DLM locks held. Instead,
1006          *       ll_glimpse_callback() will be called on each client
1007          *       holding a DLM lock against this file, and resulting size
1008          *       will be returned for each stripe. DLM lock on [0, EOF] is
1009          *       acquired only if there were no conflicting locks. */
1010         einfo.ei_type = LDLM_EXTENT;
1011         einfo.ei_mode = LCK_PR;
1012         einfo.ei_flags = ast_flags | LDLM_FL_HAS_INTENT;
1013         einfo.ei_cb_bl = ll_extent_lock_callback;
1014         einfo.ei_cb_cp = ldlm_completion_ast;
1015         einfo.ei_cb_gl = ll_glimpse_callback;
1016         einfo.ei_cbdata = inode;
1017
1018         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1019         oinfo.oi_lockh = &lockh;
1020         oinfo.oi_md = lli->lli_smd;
1021
1022         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
1023         if (rc == -ENOENT)
1024                 RETURN(rc);
1025         if (rc != 0) {
1026                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1027                 RETURN(rc > 0 ? -EIO : rc);
1028         }
1029
1030         ll_inode_size_lock(inode, 1);
1031         inode_init_lvb(inode, &lvb);
1032         obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0);
1033         inode->i_size = lvb.lvb_size;
1034         inode->i_blocks = lvb.lvb_blocks;
1035         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1036         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1037         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1038         ll_inode_size_unlock(inode, 1);
1039
1040         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1041                inode->i_size, inode->i_blocks);
1042
1043         RETURN(rc);
1044 }
1045
1046 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1047                    struct lov_stripe_md *lsm, int mode,
1048                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1049                    int ast_flags)
1050 {
1051         struct ll_sb_info *sbi = ll_i2sbi(inode);
1052         struct ost_lvb lvb;
1053         struct obd_enqueue_info einfo = { 0 };
1054         struct obd_info oinfo = { { { 0 } } };
1055         int rc;
1056         ENTRY;
1057
1058         LASSERT(!lustre_handle_is_used(lockh));
1059         LASSERT(lsm != NULL);
1060
1061         /* don't drop the mmapped file to LRU */
1062         if (mapping_mapped(inode->i_mapping))
1063                 ast_flags |= LDLM_FL_NO_LRU;
1064
1065         /* XXX phil: can we do this?  won't it screw the file size up? */
1066         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1067             (sbi->ll_flags & LL_SBI_NOLCK))
1068                 RETURN(0);
1069
1070         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1071                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1072
1073         einfo.ei_type = LDLM_EXTENT;
1074         einfo.ei_mode = mode;
1075         einfo.ei_flags = ast_flags;
1076         einfo.ei_cb_bl = ll_extent_lock_callback;
1077         einfo.ei_cb_cp = ldlm_completion_ast;
1078         einfo.ei_cb_gl = ll_glimpse_callback;
1079         einfo.ei_cbdata = inode;
1080
1081         oinfo.oi_policy = *policy;
1082         oinfo.oi_lockh = lockh;
1083         oinfo.oi_md = lsm;
1084
1085         rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo);
1086         *policy = oinfo.oi_policy;
1087         if (rc > 0)
1088                 rc = -EIO;
1089
1090         ll_inode_size_lock(inode, 1);
1091         inode_init_lvb(inode, &lvb);
1092         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 1);
1093
1094         if (policy->l_extent.start == 0 &&
1095             policy->l_extent.end == OBD_OBJECT_EOF) {
1096                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1097                  * the kms under both a DLM lock and the
1098                  * ll_inode_size_lock().  If we don't get the
1099                  * ll_inode_size_lock() here we can match the DLM lock and
1100                  * reset i_size from the kms before the truncating path has
1101                  * updated the kms.  generic_file_write can then trust the
1102                  * stale i_size when doing appending writes and effectively
1103                  * cancel the result of the truncate.  Getting the
1104                  * ll_inode_size_lock() after the enqueue maintains the DLM
1105                  * -> ll_inode_size_lock() acquiring order. */
1106                 inode->i_size = lvb.lvb_size;
1107                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1108                        inode->i_ino, inode->i_size);
1109         }
1110
1111         if (rc == 0) {
1112                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1113                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1114                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1115         }
1116         ll_inode_size_unlock(inode, 1);
1117
1118         RETURN(rc);
1119 }
1120
1121 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1122                      struct lov_stripe_md *lsm, int mode,
1123                      struct lustre_handle *lockh)
1124 {
1125         struct ll_sb_info *sbi = ll_i2sbi(inode);
1126         int rc;
1127         ENTRY;
1128
1129         /* XXX phil: can we do this?  won't it screw the file size up? */
1130         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1131             (sbi->ll_flags & LL_SBI_NOLCK))
1132                 RETURN(0);
1133
1134         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
1135
1136         RETURN(rc);
1137 }
1138
1139 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1140                             loff_t *ppos)
1141 {
1142         struct inode *inode = file->f_dentry->d_inode;
1143         struct ll_inode_info *lli = ll_i2info(inode);
1144         struct lov_stripe_md *lsm = lli->lli_smd;
1145         struct ll_sb_info *sbi = ll_i2sbi(inode);
1146         struct ll_lock_tree tree;
1147         struct ll_lock_tree_node *node;
1148         struct ost_lvb lvb;
1149         struct ll_ra_read bead;
1150         int rc, ra = 0;
1151         loff_t end;
1152         ssize_t retval, chunk, sum = 0;
1153
1154         __u64 kms;
1155         ENTRY;
1156         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1157                inode->i_ino, inode->i_generation, inode, count, *ppos);
1158         ll_vfs_ops_tally(sbi, VFS_OPS_READ);
1159
1160         /* "If nbyte is 0, read() will return 0 and have no other results."
1161          *                      -- Single Unix Spec */
1162         if (count == 0)
1163                 RETURN(0);
1164
1165         lprocfs_counter_add(sbi->ll_stats, LPROC_LL_READ_BYTES, count);
1166
1167         if (!lsm) {
1168                 /* Read on file with no objects should return zero-filled
1169                  * buffers up to file size (we can get non-zero sizes with
1170                  * mknod + truncate, then opening file for read. This is a
1171                  * common pattern in NFS case, it seems). Bug 6243 */
1172                 int notzeroed;
1173                 /* Since there are no objects on OSTs, we have nothing to get
1174                  * lock on and so we are forced to access inode->i_size
1175                  * unguarded */
1176
1177                 /* Read beyond end of file */
1178                 if (*ppos >= inode->i_size)
1179                         RETURN(0);
1180
1181                 if (count > inode->i_size - *ppos)
1182                         count = inode->i_size - *ppos;
1183                 /* Make sure to correctly adjust the file pos pointer for
1184                  * EFAULT case */
1185                 notzeroed = clear_user(buf, count);
1186                 count -= notzeroed;
1187                 *ppos += count;
1188                 if (!count)
1189                         RETURN(-EFAULT);
1190                 RETURN(count);
1191         }
1192
1193 repeat:
1194         if (sbi->ll_max_rw_chunk != 0) {
1195                 /* first, let's know the end of the current stripe */
1196                 end = *ppos;
1197                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END,
1198                                 (obd_off *)&end);
1199
1200                 /* correct, the end is beyond the request */
1201                 if (end > *ppos + count - 1)
1202                         end = *ppos + count - 1;
1203
1204                 /* and chunk shouldn't be too large even if striping is wide */
1205                 if (end - *ppos > sbi->ll_max_rw_chunk)
1206                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1207         } else {
1208                 end = *ppos + count - 1;
1209         }
1210        
1211         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1212         tree.lt_fd = LUSTRE_FPRIVATE(file);
1213         rc = ll_tree_lock(&tree, node, buf, count,
1214                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1215         if (rc != 0)
1216                 GOTO(out, retval = rc);
1217
1218         ll_inode_size_lock(inode, 1);
1219         /*
1220          * Consistency guarantees: following possibilities exist for the
1221          * relation between region being read and real file size at this
1222          * moment:
1223          *
1224          *  (A): the region is completely inside of the file;
1225          *
1226          *  (B-x): x bytes of region are inside of the file, the rest is
1227          *  outside;
1228          *
1229          *  (C): the region is completely outside of the file.
1230          *
1231          * This classification is stable under DLM lock acquired by
1232          * ll_tree_lock() above, because to change class, other client has to
1233          * take DLM lock conflicting with our lock. Also, any updates to
1234          * ->i_size by other threads on this client are serialized by
1235          * ll_inode_size_lock(). This guarantees that short reads are handled
1236          * correctly in the face of concurrent writes and truncates.
1237          */
1238         inode_init_lvb(inode, &lvb);
1239         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1240         kms = lvb.lvb_size;
1241         if (*ppos + count - 1 > kms) {
1242                 /* A glimpse is necessary to determine whether we return a
1243                  * short read (B) or some zeroes at the end of the buffer (C) */
1244                 ll_inode_size_unlock(inode, 1);
1245                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1246                 if (retval) {
1247                         ll_tree_unlock(&tree);
1248                         goto out;
1249                 }
1250         } else {
1251                 /* region is within kms and, hence, within real file size (A).
1252                  * We need to increase i_size to cover the read region so that
1253                  * generic_file_read() will do its job, but that doesn't mean
1254                  * the kms size is _correct_, it is only the _minimum_ size.
1255                  * If someone does a stat they will get the correct size which
1256                  * will always be >= the kms value here.  b=11081 */
1257                 if (inode->i_size < kms)
1258                         inode->i_size = kms;
1259                 ll_inode_size_unlock(inode, 1);
1260         }
1261
1262         chunk = end - *ppos + 1;
1263         CDEBUG(D_INODE,"Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1264                inode->i_ino, chunk, *ppos, inode->i_size);
1265
1266         /* turn off the kernel's read-ahead */
1267 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1268         file->f_ramax = 0;
1269 #else
1270         file->f_ra.ra_pages = 0;
1271 #endif
1272         /* initialize read-ahead window once per syscall */
1273         if (ra == 0) {
1274                 ra = 1;
1275                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1276                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1277                 ll_ra_read_in(file, &bead);
1278         }
1279
1280         /* BUG: 5972 */
1281         file_accessed(file);
1282         retval = generic_file_read(file, buf, chunk, ppos);
1283         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 0);
1284
1285         ll_tree_unlock(&tree);
1286
1287         if (retval > 0) {
1288                 buf += retval;
1289                 count -= retval;
1290                 sum += retval;
1291                 if (retval == chunk && count > 0)
1292                         goto repeat;
1293         }
1294
1295  out:
1296         if (ra != 0)
1297                 ll_ra_read_ex(file, &bead);
1298         retval = (sum > 0) ? sum : retval;
1299         RETURN(retval);
1300 }
1301
1302 /*
1303  * Write to a file (through the page cache).
1304  */
1305 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1306                              loff_t *ppos)
1307 {
1308         struct inode *inode = file->f_dentry->d_inode;
1309         struct ll_sb_info *sbi = ll_i2sbi(inode);
1310         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1311         struct ll_lock_tree tree;
1312         struct ll_lock_tree_node *node;
1313         loff_t maxbytes = ll_file_maxbytes(inode);
1314         loff_t lock_start, lock_end, end;
1315         ssize_t retval, chunk, sum = 0;
1316         int rc;
1317         ENTRY;
1318
1319         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1320                inode->i_ino, inode->i_generation, inode, count, *ppos);
1321         ll_vfs_ops_tally(sbi, VFS_OPS_WRITE);
1322         
1323         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1324
1325         /* POSIX, but surprised the VFS doesn't check this already */
1326         if (count == 0)
1327                 RETURN(0);
1328
1329         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1330          * called on the file, don't fail the below assertion (bug 2388). */
1331         if (file->f_flags & O_LOV_DELAY_CREATE &&
1332             ll_i2info(inode)->lli_smd == NULL)
1333                 RETURN(-EBADF);
1334
1335         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1336
1337         down(&ll_i2info(inode)->lli_write_sem);
1338
1339 repeat:
1340         chunk = 0; /* just to fix gcc's warning */
1341         end = *ppos + count - 1;
1342
1343         if (file->f_flags & O_APPEND) {
1344                 lock_start = 0;
1345                 lock_end = OBD_OBJECT_EOF;
1346         } else if (sbi->ll_max_rw_chunk != 0) {
1347                 /* first, let's know the end of the current stripe */
1348                 end = *ppos;
1349                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END, 
1350                                 (obd_off *)&end);
1351
1352                 /* correct, the end is beyond the request */
1353                 if (end > *ppos + count - 1)
1354                         end = *ppos + count - 1;
1355
1356                 /* and chunk shouldn't be too large even if striping is wide */
1357                 if (end - *ppos > sbi->ll_max_rw_chunk)
1358                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1359                 lock_start = *ppos;
1360                 lock_end = end;
1361         } else {
1362                 lock_start = *ppos;
1363                 lock_end = *ppos + count - 1;
1364         }
1365         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1366
1367         if (IS_ERR(node))
1368                 GOTO(out, retval = PTR_ERR(node));
1369
1370         tree.lt_fd = LUSTRE_FPRIVATE(file);
1371         rc = ll_tree_lock(&tree, node, buf, count,
1372                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1373         if (rc != 0)
1374                 GOTO(out, retval = rc);
1375
1376         /* This is ok, g_f_w will overwrite this under i_sem if it races
1377          * with a local truncate, it just makes our maxbyte checking easier.
1378          * The i_size value gets updated in ll_extent_lock() as a consequence
1379          * of the [0,EOF] extent lock we requested above. */
1380         if (file->f_flags & O_APPEND) {
1381                 *ppos = inode->i_size;
1382                 end = *ppos + count - 1;
1383         }
1384
1385         if (*ppos >= maxbytes) {
1386                 send_sig(SIGXFSZ, current, 0);
1387                 GOTO(out, retval = -EFBIG);
1388         }
1389         if (*ppos + count > maxbytes)
1390                 count = maxbytes - *ppos;
1391
1392         /* generic_file_write handles O_APPEND after getting i_mutex */
1393         chunk = end - *ppos + 1;
1394         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1395                inode->i_ino, chunk, *ppos);
1396         retval = generic_file_write(file, buf, chunk, ppos);
1397         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1398
1399 out:
1400         ll_tree_unlock(&tree);
1401
1402         if (retval > 0) {
1403                 buf += retval;
1404                 count -= retval;
1405                 sum += retval;
1406                 if (retval == chunk && count > 0)
1407                         goto repeat;
1408         }
1409
1410         up(&ll_i2info(inode)->lli_write_sem);
1411
1412         retval = (sum > 0) ? sum : retval;
1413         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WRITE_BYTES,
1414                             retval > 0 ? retval : 0);
1415
1416         if (retval > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) {
1417                 rc = ll_sync_page_range(inode, inode->i_mapping, *ppos - retval,
1418                                         count);
1419                 if (rc < 0)
1420                         retval = rc;
1421         }
1422
1423         RETURN(retval);
1424 }
1425
1426 /*
1427  * Send file content (through pagecache) somewhere with helper
1428  */
1429 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1430 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1431                                 read_actor_t actor, void *target)
1432 {
1433         struct inode *inode = in_file->f_dentry->d_inode;
1434         struct ll_inode_info *lli = ll_i2info(inode);
1435         struct lov_stripe_md *lsm = lli->lli_smd;
1436         struct ll_lock_tree tree;
1437         struct ll_lock_tree_node *node;
1438         struct ost_lvb lvb;
1439         struct ll_ra_read bead;
1440         int rc;
1441         ssize_t retval;
1442         __u64 kms;
1443         ENTRY;
1444         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1445                inode->i_ino, inode->i_generation, inode, count, *ppos);
1446
1447         /* "If nbyte is 0, read() will return 0 and have no other results."
1448          *                      -- Single Unix Spec */
1449         if (count == 0)
1450                 RETURN(0);
1451
1452         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
1453                             count);
1454
1455         /* turn off the kernel's read-ahead */
1456         in_file->f_ra.ra_pages = 0;
1457
1458         /* File with no objects, nothing to lock */
1459         if (!lsm)
1460                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1461
1462         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1463         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1464         rc = ll_tree_lock(&tree, node, NULL, count,
1465                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1466         if (rc != 0)
1467                 RETURN(rc);
1468
1469         ll_inode_size_lock(inode, 1);
1470         /*
1471          * Consistency guarantees: following possibilities exist for the
1472          * relation between region being read and real file size at this
1473          * moment:
1474          *
1475          *  (A): the region is completely inside of the file;
1476          *
1477          *  (B-x): x bytes of region are inside of the file, the rest is
1478          *  outside;
1479          *
1480          *  (C): the region is completely outside of the file.
1481          *
1482          * This classification is stable under DLM lock acquired by
1483          * ll_tree_lock() above, because to change class, other client has to
1484          * take DLM lock conflicting with our lock. Also, any updates to
1485          * ->i_size by other threads on this client are serialized by
1486          * ll_inode_size_lock(). This guarantees that short reads are handled
1487          * correctly in the face of concurrent writes and truncates.
1488          */
1489         inode_init_lvb(inode, &lvb);
1490         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1491         kms = lvb.lvb_size;
1492         if (*ppos + count - 1 > kms) {
1493                 /* A glimpse is necessary to determine whether we return a
1494                  * short read (B) or some zeroes at the end of the buffer (C) */
1495                 ll_inode_size_unlock(inode, 1);
1496                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1497                 if (retval)
1498                         goto out;
1499         } else {
1500                 /* region is within kms and, hence, within real file size (A) */
1501                 inode->i_size = kms;
1502                 ll_inode_size_unlock(inode, 1);
1503         }
1504
1505         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1506                inode->i_ino, count, *ppos, inode->i_size);
1507
1508         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1509         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1510         ll_ra_read_in(in_file, &bead);
1511         /* BUG: 5972 */
1512         file_accessed(in_file);
1513         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1514         ll_ra_read_ex(in_file, &bead);
1515
1516  out:
1517         ll_tree_unlock(&tree);
1518         RETURN(retval);
1519 }
1520 #endif
1521
1522 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1523                                unsigned long arg)
1524 {
1525         struct ll_inode_info *lli = ll_i2info(inode);
1526         struct obd_export *exp = ll_i2obdexp(inode);
1527         struct ll_recreate_obj ucreatp;
1528         struct obd_trans_info oti = { 0 };
1529         struct obdo *oa = NULL;
1530         int lsm_size;
1531         int rc = 0;
1532         struct lov_stripe_md *lsm, *lsm2;
1533         ENTRY;
1534
1535         if (!capable (CAP_SYS_ADMIN))
1536                 RETURN(-EPERM);
1537
1538         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1539                             sizeof(struct ll_recreate_obj));
1540         if (rc) {
1541                 RETURN(-EFAULT);
1542         }
1543         oa = obdo_alloc();
1544         if (oa == NULL)
1545                 RETURN(-ENOMEM);
1546
1547         down(&lli->lli_open_sem);
1548         lsm = lli->lli_smd;
1549         if (lsm == NULL)
1550                 GOTO(out, rc = -ENOENT);
1551         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1552                    (lsm->lsm_stripe_count));
1553
1554         OBD_ALLOC(lsm2, lsm_size);
1555         if (lsm2 == NULL)
1556                 GOTO(out, rc = -ENOMEM);
1557
1558         oa->o_id = ucreatp.lrc_id;
1559         oa->o_nlink = ucreatp.lrc_ost_idx;
1560         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1561         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
1562         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1563                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1564
1565         oti.oti_objid = NULL;
1566         memcpy(lsm2, lsm, lsm_size);
1567         rc = obd_create(exp, oa, &lsm2, &oti);
1568
1569         OBD_FREE(lsm2, lsm_size);
1570         GOTO(out, rc);
1571 out:
1572         up(&lli->lli_open_sem);
1573         obdo_free(oa);
1574         return rc;
1575 }
1576
1577 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1578                                     int flags, struct lov_user_md *lum,
1579                                     int lum_size)
1580 {
1581         struct ll_inode_info *lli = ll_i2info(inode);
1582         struct lov_stripe_md *lsm;
1583         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1584         int rc = 0;
1585         ENTRY;
1586
1587         down(&lli->lli_open_sem);
1588         lsm = lli->lli_smd;
1589         if (lsm) {
1590                 up(&lli->lli_open_sem);
1591                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1592                        inode->i_ino);
1593                 RETURN(-EEXIST);
1594         }
1595
1596         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1597         if (rc)
1598                 GOTO(out, rc);
1599         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1600                 GOTO(out_req_free, rc = -ENOENT);
1601         rc = oit.d.lustre.it_status;
1602         if (rc < 0)
1603                 GOTO(out_req_free, rc);
1604
1605         ll_release_openhandle(file->f_dentry, &oit);
1606
1607  out:
1608         up(&lli->lli_open_sem);
1609         ll_intent_release(&oit);
1610         RETURN(rc);
1611 out_req_free:
1612         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1613         goto out;
1614 }
1615
1616 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1617                              struct lov_mds_md **lmmp, int *lmm_size, 
1618                              struct ptlrpc_request **request)
1619 {
1620         struct ll_sb_info *sbi = ll_i2sbi(inode);
1621         struct ll_fid  fid;
1622         struct mds_body  *body;
1623         struct lov_mds_md *lmm = NULL;
1624         struct ptlrpc_request *req = NULL;
1625         int rc, lmmsize;
1626
1627         ll_inode2fid(&fid, inode);
1628
1629         rc = ll_get_max_mdsize(sbi, &lmmsize);
1630         if (rc)
1631                 RETURN(rc);
1632
1633         rc = mdc_getattr_name(sbi->ll_mdc_exp, &fid,
1634                         filename, strlen(filename) + 1,
1635                         OBD_MD_FLEASIZE | OBD_MD_FLDIREA,
1636                         lmmsize, &req);
1637         if (rc < 0) {
1638                 CDEBUG(D_INFO, "mdc_getattr_name failed "
1639                                 "on %s: rc %d\n", filename, rc);
1640                 GOTO(out, rc);
1641         }
1642
1643         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1644                         sizeof(*body));
1645         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1646         /* swabbed by mdc_getattr_name */
1647         LASSERT_REPSWABBED(req, REPLY_REC_OFF);
1648
1649         lmmsize = body->eadatasize;
1650
1651         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1652                         lmmsize == 0) {
1653                 GOTO(out, rc = -ENODATA);
1654         }
1655
1656         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
1657                         lmmsize);
1658         LASSERT(lmm != NULL);
1659         LASSERT_REPSWABBED(req, REPLY_REC_OFF + 1);
1660
1661         /*
1662          * This is coming from the MDS, so is probably in
1663          * little endian.  We convert it to host endian before
1664          * passing it to userspace.
1665          */
1666         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1667                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1668                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1669         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1670                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1671         }
1672
1673         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1674                 struct lov_stripe_md *lsm;
1675                 struct lov_user_md_join *lmj;
1676                 int lmj_size, i, aindex = 0;
1677
1678                 rc = obd_unpackmd(sbi->ll_osc_exp, &lsm, lmm, lmmsize);
1679                 if (rc < 0)
1680                         GOTO(out, rc = -ENOMEM);
1681                 rc = obd_checkmd(sbi->ll_osc_exp, sbi->ll_mdc_exp, lsm);
1682                 if (rc)
1683                         GOTO(out_free_memmd, rc);
1684
1685                 lmj_size = sizeof(struct lov_user_md_join) +
1686                         lsm->lsm_stripe_count *
1687                         sizeof(struct lov_user_ost_data_join);
1688                 OBD_ALLOC(lmj, lmj_size);
1689                 if (!lmj)
1690                         GOTO(out_free_memmd, rc = -ENOMEM);
1691
1692                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1693                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1694                         struct lov_extent *lex =
1695                                 &lsm->lsm_array->lai_ext_array[aindex];
1696
1697                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1698                                 aindex ++;
1699                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1700                                         LPU64" len %d\n", aindex, i,
1701                                         lex->le_start, (int)lex->le_len);
1702                         lmj->lmm_objects[i].l_extent_start =
1703                                 lex->le_start;
1704
1705                         if ((int)lex->le_len == -1)
1706                                 lmj->lmm_objects[i].l_extent_end = -1;
1707                         else
1708                                 lmj->lmm_objects[i].l_extent_end =
1709                                         lex->le_start + lex->le_len;
1710                         lmj->lmm_objects[i].l_object_id =
1711                                 lsm->lsm_oinfo[i].loi_id;
1712                         lmj->lmm_objects[i].l_object_gr =
1713                                 lsm->lsm_oinfo[i].loi_gr;
1714                         lmj->lmm_objects[i].l_ost_gen =
1715                                 lsm->lsm_oinfo[i].loi_ost_gen;
1716                         lmj->lmm_objects[i].l_ost_idx =
1717                                 lsm->lsm_oinfo[i].loi_ost_idx;
1718                 }
1719                 lmm = (struct lov_mds_md *)lmj;
1720                 lmmsize = lmj_size;
1721 out_free_memmd:
1722                 obd_free_memmd(sbi->ll_osc_exp, &lsm);
1723         }
1724 out:
1725         *lmmp = lmm;
1726         *lmm_size = lmmsize;
1727         *request = req;
1728         return rc;
1729 }
1730 static int ll_lov_setea(struct inode *inode, struct file *file,
1731                             unsigned long arg)
1732 {
1733         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1734         struct lov_user_md  *lump;
1735         int lum_size = sizeof(struct lov_user_md) +
1736                        sizeof(struct lov_user_ost_data);
1737         int rc;
1738         ENTRY;
1739
1740         if (!capable (CAP_SYS_ADMIN))
1741                 RETURN(-EPERM);
1742
1743         OBD_ALLOC(lump, lum_size);
1744         if (lump == NULL) {
1745                 RETURN(-ENOMEM);
1746         }
1747         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1748         if (rc) {
1749                 OBD_FREE(lump, lum_size);
1750                 RETURN(-EFAULT);
1751         }
1752
1753         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1754
1755         OBD_FREE(lump, lum_size);
1756         RETURN(rc);
1757 }
1758
1759 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1760                             unsigned long arg)
1761 {
1762         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1763         int rc;
1764         int flags = FMODE_WRITE;
1765         ENTRY;
1766
1767         /* Bug 1152: copy properly when this is no longer true */
1768         LASSERT(sizeof(lum) == sizeof(*lump));
1769         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1770         rc = copy_from_user(&lum, lump, sizeof(lum));
1771         if (rc)
1772                 RETURN(-EFAULT);
1773
1774         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1775         if (rc == 0) {
1776                  put_user(0, &lump->lmm_stripe_count);
1777                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode),
1778                                     0, ll_i2info(inode)->lli_smd, lump);
1779         }
1780         RETURN(rc);
1781 }
1782
1783 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1784 {
1785         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1786
1787         if (!lsm)
1788                 RETURN(-ENODATA);
1789
1790         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode), 0, lsm,
1791                             (void *)arg);
1792 }
1793
1794 static int ll_get_grouplock(struct inode *inode, struct file *file,
1795                             unsigned long arg)
1796 {
1797         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1798         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1799                                                     .end = OBD_OBJECT_EOF}};
1800         struct lustre_handle lockh = { 0 };
1801         struct ll_inode_info *lli = ll_i2info(inode);
1802         struct lov_stripe_md *lsm = lli->lli_smd;
1803         int flags = 0, rc;
1804         ENTRY;
1805
1806         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1807                 RETURN(-EINVAL);
1808         }
1809
1810         policy.l_extent.gid = arg;
1811         if (file->f_flags & O_NONBLOCK)
1812                 flags = LDLM_FL_BLOCK_NOWAIT;
1813
1814         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1815         if (rc)
1816                 RETURN(rc);
1817
1818         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1819         fd->fd_gid = arg;
1820         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1821
1822         RETURN(0);
1823 }
1824
1825 static int ll_put_grouplock(struct inode *inode, struct file *file,
1826                             unsigned long arg)
1827 {
1828         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1829         struct ll_inode_info *lli = ll_i2info(inode);
1830         struct lov_stripe_md *lsm = lli->lli_smd;
1831         int rc;
1832         ENTRY;
1833
1834         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1835                 /* Ugh, it's already unlocked. */
1836                 RETURN(-EINVAL);
1837         }
1838
1839         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1840                 RETURN(-EINVAL);
1841
1842         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1843
1844         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1845         if (rc)
1846                 RETURN(rc);
1847
1848         fd->fd_gid = 0;
1849         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1850
1851         RETURN(0);
1852 }
1853
1854 static int join_sanity_check(struct inode *head, struct inode *tail)
1855 {
1856         ENTRY;
1857         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
1858                 CERROR("server do not support join \n");
1859                 RETURN(-EINVAL);
1860         }
1861         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
1862                 CERROR("tail ino %lu and ino head %lu must be regular\n",
1863                        head->i_ino, tail->i_ino);
1864                 RETURN(-EINVAL);
1865         }
1866         if (head->i_ino == tail->i_ino) {
1867                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
1868                 RETURN(-EINVAL);
1869         }
1870         if (head->i_size % JOIN_FILE_ALIGN) {
1871                 CERROR("hsize %llu must be times of 64K\n", head->i_size);
1872                 RETURN(-EINVAL);
1873         }
1874         RETURN(0);
1875 }
1876
1877 static int join_file(struct inode *head_inode, struct file *head_filp,
1878                      struct file *tail_filp)
1879 {
1880         struct inode *tail_inode, *tail_parent;
1881         struct dentry *tail_dentry = tail_filp->f_dentry;
1882         struct lookup_intent oit = {.it_op = IT_OPEN,
1883                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
1884         struct lustre_handle lockh;
1885         struct mdc_op_data *op_data;
1886         __u32  hsize = head_inode->i_size >> 32;
1887         __u32  tsize = head_inode->i_size;
1888         int    rc;
1889         ENTRY;
1890
1891         tail_dentry = tail_filp->f_dentry;
1892         tail_inode = tail_dentry->d_inode;
1893         tail_parent = tail_dentry->d_parent->d_inode;
1894
1895         OBD_ALLOC_PTR(op_data);
1896         if (op_data == NULL) {
1897                 RETURN(-ENOMEM);
1898         }
1899
1900         ll_prepare_mdc_op_data(op_data, head_inode, tail_parent,
1901                                tail_dentry->d_name.name,
1902                                tail_dentry->d_name.len, 0);
1903         rc = mdc_enqueue(ll_i2mdcexp(head_inode), LDLM_IBITS, &oit, LCK_PW,
1904                          op_data, &lockh, &tsize, 0, ldlm_completion_ast,
1905                          ll_mdc_blocking_ast, &hsize, 0);
1906
1907         if (rc < 0)
1908                 GOTO(out, rc);
1909
1910         rc = oit.d.lustre.it_status;
1911
1912         if (rc < 0) {
1913                 ptlrpc_req_finished((struct ptlrpc_request *)
1914                                                           oit.d.lustre.it_data);
1915                 GOTO(out, rc);
1916         }
1917
1918         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
1919                                            * away */
1920                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
1921                 oit.d.lustre.it_lock_mode = 0;
1922         }
1923         ll_release_openhandle(head_filp->f_dentry, &oit);
1924 out:
1925         if (op_data)
1926                 OBD_FREE_PTR(op_data);
1927         ll_intent_release(&oit);
1928         RETURN(rc);
1929 }
1930
1931 static int ll_file_join(struct inode *head, struct file *filp,
1932                         char *filename_tail)
1933 {
1934         struct inode *tail = NULL, *first = NULL, *second = NULL;
1935         struct dentry *tail_dentry;
1936         struct file *tail_filp, *first_filp, *second_filp;
1937         struct ll_lock_tree first_tree, second_tree;
1938         struct ll_lock_tree_node *first_node, *second_node;
1939         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
1940         int rc = 0, cleanup_phase = 0;
1941         ENTRY;
1942
1943         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
1944                head->i_ino, head->i_generation, head, filename_tail);
1945
1946         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
1947         if (IS_ERR(tail_filp)) {
1948                 CERROR("Can not open tail file %s", filename_tail);
1949                 rc = PTR_ERR(tail_filp);
1950                 GOTO(cleanup, rc);
1951         }
1952         tail = igrab(tail_filp->f_dentry->d_inode);
1953
1954         tlli = ll_i2info(tail);
1955         tail_dentry = tail_filp->f_dentry;
1956         LASSERT(tail_dentry);
1957         cleanup_phase = 1;
1958
1959         /*reorder the inode for lock sequence*/
1960         first = head->i_ino > tail->i_ino ? head : tail;
1961         second = head->i_ino > tail->i_ino ? tail : head;
1962         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
1963         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
1964
1965         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
1966                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
1967         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
1968         if (IS_ERR(first_node)){
1969                 rc = PTR_ERR(first_node);
1970                 GOTO(cleanup, rc);
1971         }
1972         first_tree.lt_fd = first_filp->private_data;
1973         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
1974         if (rc != 0)
1975                 GOTO(cleanup, rc);
1976         cleanup_phase = 2;
1977
1978         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
1979         if (IS_ERR(second_node)){
1980                 rc = PTR_ERR(second_node);
1981                 GOTO(cleanup, rc);
1982         }
1983         second_tree.lt_fd = second_filp->private_data;
1984         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
1985         if (rc != 0)
1986                 GOTO(cleanup, rc);
1987         cleanup_phase = 3;
1988
1989         rc = join_sanity_check(head, tail);
1990         if (rc)
1991                 GOTO(cleanup, rc);
1992
1993         rc = join_file(head, filp, tail_filp);
1994         if (rc)
1995                 GOTO(cleanup, rc);
1996 cleanup:
1997         switch (cleanup_phase) {
1998         case 3:
1999                 ll_tree_unlock(&second_tree);
2000                 obd_cancel_unused(ll_i2obdexp(second),
2001                                   ll_i2info(second)->lli_smd, 0, NULL);
2002         case 2:
2003                 ll_tree_unlock(&first_tree);
2004                 obd_cancel_unused(ll_i2obdexp(first),
2005                                   ll_i2info(first)->lli_smd, 0, NULL);
2006         case 1:
2007                 filp_close(tail_filp, 0);
2008                 if (tail)
2009                         iput(tail);
2010                 if (head && rc == 0) {
2011                         obd_free_memmd(ll_i2sbi(head)->ll_osc_exp,
2012                                        &hlli->lli_smd);
2013                         hlli->lli_smd = NULL;
2014                 }
2015         case 0:
2016                 break;
2017         default:
2018                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2019                 LBUG();
2020         }
2021         RETURN(rc);
2022 }
2023
2024 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2025 {
2026         struct inode *inode = dentry->d_inode;
2027         struct obd_client_handle *och;
2028         int rc;
2029         ENTRY;
2030
2031         LASSERT(inode);
2032
2033         /* Root ? Do nothing. */
2034         if (dentry->d_inode->i_sb->s_root == dentry)
2035                 RETURN(0);
2036
2037         /* No open handle to close? Move away */
2038         if (!it_disposition(it, DISP_OPEN_OPEN))
2039                 RETURN(0);
2040
2041         OBD_ALLOC(och, sizeof(*och));
2042         if (!och)
2043                 GOTO(out, rc = -ENOMEM);
2044
2045         ll_och_fill(ll_i2info(inode), it, och);
2046
2047         rc = ll_close_inode_openhandle(inode, och);
2048
2049         OBD_FREE(och, sizeof(*och));
2050  out:
2051         /* this one is in place of ll_file_open */
2052         ptlrpc_req_finished(it->d.lustre.it_data);
2053         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2054         RETURN(rc);
2055 }
2056
2057 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2058                   unsigned long arg)
2059 {
2060         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2061         int flags;
2062         ENTRY;
2063
2064         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2065                inode->i_generation, inode, cmd);
2066         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_IOCTL);
2067
2068         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2069         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2070                 RETURN(-ENOTTY);
2071
2072         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_IOCTL);
2073         switch(cmd) {
2074         case LL_IOC_GETFLAGS:
2075                 /* Get the current value of the file flags */
2076                 return put_user(fd->fd_flags, (int *)arg);
2077         case LL_IOC_SETFLAGS:
2078         case LL_IOC_CLRFLAGS:
2079                 /* Set or clear specific file flags */
2080                 /* XXX This probably needs checks to ensure the flags are
2081                  *     not abused, and to handle any flag side effects.
2082                  */
2083                 if (get_user(flags, (int *) arg))
2084                         RETURN(-EFAULT);
2085
2086                 if (cmd == LL_IOC_SETFLAGS) {
2087                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2088                             !(file->f_flags & O_DIRECT)) {
2089                                 CERROR("%s: unable to disable locking on "
2090                                        "non-O_DIRECT file\n", current->comm);
2091                                 RETURN(-EINVAL);
2092                         }
2093
2094                         fd->fd_flags |= flags;
2095                 } else {
2096                         fd->fd_flags &= ~flags;
2097                 }
2098                 RETURN(0);
2099         case LL_IOC_LOV_SETSTRIPE:
2100                 RETURN(ll_lov_setstripe(inode, file, arg));
2101         case LL_IOC_LOV_SETEA:
2102                 RETURN(ll_lov_setea(inode, file, arg));
2103         case LL_IOC_LOV_GETSTRIPE:
2104                 RETURN(ll_lov_getstripe(inode, arg));
2105         case LL_IOC_RECREATE_OBJ:
2106                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2107         case EXT3_IOC_GETFLAGS:
2108         case EXT3_IOC_SETFLAGS:
2109                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2110         case EXT3_IOC_GETVERSION_OLD:
2111         case EXT3_IOC_GETVERSION:
2112                 RETURN(put_user(inode->i_generation, (int *)arg));
2113         case LL_IOC_JOIN: {
2114                 char *ftail;
2115                 int rc;
2116
2117                 ftail = getname((const char *)arg);
2118                 if (IS_ERR(ftail))
2119                         RETURN(PTR_ERR(ftail));
2120                 rc = ll_file_join(inode, file, ftail);
2121                 putname(ftail);
2122                 RETURN(rc);
2123         }
2124         case LL_IOC_GROUP_LOCK:
2125                 RETURN(ll_get_grouplock(inode, file, arg));
2126         case LL_IOC_GROUP_UNLOCK:
2127                 RETURN(ll_put_grouplock(inode, file, arg));
2128         case IOC_OBD_STATFS:
2129                 RETURN(ll_obd_statfs(inode, (void *)arg));
2130
2131         /* We need to special case any other ioctls we want to handle,
2132          * to send them to the MDS/OST as appropriate and to properly
2133          * network encode the arg field.
2134         case EXT3_IOC_SETVERSION_OLD:
2135         case EXT3_IOC_SETVERSION:
2136         */
2137         default:
2138                 RETURN(obd_iocontrol(cmd, ll_i2obdexp(inode), 0, NULL,
2139                                      (void *)arg));
2140         }
2141 }
2142
2143 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2144 {
2145         struct inode *inode = file->f_dentry->d_inode;
2146         struct ll_inode_info *lli = ll_i2info(inode);
2147         struct lov_stripe_md *lsm = lli->lli_smd;
2148         loff_t retval;
2149         ENTRY;
2150         retval = offset + ((origin == 2) ? inode->i_size :
2151                            (origin == 1) ? file->f_pos : 0);
2152         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2153                inode->i_ino, inode->i_generation, inode, retval, retval,
2154                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2155         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_SEEK);
2156         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
2157         
2158         if (origin == 2) { /* SEEK_END */
2159                 int nonblock = 0, rc;
2160
2161                 if (file->f_flags & O_NONBLOCK)
2162                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2163
2164                 if (lsm != NULL) {
2165                         rc = ll_glimpse_size(inode, nonblock);
2166                         if (rc != 0)
2167                                 RETURN(rc);
2168                 }
2169
2170                 ll_inode_size_lock(inode, 0);
2171                 offset += inode->i_size;
2172                 ll_inode_size_unlock(inode, 0);
2173         } else if (origin == 1) { /* SEEK_CUR */
2174                 offset += file->f_pos;
2175         }
2176
2177         retval = -EINVAL;
2178         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2179                 if (offset != file->f_pos) {
2180                         file->f_pos = offset;
2181 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2182                         file->f_reada = 0;
2183                         file->f_version = ++event;
2184 #endif
2185                 }
2186                 retval = offset;
2187         }
2188
2189         RETURN(retval);
2190 }
2191
2192 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2193 {
2194         struct inode *inode = dentry->d_inode;
2195         struct ll_inode_info *lli = ll_i2info(inode);
2196         struct lov_stripe_md *lsm = lli->lli_smd;
2197         struct ll_fid fid;
2198         struct ptlrpc_request *req;
2199         int rc, err;
2200         ENTRY;
2201         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2202                inode->i_generation, inode);
2203         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_FSYNC);
2204         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_FSYNC);
2205
2206         /* fsync's caller has already called _fdata{sync,write}, we want
2207          * that IO to finish before calling the osc and mdc sync methods */
2208         rc = filemap_fdatawait(inode->i_mapping);
2209
2210         /* catch async errors that were recorded back when async writeback
2211          * failed for pages in this mapping. */
2212         err = lli->lli_async_rc;
2213         lli->lli_async_rc = 0;
2214         if (rc == 0)
2215                 rc = err;
2216         if (lsm) {
2217                 err = lov_test_and_clear_async_rc(lsm);
2218                 if (rc == 0)
2219                         rc = err;
2220         }
2221
2222         ll_inode2fid(&fid, inode);
2223         err = mdc_sync(ll_i2sbi(inode)->ll_mdc_exp, &fid, &req);
2224         if (!rc)
2225                 rc = err;
2226         if (!err)
2227                 ptlrpc_req_finished(req);
2228
2229         if (data && lsm) {
2230                 struct obdo *oa = obdo_alloc();
2231
2232                 if (!oa)
2233                         RETURN(rc ? rc : -ENOMEM);
2234
2235                 oa->o_id = lsm->lsm_object_id;
2236                 oa->o_valid = OBD_MD_FLID;
2237                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2238                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2239
2240                 err = obd_sync(ll_i2sbi(inode)->ll_osc_exp, oa, lsm,
2241                                0, OBD_OBJECT_EOF);
2242                 if (!rc)
2243                         rc = err;
2244                 obdo_free(oa);
2245         }
2246
2247         RETURN(rc);
2248 }
2249
2250 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2251 {
2252         struct inode *inode = file->f_dentry->d_inode;
2253         struct ll_sb_info *sbi = ll_i2sbi(inode);
2254         struct ldlm_res_id res_id =
2255                     { .name = {inode->i_ino, inode->i_generation, LDLM_FLOCK} };
2256         struct lustre_handle lockh = {0};
2257         ldlm_policy_data_t flock;
2258         ldlm_mode_t mode = 0;
2259         int flags = 0;
2260         int rc;
2261         ENTRY;
2262
2263         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2264                inode->i_ino, file_lock);
2265         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_FLOCK);
2266
2267         if (file_lock->fl_flags & FL_FLOCK) {
2268                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2269                 /* set missing params for flock() calls */
2270                 file_lock->fl_end = OFFSET_MAX;
2271                 file_lock->fl_pid = current->tgid;
2272         }
2273         flock.l_flock.pid = file_lock->fl_pid;
2274         flock.l_flock.start = file_lock->fl_start;
2275         flock.l_flock.end = file_lock->fl_end;
2276
2277         switch (file_lock->fl_type) {
2278         case F_RDLCK:
2279                 mode = LCK_PR;
2280                 break;
2281         case F_UNLCK:
2282                 /* An unlock request may or may not have any relation to
2283                  * existing locks so we may not be able to pass a lock handle
2284                  * via a normal ldlm_lock_cancel() request. The request may even
2285                  * unlock a byte range in the middle of an existing lock. In
2286                  * order to process an unlock request we need all of the same
2287                  * information that is given with a normal read or write record
2288                  * lock request. To avoid creating another ldlm unlock (cancel)
2289                  * message we'll treat a LCK_NL flock request as an unlock. */
2290                 mode = LCK_NL;
2291                 break;
2292         case F_WRLCK:
2293                 mode = LCK_PW;
2294                 break;
2295         default:
2296                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2297                 LBUG();
2298         }
2299
2300         switch (cmd) {
2301         case F_SETLKW:
2302 #ifdef F_SETLKW64
2303         case F_SETLKW64:
2304 #endif
2305                 flags = 0;
2306                 break;
2307         case F_SETLK:
2308 #ifdef F_SETLK64
2309         case F_SETLK64:
2310 #endif
2311                 flags = LDLM_FL_BLOCK_NOWAIT;
2312                 break;
2313         case F_GETLK:
2314 #ifdef F_GETLK64
2315         case F_GETLK64:
2316 #endif
2317                 flags = LDLM_FL_TEST_LOCK;
2318                 /* Save the old mode so that if the mode in the lock changes we
2319                  * can decrement the appropriate reader or writer refcount. */
2320                 file_lock->fl_type = mode;
2321                 break;
2322         default:
2323                 CERROR("unknown fcntl lock command: %d\n", cmd);
2324                 LBUG();
2325         }
2326
2327         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2328                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2329                flags, mode, flock.l_flock.start, flock.l_flock.end);
2330
2331         rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, res_id,
2332                               LDLM_FLOCK, &flock, mode, &flags, NULL,
2333                               ldlm_flock_completion_ast, NULL, file_lock,
2334                               NULL, 0, NULL, &lockh, 0);
2335         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2336                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2337 #ifdef HAVE_F_OP_FLOCK
2338         if ((file_lock->fl_flags & FL_POSIX) &&(rc == 0))
2339                 posix_lock_file_wait(file, file_lock);
2340 #endif
2341
2342         RETURN(rc);
2343 }
2344
2345 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2346 {
2347         ENTRY;
2348
2349         RETURN(-ENOSYS);
2350 }
2351
2352 int ll_have_md_lock(struct inode *inode, __u64 bits)
2353 {
2354         struct lustre_handle lockh;
2355         struct ldlm_res_id res_id = { .name = {0} };
2356         struct obd_device *obddev;
2357         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2358         int flags;
2359         ENTRY;
2360
2361         if (!inode)
2362                RETURN(0);
2363
2364         obddev = ll_i2mdcexp(inode)->exp_obd;
2365         res_id.name[0] = inode->i_ino;
2366         res_id.name[1] = inode->i_generation;
2367
2368         CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
2369
2370         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2371         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
2372                             &policy, LCK_CR|LCK_CW|LCK_PR, &lockh)) {
2373                 RETURN(1);
2374         }
2375
2376         RETURN(0);
2377 }
2378
2379 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2380         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2381                               * and return success */
2382                 inode->i_nlink = 0;
2383                 /* This path cannot be hit for regular files unless in
2384                  * case of obscure races, so no need to to validate
2385                  * size. */
2386                 if (!S_ISREG(inode->i_mode) &&
2387                     !S_ISDIR(inode->i_mode))
2388                         return 0;
2389         }
2390
2391         if (rc) {
2392                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2393                 return -abs(rc);
2394
2395         }
2396
2397         return 0;
2398 }
2399
2400 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2401 {
2402         struct inode *inode = dentry->d_inode;
2403         struct ptlrpc_request *req = NULL;
2404         struct obd_export *exp;
2405         int rc;
2406         ENTRY;
2407
2408         if (!inode) {
2409                 CERROR("REPORT THIS LINE TO PETER\n");
2410                 RETURN(0);
2411         }
2412         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2413                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2414 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
2415         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE);
2416 #endif
2417
2418         exp = ll_i2mdcexp(inode);
2419
2420         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2421                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2422                 struct mdc_op_data op_data;
2423
2424                 /* Call getattr by fid, so do not provide name at all. */
2425                 ll_prepare_mdc_op_data(&op_data, dentry->d_parent->d_inode,
2426                                        dentry->d_inode, NULL, 0, 0);
2427                 rc = mdc_intent_lock(exp, &op_data, NULL, 0,
2428                                      /* we are not interested in name
2429                                         based lookup */
2430                                      &oit, 0, &req,
2431                                      ll_mdc_blocking_ast, 0);
2432                 if (rc < 0) {
2433                         rc = ll_inode_revalidate_fini(inode, rc);
2434                         GOTO (out, rc);
2435                 }
2436                 
2437                 rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2438                 if (rc != 0) {
2439                         ll_intent_release(&oit);
2440                         GOTO(out, rc);
2441                 }
2442
2443                 /* Unlinked? Unhash dentry, so it is not picked up later by
2444                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2445                    here to preserve get_cwd functionality on 2.6.
2446                    Bug 10503 */
2447                 if (!dentry->d_inode->i_nlink) {
2448                         spin_lock(&dcache_lock);
2449                         ll_drop_dentry(dentry);
2450                         spin_unlock(&dcache_lock);
2451                 }
2452
2453                 ll_lookup_finish_locks(&oit, dentry);
2454         } else if (!ll_have_md_lock(dentry->d_inode,
2455                                   MDS_INODELOCK_UPDATE|MDS_INODELOCK_LOOKUP)) {
2456                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2457                 struct ll_fid fid;
2458                 obd_valid valid = OBD_MD_FLGETATTR;
2459                 int ealen = 0;
2460
2461                 if (S_ISREG(inode->i_mode)) {
2462                         rc = ll_get_max_mdsize(sbi, &ealen);
2463                         if (rc) 
2464                                 RETURN(rc); 
2465                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2466                 }
2467                 ll_inode2fid(&fid, inode);
2468                 rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req);
2469                 if (rc) {
2470                         rc = ll_inode_revalidate_fini(inode, rc);
2471                         RETURN(rc);
2472                 }
2473
2474                 rc = ll_prep_inode(sbi->ll_osc_exp, &inode, req, REPLY_REC_OFF,
2475                                    NULL);
2476                 if (rc)
2477                         GOTO(out, rc);
2478         }
2479
2480         /* if object not yet allocated, don't validate size */
2481         if (ll_i2info(inode)->lli_smd == NULL) 
2482                 GOTO(out, rc = 0);
2483
2484         /* ll_glimpse_size will prefer locally cached writes if they extend
2485          * the file */
2486         rc = ll_glimpse_size(inode, 0);
2487
2488 out:
2489         ptlrpc_req_finished(req);
2490         RETURN(rc);
2491 }
2492
2493 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2494 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2495                   struct lookup_intent *it, struct kstat *stat)
2496 {
2497         struct inode *inode = de->d_inode;
2498         int res = 0;
2499
2500         res = ll_inode_revalidate_it(de, it);
2501         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
2502
2503         if (res)
2504                 return res;
2505
2506         stat->dev = inode->i_sb->s_dev;
2507         stat->ino = inode->i_ino;
2508         stat->mode = inode->i_mode;
2509         stat->nlink = inode->i_nlink;
2510         stat->uid = inode->i_uid;
2511         stat->gid = inode->i_gid;
2512         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2513         stat->atime = inode->i_atime;
2514         stat->mtime = inode->i_mtime;
2515         stat->ctime = inode->i_ctime;
2516 #ifdef HAVE_INODE_BLKSIZE
2517         stat->blksize = inode->i_blksize;
2518 #else
2519         stat->blksize = 1<<inode->i_blkbits;
2520 #endif
2521
2522         ll_inode_size_lock(inode, 0);
2523         stat->size = inode->i_size;
2524         stat->blocks = inode->i_blocks;
2525         ll_inode_size_unlock(inode, 0);
2526
2527         return 0;
2528 }
2529 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2530 {
2531         struct lookup_intent it = { .it_op = IT_GETATTR };
2532
2533         ll_vfs_ops_tally(ll_i2sbi(de->d_inode), VFS_OPS_GETATTR);
2534         return ll_getattr_it(mnt, de, &it, stat);
2535 }
2536 #endif
2537
2538 static
2539 int lustre_check_acl(struct inode *inode, int mask)
2540 {
2541 #ifdef CONFIG_FS_POSIX_ACL
2542         struct ll_inode_info *lli = ll_i2info(inode);
2543         struct posix_acl *acl;
2544         int rc;
2545         ENTRY;
2546
2547         spin_lock(&lli->lli_lock);
2548         acl = posix_acl_dup(lli->lli_posix_acl);
2549         spin_unlock(&lli->lli_lock);
2550
2551         if (!acl)
2552                 RETURN(-EAGAIN);
2553
2554         rc = posix_acl_permission(inode, acl, mask);
2555         posix_acl_release(acl);
2556
2557         RETURN(rc);
2558 #else
2559         return -EAGAIN;
2560 #endif
2561 }
2562
2563 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2564 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2565 {
2566         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2567                inode->i_ino, inode->i_generation, inode, mask);
2568
2569         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_INODE_PERMISSION);
2570         return generic_permission(inode, mask, lustre_check_acl);
2571 }
2572 #else
2573 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
2574 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2575 #else
2576 int ll_inode_permission(struct inode *inode, int mask)
2577 #endif
2578 {
2579         int mode = inode->i_mode;
2580         int rc;
2581
2582         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2583                inode->i_ino, inode->i_generation, inode, mask);
2584         ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_INODE_PERMISSION);
2585
2586         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2587             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2588                 return -EROFS;
2589         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2590                 return -EACCES;
2591         if (current->fsuid == inode->i_uid) {
2592                 mode >>= 6;
2593         } else if (1) {
2594                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2595                         goto check_groups;
2596                 rc = lustre_check_acl(inode, mask);
2597                 if (rc == -EAGAIN)
2598                         goto check_groups;
2599                 if (rc == -EACCES)
2600                         goto check_capabilities;
2601                 return rc;
2602         } else {
2603 check_groups:
2604                 if (in_group_p(inode->i_gid))
2605                         mode >>= 3;
2606         }
2607         if ((mode & mask & S_IRWXO) == mask)
2608                 return 0;
2609
2610 check_capabilities:
2611         if (!(mask & MAY_EXEC) ||
2612             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2613                 if (capable(CAP_DAC_OVERRIDE))
2614                         return 0;
2615
2616         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2617             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2618                 return 0;
2619
2620         return -EACCES;
2621 }
2622 #endif
2623
2624 struct file_operations ll_file_operations = {
2625         .read           = ll_file_read,
2626         .write          = ll_file_write,
2627         .ioctl          = ll_file_ioctl,
2628         .open           = ll_file_open,
2629         .release        = ll_file_release,
2630         .mmap           = ll_file_mmap,
2631         .llseek         = ll_file_seek,
2632 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2633         .sendfile       = ll_file_sendfile,
2634 #endif
2635         .fsync          = ll_fsync,
2636 #ifdef HAVE_F_OP_FLOCK
2637         .flock          = ll_file_noflock,
2638 #endif
2639         .lock           = ll_file_noflock
2640 };
2641
2642 struct file_operations ll_file_operations_flock = {
2643         .read           = ll_file_read,
2644         .write          = ll_file_write,
2645         .ioctl          = ll_file_ioctl,
2646         .open           = ll_file_open,
2647         .release        = ll_file_release,
2648         .mmap           = ll_file_mmap,
2649         .llseek         = ll_file_seek,
2650 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2651         .sendfile       = ll_file_sendfile,
2652 #endif
2653         .fsync          = ll_fsync,
2654 #ifdef HAVE_F_OP_FLOCK
2655         .flock          = ll_file_flock,
2656 #endif
2657         .lock           = ll_file_flock
2658 };
2659
2660
2661 struct inode_operations ll_file_inode_operations = {
2662 #ifdef LUSTRE_KERNEL_VERSION
2663         .setattr_raw    = ll_setattr_raw,
2664 #endif
2665         .setattr        = ll_setattr,
2666         .truncate       = ll_truncate,
2667 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2668         .getattr        = ll_getattr,
2669 #else
2670         .revalidate_it  = ll_inode_revalidate_it,
2671 #endif
2672         .permission     = ll_inode_permission,
2673         .setxattr       = ll_setxattr,
2674         .getxattr       = ll_getxattr,
2675         .listxattr      = ll_listxattr,
2676         .removexattr    = ll_removexattr,
2677 };
2678