Whamcloud - gitweb
- changes with names on exports and another fields in llite and mds. lov_exp is
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <linux/lustre_dlm.h>
27 #include <linux/lustre_lite.h>
28 #include <linux/pagemap.h>
29 #include <linux/file.h>
30 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
31 #include <linux/lustre_compat25.h>
32 #endif
33 #include "llite_internal.h"
34 #include <linux/obd_lov.h>
35
36 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
37                 struct file *file)
38 {
39         struct ll_file_data *fd = file->private_data;
40         struct ptlrpc_request *req = NULL;
41         struct obd_client_handle *och = &fd->fd_mds_och;
42         struct obdo *obdo = NULL;
43         int rc;
44         ENTRY;
45
46         /* clear group lock, if present */
47         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
48                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
49                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
50                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
51                                       &fd->fd_cwlockh);
52         }
53
54         obdo = obdo_alloc();
55         if (obdo == NULL)
56                 RETURN(-ENOMEM);
57
58         obdo->o_id = inode->i_ino;
59         obdo->o_valid = OBD_MD_FLID;
60         obdo_from_inode(obdo, inode, (OBD_MD_FLTYPE | OBD_MD_FLMODE |
61                                       OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
62                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
63                                       OBD_MD_FLCTIME));
64         if (0 /* ll_is_inode_dirty(inode) */) {
65                 obdo->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
66                 obdo->o_valid |= OBD_MD_FLFLAGS;
67         }
68         obdo->o_mds = id_group(&ll_i2info(inode)->lli_id);
69         rc = md_close(md_exp, obdo, och, &req);
70         obdo_free(obdo);
71
72         if (rc == EAGAIN) {
73                 /* We are the last writer, so the MDS has instructed us to get
74                  * the file size and any write cookies, then close again. */
75                 //ll_queue_done_writing(inode);
76                 rc = 0;
77         } else if (rc) {
78                 CERROR("inode %lu mdc close failed: rc = %d\n",
79                        inode->i_ino, rc);
80         }
81         if (rc == 0) {
82                 rc = ll_objects_destroy(req, file->f_dentry->d_inode, 1);
83                 if (rc)
84                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
85                                inode->i_ino, rc);
86         }
87
88         mdc_clear_open_replay_data(md_exp, och);
89         ptlrpc_req_finished(req);
90         och->och_fh.cookie = DEAD_HANDLE_MAGIC;
91         file->private_data = NULL;
92         OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof(*fd));
93         RETURN(rc);
94 }
95
96 /* While this returns an error code, fput() the caller does not, so we need
97  * to make every effort to clean up all of our state here.  Also, applications
98  * rarely check close errors and even if an error is returned they will not
99  * re-try the close call.
100  */
101 int ll_file_release(struct inode *inode, struct file *file)
102 {
103         struct ll_file_data *fd;
104         struct ll_sb_info *sbi = ll_i2sbi(inode);
105         int rc;
106
107         ENTRY;
108         CDEBUG(D_VFSTRACE, "VFS Op:inode="DLID4"(%p)\n",
109                OLID4(&ll_i2info(inode)->lli_id), inode);
110
111         /* don't do anything for / */
112         if (inode->i_sb->s_root == file->f_dentry)
113                 RETURN(0);
114
115         lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_RELEASE);
116         fd = (struct ll_file_data *)file->private_data;
117         LASSERT(fd != NULL);
118
119         rc = ll_md_close(sbi->ll_md_exp, inode, file);
120         RETURN(rc);
121 }
122
123 static int ll_intent_file_open(struct file *file, void *lmm,
124                                int lmmsize, struct lookup_intent *itp)
125 {
126         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
127         struct dentry *parent = file->f_dentry->d_parent;
128         const char *name = file->f_dentry->d_name.name;
129         const int len = file->f_dentry->d_name.len;
130         struct lustre_handle lockh;
131         struct mdc_op_data *op_data;
132         int rc;
133
134         if (!parent)
135                 RETURN(-ENOENT);
136
137         OBD_ALLOC(op_data, sizeof(*op_data));
138         if (op_data == NULL)
139                 RETURN(-ENOMEM);
140         ll_prepare_mdc_data(op_data, parent->d_inode, NULL, name, len, O_RDWR);
141
142         rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, itp, LCK_PR, op_data,
143                         &lockh, lmm, lmmsize, ldlm_completion_ast,
144                         ll_mdc_blocking_ast, NULL);
145         OBD_FREE(op_data, sizeof(*op_data));
146         if (rc == 0) {
147                 if (itp->d.lustre.it_lock_mode)
148                         memcpy(&itp->d.lustre.it_lock_handle,
149                                &lockh, sizeof(lockh));
150         } else if (rc < 0) {
151                 CERROR("lock enqueue: err: %d\n", rc);
152         }
153         
154         RETURN(rc);
155 }
156
157 int ll_local_open(struct file *file, struct lookup_intent *it)
158 {
159         struct ptlrpc_request *req = it->d.lustre.it_data;
160         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
161         struct obd_export *md_exp = ll_i2mdexp(file->f_dentry->d_inode);
162         struct ll_file_data *fd;
163         struct mds_body *body;
164         ENTRY;
165
166         body = lustre_msg_buf (req->rq_repmsg, 1, sizeof (*body));
167         LASSERT (body != NULL);                 /* reply already checked out */
168         LASSERT_REPSWABBED (req, 1);            /* and swabbed down */
169
170         LASSERTF(file->private_data == NULL, "file %*s/%*s ino %lu/%u (%o)\n",
171                  file->f_dentry->d_name.len, file->f_dentry->d_name.name,
172                  file->f_dentry->d_parent->d_name.len,
173                  file->f_dentry->d_parent->d_name.name,
174                  file->f_dentry->d_inode->i_ino,
175                  file->f_dentry->d_inode->i_generation,
176                  file->f_dentry->d_inode->i_mode);
177
178
179         OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
180         
181         /* We can't handle this well without reorganizing ll_file_open and
182          * ll_md_close(), so don't even try right now. */
183         LASSERT(fd != NULL);
184
185         memcpy(&fd->fd_mds_och.och_fh, &body->handle, sizeof(body->handle));
186         fd->fd_mds_och.och_magic = OBD_CLIENT_HANDLE_MAGIC;
187         file->private_data = fd;
188         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
189
190         lli->lli_io_epoch = body->io_epoch;
191
192         mdc_set_open_replay_data(md_exp, &fd->fd_mds_och, it->d.lustre.it_data);
193
194         RETURN(0);
195 }
196
197 /* Open a file, and (for the very first open) create objects on the OSTs at
198  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
199  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
200  * lli_open_sem to ensure no other process will create objects, send the
201  * stripe MD to the MDS, or try to destroy the objects if that fails.
202  *
203  * If we already have the stripe MD locally then we don't request it in
204  * mdc_open(), by passing a lmm_size = 0.
205  *
206  * It is up to the application to ensure no other processes open this file
207  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
208  * used.  We might be able to avoid races of that sort by getting lli_open_sem
209  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
210  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
211  */
212 int ll_file_open(struct inode *inode, struct file *file)
213 {
214         struct ll_inode_info *lli = ll_i2info(inode);
215         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
216                                           .it_flags = file->f_flags };
217         struct lov_stripe_md *lsm;
218         struct ptlrpc_request *req;
219         int rc = 0;
220         ENTRY;
221
222         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
223                inode->i_generation, inode, file->f_flags);
224
225         /* don't do anything for / */
226         if (inode->i_sb->s_root == file->f_dentry)
227                 RETURN(0);
228
229         it = file->f_it;
230
231         if (!it || !it->d.lustre.it_disposition) {
232                 it = &oit;
233                 rc = ll_intent_file_open(file, NULL, 0, it);
234                 if (rc)
235                         GOTO(out, rc);
236         }
237
238         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
239         /* mdc_intent_lock() didn't get a request ref if there was an open
240          * error, so don't do cleanup on the request here (bug 3430) */
241         rc = it_open_error(DISP_OPEN_OPEN, it);
242         if (rc)
243                 RETURN(rc);
244
245         rc = ll_local_open(file, it);
246
247         LASSERTF(rc == 0, "rc = %d\n", rc);
248
249         if (!S_ISREG(inode->i_mode))
250                 GOTO(out, rc);
251
252         lsm = lli->lli_smd;
253         if (lsm == NULL) {
254                 if (file->f_flags & O_LOV_DELAY_CREATE ||
255                     !(file->f_mode & FMODE_WRITE)) {
256                         CDEBUG(D_INODE, "object creation was delayed\n");
257                         GOTO(out, rc);
258                 }
259         }
260         file->f_flags &= ~O_LOV_DELAY_CREATE;
261         GOTO(out, rc);
262  out:
263         req = it->d.lustre.it_data;
264         ptlrpc_req_finished(req);
265         if (rc == 0)
266                 ll_open_complete(inode);
267         return rc;
268 }
269
270 /* Fills the obdo with the attributes for the inode defined by lsm */
271 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
272                    struct obdo *oa)
273 {
274         struct ptlrpc_request_set *set;
275         int rc;
276         ENTRY;
277
278         LASSERT(lsm != NULL);
279
280         memset(oa, 0, sizeof *oa);
281         oa->o_id = lsm->lsm_object_id;
282         oa->o_gr = lsm->lsm_object_gr;
283         oa->o_mode = S_IFREG;
284         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
285                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
286                 OBD_MD_FLCTIME | OBD_MD_FLGROUP;
287
288         set = ptlrpc_prep_set();
289         if (set == NULL) {
290                 rc = -ENOMEM;
291         } else {
292                 rc = obd_getattr_async(exp, oa, lsm, set);
293                 if (rc == 0)
294                         rc = ptlrpc_set_wait(set);
295                 ptlrpc_set_destroy(set);
296         }
297         if (rc)
298                 RETURN(rc);
299
300         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
301                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
302         RETURN(0);
303 }
304
305 static inline void ll_remove_suid(struct inode *inode)
306 {
307         unsigned int mode;
308
309         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
310         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
311
312         /* was any of the uid bits set? */
313         mode &= inode->i_mode;
314         if (mode && !capable(CAP_FSETID)) {
315                 inode->i_mode &= ~mode;
316                 // XXX careful here - we cannot change the size
317         }
318 }
319
320 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
321 {
322         struct ll_inode_info *lli = ll_i2info(inode);
323         struct lov_stripe_md *lsm = lli->lli_smd;
324         struct obd_export *exp = ll_i2dtexp(inode);
325         struct {
326                 char name[16];
327                 struct ldlm_lock *lock;
328                 struct lov_stripe_md *lsm;
329         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
330         __u32 stripe, vallen = sizeof(stripe);
331         int rc;
332         ENTRY;
333
334         if (lsm->lsm_stripe_count == 1)
335                 GOTO(check, stripe = 0);
336
337         /* get our offset in the lov */
338         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
339         if (rc != 0) {
340                 CERROR("obd_get_info: rc = %d\n", rc);
341                 RETURN(rc);
342         }
343         LASSERT(stripe < lsm->lsm_stripe_count);
344         EXIT;
345 check:
346         if (lsm->lsm_oinfo[stripe].loi_id != lock->l_resource->lr_name.name[0]||
347             lsm->lsm_oinfo[stripe].loi_gr != lock->l_resource->lr_name.name[2]){
348                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64
349                            " inode=%lu/%u (%p)\n",
350                            lsm->lsm_oinfo[stripe].loi_id,
351                            lsm->lsm_oinfo[stripe].loi_gr,
352                            inode->i_ino, inode->i_generation, inode);
353                 return -ELDLM_NO_LOCK_DATA;
354         }
355
356         return stripe;
357 }
358
359 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
360  * we get a lock cancellation for each stripe, so we have to map the obd's
361  * region back onto the stripes in the file that it held.
362  *
363  * No one can dirty the extent until we've finished our work and they can
364  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
365  * but other kernel actors could have pages locked.
366  *
367  * Called with the DLM lock held. */
368 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
369                               struct ldlm_lock *lock, __u32 stripe)
370 {
371         ldlm_policy_data_t tmpex;
372         unsigned long start, end, count, skip, i, j;
373         struct page *page;
374         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
375         struct lustre_handle lockh;
376         ENTRY;
377
378         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
379         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
380                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
381                inode->i_size);
382
383         /* our locks are page granular thanks to osc_enqueue, we invalidate the
384          * whole page. */
385         LASSERT((tmpex.l_extent.start & ~PAGE_CACHE_MASK) == 0);
386         LASSERT(((tmpex.l_extent.end + 1) & ~PAGE_CACHE_MASK) == 0);
387
388         count = ~0;
389         skip = 0;
390         start = tmpex.l_extent.start >> PAGE_CACHE_SHIFT;
391         end = tmpex.l_extent.end >> PAGE_CACHE_SHIFT;
392         if (lsm->lsm_stripe_count > 1) {
393                 count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
394                 skip = (lsm->lsm_stripe_count - 1) * count;
395                 start += start/count * skip + stripe * count;
396                 if (end != ~0)
397                         end += end/count * skip + stripe * count;
398         }
399         if (end < tmpex.l_extent.end >> PAGE_CACHE_SHIFT)
400                 end = ~0;
401
402         i = inode->i_size ? (inode->i_size - 1) >> PAGE_CACHE_SHIFT : 0;
403         if (i < end)
404                 end = i;
405
406         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
407                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
408                count, skip, end, discard ? " (DISCARDING)" : "");
409         
410         /* walk through the vmas on the inode and tear down mmaped pages that
411          * intersect with the lock.  this stops immediately if there are no
412          * mmap()ed regions of the file.  This is not efficient at all and
413          * should be short lived. We'll associate mmap()ed pages with the lock
414          * and will be able to find them directly */
415         
416         for (i = start; i <= end; i += (j + skip)) {
417                 j = min(count - (i % count), end - i + 1);
418                 LASSERT(inode->i_mapping);
419                 if (ll_teardown_mmaps(inode->i_mapping, i << PAGE_CACHE_SHIFT,
420                                       ((i+j) << PAGE_CACHE_SHIFT) - 1) )
421                         break;
422         }
423
424         /* this is the simplistic implementation of page eviction at
425          * cancelation.  It is careful to get races with other page
426          * lockers handled correctly.  fixes from bug 20 will make it
427          * more efficient by associating locks with pages and with
428          * batching writeback under the lock explicitly. */
429         for (i = start, j = start % count; i <= end;
430              j++, i++, tmpex.l_extent.start += PAGE_CACHE_SIZE) {
431                 if (j == count) {
432                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
433                         i += skip;
434                         j = 0;
435                         if (i > end)
436                                 break;
437                 }
438                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
439                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
440                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
441                          start, i, end);
442
443                 if (!mapping_has_pages(inode->i_mapping)) {
444                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
445                         break;
446                 }
447
448                 cond_resched();
449
450                 page = find_get_page(inode->i_mapping, i);
451                 if (page == NULL)
452                         continue;
453                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
454                                i, tmpex.l_extent.start);
455                 lock_page(page);
456
457                 /* page->mapping to check with racing against teardown */
458                 if (!discard && clear_page_dirty_for_io(page)) {
459                         rc = ll_call_writepage(inode, page);
460                         if (rc != 0)
461                                 CERROR("writepage of page %p failed: %d\n",
462                                        page, rc);
463                         /* either waiting for io to complete or reacquiring
464                          * the lock that the failed writepage released */
465                         lock_page(page);
466                 }
467
468                 tmpex.l_extent.end = tmpex.l_extent.start + PAGE_CACHE_SIZE - 1;
469                 /* check to see if another DLM lock covers this page */
470                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
471                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
472                                       LDLM_FL_TEST_LOCK,
473                                       &lock->l_resource->lr_name, LDLM_EXTENT,
474                                       &tmpex, LCK_PR | LCK_PW, &lockh);
475                 if (rc2 == 0 && page->mapping != NULL) {
476                         // checking again to account for writeback's lock_page()
477                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
478                         ll_ra_accounting(page, inode->i_mapping);
479                         ll_truncate_complete_page(page);
480                 }
481                 unlock_page(page);
482                 page_cache_release(page);
483         }
484         LASSERTF(tmpex.l_extent.start <=
485                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
486                   lock->l_policy_data.l_extent.end + 1),
487                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
488                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
489                  start, i, end);
490         EXIT;
491 }
492
493 static int ll_extent_lock_callback(struct ldlm_lock *lock,
494                                    struct ldlm_lock_desc *new, void *data,
495                                    int flag)
496 {
497         struct lustre_handle lockh = { 0 };
498         int rc;
499         ENTRY;
500
501         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
502                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
503                 LBUG();
504         }
505
506         switch (flag) {
507         case LDLM_CB_BLOCKING:
508                 ldlm_lock2handle(lock, &lockh);
509                 rc = ldlm_cli_cancel(&lockh);
510                 if (rc != ELDLM_OK)
511                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
512                 break;
513         case LDLM_CB_CANCELING: {
514                 struct inode *inode;
515                 struct ll_inode_info *lli;
516                 struct lov_stripe_md *lsm;
517                 __u32 stripe;
518                 __u64 kms;
519
520                 /* This lock wasn't granted, don't try to evict pages */
521                 if (lock->l_req_mode != lock->l_granted_mode)
522                         RETURN(0);
523
524                 inode = ll_inode_from_lock(lock);
525                 if (inode == NULL)
526                         RETURN(0);
527                 lli = ll_i2info(inode);
528                 if (lli == NULL)
529                         goto iput;
530                 if (lli->lli_smd == NULL)
531                         goto iput;
532                 lsm = lli->lli_smd;
533
534                 stripe = ll_lock_to_stripe_offset(inode, lock);
535                 if (stripe < 0)
536                         goto iput;
537                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
538
539                 /* grabbing the i_sem will wait for write() to complete.  ns
540                  * lock hold times should be very short as ast processing
541                  * requires them and has a short timeout.  so, i_sem before ns
542                  * lock.*/
543
544                 down(&inode->i_sem);
545                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
546                 kms = ldlm_extent_shift_kms(lock,
547                                             lsm->lsm_oinfo[stripe].loi_kms);
548                 
549                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
550                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
551                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
552                 lsm->lsm_oinfo[stripe].loi_kms = kms;
553                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
554                 up(&inode->i_sem);
555                 //ll_try_done_writing(inode);
556         iput:
557                 iput(inode);
558                 break;
559         }
560         default:
561                 LBUG();
562         }
563
564         RETURN(0);
565 }
566
567 #if 0
568 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
569 {
570         /* XXX ALLOCATE - 160 bytes */
571         struct inode *inode = ll_inode_from_lock(lock);
572         struct ll_inode_info *lli = ll_i2info(inode);
573         struct lustre_handle lockh = { 0 };
574         struct ost_lvb *lvb;
575         __u32 stripe;
576         ENTRY;
577
578         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
579                      LDLM_FL_BLOCK_CONV)) {
580                 LBUG(); /* not expecting any blocked async locks yet */
581                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
582                            "lock, returning");
583                 ldlm_lock_dump(D_OTHER, lock, 0);
584                 ldlm_reprocess_all(lock->l_resource);
585                 RETURN(0);
586         }
587
588         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
589
590         stripe = ll_lock_to_stripe_offset(inode, lock);
591         if (stripe < 0)
592                 goto iput;
593
594         if (lock->l_lvb_len) {
595                 struct lov_stripe_md *lsm = lli->lli_smd;
596                 __u64 kms;
597                 lvb = lock->l_lvb_data;
598                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
599
600                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
601                 down(&inode->i_sem);
602                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
603                 kms = ldlm_extent_shift_kms(NULL, kms);
604                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
605                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
606                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
607                 lsm->lsm_oinfo[stripe].loi_kms = kms;
608                 up(&inode->i_sem);
609                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
610         }
611
612 iput:
613         iput(inode);
614         wake_up(&lock->l_waitq);
615
616         ldlm_lock2handle(lock, &lockh);
617         ldlm_lock_decref(&lockh, LCK_PR);
618         RETURN(0);
619 }
620 #endif
621
622 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
623 {
624         struct ptlrpc_request *req = reqp;
625         struct inode *inode = ll_inode_from_lock(lock);
626         struct ll_inode_info *lli;
627         struct ost_lvb *lvb;
628         struct lov_stripe_md *lsm;
629         int rc, size = sizeof(*lvb), stripe;
630         ENTRY;
631
632         if (inode == NULL)
633                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
634         lli = ll_i2info(inode);
635         if (lli == NULL)
636                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
637
638         lsm = lli->lli_smd;
639         if (lsm == NULL)
640                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
641
642         /* First, find out which stripe index this lock corresponds to. */
643         stripe = ll_lock_to_stripe_offset(inode, lock);
644         if (stripe < 0)
645                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
646
647         rc = lustre_pack_reply(req, 1, &size, NULL);
648         if (rc) {
649                 CERROR("lustre_pack_reply: %d\n", rc);
650                 GOTO(iput, rc);
651         }
652
653         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
654         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
655
656         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64,
657                    inode->i_size, stripe, lvb->lvb_size);
658         GOTO(iput, 0);
659  iput:
660         iput(inode);
661
662  out:
663         /* These errors are normal races, so we don't want to fill the console
664          * with messages by calling ptlrpc_error() */
665         if (rc == -ELDLM_NO_LOCK_DATA)
666                 lustre_pack_reply(req, 0, NULL, NULL);
667
668         req->rq_status = rc;
669         return rc;
670 }
671
672 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
673 __u64 lov_merge_blocks(struct lov_stripe_md *lsm);
674 __u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
675
676 /* NB: lov_merge_size will prefer locally cached writes if they extend the
677  * file (because it prefers KMS over RSS when larger) */
678 int ll_glimpse_size(struct inode *inode)
679 {
680         struct ll_inode_info *lli = ll_i2info(inode);
681         struct ll_sb_info *sbi = ll_i2sbi(inode);
682         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
683         struct lustre_handle lockh = { 0 };
684         int rc, flags = LDLM_FL_HAS_INTENT;
685         ENTRY;
686
687         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
688
689         rc = obd_enqueue(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT, &policy,
690                          LCK_PR, &flags, ll_extent_lock_callback,
691                          ldlm_completion_ast, ll_glimpse_callback, inode,
692                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, &lockh);
693         if (rc == -ENOENT)
694                 RETURN(rc);
695
696         if (rc != 0) {
697                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
698                 RETURN(rc > 0 ? -EIO : rc);
699         }
700
701         inode->i_size = lov_merge_size(lli->lli_smd, 0);
702         inode->i_blocks = lov_merge_blocks(lli->lli_smd);
703         //inode->i_mtime = lov_merge_mtime(lli->lli_smd, inode->i_mtime);
704
705         CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64", blocks: "LPU64"\n",
706                (__u64)inode->i_size, (__u64)inode->i_blocks);
707         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
708         RETURN(rc);
709 }
710
711 void ll_stime_record(struct ll_sb_info *sbi, struct timeval *start,
712                     struct obd_service_time *stime)
713 {
714         struct timeval stop;
715         do_gettimeofday(&stop);
716                                                                                                                                                                                                      
717         spin_lock(&sbi->ll_lock);
718         lprocfs_stime_record(stime, &stop, start);
719         spin_unlock(&sbi->ll_lock);
720 }
721
722 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
723                    struct lov_stripe_md *lsm, int mode,
724                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
725                    int ast_flags, struct obd_service_time *stime)
726 {
727         struct ll_sb_info *sbi = ll_i2sbi(inode);
728         struct timeval start;
729         int rc;
730         ENTRY;
731
732         LASSERT(lockh->cookie == 0);
733
734         /* XXX phil: can we do this?  won't it screw the file size up? */
735         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
736             (sbi->ll_flags & LL_SBI_NOLCK))
737                 RETURN(0);
738
739         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
740                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
741
742         do_gettimeofday(&start);
743         rc = obd_enqueue(sbi->ll_dt_exp, lsm, LDLM_EXTENT, policy, mode,
744                          &ast_flags, ll_extent_lock_callback,
745                          ldlm_completion_ast, ll_glimpse_callback, inode,
746                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
747         if (rc > 0)
748                 rc = -EIO;
749         
750         ll_stime_record(sbi, &start, stime);
751
752         if (policy->l_extent.start == 0 &&
753             policy->l_extent.end == OBD_OBJECT_EOF) {
754                 /* vmtruncate()->ll_truncate() first sets the i_size and then
755                  * the kms under both a DLM lock and the i_sem.  If we don't
756                  * get the i_sem here we can match the DLM lock and reset
757                  * i_size from the kms before the truncating path has updated
758                  * the kms.  generic_file_write can then trust the stale i_size
759                  * when doing appending writes and effectively cancel the
760                  * result of the truncate.  Getting the i_sem after the enqueue
761                  * maintains the DLM -> i_sem acquiry order. */
762                 down(&inode->i_sem);
763                 inode->i_size = lov_merge_size(lsm, 1);
764                 up(&inode->i_sem);
765         }
766         //inode->i_mtime = lov_merge_mtime(lsm, inode->i_mtime);
767
768         RETURN(rc);
769 }
770
771 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
772                      struct lov_stripe_md *lsm, int mode,
773                      struct lustre_handle *lockh)
774 {
775         struct ll_sb_info *sbi = ll_i2sbi(inode);
776         int rc;
777         ENTRY;
778
779         /* XXX phil: can we do this?  won't it screw the file size up? */
780         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
781             (sbi->ll_flags & LL_SBI_NOLCK))
782                 RETURN(0);
783
784         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
785
786         RETURN(rc);
787 }
788
789 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
790                             loff_t *ppos)
791 {
792         struct inode *inode = file->f_dentry->d_inode;
793         struct ll_inode_info *lli = ll_i2info(inode);
794         struct lov_stripe_md *lsm = lli->lli_smd;
795         struct ll_lock_tree tree;
796         struct ll_lock_tree_node *node;
797         int rc;
798         ssize_t retval;
799         __u64 kms;
800         ENTRY;
801         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
802                inode->i_ino, inode->i_generation, inode, count, *ppos);
803
804         /* "If nbyte is 0, read() will return 0 and have no other results."
805          *                      -- Single Unix Spec */
806         if (count == 0)
807                 RETURN(0);
808
809         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
810                             count);
811
812         if (!lsm)
813                 RETURN(0);
814
815         node = ll_node_from_inode(inode, *ppos, *ppos  + count - 1,
816                                   LCK_PR);
817
818         tree.lt_fd = file->private_data;
819
820         rc = ll_tree_lock(&tree, node, inode, buf, count,
821                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
822         if (rc != 0)
823                 RETURN(rc);
824
825         kms = lov_merge_size(lsm, 1);
826         if (*ppos + count - 1 > kms) {
827                 /* A glimpse is necessary to determine whether we return a short
828                  * read or some zeroes at the end of the buffer */
829                 retval = ll_glimpse_size(inode);
830                 if (retval)
831                         goto out;
832         } else {
833                 inode->i_size = kms;
834         }
835
836         CDEBUG(D_INFO, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
837                inode->i_ino, count, *ppos, inode->i_size);
838
839         /* turn off the kernel's read-ahead */
840 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
841         file->f_ramax = 0;
842 #else
843         file->f_ra.ra_pages = 0;
844 #endif
845         retval = generic_file_read(file, buf, count, ppos);
846
847  out:
848         ll_tree_unlock(&tree, inode);
849         RETURN(retval);
850 }
851
852 /*
853  * Write to a file (through the page cache).
854  */
855 static ssize_t ll_file_write(struct file *file, const char *buf,
856                              size_t count, loff_t *ppos)
857 {
858         struct inode *inode = file->f_dentry->d_inode;
859         loff_t maxbytes = ll_file_maxbytes(inode);
860         struct ll_lock_tree tree;
861         struct ll_lock_tree_node *node;
862         ssize_t retval;
863         int rc;
864
865         ENTRY;
866         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
867                inode->i_ino, inode->i_generation, inode, count, *ppos);
868
869         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
870
871         /* POSIX, but surprised the VFS doesn't check this already */
872         if (count == 0)
873                 RETURN(0);
874
875         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
876          * called on the file, don't fail the below assertion (bug 2388). */
877         if (file->f_flags & O_LOV_DELAY_CREATE &&
878             ll_i2info(inode)->lli_smd == NULL)
879                 RETURN(-EBADF);
880
881         LASSERT(ll_i2info(inode)->lli_smd != NULL);
882         
883         if (file->f_flags & O_APPEND)
884                 node = ll_node_from_inode(inode, 0, OBD_OBJECT_EOF, LCK_PW);
885         else
886                 node = ll_node_from_inode(inode, *ppos, *ppos  + count - 1,
887                                           LCK_PW);
888
889         if (IS_ERR(node))
890                 RETURN(PTR_ERR(node));
891
892         tree.lt_fd = file->private_data;
893
894         rc = ll_tree_lock(&tree, node, inode, buf, count,
895                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
896         if (rc != 0)
897                 RETURN(rc);
898
899         /* this is ok, g_f_w will overwrite this under i_sem if it races
900          * with a local truncate, it just makes our maxbyte checking easier */
901         if (file->f_flags & O_APPEND)
902                 *ppos = inode->i_size;
903
904         if (*ppos >= maxbytes) {
905                 if (count || *ppos > maxbytes) {
906                         send_sig(SIGXFSZ, current, 0);
907                         GOTO(out, retval = -EFBIG);
908                 }
909         }
910         if (*ppos + count > maxbytes)
911                 count = maxbytes - *ppos;
912
913         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
914                inode->i_ino, count, *ppos);
915
916         /* generic_file_write handles O_APPEND after getting i_sem */
917         retval = generic_file_write(file, buf, count, ppos);
918         EXIT;
919 out:
920         ll_tree_unlock(&tree, inode);
921         /* serialize with mmap/munmap/mremap */
922         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WRITE_BYTES,
923                             retval > 0 ? retval : 0);
924         return retval;
925 }
926
927 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
928                                unsigned long arg)
929 {
930         struct ll_inode_info *lli = ll_i2info(inode);
931         struct obd_export *exp = ll_i2dtexp(inode);
932         struct ll_recreate_obj ucreatp;
933         struct obd_trans_info oti = { 0 };
934         struct obdo *oa = NULL;
935         int lsm_size;
936         int rc = 0;
937         struct lov_stripe_md *lsm, *lsm2;
938         ENTRY;
939
940         if (!capable (CAP_SYS_ADMIN))
941                 RETURN(-EPERM);
942
943         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
944                             sizeof(struct ll_recreate_obj));
945         if (rc) {
946                 RETURN(-EFAULT);
947         }
948         oa = obdo_alloc();
949         if (oa == NULL) 
950                 RETURN(-ENOMEM);
951
952         down(&lli->lli_open_sem);
953         lsm = lli->lli_smd;
954         if (lsm == NULL)
955                 GOTO(out, rc = -ENOENT);
956         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
957                    (lsm->lsm_stripe_count));
958
959         OBD_ALLOC(lsm2, lsm_size);
960         if (lsm2 == NULL)
961                 GOTO(out, rc = -ENOMEM);
962
963         oa->o_id = ucreatp.lrc_id;
964         oa->o_nlink = ucreatp.lrc_ost_idx;
965         oa->o_gr = ucreatp.lrc_group;
966         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
967         oa->o_flags |= OBD_FL_RECREATE_OBJS;
968         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
969                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
970
971         oti.oti_objid = NULL;
972         memcpy(lsm2, lsm, lsm_size);
973         rc = obd_create(exp, oa, &lsm2, &oti);
974
975         OBD_FREE(lsm2, lsm_size);
976         GOTO(out, rc);
977 out:
978         up(&lli->lli_open_sem);
979         obdo_free(oa);
980         return rc;
981 }
982
983 static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
984                                     int flags, struct lov_user_md *lum,
985                                     int lum_size)
986 {
987         struct ll_inode_info *lli = ll_i2info(inode);
988         struct file *f;
989         struct obd_export *exp = ll_i2dtexp(inode);
990         struct lov_stripe_md *lsm;
991         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
992         struct ptlrpc_request *req = NULL;
993         int rc = 0;
994         struct lustre_md md;
995         ENTRY;
996
997         down(&lli->lli_open_sem);
998         lsm = lli->lli_smd;
999         if (lsm) {
1000                 up(&lli->lli_open_sem);
1001                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1002                        inode->i_ino);
1003                 RETURN(-EEXIST);
1004         }
1005
1006         f = get_empty_filp();
1007         if (!f)
1008                 GOTO(out, -ENOMEM);
1009
1010         f->f_dentry = file->f_dentry;
1011         f->f_vfsmnt = file->f_vfsmnt;
1012
1013         rc = ll_intent_file_open(f, lum, lum_size, &oit);
1014         if (rc)
1015                 GOTO(out, rc);
1016         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1017                 GOTO(out, -ENOENT);
1018         req = oit.d.lustre.it_data;
1019         rc = oit.d.lustre.it_status;
1020
1021         if (rc < 0)
1022                 GOTO(out, rc);
1023
1024         rc = mdc_req2lustre_md(ll_i2mdexp(inode), req, 1, exp, &md);
1025         if (rc)
1026                 GOTO(out, rc);
1027         ll_update_inode(f->f_dentry->d_inode, &md);
1028
1029         rc = ll_local_open(f, &oit);
1030         if (rc)
1031                 GOTO(out, rc);
1032         ll_intent_release(&oit);
1033
1034         rc = ll_file_release(f->f_dentry->d_inode, f);
1035         EXIT;
1036  out:
1037         if (f)
1038                 put_filp(f);
1039         up(&lli->lli_open_sem);
1040         if (req != NULL)
1041                 ptlrpc_req_finished(req);
1042         return rc;
1043 }
1044
1045 static int ll_lov_setea(struct inode *inode, struct file *file,
1046                             unsigned long arg)
1047 {
1048         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1049         struct lov_user_md  *lump;
1050         int lum_size = sizeof(struct lov_user_md) +
1051                        sizeof(struct lov_user_ost_data);
1052         int rc;
1053         ENTRY;
1054
1055         if (!capable (CAP_SYS_ADMIN))
1056                 RETURN(-EPERM);
1057
1058         OBD_ALLOC(lump, lum_size);
1059         if (lump == NULL) {
1060                 RETURN(-ENOMEM);
1061         }
1062         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1063         if (rc) {
1064                 OBD_FREE(lump, lum_size);
1065                 RETURN(-EFAULT);
1066         }
1067
1068         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1069
1070         OBD_FREE(lump, lum_size);
1071         RETURN(rc);
1072 }
1073
1074 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1075                             unsigned long arg)
1076 {
1077         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1078         int rc;
1079         int flags = FMODE_WRITE;
1080         ENTRY;
1081
1082         /* Bug 1152: copy properly when this is no longer true */
1083         LASSERT(sizeof(lum) == sizeof(*lump));
1084         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1085         rc = copy_from_user(&lum, lump, sizeof(lum));
1086         if (rc)
1087                 RETURN(-EFAULT);
1088
1089         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1090         if (rc == 0) {
1091                  put_user(0, &lump->lmm_stripe_count);
1092                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1093                                     0, ll_i2info(inode)->lli_smd, lump);
1094         }
1095         RETURN(rc);
1096 }
1097
1098 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1099 {
1100         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1101
1102         if (!lsm)
1103                 RETURN(-ENODATA);
1104
1105         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1106                             (void *)arg);
1107 }
1108
1109 static int ll_get_grouplock(struct inode *inode, struct file *file,
1110                          unsigned long arg)
1111 {
1112         struct ll_file_data *fd = file->private_data;
1113         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1114                                                     .end = OBD_OBJECT_EOF}};
1115         struct lustre_handle lockh = { 0 };
1116         struct ll_inode_info *lli = ll_i2info(inode);
1117         struct lov_stripe_md *lsm = lli->lli_smd;
1118         int flags = 0, rc;
1119         ENTRY;
1120
1121         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1122                 RETURN(-EINVAL);
1123         }
1124
1125         policy.l_extent.gid = arg;
1126         if (file->f_flags & O_NONBLOCK)
1127                 flags = LDLM_FL_BLOCK_NOWAIT;
1128
1129         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags,
1130                             &ll_i2sbi(inode)->ll_grouplock_stime);
1131         if (rc != 0)
1132                 RETURN(rc);
1133
1134         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1135         fd->fd_gid = arg;
1136         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1137
1138         RETURN(0);
1139 }
1140
1141 static int ll_put_grouplock(struct inode *inode, struct file *file,
1142                          unsigned long arg)
1143 {
1144         struct ll_file_data *fd = file->private_data;
1145         struct ll_inode_info *lli = ll_i2info(inode);
1146         struct lov_stripe_md *lsm = lli->lli_smd;
1147         int rc;
1148         ENTRY;
1149
1150         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1151                 /* Ugh, it's already unlocked. */
1152                 RETURN(-EINVAL);
1153         }
1154
1155         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1156                 RETURN(-EINVAL);
1157
1158         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1159
1160         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1161         if (rc)
1162                 RETURN(rc);
1163
1164         fd->fd_gid = 0;
1165         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1166
1167         RETURN(0);
1168 }
1169
1170 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
1171                   unsigned long arg)
1172 {
1173         struct ll_file_data *fd = file->private_data;
1174         struct ll_sb_info *sbi = ll_i2sbi(inode);
1175         int flags;
1176         ENTRY;
1177
1178         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1179                inode->i_generation, inode, cmd);
1180
1181         if (_IOC_TYPE(cmd) == 'T') /* tty ioctls */
1182                 RETURN(-ENOTTY);
1183
1184         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_IOCTL);
1185         switch(cmd) {
1186         case LL_IOC_GETFLAGS:
1187                 /* Get the current value of the file flags */
1188                 return put_user(fd->fd_flags, (int *)arg);
1189         case LL_IOC_SETFLAGS:
1190         case LL_IOC_CLRFLAGS:
1191                 /* Set or clear specific file flags */
1192                 /* XXX This probably needs checks to ensure the flags are
1193                  *     not abused, and to handle any flag side effects.
1194                  */
1195                 if (get_user(flags, (int *) arg))
1196                         RETURN(-EFAULT);
1197
1198                 if (cmd == LL_IOC_SETFLAGS)
1199                         fd->fd_flags |= flags;
1200                 else
1201                         fd->fd_flags &= ~flags;
1202                 RETURN(0);
1203         case LL_IOC_LOV_SETSTRIPE:
1204                 RETURN(ll_lov_setstripe(inode, file, arg));
1205         case LL_IOC_LOV_SETEA:
1206                 RETURN(ll_lov_setea(inode, file, arg));
1207         case IOC_MDC_SHOWFID: {
1208                 struct lustre_id *idp = (struct lustre_id *)arg;
1209                 struct lustre_id id;
1210                 char *filename;
1211                 int rc;
1212
1213                 filename = getname((const char *)arg);
1214                 if (IS_ERR(filename))
1215                         RETURN(PTR_ERR(filename));
1216
1217                 ll_inode2id(&id, inode);
1218
1219                 rc = ll_get_fid(sbi->ll_md_exp, &id, filename, &id);
1220                 if (rc < 0)
1221                         GOTO(out_filename, rc);
1222
1223                 rc = copy_to_user(idp, &id, sizeof(*idp));
1224                 if (rc)
1225                         GOTO(out_filename, rc = -EFAULT);
1226
1227                 EXIT;
1228         out_filename:
1229                 putname(filename);
1230                 return rc;
1231         }
1232         case LL_IOC_LOV_GETSTRIPE:
1233                 RETURN(ll_lov_getstripe(inode, arg));
1234         case LL_IOC_RECREATE_OBJ:
1235                 RETURN(ll_lov_recreate_obj(inode, file, arg));
1236         case EXT3_IOC_GETFLAGS:
1237         case EXT3_IOC_SETFLAGS:
1238                 RETURN( ll_iocontrol(inode, file, cmd, arg) );
1239         case LL_IOC_GROUP_LOCK:
1240                 RETURN(ll_get_grouplock(inode, file, arg));
1241         case LL_IOC_GROUP_UNLOCK:
1242                 RETURN(ll_put_grouplock(inode, file, arg));
1243         case EXT3_IOC_GETVERSION_OLD:
1244         case EXT3_IOC_GETVERSION:
1245                 return put_user(inode->i_generation, (int *) arg);
1246         /* We need to special case any other ioctls we want to handle,
1247          * to send them to the MDS/OST as appropriate and to properly
1248          * network encode the arg field.
1249         case EXT2_IOC_GETVERSION_OLD:
1250         case EXT2_IOC_GETVERSION_NEW:
1251         case EXT2_IOC_SETVERSION_OLD:
1252         case EXT2_IOC_SETVERSION_NEW:
1253         case EXT3_IOC_SETVERSION_OLD:
1254         case EXT3_IOC_SETVERSION:
1255         */
1256         default:
1257                 RETURN( obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
1258                                       (void *)arg) );
1259         }
1260 }
1261
1262 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
1263 {
1264         struct inode *inode = file->f_dentry->d_inode;
1265         struct ll_file_data *fd = file->private_data;
1266         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1267         struct lustre_handle lockh = {0};
1268         loff_t retval;
1269         ENTRY;
1270         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),to=%llu\n", inode->i_ino,
1271                inode->i_generation, inode,
1272                offset + ((origin==2) ? inode->i_size : file->f_pos));
1273
1274         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
1275         if (origin == 2) { /* SEEK_END */
1276                 ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF }};
1277                 int nonblock = 0, rc;
1278
1279                 if (file->f_flags & O_NONBLOCK)
1280                         nonblock = LDLM_FL_BLOCK_NOWAIT;
1281
1282                 rc = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh,
1283                                     nonblock, &ll_i2sbi(inode)->ll_seek_stime);
1284                 if (rc != 0)
1285                         RETURN(rc);
1286
1287                 offset += inode->i_size;
1288         } else if (origin == 1) { /* SEEK_CUR */
1289                 offset += file->f_pos;
1290         }
1291
1292         retval = -EINVAL;
1293         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
1294                 if (offset != file->f_pos) {
1295                         file->f_pos = offset;
1296 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1297                         file->f_reada = 0;
1298                         file->f_version = ++event;
1299 #endif
1300                 }
1301                 retval = offset;
1302         }
1303
1304         if (origin == 2)
1305                 ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
1306         RETURN(retval);
1307 }
1308
1309 int ll_fsync(struct file *file, struct dentry *dentry, int data)
1310 {
1311         struct inode *inode = dentry->d_inode;
1312         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1313         struct lustre_id id;
1314         struct ptlrpc_request *req;
1315         int rc, err;
1316         ENTRY;
1317         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
1318                inode->i_generation, inode);
1319
1320         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_FSYNC);
1321
1322         /* fsync's caller has already called _fdata{sync,write}, we want
1323          * that IO to finish before calling the osc and mdc sync methods */
1324         rc = filemap_fdatawait(inode->i_mapping);
1325
1326         ll_inode2id(&id, inode);
1327         err = md_sync(ll_i2sbi(inode)->ll_md_exp, &id, &req);
1328         if (!rc)
1329                 rc = err;
1330         if (!err)
1331                 ptlrpc_req_finished(req);
1332
1333         if (data && lsm) {
1334                 struct obdo *oa = obdo_alloc();
1335
1336                 if (!oa)
1337                         RETURN(rc ? rc : -ENOMEM);
1338
1339                 oa->o_id = lsm->lsm_object_id;
1340                 oa->o_gr = lsm->lsm_object_gr;
1341                 oa->o_valid = OBD_MD_FLID;
1342                 obdo_from_inode(oa, inode, (OBD_MD_FLTYPE | OBD_MD_FLATIME |
1343                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME |
1344                                             OBD_MD_FLGROUP));
1345
1346                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
1347                                0, OBD_OBJECT_EOF);
1348                 if (!rc)
1349                         rc = err;
1350                 obdo_free(oa);
1351         }
1352
1353         RETURN(rc);
1354 }
1355
1356 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
1357 {
1358         struct inode *inode = file->f_dentry->d_inode;
1359         struct ll_inode_info *li = ll_i2info(inode);
1360         struct ll_sb_info *sbi = ll_i2sbi(inode);
1361         struct obd_device *obddev;
1362         struct ldlm_res_id res_id =
1363                 { .name = {id_fid(&li->lli_id), id_group(&li->lli_id), LDLM_FLOCK} };
1364         struct lustre_handle lockh = {0};
1365         ldlm_policy_data_t flock;
1366         ldlm_mode_t mode = 0;
1367         int flags = 0;
1368         int rc;
1369         ENTRY;
1370
1371         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
1372                inode->i_ino, file_lock);
1373
1374         flock.l_flock.pid = file_lock->fl_pid;
1375         flock.l_flock.start = file_lock->fl_start;
1376         flock.l_flock.end = file_lock->fl_end;
1377
1378         switch (file_lock->fl_type) {
1379         case F_RDLCK:
1380                 mode = LCK_PR;
1381                 break;
1382         case F_UNLCK:
1383                 /* An unlock request may or may not have any relation to
1384                  * existing locks so we may not be able to pass a lock handle
1385                  * via a normal ldlm_lock_cancel() request. The request may even
1386                  * unlock a byte range in the middle of an existing lock. In
1387                  * order to process an unlock request we need all of the same
1388                  * information that is given with a normal read or write record
1389                  * lock request. To avoid creating another ldlm unlock (cancel)
1390                  * message we'll treat a LCK_NL flock request as an unlock. */
1391                 mode = LCK_NL;
1392                 break;
1393         case F_WRLCK:
1394                 mode = LCK_PW;
1395                 break;
1396         default:
1397                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
1398                 LBUG();
1399         }
1400
1401         switch (cmd) {
1402         case F_SETLKW:
1403 #ifdef F_SETLKW64
1404         case F_SETLKW64:
1405 #endif
1406                 flags = 0;
1407                 break;
1408         case F_SETLK:
1409 #ifdef F_SETLK64
1410         case F_SETLK64:
1411 #endif
1412                 flags = LDLM_FL_BLOCK_NOWAIT;
1413                 break;
1414         case F_GETLK:
1415 #ifdef F_GETLK64
1416         case F_GETLK64:
1417 #endif
1418                 flags = LDLM_FL_TEST_LOCK;
1419                 /* Save the old mode so that if the mode in the lock changes we
1420                  * can decrement the appropriate reader or writer refcount. */
1421                 file_lock->fl_type = mode;
1422                 break;
1423         default:
1424                 CERROR("unknown fcntl lock command: %d\n", cmd);
1425                 LBUG();
1426         }
1427
1428         CDEBUG(D_DLMTRACE, "inode=%lu, pid="LPU64", flags=%#x, mode=%u, "
1429                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
1430                flags, mode, flock.l_flock.start, flock.l_flock.end);
1431
1432         obddev = md_get_real_obd(sbi->ll_md_exp, NULL, 0);
1433         rc = ldlm_cli_enqueue(obddev->obd_self_export, NULL,
1434                               obddev->obd_namespace,
1435                               res_id, LDLM_FLOCK, &flock, mode, &flags,
1436                               NULL, ldlm_flock_completion_ast, NULL, file_lock,
1437                               NULL, 0, NULL, &lockh);
1438         RETURN(rc);
1439 }
1440
1441 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
1442 {
1443         struct lookup_intent oit = { .it_op = IT_GETATTR };
1444         struct inode *inode = dentry->d_inode;
1445         struct ptlrpc_request *req = NULL;
1446         struct ll_inode_info *lli;
1447         struct lov_stripe_md *lsm;
1448         struct ll_sb_info *sbi;
1449         struct lustre_id id;
1450         int rc;
1451         
1452         ENTRY;
1453
1454         if (!inode) {
1455                 CERROR("REPORT THIS LINE TO PETER\n");
1456                 RETURN(0);
1457         }
1458         
1459         sbi = ll_i2sbi(inode);
1460         
1461         ll_inode2id(&id, inode);
1462         lli = ll_i2info(inode);
1463         LASSERT(id_fid(&id) != 0);
1464
1465         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), name=%s, intent=%s\n",
1466                inode->i_ino, inode->i_generation, inode, dentry->d_name.name,
1467                LL_IT2STR(it));
1468
1469 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
1470         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE);
1471 #endif
1472
1473         rc = md_intent_lock(sbi->ll_md_exp, &id, NULL, 0, NULL, 0, &id,
1474                             &oit, 0, &req, ll_mdc_blocking_ast);
1475         if (rc < 0)
1476                 GOTO(out, rc);
1477
1478         rc = revalidate_it_finish(req, 1, &oit, dentry);
1479         if (rc) {
1480                 ll_intent_release(&oit);
1481                 GOTO(out, rc);
1482         }
1483
1484         if (dentry->d_iname[0] != '/') { /* Do not rehash root of the tree
1485                                             dentries */
1486                 spin_lock(&dcache_lock);
1487                 hlist_del_init(&dentry->d_hash);
1488                 __d_rehash(dentry, 0);
1489                 spin_unlock(&dcache_lock);
1490         }
1491         ll_lookup_finish_locks(&oit, dentry);
1492         dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
1493
1494
1495         lsm = lli->lli_smd;
1496         if (lsm == NULL) /* object not yet allocated, don't validate size */
1497                 GOTO(out, rc = 0);
1498
1499         /*
1500          * ll_glimpse_size() will prefer locally cached writes if they extend
1501          * the file.
1502          */
1503         rc = ll_glimpse_size(inode);
1504         EXIT;
1505 out:
1506         if (req)
1507                 ptlrpc_req_finished(req);
1508         return rc;
1509 }
1510
1511 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1512 int ll_getattr(struct vfsmount *mnt, struct dentry *de,
1513                struct lookup_intent *it, struct kstat *stat)
1514 {
1515         int res = 0;
1516         struct inode *inode = de->d_inode;
1517
1518         res = ll_inode_revalidate_it(de, it);
1519         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
1520
1521         if (res)
1522                 return res;
1523
1524         stat->ino = inode->i_ino;
1525         stat->mode = inode->i_mode;
1526         stat->nlink = inode->i_nlink;
1527         stat->uid = inode->i_uid;
1528         stat->gid = inode->i_gid;
1529         stat->atime = inode->i_atime;
1530         stat->mtime = inode->i_mtime;
1531         stat->ctime = inode->i_ctime;
1532         stat->size = inode->i_size;
1533         stat->blksize = inode->i_blksize;
1534         stat->blocks = inode->i_blocks;
1535         stat->rdev = kdev_t_to_nr(inode->i_rdev);
1536         stat->dev = id_group(&ll_i2info(inode)->lli_id);
1537         return 0;
1538 }
1539 #endif
1540
1541 struct file_operations ll_file_operations = {
1542         .read           = ll_file_read,
1543         .write          = ll_file_write,
1544         .ioctl          = ll_file_ioctl,
1545         .open           = ll_file_open,
1546         .release        = ll_file_release,
1547         .mmap           = ll_file_mmap,
1548         .llseek         = ll_file_seek,
1549 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
1550         .sendfile       = generic_file_sendfile,
1551 #endif
1552         .fsync          = ll_fsync,
1553         .lock           = ll_file_flock
1554 };
1555
1556 struct inode_operations ll_file_inode_operations = {
1557         .setattr_raw    = ll_setattr_raw,
1558         .setattr        = ll_setattr,
1559         .truncate       = ll_truncate,
1560 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1561         .getattr_it     = ll_getattr,
1562 #else
1563         .revalidate_it  = ll_inode_revalidate_it,
1564 #endif
1565 };
1566