Whamcloud - gitweb
Land b1_4_smallfix onto b1_4 (20041216_1438)
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <linux/lustre_dlm.h>
27 #include <linux/lustre_lite.h>
28 #include <linux/pagemap.h>
29 #include <linux/file.h>
30 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
31 #include <linux/lustre_compat25.h>
32 #endif
33 #include "llite_internal.h"
34
35 int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
36                         struct file *file)
37 {
38         struct ll_file_data *fd = file->private_data;
39         struct ptlrpc_request *req = NULL;
40         struct obd_client_handle *och = &fd->fd_mds_och;
41         struct obdo obdo;
42         int rc;
43         ENTRY;
44
45         obdo.o_id = inode->i_ino;
46         obdo.o_valid = OBD_MD_FLID;
47         obdo_from_inode(&obdo, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
48                                       OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
49                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
50                                       OBD_MD_FLCTIME);
51         if (0 /* ll_is_inode_dirty(inode) */) {
52                 obdo.o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
53                 obdo.o_valid |= OBD_MD_FLFLAGS;
54         }
55         rc = mdc_close(mdc_exp, &obdo, och, &req);
56         if (rc == EAGAIN) {
57                 /* We are the last writer, so the MDS has instructed us to get
58                  * the file size and any write cookies, then close again. */
59                 //ll_queue_done_writing(inode);
60                 rc = 0;
61         } else if (rc) {
62                 CERROR("inode %lu mdc close failed: rc = %d\n",
63                        inode->i_ino, rc);
64         }
65         if (rc == 0) {
66                 rc = ll_objects_destroy(req, file->f_dentry->d_inode);
67                 if (rc)
68                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
69                                inode->i_ino, rc);
70         }
71
72         mdc_clear_open_replay_data(och);
73         ptlrpc_req_finished(req);
74         och->och_fh.cookie = DEAD_HANDLE_MAGIC;
75         file->private_data = NULL;
76         OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof *fd);
77
78         RETURN(rc);
79 }
80
81 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
82
83 /* While this returns an error code, fput() the caller does not, so we need
84  * to make every effort to clean up all of our state here.  Also, applications
85  * rarely check close errors and even if an error is returned they will not
86  * re-try the close call.
87  */
88 int ll_file_release(struct inode *inode, struct file *file)
89 {
90         struct ll_file_data *fd;
91         struct ll_sb_info *sbi = ll_i2sbi(inode);
92         struct ll_inode_info *lli = ll_i2info(inode);
93         struct lov_stripe_md *lsm = lli->lli_smd;
94         int rc;
95
96         ENTRY;
97         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
98                inode->i_generation, inode);
99
100         /* don't do anything for / */
101         if (inode->i_sb->s_root == file->f_dentry)
102                 RETURN(0);
103
104         lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_RELEASE);
105         fd = (struct ll_file_data *)file->private_data;
106         LASSERT(fd != NULL);
107
108         if (lsm)
109                 lov_test_and_clear_async_rc(lsm);
110         lli->lli_async_rc = 0;
111
112         rc = ll_mdc_close(sbi->ll_mdc_exp, inode, file);
113         RETURN(rc);
114 }
115
116 static int ll_intent_file_open(struct file *file, void *lmm,
117                                int lmmsize, struct lookup_intent *itp)
118 {
119         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
120         struct lustre_handle lockh;
121         struct mdc_op_data data;
122         struct dentry *parent = file->f_dentry->d_parent;
123         const char *name = file->f_dentry->d_name.name;
124         const int len = file->f_dentry->d_name.len;
125         int rc;
126
127         if (!parent)
128                 RETURN(-ENOENT);
129
130         ll_prepare_mdc_op_data(&data, parent->d_inode, NULL, name, len, O_RDWR);
131
132         rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, itp, LCK_PW, &data,
133                          &lockh, lmm, lmmsize, ldlm_completion_ast,
134                          ll_mdc_blocking_ast, NULL);
135         if (rc < 0)
136                 CERROR("lock enqueue: err: %d\n", rc);
137         RETURN(rc);
138 }
139
140 int ll_local_open(struct file *file, struct lookup_intent *it)
141 {
142         struct ptlrpc_request *req = it->d.lustre.it_data;
143         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
144         struct ll_file_data *fd;
145         struct mds_body *body;
146         ENTRY;
147
148         body = lustre_msg_buf (req->rq_repmsg, 1, sizeof (*body));
149         LASSERT (body != NULL);                 /* reply already checked out */
150         LASSERT_REPSWABBED (req, 1);            /* and swabbed down */
151
152         LASSERT(!file->private_data);
153
154         OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
155         /* We can't handle this well without reorganizing ll_file_open and
156          * ll_mdc_close, so don't even try right now. */
157         LASSERT(fd != NULL);
158
159         memcpy(&fd->fd_mds_och.och_fh, &body->handle, sizeof(body->handle));
160         fd->fd_mds_och.och_magic = OBD_CLIENT_HANDLE_MAGIC;
161         file->private_data = fd;
162         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
163
164         lli->lli_io_epoch = body->io_epoch;
165
166         mdc_set_open_replay_data(&fd->fd_mds_och, it->d.lustre.it_data);
167
168         RETURN(0);
169 }
170
171 /* Open a file, and (for the very first open) create objects on the OSTs at
172  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
173  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
174  * lli_open_sem to ensure no other process will create objects, send the
175  * stripe MD to the MDS, or try to destroy the objects if that fails.
176  *
177  * If we already have the stripe MD locally then we don't request it in
178  * mdc_open(), by passing a lmm_size = 0.
179  *
180  * It is up to the application to ensure no other processes open this file
181  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
182  * used.  We might be able to avoid races of that sort by getting lli_open_sem
183  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
184  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
185  */
186 int ll_file_open(struct inode *inode, struct file *file)
187 {
188         struct ll_inode_info *lli = ll_i2info(inode);
189         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
190                                           .it_flags = file->f_flags };
191         struct lov_stripe_md *lsm;
192         struct ptlrpc_request *req;
193         int rc = 0;
194         ENTRY;
195
196         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
197                inode->i_generation, inode, file->f_flags);
198
199         /* don't do anything for / */
200         if (inode->i_sb->s_root == file->f_dentry)
201                 RETURN(0);
202
203         it = file->f_it;
204
205         if (!it || !it->d.lustre.it_disposition) {
206                 it = &oit;
207                 rc = ll_intent_file_open(file, NULL, 0, it);
208                 if (rc)
209                         GOTO(out, rc);
210         }
211
212         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
213         rc = it_open_error(DISP_OPEN_OPEN, it);
214         /* mdc_intent_lock() didn't get a request ref if there was an open
215          * error, so don't do cleanup on the request here (bug 3430) */
216         if (rc)
217                 RETURN(rc);
218
219         rc = ll_local_open(file, it);
220         LASSERTF(rc == 0, "rc = %d\n", rc);
221
222         if (!S_ISREG(inode->i_mode))
223                 GOTO(out, rc);
224
225         lsm = lli->lli_smd;
226         if (lsm == NULL) {
227                 if (file->f_flags & O_LOV_DELAY_CREATE ||
228                     !(file->f_mode & FMODE_WRITE)) {
229                         CDEBUG(D_INODE, "object creation was delayed\n");
230                         GOTO(out, rc);
231                 }
232         }
233         file->f_flags &= ~O_LOV_DELAY_CREATE;
234         GOTO(out, rc);
235  out:
236         req = it->d.lustre.it_data;
237         ptlrpc_req_finished(req);
238         if (rc == 0)
239                 ll_open_complete(inode);
240         return rc;
241 }
242
243 /* Fills the obdo with the attributes for the inode defined by lsm */
244 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
245                    struct obdo *oa)
246 {
247         struct ptlrpc_request_set *set;
248         int rc;
249         ENTRY;
250
251         LASSERT(lsm != NULL);
252
253         memset(oa, 0, sizeof *oa);
254         oa->o_id = lsm->lsm_object_id;
255         oa->o_mode = S_IFREG;
256         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
257                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
258                 OBD_MD_FLCTIME;
259
260         set = ptlrpc_prep_set();
261         if (set == NULL) {
262                 rc = -ENOMEM;
263         } else {
264                 rc = obd_getattr_async(exp, oa, lsm, set);
265                 if (rc == 0)
266                         rc = ptlrpc_set_wait(set);
267                 ptlrpc_set_destroy(set);
268         }
269         if (rc)
270                 RETURN(rc);
271
272         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
273                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
274         RETURN(0);
275 }
276
277 static inline void ll_remove_suid(struct inode *inode)
278 {
279         unsigned int mode;
280
281         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
282         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
283
284         /* was any of the uid bits set? */
285         mode &= inode->i_mode;
286         if (mode && !capable(CAP_FSETID)) {
287                 inode->i_mode &= ~mode;
288                 // XXX careful here - we cannot change the size
289         }
290 }
291
292 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
293 {
294         struct ll_inode_info *lli = ll_i2info(inode);
295         struct lov_stripe_md *lsm = lli->lli_smd;
296         struct obd_export *exp = ll_i2obdexp(inode);
297         struct {
298                 char name[16];
299                 struct ldlm_lock *lock;
300                 struct lov_stripe_md *lsm;
301         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
302         __u32 stripe, vallen = sizeof(stripe);
303         int rc;
304         ENTRY;
305
306         if (lsm->lsm_stripe_count == 1)
307                 GOTO(check, stripe = 0);
308
309         /* get our offset in the lov */
310         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
311         if (rc != 0) {
312                 CERROR("obd_get_info: rc = %d\n", rc);
313                 RETURN(rc);
314         }
315         LASSERT(stripe < lsm->lsm_stripe_count);
316
317 check:
318         if (lsm->lsm_oinfo[stripe].loi_id != lock->l_resource->lr_name.name[0]||
319             lsm->lsm_oinfo[stripe].loi_gr != lock->l_resource->lr_name.name[1]){
320                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
321                            lsm->lsm_oinfo[stripe].loi_id,
322                            lsm->lsm_oinfo[stripe].loi_gr);
323                 RETURN(-ELDLM_NO_LOCK_DATA);
324         }
325
326         RETURN(stripe);
327 }
328
329 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
330  * we get a lock cancellation for each stripe, so we have to map the obd's
331  * region back onto the stripes in the file that it held.
332  *
333  * No one can dirty the extent until we've finished our work and they can
334  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
335  * but other kernel actors could have pages locked.
336  *
337  * Called with the DLM lock held. */
338 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
339                               struct ldlm_lock *lock, __u32 stripe)
340 {
341         ldlm_policy_data_t tmpex;
342         unsigned long start, end, count, skip, i, j;
343         struct page *page;
344         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
345         struct lustre_handle lockh;
346         ENTRY;
347
348         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
349         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
350                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
351                inode->i_size);
352
353         /* our locks are page granular thanks to osc_enqueue, we invalidate the
354          * whole page. */
355         LASSERT((tmpex.l_extent.start & ~PAGE_CACHE_MASK) == 0);
356         LASSERT(((tmpex.l_extent.end + 1) & ~PAGE_CACHE_MASK) == 0);
357
358         count = ~0;
359         skip = 0;
360         start = tmpex.l_extent.start >> PAGE_CACHE_SHIFT;
361         end = tmpex.l_extent.end >> PAGE_CACHE_SHIFT;
362         if (lsm->lsm_stripe_count > 1) {
363                 count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
364                 skip = (lsm->lsm_stripe_count - 1) * count;
365                 start += start/count * skip + stripe * count;
366                 if (end != ~0)
367                         end += end/count * skip + stripe * count;
368         }
369         if (end < tmpex.l_extent.end >> PAGE_CACHE_SHIFT)
370                 end = ~0;
371
372         i = inode->i_size ? (inode->i_size - 1) >> PAGE_CACHE_SHIFT : 0;
373         if (i < end)
374                 end = i;
375
376         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
377                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
378                count, skip, end, discard ? " (DISCARDING)" : "");
379
380         /* this is the simplistic implementation of page eviction at
381          * cancelation.  It is careful to get races with other page
382          * lockers handled correctly.  fixes from bug 20 will make it
383          * more efficient by associating locks with pages and with
384          * batching writeback under the lock explicitly. */
385         for (i = start, j = start % count; i <= end;
386              j++, i++, tmpex.l_extent.start += PAGE_CACHE_SIZE) {
387                 if (j == count) {
388                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
389                         i += skip;
390                         j = 0;
391                         if (i > end)
392                                 break;
393                 }
394                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
395                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
396                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
397                          start, i, end);
398
399                 if (!mapping_has_pages(inode->i_mapping)) {
400                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
401                         break;
402                 }
403
404                 cond_resched();
405
406                 page = find_get_page(inode->i_mapping, i);
407                 if (page == NULL)
408                         continue;
409                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
410                                i, tmpex.l_extent.start);
411                 lock_page(page);
412
413                 /* page->mapping to check with racing against teardown */
414                 if (!discard && clear_page_dirty_for_io(page)) {
415                         rc = ll_call_writepage(inode, page);
416                         if (rc != 0)
417                                 CERROR("writepage of page %p failed: %d\n",
418                                        page, rc);
419                         /* either waiting for io to complete or reacquiring
420                          * the lock that the failed writepage released */
421                         lock_page(page);
422                 }
423
424                 tmpex.l_extent.end = tmpex.l_extent.start + PAGE_CACHE_SIZE - 1;
425                 /* check to see if another DLM lock covers this page */
426                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
427                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
428                                       LDLM_FL_TEST_LOCK,
429                                       &lock->l_resource->lr_name, LDLM_EXTENT,
430                                       &tmpex, LCK_PR | LCK_PW, &lockh);
431                 if (rc2 == 0 && page->mapping != NULL) {
432                         // checking again to account for writeback's lock_page()
433                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
434                         ll_ra_accounting(page, inode->i_mapping);
435                         ll_truncate_complete_page(page);
436                 }
437                 unlock_page(page);
438                 page_cache_release(page);
439         }
440         LASSERTF(tmpex.l_extent.start <=
441                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
442                   lock->l_policy_data.l_extent.end + 1),
443                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
444                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
445                  start, i, end);
446         EXIT;
447 }
448
449 static int ll_extent_lock_callback(struct ldlm_lock *lock,
450                                    struct ldlm_lock_desc *new, void *data,
451                                    int flag)
452 {
453         struct lustre_handle lockh = { 0 };
454         int rc;
455         ENTRY;
456
457         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
458                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
459                 LBUG();
460         }
461
462         switch (flag) {
463         case LDLM_CB_BLOCKING:
464                 ldlm_lock2handle(lock, &lockh);
465                 rc = ldlm_cli_cancel(&lockh);
466                 if (rc != ELDLM_OK)
467                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
468                 break;
469         case LDLM_CB_CANCELING: {
470                 struct inode *inode;
471                 struct ll_inode_info *lli;
472                 struct lov_stripe_md *lsm;
473                 __u32 stripe;
474                 __u64 kms;
475
476                 /* This lock wasn't granted, don't try to evict pages */
477                 if (lock->l_req_mode != lock->l_granted_mode)
478                         RETURN(0);
479
480                 inode = ll_inode_from_lock(lock);
481                 if (inode == NULL)
482                         RETURN(0);
483                 lli = ll_i2info(inode);
484                 if (lli == NULL)
485                         goto iput;
486                 if (lli->lli_smd == NULL)
487                         goto iput;
488                 lsm = lli->lli_smd;
489
490                 stripe = ll_lock_to_stripe_offset(inode, lock);
491                 if (stripe < 0)
492                         goto iput;
493                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
494
495                 /* grabbing the i_sem will wait for write() to complete.  ns
496                  * lock hold times should be very short as ast processing
497                  * requires them and has a short timeout.  so, i_sem before ns
498                  * lock.*/
499                 down(&inode->i_sem);
500                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
501                 kms = ldlm_extent_shift_kms(lock,
502                                             lsm->lsm_oinfo[stripe].loi_kms);
503
504                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
505                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
506                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
507                 lsm->lsm_oinfo[stripe].loi_kms = kms;
508                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
509                 up(&inode->i_sem);
510                 //ll_try_done_writing(inode);
511         iput:
512                 iput(inode);
513                 break;
514         }
515         default:
516                 LBUG();
517         }
518
519         RETURN(0);
520 }
521
522 #if 0
523 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
524 {
525         /* XXX ALLOCATE - 160 bytes */
526         struct inode *inode = ll_inode_from_lock(lock);
527         struct ll_inode_info *lli = ll_i2info(inode);
528         struct lustre_handle lockh = { 0 };
529         struct ost_lvb *lvb;
530         __u32 stripe;
531         ENTRY;
532
533         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
534                      LDLM_FL_BLOCK_CONV)) {
535                 LBUG(); /* not expecting any blocked async locks yet */
536                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
537                            "lock, returning");
538                 ldlm_lock_dump(D_OTHER, lock, 0);
539                 ldlm_reprocess_all(lock->l_resource);
540                 RETURN(0);
541         }
542
543         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
544
545         stripe = ll_lock_to_stripe_offset(inode, lock);
546         if (stripe < 0)
547                 goto iput;
548
549         if (lock->l_lvb_len) {
550                 struct lov_stripe_md *lsm = lli->lli_smd;
551                 __u64 kms;
552                 lvb = lock->l_lvb_data;
553                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
554
555                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
556                 down(&inode->i_sem);
557                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
558                 kms = ldlm_extent_shift_kms(NULL, kms);
559                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
560                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
561                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
562                 lsm->lsm_oinfo[stripe].loi_kms = kms;
563                 up(&inode->i_sem);
564                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
565         }
566
567 iput:
568         iput(inode);
569         wake_up(&lock->l_waitq);
570
571         ldlm_lock2handle(lock, &lockh);
572         ldlm_lock_decref(&lockh, LCK_PR);
573         RETURN(0);
574 }
575 #endif
576
577 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
578 {
579         struct ptlrpc_request *req = reqp;
580         struct inode *inode = ll_inode_from_lock(lock);
581         struct ll_inode_info *lli;
582         struct lov_stripe_md *lsm;
583         struct ost_lvb *lvb;
584         int rc, size = sizeof(*lvb), stripe;
585         ENTRY;
586
587         if (inode == NULL)
588                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
589         lli = ll_i2info(inode);
590         if (lli == NULL)
591                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
592         lsm = lli->lli_smd;
593         if (lsm == NULL)
594                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
595
596         /* First, find out which stripe index this lock corresponds to. */
597         stripe = ll_lock_to_stripe_offset(inode, lock);
598         if (stripe < 0)
599                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
600
601         rc = lustre_pack_reply(req, 1, &size, NULL);
602         if (rc) {
603                 CERROR("lustre_pack_reply: %d\n", rc);
604                 GOTO(iput, rc);
605         }
606
607         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
608         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
609         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
610         lvb->lvb_atime = LTIME_S(inode->i_atime);
611         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
612
613         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
614                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
615                    inode->i_size, stripe, lvb->lvb_size, lvb->lvb_mtime,
616                    lvb->lvb_atime, lvb->lvb_ctime);
617         GOTO(iput, 0);
618  iput:
619         iput(inode);
620
621  out:
622         /* These errors are normal races, so we don't want to fill the console
623          * with messages by calling ptlrpc_error() */
624         if (rc == -ELDLM_NO_LOCK_DATA)
625                 lustre_pack_reply(req, 0, NULL, NULL);
626
627         req->rq_status = rc;
628         return rc;
629 }
630
631 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
632 __u64 lov_merge_blocks(struct lov_stripe_md *lsm);
633 __u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
634
635 /* NB: lov_merge_size will prefer locally cached writes if they extend the
636  * file (because it prefers KMS over RSS when larger) */
637 int ll_glimpse_size(struct inode *inode)
638 {
639         struct ll_inode_info *lli = ll_i2info(inode);
640         struct ll_sb_info *sbi = ll_i2sbi(inode);
641         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
642         struct lustre_handle lockh = { 0 };
643         int rc, flags = LDLM_FL_HAS_INTENT;
644         ENTRY;
645
646         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
647
648         rc = obd_enqueue(sbi->ll_osc_exp, lli->lli_smd, LDLM_EXTENT, &policy,
649                          LCK_PR, &flags, ll_extent_lock_callback,
650                          ldlm_completion_ast, ll_glimpse_callback, inode,
651                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, &lockh);
652         if (rc == -ENOENT)
653                 RETURN(rc);
654         if (rc != 0) {
655                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
656                 RETURN(rc > 0 ? -EIO : rc);
657         }
658
659         inode->i_size = lov_merge_size(lli->lli_smd, 0);
660         inode->i_blocks = lov_merge_blocks(lli->lli_smd);
661         LTIME_S(inode->i_mtime) =
662                 lov_merge_mtime(lli->lli_smd, LTIME_S(inode->i_mtime));
663
664         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
665                inode->i_size, inode->i_blocks);
666
667         obd_cancel(sbi->ll_osc_exp, lli->lli_smd, LCK_PR, &lockh);
668
669         RETURN(rc);
670 }
671
672 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
673                    struct lov_stripe_md *lsm, int mode,
674                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
675                    int ast_flags)
676 {
677         struct ll_sb_info *sbi = ll_i2sbi(inode);
678         int rc;
679         ENTRY;
680
681         LASSERT(lockh->cookie == 0);
682
683         /* XXX phil: can we do this?  won't it screw the file size up? */
684         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
685             (sbi->ll_flags & LL_SBI_NOLCK))
686                 RETURN(0);
687
688         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
689                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
690
691         rc = obd_enqueue(sbi->ll_osc_exp, lsm, LDLM_EXTENT, policy, mode,
692                          &ast_flags, ll_extent_lock_callback,
693                          ldlm_completion_ast, ll_glimpse_callback, inode,
694                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
695         if (rc > 0)
696                 rc = -EIO;
697
698         if (policy->l_extent.start == 0 &&
699             policy->l_extent.end == OBD_OBJECT_EOF) {
700                 /* vmtruncate()->ll_truncate() first sets the i_size and then
701                  * the kms under both a DLM lock and the i_sem.  If we don't
702                  * get the i_sem here we can match the DLM lock and reset
703                  * i_size from the kms before the truncating path has updated
704                  * the kms.  generic_file_write can then trust the stale i_size
705                  * when doing appending writes and effectively cancel the
706                  * result of the truncate.  Getting the i_sem after the enqueue
707                  * maintains the DLM -> i_sem acquiry order. */
708                 down(&inode->i_sem);
709                 inode->i_size = lov_merge_size(lsm, 1);
710                 up(&inode->i_sem);
711         }
712
713         if (rc == 0)
714                 LTIME_S(inode->i_mtime) =
715                         lov_merge_mtime(lsm, LTIME_S(inode->i_mtime));
716         RETURN(rc);
717 }
718
719 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
720                      struct lov_stripe_md *lsm, int mode,
721                      struct lustre_handle *lockh)
722 {
723         struct ll_sb_info *sbi = ll_i2sbi(inode);
724         int rc;
725         ENTRY;
726
727         /* XXX phil: can we do this?  won't it screw the file size up? */
728         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
729             (sbi->ll_flags & LL_SBI_NOLCK))
730                 RETURN(0);
731
732         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
733
734         RETURN(rc);
735 }
736
737 static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
738                             loff_t *ppos)
739 {
740         struct ll_file_data *fd = filp->private_data;
741         struct inode *inode = filp->f_dentry->d_inode;
742         struct ll_inode_info *lli = ll_i2info(inode);
743         struct lov_stripe_md *lsm = lli->lli_smd;
744         struct lustre_handle lockh = { 0 };
745         ldlm_policy_data_t policy;
746         int rc;
747         ssize_t retval;
748         __u64 kms;
749         ENTRY;
750         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
751                inode->i_ino, inode->i_generation, inode, count, *ppos);
752
753         /* "If nbyte is 0, read() will return 0 and have no other results."
754          *                      -- Single Unix Spec */
755         if (count == 0)
756                 RETURN(0);
757
758         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
759                             count);
760
761         if (!lsm)
762                 RETURN(0);
763
764         policy.l_extent.start = *ppos;
765         policy.l_extent.end = *ppos + count - 1;
766
767         rc = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh, 0);
768         if (rc != 0)
769                 RETURN(rc);
770
771         kms = lov_merge_size(lsm, 1);
772         if (*ppos + count - 1 > kms) {
773                 /* A glimpse is necessary to determine whether we return a short
774                  * read or some zeroes at the end of the buffer */
775                 retval = ll_glimpse_size(inode);
776                 if (retval)
777                         goto out;
778         } else {
779                 inode->i_size = kms;
780         }
781
782         CDEBUG(D_INFO, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
783                inode->i_ino, count, *ppos, inode->i_size);
784
785         /* turn off the kernel's read-ahead */
786 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
787         filp->f_ramax = 0;
788 #else
789         filp->f_ra.ra_pages = 0;
790 #endif
791         retval = generic_file_read(filp, buf, count, ppos);
792
793  out:
794         ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
795         RETURN(retval);
796 }
797
798 /*
799  * Write to a file (through the page cache).
800  */
801 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
802                              loff_t *ppos)
803 {
804         struct ll_file_data *fd = file->private_data;
805         struct inode *inode = file->f_dentry->d_inode;
806         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
807         struct lustre_handle lockh = { 0 };
808         ldlm_policy_data_t policy;
809         loff_t maxbytes = ll_file_maxbytes(inode);
810         ssize_t retval;
811         int rc;
812         ENTRY;
813         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
814                inode->i_ino, inode->i_generation, inode, count, *ppos);
815
816         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
817
818         /* POSIX, but surprised the VFS doesn't check this already */
819         if (count == 0)
820                 RETURN(0);
821
822         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
823          * called on the file, don't fail the below assertion (bug 2388). */
824         if (file->f_flags & O_LOV_DELAY_CREATE && lsm == NULL)
825                 RETURN(-EBADF);
826
827         LASSERT(lsm);
828
829         if (file->f_flags & O_APPEND) {
830                 policy.l_extent.start = 0;
831                 policy.l_extent.end = OBD_OBJECT_EOF;
832         } else  {
833                 policy.l_extent.start = *ppos;
834                 policy.l_extent.end = *ppos + count - 1;
835         }
836
837         rc = ll_extent_lock(fd, inode, lsm, LCK_PW, &policy, &lockh, 0);
838         if (rc != 0)
839                 RETURN(rc);
840
841         /* this is ok, g_f_w will overwrite this under i_sem if it races
842          * with a local truncate, it just makes our maxbyte checking easier */
843         if (file->f_flags & O_APPEND)
844                 *ppos = inode->i_size;
845
846         if (*ppos >= maxbytes) {
847                 if (count || *ppos > maxbytes) {
848                         send_sig(SIGXFSZ, current, 0);
849                         GOTO(out, retval = -EFBIG);
850                 }
851         }
852         if (*ppos + count > maxbytes)
853                 count = maxbytes - *ppos;
854
855         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
856                inode->i_ino, count, *ppos);
857
858         /* generic_file_write handles O_APPEND after getting i_sem */
859         retval = generic_file_write(file, buf, count, ppos);
860
861 out:
862         ll_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
863         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WRITE_BYTES,
864                             retval > 0 ? retval : 0);
865         RETURN(retval);
866 }
867
868 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
869                                unsigned long arg)
870 {
871         struct ll_inode_info *lli = ll_i2info(inode);
872         struct obd_export *exp = ll_i2obdexp(inode);
873         struct ll_recreate_obj ucreatp;
874         struct obd_trans_info oti = { 0 };
875         struct obdo *oa = NULL;
876         int lsm_size;
877         int rc = 0;
878         struct lov_stripe_md *lsm, *lsm2;
879         ENTRY;
880
881         if (!capable (CAP_SYS_ADMIN))
882                 RETURN(-EPERM);
883
884         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
885                             sizeof(struct ll_recreate_obj));
886         if (rc) {
887                 RETURN(-EFAULT);
888         }
889         oa = obdo_alloc();
890         if (oa == NULL)
891                 RETURN(-ENOMEM);
892
893         down(&lli->lli_open_sem);
894         lsm = lli->lli_smd;
895         if (lsm == NULL)
896                 GOTO(out, rc = -ENOENT);
897         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
898                    (lsm->lsm_stripe_count));
899
900         OBD_ALLOC(lsm2, lsm_size);
901         if (lsm2 == NULL)
902                 GOTO(out, rc = -ENOMEM);
903
904         oa->o_id = ucreatp.lrc_id;
905         oa->o_nlink = ucreatp.lrc_ost_idx;
906         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
907         oa->o_flags |= OBD_FL_RECREATE_OBJS;
908         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
909                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
910
911         oti.oti_objid = NULL;
912         memcpy(lsm2, lsm, lsm_size);
913         rc = obd_create(exp, oa, &lsm2, &oti);
914
915         OBD_FREE(lsm2, lsm_size);
916         GOTO(out, rc);
917 out:
918         up(&lli->lli_open_sem);
919         obdo_free(oa);
920         return rc;
921 }
922
923 static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
924                                     int flags, struct lov_user_md *lum,
925                                     int lum_size)
926 {
927         struct ll_inode_info *lli = ll_i2info(inode);
928         struct file *f;
929         struct obd_export *exp = ll_i2obdexp(inode);
930         struct lov_stripe_md *lsm;
931         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
932         struct ptlrpc_request *req = NULL;
933         int rc = 0;
934         struct lustre_md md;
935         ENTRY;
936
937         down(&lli->lli_open_sem);
938         lsm = lli->lli_smd;
939         if (lsm) {
940                 up(&lli->lli_open_sem);
941                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
942                        inode->i_ino);
943                 RETURN(-EEXIST);
944         }
945
946         f = get_empty_filp();
947         if (!f)
948                 GOTO(out, -ENOMEM);
949
950         f->f_dentry = file->f_dentry;
951         f->f_vfsmnt = file->f_vfsmnt;
952
953         rc = ll_intent_file_open(f, lum, lum_size, &oit);
954         if (rc)
955                 GOTO(out, rc);
956         if (it_disposition(&oit, DISP_LOOKUP_NEG))
957                 GOTO(out, -ENOENT);
958         req = oit.d.lustre.it_data;
959         rc = oit.d.lustre.it_status;
960
961         if (rc < 0)
962                 GOTO(out, rc);
963
964         rc = mdc_req2lustre_md(req, 1, exp, &md);
965         if (rc)
966                 GOTO(out, rc);
967         ll_update_inode(f->f_dentry->d_inode, md.body, md.lsm);
968
969         rc = ll_local_open(f, &oit);
970         if (rc)
971                 GOTO(out, rc);
972         ll_intent_release(&oit);
973
974         rc = ll_file_release(f->f_dentry->d_inode, f);
975
976  out:
977         if (f)
978                 put_filp(f);
979         up(&lli->lli_open_sem);
980         if (req != NULL)
981                 ptlrpc_req_finished(req);
982         RETURN(rc);
983 }
984
985 static int ll_lov_setea(struct inode *inode, struct file *file,
986                             unsigned long arg)
987 {
988         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
989         struct lov_user_md  *lump;
990         int lum_size = sizeof(struct lov_user_md) +
991                        sizeof(struct lov_user_ost_data);
992         int rc;
993         ENTRY;
994
995         if (!capable (CAP_SYS_ADMIN))
996                 RETURN(-EPERM);
997
998         OBD_ALLOC(lump, lum_size);
999         if (lump == NULL) {
1000                 RETURN(-ENOMEM);
1001         }
1002         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1003         if (rc) {
1004                 OBD_FREE(lump, lum_size);
1005                 RETURN(-EFAULT);
1006         }
1007
1008         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1009
1010         OBD_FREE(lump, lum_size);
1011         RETURN(rc);
1012 }
1013
1014 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1015                             unsigned long arg)
1016 {
1017         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1018         int rc;
1019         int flags = FMODE_WRITE;
1020         ENTRY;
1021
1022         /* Bug 1152: copy properly when this is no longer true */
1023         LASSERT(sizeof(lum) == sizeof(*lump));
1024         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1025         rc = copy_from_user(&lum, lump, sizeof(lum));
1026         if (rc)
1027                 RETURN(-EFAULT);
1028
1029         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1030         RETURN(rc);
1031 }
1032
1033 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1034 {
1035         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1036
1037         if (!lsm)
1038                 RETURN(-ENODATA);
1039
1040         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode), 0, lsm,
1041                             (void *)arg);
1042 }
1043
1044 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
1045                   unsigned long arg)
1046 {
1047         struct ll_file_data *fd = file->private_data;
1048         int flags;
1049         ENTRY;
1050
1051         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1052                inode->i_generation, inode, cmd);
1053
1054         if (_IOC_TYPE(cmd) == 'T') /* tty ioctls */
1055                 RETURN(-ENOTTY);
1056
1057         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_IOCTL);
1058         switch(cmd) {
1059         case LL_IOC_GETFLAGS:
1060                 /* Get the current value of the file flags */
1061                 return put_user(fd->fd_flags, (int *)arg);
1062         case LL_IOC_SETFLAGS:
1063         case LL_IOC_CLRFLAGS:
1064                 /* Set or clear specific file flags */
1065                 /* XXX This probably needs checks to ensure the flags are
1066                  *     not abused, and to handle any flag side effects.
1067                  */
1068                 if (get_user(flags, (int *) arg))
1069                         RETURN(-EFAULT);
1070
1071                 if (cmd == LL_IOC_SETFLAGS)
1072                         fd->fd_flags |= flags;
1073                 else
1074                         fd->fd_flags &= ~flags;
1075                 RETURN(0);
1076         case LL_IOC_LOV_SETSTRIPE:
1077                 RETURN(ll_lov_setstripe(inode, file, arg));
1078         case LL_IOC_LOV_SETEA:
1079                 RETURN(ll_lov_setea(inode, file, arg));
1080         case LL_IOC_LOV_GETSTRIPE:
1081                 RETURN(ll_lov_getstripe(inode, arg));
1082         case LL_IOC_RECREATE_OBJ:
1083                 RETURN(ll_lov_recreate_obj(inode, file, arg));
1084         case EXT3_IOC_GETFLAGS:
1085         case EXT3_IOC_SETFLAGS:
1086                 RETURN( ll_iocontrol(inode, file, cmd, arg) );
1087         case EXT3_IOC_GETVERSION_OLD:
1088         case EXT3_IOC_GETVERSION:
1089                 return put_user(inode->i_generation, (int *) arg);
1090         /* We need to special case any other ioctls we want to handle,
1091          * to send them to the MDS/OST as appropriate and to properly
1092          * network encode the arg field.
1093         case EXT2_IOC_SETVERSION_OLD:
1094         case EXT2_IOC_SETVERSION_NEW:
1095         */
1096         default:
1097                 RETURN( obd_iocontrol(cmd, ll_i2obdexp(inode), 0, NULL,
1098                                       (void *)arg) );
1099         }
1100 }
1101
1102 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
1103 {
1104         struct inode *inode = file->f_dentry->d_inode;
1105         struct ll_file_data *fd = file->private_data;
1106         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1107         struct lustre_handle lockh = {0};
1108         loff_t retval;
1109         ENTRY;
1110         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu(%s)\n",
1111                inode->i_ino, inode->i_generation, inode,
1112                offset + ((origin == 2) ? inode->i_size :
1113                          (origin == 1) ? file->f_pos : 0),
1114                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR": "SEEK_SET");
1115
1116         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
1117         if (origin == 2) { /* SEEK_END */
1118                 ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF }};
1119                 int rc;
1120
1121                 rc = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh,0);
1122                 if (rc != 0)
1123                         RETURN(rc);
1124
1125                 offset += inode->i_size;
1126         } else if (origin == 1) { /* SEEK_CUR */
1127                 offset += file->f_pos;
1128         }
1129
1130         retval = -EINVAL;
1131         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
1132                 if (offset != file->f_pos) {
1133                         file->f_pos = offset;
1134 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1135                         file->f_reada = 0;
1136                         file->f_version = ++event;
1137 #endif
1138                 }
1139                 retval = offset;
1140         }
1141
1142         if (origin == 2)
1143                 ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
1144         RETURN(retval);
1145 }
1146
1147 int ll_fsync(struct file *file, struct dentry *dentry, int data)
1148 {
1149         struct inode *inode = dentry->d_inode;
1150         struct ll_inode_info *lli = ll_i2info(inode);
1151         struct lov_stripe_md *lsm = lli->lli_smd;
1152         struct ll_fid fid;
1153         struct ptlrpc_request *req;
1154         int rc, err;
1155         ENTRY;
1156         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
1157                inode->i_generation, inode);
1158
1159         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_FSYNC);
1160
1161         /* fsync's caller has already called _fdata{sync,write}, we want
1162          * that IO to finish before calling the osc and mdc sync methods */
1163         rc = filemap_fdatawait(inode->i_mapping);
1164
1165         /* catch async errors that were recorded back when async writeback
1166          * failed for pages in this mapping. */
1167         err = lli->lli_async_rc;
1168         lli->lli_async_rc = 0;
1169         if (rc == 0)
1170                 rc = err;
1171         if (lsm) {
1172                 err = lov_test_and_clear_async_rc(lsm);
1173                 if (rc == 0)
1174                         rc = err;
1175         }
1176
1177         ll_inode2fid(&fid, inode);
1178         err = mdc_sync(ll_i2sbi(inode)->ll_mdc_exp, &fid, &req);
1179         if (!rc)
1180                 rc = err;
1181         if (!err)
1182                 ptlrpc_req_finished(req);
1183
1184         if (data && lsm) {
1185                 struct obdo *oa = obdo_alloc();
1186
1187                 if (!oa)
1188                         RETURN(rc ? rc : -ENOMEM);
1189
1190                 oa->o_id = lsm->lsm_object_id;
1191                 oa->o_valid = OBD_MD_FLID;
1192                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1193                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1194
1195                 err = obd_sync(ll_i2sbi(inode)->ll_osc_exp, oa, lsm,
1196                                0, OBD_OBJECT_EOF);
1197                 if (!rc)
1198                         rc = err;
1199                 obdo_free(oa);
1200         }
1201
1202         RETURN(rc);
1203 }
1204
1205 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
1206 {
1207         struct inode *inode = file->f_dentry->d_inode;
1208         struct ll_sb_info *sbi = ll_i2sbi(inode);
1209         struct obd_device *obddev;
1210         struct ldlm_res_id res_id =
1211                     { .name = {inode->i_ino, inode->i_generation, LDLM_FLOCK} };
1212         struct lustre_handle lockh = {0};
1213         ldlm_policy_data_t flock;
1214         ldlm_mode_t mode = 0;
1215         int flags = 0;
1216         int rc;
1217         ENTRY;
1218
1219         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
1220                inode->i_ino, file_lock);
1221
1222         flock.l_flock.pid = file_lock->fl_pid;
1223         flock.l_flock.start = file_lock->fl_start;
1224         flock.l_flock.end = file_lock->fl_end;
1225
1226         switch (file_lock->fl_type) {
1227         case F_RDLCK:
1228                 mode = LCK_PR;
1229                 break;
1230         case F_UNLCK:
1231                 /* An unlock request may or may not have any relation to
1232                  * existing locks so we may not be able to pass a lock handle
1233                  * via a normal ldlm_lock_cancel() request. The request may even
1234                  * unlock a byte range in the middle of an existing lock. In
1235                  * order to process an unlock request we need all of the same
1236                  * information that is given with a normal read or write record
1237                  * lock request. To avoid creating another ldlm unlock (cancel)
1238                  * message we'll treat a LCK_NL flock request as an unlock. */
1239                 mode = LCK_NL;
1240                 break;
1241         case F_WRLCK:
1242                 mode = LCK_PW;
1243                 break;
1244         default:
1245                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
1246                 LBUG();
1247         }
1248
1249         switch (cmd) {
1250         case F_SETLKW:
1251 #ifdef F_SETLKW64
1252         case F_SETLKW64:
1253 #endif
1254                 flags = 0;
1255                 break;
1256         case F_SETLK:
1257 #ifdef F_SETLK64
1258         case F_SETLK64:
1259 #endif
1260                 flags = LDLM_FL_BLOCK_NOWAIT;
1261                 break;
1262         case F_GETLK:
1263 #ifdef F_GETLK64
1264         case F_GETLK64:
1265 #endif
1266                 flags = LDLM_FL_TEST_LOCK;
1267                 /* Save the old mode so that if the mode in the lock changes we
1268                  * can decrement the appropriate reader or writer refcount. */
1269                 file_lock->fl_type = mode;
1270                 break;
1271         default:
1272                 CERROR("unknown fcntl lock command: %d\n", cmd);
1273                 LBUG();
1274         }
1275
1276         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
1277                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
1278                flags, mode, flock.l_flock.start, flock.l_flock.end);
1279
1280         obddev = sbi->ll_mdc_exp->exp_obd;
1281         rc = ldlm_cli_enqueue(obddev->obd_self_export, NULL,
1282                               obddev->obd_namespace,
1283                               res_id, LDLM_FLOCK, &flock, mode, &flags,
1284                               NULL, ldlm_flock_completion_ast, NULL, file_lock,
1285                               NULL, 0, NULL, &lockh);
1286         RETURN(rc);
1287 }
1288
1289 static int ll_have_md_lock(struct dentry *de)
1290 {
1291         struct ll_sb_info *sbi = ll_s2sbi(de->d_sb);
1292         struct lustre_handle lockh;
1293         struct ldlm_res_id res_id = { .name = {0} };
1294         struct obd_device *obddev;
1295         int flags;
1296         ENTRY;
1297
1298         if (!de->d_inode)
1299                RETURN(0);
1300
1301         obddev = sbi->ll_mdc_exp->exp_obd;
1302         res_id.name[0] = de->d_inode->i_ino;
1303         res_id.name[1] = de->d_inode->i_generation;
1304
1305         CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
1306
1307         /* FIXME use LDLM_FL_TEST_LOCK instead */
1308         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
1309         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
1310                             NULL, LCK_PR, &lockh)) {
1311                 ldlm_lock_decref(&lockh, LCK_PR);
1312                 RETURN(1);
1313         }
1314
1315         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
1316                             NULL, LCK_PW, &lockh)) {
1317                 ldlm_lock_decref(&lockh, LCK_PW);
1318                 RETURN(1);
1319         }
1320         RETURN(0);
1321 }
1322
1323 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
1324 {
1325         struct inode *inode = dentry->d_inode;
1326         struct ll_inode_info *lli;
1327         struct lov_stripe_md *lsm;
1328         int rc;
1329         ENTRY;
1330
1331         if (!inode) {
1332                 CERROR("REPORT THIS LINE TO PETER\n");
1333                 RETURN(0);
1334         }
1335         lli = ll_i2info(inode);
1336         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
1337                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
1338 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
1339         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE);
1340 #endif
1341
1342         if (!ll_have_md_lock(dentry)) {
1343                 struct ptlrpc_request *req = NULL;
1344                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
1345                 struct ll_fid fid;
1346                 unsigned long valid = 0;
1347                 int ealen = 0;
1348
1349                 if (S_ISREG(inode->i_mode)) {
1350                         ealen = obd_size_diskmd(sbi->ll_osc_exp, NULL);
1351                         valid |= OBD_MD_FLEASIZE;
1352                 }
1353                 ll_inode2fid(&fid, inode);
1354                 rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req);
1355                 if (rc) {
1356                         CERROR("failure %d inode %lu\n", rc, inode->i_ino);
1357                         RETURN(-abs(rc));
1358                 }
1359                 rc = ll_prep_inode(sbi->ll_osc_exp, &inode, req, 0, NULL);
1360                 if (rc) {
1361                         ptlrpc_req_finished(req);
1362                         RETURN(rc);
1363                 }
1364                 ptlrpc_req_finished(req);
1365         }
1366
1367         lsm = lli->lli_smd;
1368         if (lsm == NULL) /* object not yet allocated, don't validate size */
1369                 RETURN(0);
1370
1371         /* ll_glimpse_size will prefer locally cached writes if they extend
1372          * the file */
1373         rc = ll_glimpse_size(inode);
1374         RETURN(rc);
1375 }
1376
1377 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1378 int ll_getattr(struct vfsmount *mnt, struct dentry *de,
1379                struct lookup_intent *it, struct kstat *stat)
1380 {
1381         int res = 0;
1382         struct inode *inode = de->d_inode;
1383
1384         res = ll_inode_revalidate_it(de, it);
1385         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
1386
1387         if (res)
1388                 return res;
1389
1390         stat->dev = inode->i_sb->s_dev;
1391         stat->ino = inode->i_ino;
1392         stat->mode = inode->i_mode;
1393         stat->nlink = inode->i_nlink;
1394         stat->uid = inode->i_uid;
1395         stat->gid = inode->i_gid;
1396         stat->rdev = kdev_t_to_nr(inode->i_rdev);
1397         stat->atime = inode->i_atime;
1398         stat->mtime = inode->i_mtime;
1399         stat->ctime = inode->i_ctime;
1400         stat->size = inode->i_size;
1401         stat->blksize = inode->i_blksize;
1402         stat->blocks = inode->i_blocks;
1403         return 0;
1404 }
1405 #endif
1406
1407 struct file_operations ll_file_operations = {
1408         .read           = ll_file_read,
1409         .write          = ll_file_write,
1410         .ioctl          = ll_file_ioctl,
1411         .open           = ll_file_open,
1412         .release        = ll_file_release,
1413         .mmap           = generic_file_mmap,
1414         .llseek         = ll_file_seek,
1415 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
1416         .sendfile       = generic_file_sendfile,
1417 #endif
1418         .fsync          = ll_fsync,
1419         /* .lock           = ll_file_flock */
1420 };
1421
1422 struct inode_operations ll_file_inode_operations = {
1423         .setattr_raw    = ll_setattr_raw,
1424         .setattr        = ll_setattr,
1425         .truncate       = ll_truncate,
1426 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1427         .getattr_it     = ll_getattr,
1428 #else
1429         .revalidate_it  = ll_inode_revalidate_it,
1430 #endif
1431 };
1432