Whamcloud - gitweb
b=5654
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <linux/lustre_dlm.h>
27 #include <linux/lustre_lite.h>
28 #include <linux/pagemap.h>
29 #include <linux/file.h>
30 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
31 #include <linux/lustre_compat25.h>
32 #endif
33 #include "llite_internal.h"
34
35 int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
36                         struct file *file)
37 {
38         struct ll_file_data *fd = file->private_data;
39         struct ptlrpc_request *req = NULL;
40         struct obd_client_handle *och = &fd->fd_mds_och;
41         struct obdo obdo;
42         int rc;
43         ENTRY;
44
45         obdo.o_id = inode->i_ino;
46         obdo.o_valid = OBD_MD_FLID;
47         obdo_from_inode(&obdo, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
48                                       OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
49                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
50                                       OBD_MD_FLCTIME);
51         if (0 /* ll_is_inode_dirty(inode) */) {
52                 obdo.o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
53                 obdo.o_valid |= OBD_MD_FLFLAGS;
54         }
55         rc = mdc_close(mdc_exp, &obdo, och, &req);
56         if (rc == EAGAIN) {
57                 /* We are the last writer, so the MDS has instructed us to get
58                  * the file size and any write cookies, then close again. */
59                 //ll_queue_done_writing(inode);
60                 rc = 0;
61         } else if (rc) {
62                 CERROR("inode %lu mdc close failed: rc = %d\n",
63                        inode->i_ino, rc);
64         }
65         if (rc == 0) {
66                 rc = ll_objects_destroy(req, file->f_dentry->d_inode);
67                 if (rc)
68                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
69                                inode->i_ino, rc);
70         }
71
72         mdc_clear_open_replay_data(och);
73         ptlrpc_req_finished(req);
74         och->och_fh.cookie = DEAD_HANDLE_MAGIC;
75         file->private_data = NULL;
76         OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof *fd);
77
78         RETURN(rc);
79 }
80
81 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
82
83 /* While this returns an error code, fput() the caller does not, so we need
84  * to make every effort to clean up all of our state here.  Also, applications
85  * rarely check close errors and even if an error is returned they will not
86  * re-try the close call.
87  */
88 int ll_file_release(struct inode *inode, struct file *file)
89 {
90         struct ll_file_data *fd;
91         struct ll_sb_info *sbi = ll_i2sbi(inode);
92         struct ll_inode_info *lli = ll_i2info(inode);
93         struct lov_stripe_md *lsm = lli->lli_smd;
94         int rc;
95
96         ENTRY;
97         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
98                inode->i_generation, inode);
99
100         /* don't do anything for / */
101         if (inode->i_sb->s_root == file->f_dentry)
102                 RETURN(0);
103
104         lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_RELEASE);
105         fd = (struct ll_file_data *)file->private_data;
106         LASSERT(fd != NULL);
107
108         if (lsm)
109                 lov_test_and_clear_async_rc(lsm);
110         lli->lli_async_rc = 0;
111
112         rc = ll_mdc_close(sbi->ll_mdc_exp, inode, file);
113         RETURN(rc);
114 }
115
116 static int ll_intent_file_open(struct file *file, void *lmm,
117                                int lmmsize, struct lookup_intent *itp)
118 {
119         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
120         struct lustre_handle lockh;
121         struct mdc_op_data data;
122         struct dentry *parent = file->f_dentry->d_parent;
123         const char *name = file->f_dentry->d_name.name;
124         const int len = file->f_dentry->d_name.len;
125         int rc;
126
127         if (!parent)
128                 RETURN(-ENOENT);
129
130         ll_prepare_mdc_op_data(&data, parent->d_inode, NULL, name, len, O_RDWR);
131
132         rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, itp, LCK_PW, &data,
133                          &lockh, lmm, lmmsize, ldlm_completion_ast,
134                          ll_mdc_blocking_ast, NULL);
135         if (rc < 0)
136                 CERROR("lock enqueue: err: %d\n", rc);
137         RETURN(rc);
138 }
139
140 int ll_local_open(struct file *file, struct lookup_intent *it)
141 {
142         struct ptlrpc_request *req = it->d.lustre.it_data;
143         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
144         struct ll_file_data *fd;
145         struct mds_body *body;
146         ENTRY;
147
148         body = lustre_msg_buf (req->rq_repmsg, 1, sizeof (*body));
149         LASSERT (body != NULL);                 /* reply already checked out */
150         LASSERT_REPSWABBED (req, 1);            /* and swabbed down */
151
152         LASSERT(!file->private_data);
153
154         OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
155         /* We can't handle this well without reorganizing ll_file_open and
156          * ll_mdc_close, so don't even try right now. */
157         LASSERT(fd != NULL);
158
159         memcpy(&fd->fd_mds_och.och_fh, &body->handle, sizeof(body->handle));
160         fd->fd_mds_och.och_magic = OBD_CLIENT_HANDLE_MAGIC;
161         file->private_data = fd;
162         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
163
164         lli->lli_io_epoch = body->io_epoch;
165
166         mdc_set_open_replay_data(&fd->fd_mds_och, it->d.lustre.it_data);
167
168         RETURN(0);
169 }
170
171 /* Open a file, and (for the very first open) create objects on the OSTs at
172  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
173  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
174  * lli_open_sem to ensure no other process will create objects, send the
175  * stripe MD to the MDS, or try to destroy the objects if that fails.
176  *
177  * If we already have the stripe MD locally then we don't request it in
178  * mdc_open(), by passing a lmm_size = 0.
179  *
180  * It is up to the application to ensure no other processes open this file
181  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
182  * used.  We might be able to avoid races of that sort by getting lli_open_sem
183  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
184  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
185  */
186 int ll_file_open(struct inode *inode, struct file *file)
187 {
188         struct ll_inode_info *lli = ll_i2info(inode);
189         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
190                                           .it_flags = file->f_flags };
191         struct lov_stripe_md *lsm;
192         struct ptlrpc_request *req;
193         int rc = 0;
194         ENTRY;
195
196         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
197                inode->i_generation, inode, file->f_flags);
198
199         /* don't do anything for / */
200         if (inode->i_sb->s_root == file->f_dentry)
201                 RETURN(0);
202
203         it = file->f_it;
204
205         if (!it || !it->d.lustre.it_disposition) {
206                 it = &oit;
207                 rc = ll_intent_file_open(file, NULL, 0, it);
208                 if (rc)
209                         GOTO(out, rc);
210         }
211
212         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
213         rc = it_open_error(DISP_OPEN_OPEN, it);
214         /* mdc_intent_lock() didn't get a request ref if there was an open
215          * error, so don't do cleanup on the request here (bug 3430) */
216         if (rc)
217                 RETURN(rc);
218
219         rc = ll_local_open(file, it);
220         LASSERTF(rc == 0, "rc = %d\n", rc);
221
222         if (!S_ISREG(inode->i_mode))
223                 GOTO(out, rc);
224
225         lsm = lli->lli_smd;
226         if (lsm == NULL) {
227                 if (file->f_flags & O_LOV_DELAY_CREATE ||
228                     !(file->f_mode & FMODE_WRITE)) {
229                         CDEBUG(D_INODE, "object creation was delayed\n");
230                         GOTO(out, rc);
231                 }
232         }
233         file->f_flags &= ~O_LOV_DELAY_CREATE;
234         GOTO(out, rc);
235  out:
236         req = it->d.lustre.it_data;
237         ptlrpc_req_finished(req);
238         if (rc == 0)
239                 ll_open_complete(inode);
240         return rc;
241 }
242
243 /* Fills the obdo with the attributes for the inode defined by lsm */
244 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
245                    struct obdo *oa)
246 {
247         struct ptlrpc_request_set *set;
248         int rc;
249         ENTRY;
250
251         LASSERT(lsm != NULL);
252
253         memset(oa, 0, sizeof *oa);
254         oa->o_id = lsm->lsm_object_id;
255         oa->o_mode = S_IFREG;
256         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
257                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
258                 OBD_MD_FLCTIME;
259
260         set = ptlrpc_prep_set();
261         if (set == NULL) {
262                 rc = -ENOMEM;
263         } else {
264                 rc = obd_getattr_async(exp, oa, lsm, set);
265                 if (rc == 0)
266                         rc = ptlrpc_set_wait(set);
267                 ptlrpc_set_destroy(set);
268         }
269         if (rc)
270                 RETURN(rc);
271
272         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
273                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
274         RETURN(0);
275 }
276
277 static inline void ll_remove_suid(struct inode *inode)
278 {
279         unsigned int mode;
280
281         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
282         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
283
284         /* was any of the uid bits set? */
285         mode &= inode->i_mode;
286         if (mode && !capable(CAP_FSETID)) {
287                 inode->i_mode &= ~mode;
288                 // XXX careful here - we cannot change the size
289         }
290 }
291
292 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
293 {
294         struct ll_inode_info *lli = ll_i2info(inode);
295         struct lov_stripe_md *lsm = lli->lli_smd;
296         struct obd_export *exp = ll_i2obdexp(inode);
297         struct {
298                 char name[16];
299                 struct ldlm_lock *lock;
300                 struct lov_stripe_md *lsm;
301         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
302         __u32 stripe, vallen = sizeof(stripe);
303         int rc;
304         ENTRY;
305
306         if (lsm->lsm_stripe_count == 1)
307                 GOTO(check, stripe = 0);
308
309         /* get our offset in the lov */
310         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
311         if (rc != 0) {
312                 CERROR("obd_get_info: rc = %d\n", rc);
313                 RETURN(rc);
314         }
315         LASSERT(stripe < lsm->lsm_stripe_count);
316
317 check:
318         if (lsm->lsm_oinfo[stripe].loi_id != lock->l_resource->lr_name.name[0]||
319             lsm->lsm_oinfo[stripe].loi_gr != lock->l_resource->lr_name.name[1]){
320                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
321                            lsm->lsm_oinfo[stripe].loi_id,
322                            lsm->lsm_oinfo[stripe].loi_gr);
323                 RETURN(-ELDLM_NO_LOCK_DATA);
324         }
325
326         RETURN(stripe);
327 }
328
329 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
330  * we get a lock cancellation for each stripe, so we have to map the obd's
331  * region back onto the stripes in the file that it held.
332  *
333  * No one can dirty the extent until we've finished our work and they can
334  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
335  * but other kernel actors could have pages locked.
336  *
337  * Called with the DLM lock held. */
338 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
339                               struct ldlm_lock *lock, __u32 stripe)
340 {
341         ldlm_policy_data_t tmpex;
342         unsigned long start, end, count, skip, i, j;
343         struct page *page;
344         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
345         struct lustre_handle lockh;
346         ENTRY;
347
348         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
349         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
350                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
351                inode->i_size);
352
353         /* our locks are page granular thanks to osc_enqueue, we invalidate the
354          * whole page. */
355         LASSERT((tmpex.l_extent.start & ~PAGE_CACHE_MASK) == 0);
356         LASSERT(((tmpex.l_extent.end + 1) & ~PAGE_CACHE_MASK) == 0);
357
358         count = ~0;
359         skip = 0;
360         start = tmpex.l_extent.start >> PAGE_CACHE_SHIFT;
361         end = tmpex.l_extent.end >> PAGE_CACHE_SHIFT;
362         if (lsm->lsm_stripe_count > 1) {
363                 count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
364                 skip = (lsm->lsm_stripe_count - 1) * count;
365                 start += start/count * skip + stripe * count;
366                 if (end != ~0)
367                         end += end/count * skip + stripe * count;
368         }
369         if (end < tmpex.l_extent.end >> PAGE_CACHE_SHIFT)
370                 end = ~0;
371
372         i = inode->i_size ? (inode->i_size - 1) >> PAGE_CACHE_SHIFT : 0;
373         if (i < end)
374                 end = i;
375
376         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
377                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
378                count, skip, end, discard ? " (DISCARDING)" : "");
379
380         /* this is the simplistic implementation of page eviction at
381          * cancelation.  It is careful to get races with other page
382          * lockers handled correctly.  fixes from bug 20 will make it
383          * more efficient by associating locks with pages and with
384          * batching writeback under the lock explicitly. */
385         for (i = start, j = start % count; i <= end;
386              j++, i++, tmpex.l_extent.start += PAGE_CACHE_SIZE) {
387                 if (j == count) {
388                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
389                         i += skip;
390                         j = 0;
391                         if (i > end)
392                                 break;
393                 }
394                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
395                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
396                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
397                          start, i, end);
398
399                 if (!mapping_has_pages(inode->i_mapping)) {
400                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
401                         break;
402                 }
403
404                 cond_resched();
405
406                 page = find_get_page(inode->i_mapping, i);
407                 if (page == NULL)
408                         continue;
409                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
410                                i, tmpex.l_extent.start);
411                 lock_page(page);
412
413                 /* page->mapping to check with racing against teardown */
414                 if (!discard && clear_page_dirty_for_io(page)) {
415                         rc = ll_call_writepage(inode, page);
416                         if (rc != 0)
417                                 CERROR("writepage of page %p failed: %d\n",
418                                        page, rc);
419                         /* either waiting for io to complete or reacquiring
420                          * the lock that the failed writepage released */
421                         lock_page(page);
422                 }
423
424                 tmpex.l_extent.end = tmpex.l_extent.start + PAGE_CACHE_SIZE - 1;
425                 /* check to see if another DLM lock covers this page */
426                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
427                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
428                                       LDLM_FL_TEST_LOCK,
429                                       &lock->l_resource->lr_name, LDLM_EXTENT,
430                                       &tmpex, LCK_PR | LCK_PW, &lockh);
431                 if (rc2 == 0 && page->mapping != NULL) {
432                         // checking again to account for writeback's lock_page()
433                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
434                         ll_ra_accounting(page, inode->i_mapping);
435                         ll_truncate_complete_page(page);
436                 }
437                 unlock_page(page);
438                 page_cache_release(page);
439         }
440         LASSERTF(tmpex.l_extent.start <=
441                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
442                   lock->l_policy_data.l_extent.end + 1),
443                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
444                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
445                  start, i, end);
446         EXIT;
447 }
448
449 static int ll_extent_lock_callback(struct ldlm_lock *lock,
450                                    struct ldlm_lock_desc *new, void *data,
451                                    int flag)
452 {
453         struct lustre_handle lockh = { 0 };
454         int rc;
455         ENTRY;
456
457         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
458                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
459                 LBUG();
460         }
461
462         switch (flag) {
463         case LDLM_CB_BLOCKING:
464                 ldlm_lock2handle(lock, &lockh);
465                 rc = ldlm_cli_cancel(&lockh);
466                 if (rc != ELDLM_OK)
467                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
468                 break;
469         case LDLM_CB_CANCELING: {
470                 struct inode *inode;
471                 struct ll_inode_info *lli;
472                 struct lov_stripe_md *lsm;
473                 int stripe;
474                 __u64 kms;
475
476                 /* This lock wasn't granted, don't try to evict pages */
477                 if (lock->l_req_mode != lock->l_granted_mode)
478                         RETURN(0);
479
480                 inode = ll_inode_from_lock(lock);
481                 if (inode == NULL)
482                         RETURN(0);
483                 lli = ll_i2info(inode);
484                 if (lli == NULL)
485                         goto iput;
486                 if (lli->lli_smd == NULL)
487                         goto iput;
488                 lsm = lli->lli_smd;
489
490                 stripe = ll_lock_to_stripe_offset(inode, lock);
491                 if (stripe < 0)
492                         goto iput;
493                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
494
495                 /* grabbing the i_sem will wait for write() to complete.  ns
496                  * lock hold times should be very short as ast processing
497                  * requires them and has a short timeout.  so, i_sem before ns
498                  * lock.*/
499                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
500                 spin_lock(&lli->lli_lock);
501
502                 kms = ldlm_extent_shift_kms(lock,
503                                             lsm->lsm_oinfo[stripe].loi_kms);
504                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
505                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
506                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
507                 lsm->lsm_oinfo[stripe].loi_kms = kms;
508
509                 spin_unlock(&lli->lli_lock);
510                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
511                 //ll_try_done_writing(inode);
512         iput:
513                 iput(inode);
514                 break;
515         }
516         default:
517                 LBUG();
518         }
519
520         RETURN(0);
521 }
522
523 #if 0
524 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
525 {
526         /* XXX ALLOCATE - 160 bytes */
527         struct inode *inode = ll_inode_from_lock(lock);
528         struct ll_inode_info *lli = ll_i2info(inode);
529         struct lustre_handle lockh = { 0 };
530         struct ost_lvb *lvb;
531         int stripe;
532         ENTRY;
533
534         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
535                      LDLM_FL_BLOCK_CONV)) {
536                 LBUG(); /* not expecting any blocked async locks yet */
537                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
538                            "lock, returning");
539                 ldlm_lock_dump(D_OTHER, lock, 0);
540                 ldlm_reprocess_all(lock->l_resource);
541                 RETURN(0);
542         }
543
544         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
545
546         stripe = ll_lock_to_stripe_offset(inode, lock);
547         if (stripe < 0)
548                 goto iput;
549
550         if (lock->l_lvb_len) {
551                 struct lov_stripe_md *lsm = lli->lli_smd;
552                 __u64 kms;
553                 lvb = lock->l_lvb_data;
554                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
555
556                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
557                 spin_lock(&lli->lli_lock);
558
559                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
560                 kms = ldlm_extent_shift_kms(NULL, kms);
561                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
562                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
563                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
564                 lsm->lsm_oinfo[stripe].loi_kms = kms;
565
566                 spin_unlock(&lli->lli_lock);
567                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
568         }
569
570 iput:
571         iput(inode);
572         wake_up(&lock->l_waitq);
573
574         ldlm_lock2handle(lock, &lockh);
575         ldlm_lock_decref(&lockh, LCK_PR);
576         RETURN(0);
577 }
578 #endif
579
580 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
581 {
582         struct ptlrpc_request *req = reqp;
583         struct inode *inode = ll_inode_from_lock(lock);
584         struct ll_inode_info *lli;
585         struct lov_stripe_md *lsm;
586         struct ost_lvb *lvb;
587         int rc, size = sizeof(*lvb), stripe;
588         ENTRY;
589
590         if (inode == NULL)
591                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
592         lli = ll_i2info(inode);
593         if (lli == NULL)
594                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
595         lsm = lli->lli_smd;
596         if (lsm == NULL)
597                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
598
599         /* First, find out which stripe index this lock corresponds to. */
600         stripe = ll_lock_to_stripe_offset(inode, lock);
601         if (stripe < 0)
602                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
603
604         rc = lustre_pack_reply(req, 1, &size, NULL);
605         if (rc) {
606                 CERROR("lustre_pack_reply: %d\n", rc);
607                 GOTO(iput, rc);
608         }
609
610         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
611         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
612         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
613         lvb->lvb_atime = LTIME_S(inode->i_atime);
614         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
615
616         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
617                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
618                    inode->i_size, stripe, lvb->lvb_size, lvb->lvb_mtime,
619                    lvb->lvb_atime, lvb->lvb_ctime);
620         GOTO(iput, 0);
621  iput:
622         iput(inode);
623
624  out:
625         /* These errors are normal races, so we don't want to fill the console
626          * with messages by calling ptlrpc_error() */
627         if (rc == -ELDLM_NO_LOCK_DATA)
628                 lustre_pack_reply(req, 0, NULL, NULL);
629
630         req->rq_status = rc;
631         return rc;
632 }
633
634 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
635 __u64 lov_merge_blocks(struct lov_stripe_md *lsm);
636 __u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
637
638 /* NB: lov_merge_size will prefer locally cached writes if they extend the
639  * file (because it prefers KMS over RSS when larger) */
640 int ll_glimpse_size(struct inode *inode)
641 {
642         struct ll_inode_info *lli = ll_i2info(inode);
643         struct ll_sb_info *sbi = ll_i2sbi(inode);
644         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
645         struct lustre_handle lockh = { 0 };
646         int rc, flags = LDLM_FL_HAS_INTENT;
647         ENTRY;
648
649         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
650
651         rc = obd_enqueue(sbi->ll_osc_exp, lli->lli_smd, LDLM_EXTENT, &policy,
652                          LCK_PR, &flags, ll_extent_lock_callback,
653                          ldlm_completion_ast, ll_glimpse_callback, inode,
654                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, &lockh);
655         if (rc == -ENOENT)
656                 RETURN(rc);
657         if (rc != 0) {
658                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
659                 RETURN(rc > 0 ? -EIO : rc);
660         }
661
662         inode->i_size = lov_merge_size(lli->lli_smd, 0);
663         inode->i_blocks = lov_merge_blocks(lli->lli_smd);
664         LTIME_S(inode->i_mtime) =
665                 lov_merge_mtime(lli->lli_smd, LTIME_S(inode->i_mtime));
666
667         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
668                inode->i_size, inode->i_blocks);
669
670         obd_cancel(sbi->ll_osc_exp, lli->lli_smd, LCK_PR, &lockh);
671
672         RETURN(rc);
673 }
674
675 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
676                    struct lov_stripe_md *lsm, int mode,
677                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
678                    int ast_flags)
679 {
680         struct ll_sb_info *sbi = ll_i2sbi(inode);
681         int rc;
682         ENTRY;
683
684         LASSERT(lockh->cookie == 0);
685
686         /* XXX phil: can we do this?  won't it screw the file size up? */
687         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
688             (sbi->ll_flags & LL_SBI_NOLCK))
689                 RETURN(0);
690
691         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
692                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
693
694         rc = obd_enqueue(sbi->ll_osc_exp, lsm, LDLM_EXTENT, policy, mode,
695                          &ast_flags, ll_extent_lock_callback,
696                          ldlm_completion_ast, ll_glimpse_callback, inode,
697                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
698         if (rc > 0)
699                 rc = -EIO;
700
701         if (policy->l_extent.start == 0 &&
702             policy->l_extent.end == OBD_OBJECT_EOF) {
703                 /* vmtruncate()->ll_truncate() first sets the i_size and then
704                  * the kms under both a DLM lock and the i_sem.  If we don't
705                  * get the i_sem here we can match the DLM lock and reset
706                  * i_size from the kms before the truncating path has updated
707                  * the kms.  generic_file_write can then trust the stale i_size
708                  * when doing appending writes and effectively cancel the
709                  * result of the truncate.  Getting the i_sem after the enqueue
710                  * maintains the DLM -> i_sem acquiry order. */
711                 down(&inode->i_sem);
712                 inode->i_size = lov_merge_size(lsm, 1);
713                 up(&inode->i_sem);
714         }
715
716         if (rc == 0)
717                 LTIME_S(inode->i_mtime) =
718                         lov_merge_mtime(lsm, LTIME_S(inode->i_mtime));
719         RETURN(rc);
720 }
721
722 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
723                      struct lov_stripe_md *lsm, int mode,
724                      struct lustre_handle *lockh)
725 {
726         struct ll_sb_info *sbi = ll_i2sbi(inode);
727         int rc;
728         ENTRY;
729
730         /* XXX phil: can we do this?  won't it screw the file size up? */
731         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
732             (sbi->ll_flags & LL_SBI_NOLCK))
733                 RETURN(0);
734
735         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
736
737         RETURN(rc);
738 }
739
740 static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
741                             loff_t *ppos)
742 {
743         struct ll_file_data *fd = filp->private_data;
744         struct inode *inode = filp->f_dentry->d_inode;
745         struct ll_inode_info *lli = ll_i2info(inode);
746         struct lov_stripe_md *lsm = lli->lli_smd;
747         struct lustre_handle lockh = { 0 };
748         ldlm_policy_data_t policy;
749         int rc;
750         ssize_t retval;
751         __u64 kms;
752         ENTRY;
753         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
754                inode->i_ino, inode->i_generation, inode, count, *ppos);
755
756         /* "If nbyte is 0, read() will return 0 and have no other results."
757          *                      -- Single Unix Spec */
758         if (count == 0)
759                 RETURN(0);
760
761         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
762                             count);
763
764         if (!lsm)
765                 RETURN(0);
766
767         policy.l_extent.start = *ppos;
768         policy.l_extent.end = *ppos + count - 1;
769
770         rc = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh, 0);
771         if (rc != 0)
772                 RETURN(rc);
773
774         kms = lov_merge_size(lsm, 1);
775         if (*ppos + count - 1 > kms) {
776                 /* A glimpse is necessary to determine whether we return a short
777                  * read or some zeroes at the end of the buffer */
778                 retval = ll_glimpse_size(inode);
779                 if (retval)
780                         goto out;
781         } else {
782                 inode->i_size = kms;
783         }
784
785         CDEBUG(D_INFO, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
786                inode->i_ino, count, *ppos, inode->i_size);
787
788         /* turn off the kernel's read-ahead */
789 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
790         filp->f_ramax = 0;
791 #else
792         filp->f_ra.ra_pages = 0;
793 #endif
794         retval = generic_file_read(filp, buf, count, ppos);
795
796  out:
797         ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
798         RETURN(retval);
799 }
800
801 /*
802  * Write to a file (through the page cache).
803  */
804 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
805                              loff_t *ppos)
806 {
807         struct ll_file_data *fd = file->private_data;
808         struct inode *inode = file->f_dentry->d_inode;
809         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
810         struct lustre_handle lockh = { 0 };
811         ldlm_policy_data_t policy;
812         loff_t maxbytes = ll_file_maxbytes(inode);
813         ssize_t retval;
814         int rc;
815         ENTRY;
816         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
817                inode->i_ino, inode->i_generation, inode, count, *ppos);
818
819         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
820
821         /* POSIX, but surprised the VFS doesn't check this already */
822         if (count == 0)
823                 RETURN(0);
824
825         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
826          * called on the file, don't fail the below assertion (bug 2388). */
827         if (file->f_flags & O_LOV_DELAY_CREATE && lsm == NULL)
828                 RETURN(-EBADF);
829
830         LASSERT(lsm);
831
832         if (file->f_flags & O_APPEND) {
833                 policy.l_extent.start = 0;
834                 policy.l_extent.end = OBD_OBJECT_EOF;
835         } else  {
836                 policy.l_extent.start = *ppos;
837                 policy.l_extent.end = *ppos + count - 1;
838         }
839
840         rc = ll_extent_lock(fd, inode, lsm, LCK_PW, &policy, &lockh, 0);
841         if (rc != 0)
842                 RETURN(rc);
843
844         /* this is ok, g_f_w will overwrite this under i_sem if it races
845          * with a local truncate, it just makes our maxbyte checking easier */
846         if (file->f_flags & O_APPEND)
847                 *ppos = inode->i_size;
848
849         if (*ppos >= maxbytes) {
850                 if (count || *ppos > maxbytes) {
851                         send_sig(SIGXFSZ, current, 0);
852                         GOTO(out, retval = -EFBIG);
853                 }
854         }
855         if (*ppos + count > maxbytes)
856                 count = maxbytes - *ppos;
857
858         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
859                inode->i_ino, count, *ppos);
860
861         /* generic_file_write handles O_APPEND after getting i_sem */
862         retval = generic_file_write(file, buf, count, ppos);
863
864 out:
865         ll_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
866         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WRITE_BYTES,
867                             retval > 0 ? retval : 0);
868         RETURN(retval);
869 }
870
871 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
872                                unsigned long arg)
873 {
874         struct ll_inode_info *lli = ll_i2info(inode);
875         struct obd_export *exp = ll_i2obdexp(inode);
876         struct ll_recreate_obj ucreatp;
877         struct obd_trans_info oti = { 0 };
878         struct obdo *oa = NULL;
879         int lsm_size;
880         int rc = 0;
881         struct lov_stripe_md *lsm, *lsm2;
882         ENTRY;
883
884         if (!capable (CAP_SYS_ADMIN))
885                 RETURN(-EPERM);
886
887         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
888                             sizeof(struct ll_recreate_obj));
889         if (rc) {
890                 RETURN(-EFAULT);
891         }
892         oa = obdo_alloc();
893         if (oa == NULL)
894                 RETURN(-ENOMEM);
895
896         down(&lli->lli_open_sem);
897         lsm = lli->lli_smd;
898         if (lsm == NULL)
899                 GOTO(out, rc = -ENOENT);
900         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
901                    (lsm->lsm_stripe_count));
902
903         OBD_ALLOC(lsm2, lsm_size);
904         if (lsm2 == NULL)
905                 GOTO(out, rc = -ENOMEM);
906
907         oa->o_id = ucreatp.lrc_id;
908         oa->o_nlink = ucreatp.lrc_ost_idx;
909         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
910         oa->o_flags |= OBD_FL_RECREATE_OBJS;
911         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
912                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
913
914         oti.oti_objid = NULL;
915         memcpy(lsm2, lsm, lsm_size);
916         rc = obd_create(exp, oa, &lsm2, &oti);
917
918         OBD_FREE(lsm2, lsm_size);
919         GOTO(out, rc);
920 out:
921         up(&lli->lli_open_sem);
922         obdo_free(oa);
923         return rc;
924 }
925
926 static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
927                                     int flags, struct lov_user_md *lum,
928                                     int lum_size)
929 {
930         struct ll_inode_info *lli = ll_i2info(inode);
931         struct file *f;
932         struct obd_export *exp = ll_i2obdexp(inode);
933         struct lov_stripe_md *lsm;
934         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
935         struct ptlrpc_request *req = NULL;
936         int rc = 0;
937         struct lustre_md md;
938         ENTRY;
939
940         down(&lli->lli_open_sem);
941         lsm = lli->lli_smd;
942         if (lsm) {
943                 up(&lli->lli_open_sem);
944                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
945                        inode->i_ino);
946                 RETURN(-EEXIST);
947         }
948
949         f = get_empty_filp();
950         if (!f)
951                 GOTO(out, -ENOMEM);
952
953         f->f_dentry = file->f_dentry;
954         f->f_vfsmnt = file->f_vfsmnt;
955
956         rc = ll_intent_file_open(f, lum, lum_size, &oit);
957         if (rc)
958                 GOTO(out, rc);
959         if (it_disposition(&oit, DISP_LOOKUP_NEG))
960                 GOTO(out, -ENOENT);
961         req = oit.d.lustre.it_data;
962         rc = oit.d.lustre.it_status;
963
964         if (rc < 0)
965                 GOTO(out, rc);
966
967         rc = mdc_req2lustre_md(req, 1, exp, &md);
968         if (rc)
969                 GOTO(out, rc);
970         ll_update_inode(f->f_dentry->d_inode, md.body, md.lsm);
971
972         rc = ll_local_open(f, &oit);
973         if (rc)
974                 GOTO(out, rc);
975         ll_intent_release(&oit);
976
977         rc = ll_file_release(f->f_dentry->d_inode, f);
978
979  out:
980         if (f)
981                 put_filp(f);
982         up(&lli->lli_open_sem);
983         if (req != NULL)
984                 ptlrpc_req_finished(req);
985         RETURN(rc);
986 }
987
988 static int ll_lov_setea(struct inode *inode, struct file *file,
989                             unsigned long arg)
990 {
991         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
992         struct lov_user_md  *lump;
993         int lum_size = sizeof(struct lov_user_md) +
994                        sizeof(struct lov_user_ost_data);
995         int rc;
996         ENTRY;
997
998         if (!capable (CAP_SYS_ADMIN))
999                 RETURN(-EPERM);
1000
1001         OBD_ALLOC(lump, lum_size);
1002         if (lump == NULL) {
1003                 RETURN(-ENOMEM);
1004         }
1005         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1006         if (rc) {
1007                 OBD_FREE(lump, lum_size);
1008                 RETURN(-EFAULT);
1009         }
1010
1011         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1012
1013         OBD_FREE(lump, lum_size);
1014         RETURN(rc);
1015 }
1016
1017 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1018                             unsigned long arg)
1019 {
1020         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1021         int rc;
1022         int flags = FMODE_WRITE;
1023         ENTRY;
1024
1025         /* Bug 1152: copy properly when this is no longer true */
1026         LASSERT(sizeof(lum) == sizeof(*lump));
1027         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1028         rc = copy_from_user(&lum, lump, sizeof(lum));
1029         if (rc)
1030                 RETURN(-EFAULT);
1031
1032         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1033         RETURN(rc);
1034 }
1035
1036 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1037 {
1038         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1039
1040         if (!lsm)
1041                 RETURN(-ENODATA);
1042
1043         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode), 0, lsm,
1044                             (void *)arg);
1045 }
1046
1047 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
1048                   unsigned long arg)
1049 {
1050         struct ll_file_data *fd = file->private_data;
1051         int flags;
1052         ENTRY;
1053
1054         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1055                inode->i_generation, inode, cmd);
1056
1057         if (_IOC_TYPE(cmd) == 'T') /* tty ioctls */
1058                 RETURN(-ENOTTY);
1059
1060         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_IOCTL);
1061         switch(cmd) {
1062         case LL_IOC_GETFLAGS:
1063                 /* Get the current value of the file flags */
1064                 return put_user(fd->fd_flags, (int *)arg);
1065         case LL_IOC_SETFLAGS:
1066         case LL_IOC_CLRFLAGS:
1067                 /* Set or clear specific file flags */
1068                 /* XXX This probably needs checks to ensure the flags are
1069                  *     not abused, and to handle any flag side effects.
1070                  */
1071                 if (get_user(flags, (int *) arg))
1072                         RETURN(-EFAULT);
1073
1074                 if (cmd == LL_IOC_SETFLAGS)
1075                         fd->fd_flags |= flags;
1076                 else
1077                         fd->fd_flags &= ~flags;
1078                 RETURN(0);
1079         case LL_IOC_LOV_SETSTRIPE:
1080                 RETURN(ll_lov_setstripe(inode, file, arg));
1081         case LL_IOC_LOV_SETEA:
1082                 RETURN(ll_lov_setea(inode, file, arg));
1083         case LL_IOC_LOV_GETSTRIPE:
1084                 RETURN(ll_lov_getstripe(inode, arg));
1085         case LL_IOC_RECREATE_OBJ:
1086                 RETURN(ll_lov_recreate_obj(inode, file, arg));
1087         case EXT3_IOC_GETFLAGS:
1088         case EXT3_IOC_SETFLAGS:
1089                 RETURN( ll_iocontrol(inode, file, cmd, arg) );
1090         case EXT3_IOC_GETVERSION_OLD:
1091         case EXT3_IOC_GETVERSION:
1092                 return put_user(inode->i_generation, (int *) arg);
1093         /* We need to special case any other ioctls we want to handle,
1094          * to send them to the MDS/OST as appropriate and to properly
1095          * network encode the arg field.
1096         case EXT2_IOC_SETVERSION_OLD:
1097         case EXT2_IOC_SETVERSION_NEW:
1098         */
1099         default:
1100                 RETURN( obd_iocontrol(cmd, ll_i2obdexp(inode), 0, NULL,
1101                                       (void *)arg) );
1102         }
1103 }
1104
1105 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
1106 {
1107         struct inode *inode = file->f_dentry->d_inode;
1108         struct ll_file_data *fd = file->private_data;
1109         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1110         struct lustre_handle lockh = {0};
1111         loff_t retval;
1112         ENTRY;
1113         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu(%s)\n",
1114                inode->i_ino, inode->i_generation, inode,
1115                offset + ((origin == 2) ? inode->i_size :
1116                          (origin == 1) ? file->f_pos : 0),
1117                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR": "SEEK_SET");
1118
1119         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
1120         if (origin == 2) { /* SEEK_END */
1121                 ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF }};
1122                 int rc;
1123
1124                 rc = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh,0);
1125                 if (rc != 0)
1126                         RETURN(rc);
1127
1128                 offset += inode->i_size;
1129         } else if (origin == 1) { /* SEEK_CUR */
1130                 offset += file->f_pos;
1131         }
1132
1133         retval = -EINVAL;
1134         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
1135                 if (offset != file->f_pos) {
1136                         file->f_pos = offset;
1137 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1138                         file->f_reada = 0;
1139                         file->f_version = ++event;
1140 #endif
1141                 }
1142                 retval = offset;
1143         }
1144
1145         if (origin == 2)
1146                 ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
1147         RETURN(retval);
1148 }
1149
1150 int ll_fsync(struct file *file, struct dentry *dentry, int data)
1151 {
1152         struct inode *inode = dentry->d_inode;
1153         struct ll_inode_info *lli = ll_i2info(inode);
1154         struct lov_stripe_md *lsm = lli->lli_smd;
1155         struct ll_fid fid;
1156         struct ptlrpc_request *req;
1157         int rc, err;
1158         ENTRY;
1159         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
1160                inode->i_generation, inode);
1161
1162         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_FSYNC);
1163
1164         /* fsync's caller has already called _fdata{sync,write}, we want
1165          * that IO to finish before calling the osc and mdc sync methods */
1166         rc = filemap_fdatawait(inode->i_mapping);
1167
1168         /* catch async errors that were recorded back when async writeback
1169          * failed for pages in this mapping. */
1170         err = lli->lli_async_rc;
1171         lli->lli_async_rc = 0;
1172         if (rc == 0)
1173                 rc = err;
1174         if (lsm) {
1175                 err = lov_test_and_clear_async_rc(lsm);
1176                 if (rc == 0)
1177                         rc = err;
1178         }
1179
1180         ll_inode2fid(&fid, inode);
1181         err = mdc_sync(ll_i2sbi(inode)->ll_mdc_exp, &fid, &req);
1182         if (!rc)
1183                 rc = err;
1184         if (!err)
1185                 ptlrpc_req_finished(req);
1186
1187         if (data && lsm) {
1188                 struct obdo *oa = obdo_alloc();
1189
1190                 if (!oa)
1191                         RETURN(rc ? rc : -ENOMEM);
1192
1193                 oa->o_id = lsm->lsm_object_id;
1194                 oa->o_valid = OBD_MD_FLID;
1195                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1196                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1197
1198                 err = obd_sync(ll_i2sbi(inode)->ll_osc_exp, oa, lsm,
1199                                0, OBD_OBJECT_EOF);
1200                 if (!rc)
1201                         rc = err;
1202                 obdo_free(oa);
1203         }
1204
1205         RETURN(rc);
1206 }
1207
1208 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
1209 {
1210         struct inode *inode = file->f_dentry->d_inode;
1211         struct ll_sb_info *sbi = ll_i2sbi(inode);
1212         struct obd_device *obddev;
1213         struct ldlm_res_id res_id =
1214                     { .name = {inode->i_ino, inode->i_generation, LDLM_FLOCK} };
1215         struct lustre_handle lockh = {0};
1216         ldlm_policy_data_t flock;
1217         ldlm_mode_t mode = 0;
1218         int flags = 0;
1219         int rc;
1220         ENTRY;
1221
1222         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
1223                inode->i_ino, file_lock);
1224
1225         flock.l_flock.pid = file_lock->fl_pid;
1226         flock.l_flock.start = file_lock->fl_start;
1227         flock.l_flock.end = file_lock->fl_end;
1228
1229         switch (file_lock->fl_type) {
1230         case F_RDLCK:
1231                 mode = LCK_PR;
1232                 break;
1233         case F_UNLCK:
1234                 /* An unlock request may or may not have any relation to
1235                  * existing locks so we may not be able to pass a lock handle
1236                  * via a normal ldlm_lock_cancel() request. The request may even
1237                  * unlock a byte range in the middle of an existing lock. In
1238                  * order to process an unlock request we need all of the same
1239                  * information that is given with a normal read or write record
1240                  * lock request. To avoid creating another ldlm unlock (cancel)
1241                  * message we'll treat a LCK_NL flock request as an unlock. */
1242                 mode = LCK_NL;
1243                 break;
1244         case F_WRLCK:
1245                 mode = LCK_PW;
1246                 break;
1247         default:
1248                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
1249                 LBUG();
1250         }
1251
1252         switch (cmd) {
1253         case F_SETLKW:
1254 #ifdef F_SETLKW64
1255         case F_SETLKW64:
1256 #endif
1257                 flags = 0;
1258                 break;
1259         case F_SETLK:
1260 #ifdef F_SETLK64
1261         case F_SETLK64:
1262 #endif
1263                 flags = LDLM_FL_BLOCK_NOWAIT;
1264                 break;
1265         case F_GETLK:
1266 #ifdef F_GETLK64
1267         case F_GETLK64:
1268 #endif
1269                 flags = LDLM_FL_TEST_LOCK;
1270                 /* Save the old mode so that if the mode in the lock changes we
1271                  * can decrement the appropriate reader or writer refcount. */
1272                 file_lock->fl_type = mode;
1273                 break;
1274         default:
1275                 CERROR("unknown fcntl lock command: %d\n", cmd);
1276                 LBUG();
1277         }
1278
1279         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
1280                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
1281                flags, mode, flock.l_flock.start, flock.l_flock.end);
1282
1283         obddev = sbi->ll_mdc_exp->exp_obd;
1284         rc = ldlm_cli_enqueue(obddev->obd_self_export, NULL,
1285                               obddev->obd_namespace,
1286                               res_id, LDLM_FLOCK, &flock, mode, &flags,
1287                               NULL, ldlm_flock_completion_ast, NULL, file_lock,
1288                               NULL, 0, NULL, &lockh);
1289         RETURN(rc);
1290 }
1291
1292 static int ll_have_md_lock(struct dentry *de)
1293 {
1294         struct ll_sb_info *sbi = ll_s2sbi(de->d_sb);
1295         struct lustre_handle lockh;
1296         struct ldlm_res_id res_id = { .name = {0} };
1297         struct obd_device *obddev;
1298         int flags;
1299         ENTRY;
1300
1301         if (!de->d_inode)
1302                RETURN(0);
1303
1304         obddev = sbi->ll_mdc_exp->exp_obd;
1305         res_id.name[0] = de->d_inode->i_ino;
1306         res_id.name[1] = de->d_inode->i_generation;
1307
1308         CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
1309
1310         /* FIXME use LDLM_FL_TEST_LOCK instead */
1311         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
1312         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
1313                             NULL, LCK_PR, &lockh)) {
1314                 ldlm_lock_decref(&lockh, LCK_PR);
1315                 RETURN(1);
1316         }
1317
1318         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
1319                             NULL, LCK_PW, &lockh)) {
1320                 ldlm_lock_decref(&lockh, LCK_PW);
1321                 RETURN(1);
1322         }
1323         RETURN(0);
1324 }
1325
1326 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
1327 {
1328         struct inode *inode = dentry->d_inode;
1329         struct ll_inode_info *lli;
1330         struct lov_stripe_md *lsm;
1331         int rc;
1332         ENTRY;
1333
1334         if (!inode) {
1335                 CERROR("REPORT THIS LINE TO PETER\n");
1336                 RETURN(0);
1337         }
1338         lli = ll_i2info(inode);
1339         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
1340                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
1341 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
1342         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE);
1343 #endif
1344
1345         if (!ll_have_md_lock(dentry)) {
1346                 struct ptlrpc_request *req = NULL;
1347                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
1348                 struct ll_fid fid;
1349                 unsigned long valid = 0;
1350                 int ealen = 0;
1351
1352                 if (S_ISREG(inode->i_mode)) {
1353                         ealen = obd_size_diskmd(sbi->ll_osc_exp, NULL);
1354                         valid |= OBD_MD_FLEASIZE;
1355                 }
1356                 ll_inode2fid(&fid, inode);
1357                 rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req);
1358                 if (rc) {
1359                         CERROR("failure %d inode %lu\n", rc, inode->i_ino);
1360                         RETURN(-abs(rc));
1361                 }
1362                 rc = ll_prep_inode(sbi->ll_osc_exp, &inode, req, 0, NULL);
1363                 if (rc) {
1364                         ptlrpc_req_finished(req);
1365                         RETURN(rc);
1366                 }
1367                 ptlrpc_req_finished(req);
1368         }
1369
1370         lsm = lli->lli_smd;
1371         if (lsm == NULL) /* object not yet allocated, don't validate size */
1372                 RETURN(0);
1373
1374         /* ll_glimpse_size will prefer locally cached writes if they extend
1375          * the file */
1376         rc = ll_glimpse_size(inode);
1377         RETURN(rc);
1378 }
1379
1380 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1381 int ll_getattr(struct vfsmount *mnt, struct dentry *de,
1382                struct lookup_intent *it, struct kstat *stat)
1383 {
1384         int res = 0;
1385         struct inode *inode = de->d_inode;
1386
1387         res = ll_inode_revalidate_it(de, it);
1388         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
1389
1390         if (res)
1391                 return res;
1392
1393         stat->dev = inode->i_sb->s_dev;
1394         stat->ino = inode->i_ino;
1395         stat->mode = inode->i_mode;
1396         stat->nlink = inode->i_nlink;
1397         stat->uid = inode->i_uid;
1398         stat->gid = inode->i_gid;
1399         stat->rdev = kdev_t_to_nr(inode->i_rdev);
1400         stat->atime = inode->i_atime;
1401         stat->mtime = inode->i_mtime;
1402         stat->ctime = inode->i_ctime;
1403         stat->size = inode->i_size;
1404         stat->blksize = inode->i_blksize;
1405         stat->blocks = inode->i_blocks;
1406         return 0;
1407 }
1408 #endif
1409
1410 struct file_operations ll_file_operations = {
1411         .read           = ll_file_read,
1412         .write          = ll_file_write,
1413         .ioctl          = ll_file_ioctl,
1414         .open           = ll_file_open,
1415         .release        = ll_file_release,
1416         .mmap           = generic_file_mmap,
1417         .llseek         = ll_file_seek,
1418 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
1419         .sendfile       = generic_file_sendfile,
1420 #endif
1421         .fsync          = ll_fsync,
1422         /* .lock           = ll_file_flock */
1423 };
1424
1425 struct inode_operations ll_file_inode_operations = {
1426         .setattr_raw    = ll_setattr_raw,
1427         .setattr        = ll_setattr,
1428         .truncate       = ll_truncate,
1429 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1430         .getattr_it     = ll_getattr,
1431 #else
1432         .revalidate_it  = ll_inode_revalidate_it,
1433 #endif
1434 };
1435