Whamcloud - gitweb
1e26110ad32a810587032ef903a52c6e991acec8
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26
27 #include <linux/lustre_dlm.h>
28 #include <linux/lustre_lite.h>
29 #include <linux/obd_lov.h>      /* for lov_mds_md_size() in lov_setstripe() */
30 #include <linux/random.h>
31
32 int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc);
33 extern int ll_setattr(struct dentry *de, struct iattr *attr);
34
35 static int ll_mdc_close(struct lustre_handle *mdc_conn, struct inode *inode,
36                         struct file *file)
37 {
38         struct ll_file_data *fd = file->private_data;
39         struct ptlrpc_request *req = NULL;
40         unsigned long flags;
41         struct obd_import *imp;
42         int rc;
43         ENTRY;
44
45         /* Complete the open request and remove it from replay list */
46         rc = mdc_close(&ll_i2sbi(inode)->ll_mdc_conn, inode->i_ino,
47                        inode->i_mode, &fd->fd_mdshandle, &req);
48         if (rc)
49                 CERROR("inode %lu close failed: rc = %d\n", inode->i_ino, rc);
50
51         imp = fd->fd_req->rq_import;
52         LASSERT(imp != NULL);
53         spin_lock_irqsave(&imp->imp_lock, flags);
54
55         DEBUG_REQ(D_HA, fd->fd_req, "matched open req %p", fd->fd_req);
56
57         /* We held on to the request for replay until we saw a close for that
58          * file.  Now that we've closed it, it gets replayed on the basis of
59          * its transno only. */
60         fd->fd_req->rq_flags &= ~PTL_RPC_FL_REPLAY;
61
62         if (fd->fd_req->rq_transno) {
63                 /* This open created a file, so it needs replay as a
64                  * normal transaction now.  Our reference to it now
65                  * effectively owned by the imp_replay_list, and it'll
66                  * be committed just like other transno-having
67                  * requests from here on out. */
68
69                 /* We now retain this close request, so that it is
70                  * replayed if the open is replayed.  We duplicate the
71                  * transno, so that we get freed at the right time,
72                  * and rely on the difference in xid to keep
73                  * everything ordered correctly.
74                  *
75                  * But! If this close was already given a transno
76                  * (because it caused real unlinking of an
77                  * open-unlinked file, f.e.), then we'll be ordered on
78                  * the basis of that and we don't need to do anything
79                  * magical here. */
80                 if (!req->rq_transno) {
81                         req->rq_transno = fd->fd_req->rq_transno;
82                         ptlrpc_retain_replayable_request(req, imp);
83                 }
84                 spin_unlock_irqrestore(&imp->imp_lock, flags);
85
86                 /* Should we free_committed now? we always free before
87                  * replay, so it's probably a wash.  We could check to
88                  * see if the fd_req should already be committed, in
89                  * which case we can avoid the whole retain_replayable
90                  * dance. */
91         } else {
92                 /* No transno means that we can just drop our ref. */
93                 spin_unlock_irqrestore(&imp->imp_lock, flags);
94                 ptlrpc_req_finished(fd->fd_req);
95         }
96
97         /* Do this after the fd_req->rq_transno check, because we don't want
98          * to bounce off zero references. */
99         ptlrpc_req_finished(req);
100         fd->fd_mdshandle.cookie = DEAD_HANDLE_MAGIC;
101         file->private_data = NULL;
102         kmem_cache_free(ll_file_data_slab, fd);
103
104         RETURN(-abs(rc));
105 }
106
107 /* While this returns an error code, fput() the caller does not, so we need
108  * to make every effort to clean up all of our state here.  Also, applications
109  * rarely check close errors and even if an error is returned they will not
110  * re-try the close call.
111  */
112 static int ll_file_release(struct inode *inode, struct file *file)
113 {
114         struct ll_file_data *fd;
115         struct obdo oa;
116         struct ll_sb_info *sbi = ll_i2sbi(inode);
117         struct ll_inode_info *lli = ll_i2info(inode);
118         struct lov_stripe_md *lsm = lli->lli_smd;
119         int rc = 0, rc2;
120
121         ENTRY;
122
123         fd = (struct ll_file_data *)file->private_data;
124         if (!fd) /* no process opened the file after an mcreate */
125                 RETURN(rc = 0);
126
127         if (lsm != NULL) {
128                 memset(&oa, 0, sizeof(oa));
129                 oa.o_id = lsm->lsm_object_id;
130                 oa.o_mode = S_IFREG;
131                 oa.o_valid = OBD_MD_FLTYPE | OBD_MD_FLID;
132                 obd_handle2oa(&oa, &fd->fd_osthandle);
133                 rc = obd_close(&sbi->ll_osc_conn, &oa, lsm, NULL);
134                 if (rc)
135                         CERROR("inode %lu object close failed: rc = %d\n",
136                                inode->i_ino, rc);
137         }
138
139         mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
140         rc2 = ll_mdc_close(&sbi->ll_mdc_conn, inode, file);
141         mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
142         if (rc2 && !rc)
143                 rc = rc2;
144
145         if (atomic_dec_and_test(&lli->lli_open_count)) {
146                 CDEBUG(D_INFO, "last close, cancelling unused locks\n");
147                 rc2 = obd_cancel_unused(&sbi->ll_osc_conn, lsm, 0);
148                 if (rc2 && !rc) {
149                         rc = rc2;
150                         CERROR("obd_cancel_unused: %d\n", rc);
151                 }
152         } else
153                 CDEBUG(D_INFO, "not last close, not cancelling unused locks\n");
154
155         RETURN(rc);
156 }
157
158 static int ll_local_open(struct file *file, struct lookup_intent *it)
159 {
160         struct ptlrpc_request *req = it->it_data;
161         struct ll_file_data *fd;
162         struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
163         ENTRY;
164
165         LASSERT(!file->private_data);
166
167         fd = kmem_cache_alloc(ll_file_data_slab, SLAB_KERNEL);
168         /* We can't handle this well without reorganizing ll_file_open and
169          * ll_mdc_close, so don't even try right now. */
170         LASSERT(fd != NULL);
171
172         memset(fd, 0, sizeof(*fd));
173
174         memcpy(&fd->fd_mdshandle, &body->handle, sizeof(body->handle));
175         fd->fd_req = it->it_data;
176         file->private_data = fd;
177
178         RETURN(0);
179 }
180
181 static int ll_osc_open(struct lustre_handle *conn, struct inode *inode,
182                        struct file *file, struct lov_stripe_md *lsm)
183 {
184         struct ll_file_data *fd = file->private_data;
185         struct obdo *oa;
186         int rc;
187         ENTRY;
188
189         oa = obdo_alloc();
190         if (!oa)
191                 RETURN(-ENOMEM);
192         oa->o_id = lsm->lsm_object_id;
193         oa->o_mode = S_IFREG;
194         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
195                 OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
196         rc = obd_open(conn, oa, lsm, NULL);
197         if (rc)
198                 GOTO(out, rc);
199
200         file->f_flags &= ~O_LOV_DELAY_CREATE;
201         obdo_to_inode(inode, oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
202                                 OBD_MD_FLMTIME | OBD_MD_FLCTIME);
203
204         obd_oa2handle(&fd->fd_osthandle, oa);
205
206         atomic_inc(&ll_i2info(inode)->lli_open_count);
207 out:
208         obdo_free(oa);
209         RETURN(rc);
210 }
211
212 /* Caller must hold lli_open_sem to protect lli->lli_smd from changing and
213  * duplicate objects from being created.  We only install lsm to lli_smd if
214  * the mdc open was successful (hence stored stripe MD on MDS), otherwise
215  * other nodes could try to create different objects for the same file.
216  */
217 static int ll_create_obj(struct lustre_handle *conn, struct inode *inode,
218                          struct file *file, struct lov_stripe_md *lsm)
219 {
220         struct ptlrpc_request *req = NULL;
221         struct ll_inode_info *lli = ll_i2info(inode);
222         struct lov_mds_md *lmm = NULL;
223         int lmm_size = 0;
224         struct obdo *oa;
225         int rc, err;
226         ENTRY;
227
228         oa = obdo_alloc();
229         if (!oa)
230                 RETURN(-ENOMEM);
231
232         oa->o_mode = S_IFREG | 0600;
233         oa->o_id = inode->i_ino;
234         /* Keep these 0 for now, because chown/chgrp does not change the
235          * ownership on the OST, and we don't want to allow BA OST NFS
236          * users to access these objects by mistake.
237          */
238         oa->o_uid = 0;
239         oa->o_gid = 0;
240         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE |
241                 OBD_MD_FLUID | OBD_MD_FLGID;
242
243         rc = obd_create(conn, oa, &lsm, NULL);
244         if (rc) {
245                 CERROR("error creating objects for inode %lu: rc = %d\n",
246                        inode->i_ino, rc);
247                 if (rc > 0) {
248                         CERROR("obd_create returned invalid rc %d\n", rc);
249                         rc = -EIO;
250                 }
251                 GOTO(out_oa, rc);
252         }
253
254         LASSERT(lsm && lsm->lsm_object_id);
255         rc = obd_packmd(conn, &lmm, lsm);
256         if (rc < 0)
257                 GOTO(out_destroy, rc);
258
259         lmm_size = rc;
260
261         /* Save the stripe MD with this file on the MDS */
262         rc = mdc_setattr(&ll_i2sbi(inode)->ll_mdc_conn, inode, NULL,
263                          lmm, lmm_size, &req);
264         ptlrpc_req_finished(req);
265
266         obd_free_wiremd(conn, &lmm);
267
268         /* If we couldn't complete mdc_open() and store the stripe MD on the
269          * MDS, we need to destroy the objects now or they will be leaked.
270          */
271         if (rc) {
272                 CERROR("error: storing stripe MD for %lu: rc %d\n",
273                        inode->i_ino, rc);
274                 GOTO(out_destroy, rc);
275         }
276         lli->lli_smd = lsm;
277
278         EXIT;
279 out_oa:
280         obdo_free(oa);
281         return rc;
282
283 out_destroy:
284         obdo_from_inode(oa, inode, OBD_MD_FLTYPE);
285         oa->o_id = lsm->lsm_object_id;
286         oa->o_valid |= OBD_MD_FLID;
287         err = obd_destroy(conn, oa, lsm, NULL);
288         obd_free_memmd(conn, &lsm);
289         if (err)
290                 CERROR("error uncreating inode %lu objects: rc %d\n",
291                        inode->i_ino, err);
292         goto out_oa;
293 }
294
295 /* Open a file, and (for the very first open) create objects on the OSTs at
296  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
297  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
298  * lli_open_sem to ensure no other process will create objects, send the
299  * stripe MD to the MDS, or try to destroy the objects if that fails.
300  *
301  * If we already have the stripe MD locally, we don't request it in
302  * mdc_open() by passing a lmm_size = 0.
303  *
304  * It is up to the application to ensure no other processes open this file
305  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
306  * used.  We might be able to avoid races of that sort by getting lli_open_sem
307  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
308  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
309  */
310 extern int ll_it_open_error(int phase, struct lookup_intent *it);
311
312 static int ll_file_open(struct inode *inode, struct file *file)
313 {
314         struct ll_sb_info *sbi = ll_i2sbi(inode);
315         struct ll_inode_info *lli = ll_i2info(inode);
316         struct lustre_handle *conn = ll_i2obdconn(inode);
317         struct lookup_intent *it;
318         struct lov_stripe_md *lsm;
319         int rc = 0;
320         ENTRY;
321
322         LL_GET_INTENT(file->f_dentry, it);
323         rc = ll_it_open_error(IT_OPEN_OPEN, it);
324         if (rc)
325                 RETURN(rc);
326
327         rc = ll_local_open(file, it);
328         if (rc)
329                 LBUG();
330
331         mdc_set_open_replay_data((struct ll_file_data *)file->private_data);
332
333         lsm = lli->lli_smd;
334         if (lsm == NULL) {
335                 if (file->f_flags & O_LOV_DELAY_CREATE) {
336                         CDEBUG(D_INODE, "delaying object creation\n");
337                         RETURN(0);
338                 }
339                 down(&lli->lli_open_sem);
340                 if (!lli->lli_smd) {
341                         rc = ll_create_obj(conn, inode, file, NULL);
342                         up(&lli->lli_open_sem);
343                         if (rc)
344                                 GOTO(out_close, rc);
345                 } else {
346                         CERROR("warning: stripe already set on ino %lu\n",
347                                inode->i_ino);
348                         up(&lli->lli_open_sem);
349                 }
350                 lsm = lli->lli_smd;
351         }
352
353         rc = ll_osc_open(conn, inode, file, lsm);
354         if (rc)
355                 GOTO(out_close, rc);
356         RETURN(0);
357
358  out_close:
359         ll_mdc_close(&sbi->ll_mdc_conn, inode, file);
360         return rc;
361 }
362
363 int ll_size_lock(struct inode *inode, struct lov_stripe_md *lsm, obd_off start,
364                  int mode, struct lustre_handle *lockh)
365 {
366         struct ll_sb_info *sbi = ll_i2sbi(inode);
367         struct ldlm_extent extent;
368         int rc, flags = 0;
369         ENTRY;
370
371         /* XXX phil: can we do this?  won't it screw the file size up? */
372         if (sbi->ll_flags & LL_SBI_NOLCK)
373                 RETURN(0);
374
375         extent.start = start;
376         extent.end = OBD_OBJECT_EOF;
377
378         rc = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, &extent,
379                          sizeof(extent), mode, &flags, ll_lock_callback,
380                          inode, sizeof(*inode), lockh);
381         RETURN(rc);
382 }
383
384 int ll_size_unlock(struct inode *inode, struct lov_stripe_md *lsm, int mode,
385                    struct lustre_handle *lockh)
386 {
387         struct ll_sb_info *sbi = ll_i2sbi(inode);
388         int rc;
389         ENTRY;
390
391         /* XXX phil: can we do this?  won't it screw the file size up? */
392         if (sbi->ll_flags & LL_SBI_NOLCK)
393                 RETURN(0);
394
395         rc = obd_cancel(&sbi->ll_osc_conn, lsm, mode, lockh);
396         if (rc != ELDLM_OK) {
397                 CERROR("lock cancel: %d\n", rc);
398                 LBUG();
399         }
400
401         RETURN(rc);
402 }
403
404 /* This function is solely "sampling" the file size, and does not explicit
405  * locking on the size itself (see ll_size_lock() and ll_size_unlock()).
406  *
407  * XXX We need to optimize away the obd_getattr for decent performance here,
408  *     by checking if we already have the size lock and considering our size
409  *     authoritative in that case.  In order to do that either the act of
410  *     getting the size lock includes retrieving the file size, or the client
411  *     keeps an atomic flag in the inode which indicates whether the size
412  *     has been updated (see bug 280).
413  */
414 int ll_file_size(struct inode *inode, struct lov_stripe_md *lsm,
415                  struct lustre_handle *handle)
416 {
417         struct ll_sb_info *sbi = ll_i2sbi(inode);
418         struct obdo oa;
419         int rc;
420         ENTRY;
421
422         LASSERT(lsm);
423         LASSERT(sbi);
424
425         memset(&oa, 0, sizeof oa);
426         oa.o_id = lsm->lsm_object_id;
427         oa.o_mode = S_IFREG;
428         oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
429                 OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
430         obd_handle2oa(&oa, handle);
431         rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm);
432         if (!rc) {
433                 obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
434                                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
435                 CDEBUG(D_INODE, "objid "LPX64" size %Lu/%Lu\n",
436                        lsm->lsm_object_id, inode->i_size, inode->i_size);
437         }
438
439         RETURN(rc);
440 }
441
442 static inline void ll_remove_suid(struct inode *inode)
443 {
444         unsigned int mode;
445
446         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
447         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
448
449         /* was any of the uid bits set? */
450         mode &= inode->i_mode;
451         if (mode && !capable(CAP_FSETID)) {
452                 inode->i_mode &= ~mode;
453                 // XXX careful here - we cannot change the size
454         }
455 }
456
457 static void ll_update_atime(struct inode *inode)
458 {
459 #ifdef USE_ATIME
460         struct iattr attr;
461
462         attr.ia_atime = CURRENT_TIME;
463         attr.ia_valid = ATTR_ATIME;
464
465         if (inode->i_atime == attr.ia_atime) return;
466         if (IS_RDONLY(inode)) return;
467         if (IS_NOATIME(inode)) return;
468
469         /* ll_inode_setattr() sets inode->i_atime from attr.ia_atime */
470         ll_inode_setattr(inode, &attr, 0);
471 #else
472         /* update atime, but don't explicitly write it out just this change */
473         inode->i_atime = CURRENT_TIME;
474 #endif
475 }
476
477 int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
478                      void *data, int flag)
479 {
480         struct inode *inode = data;
481         struct lustre_handle lockh = { 0, 0 };
482         int rc;
483         ENTRY;
484
485         if (inode == NULL)
486                 LBUG();
487
488         switch (flag) {
489         case LDLM_CB_BLOCKING:
490                 ldlm_lock2handle(lock, &lockh);
491                 rc = ldlm_cli_cancel(&lockh);
492                 if (rc != ELDLM_OK)
493                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
494                 break;
495         case LDLM_CB_CANCELING:
496                 CDEBUG(D_INODE, "invalidating obdo/inode %lu\n", inode->i_ino);
497                 /* FIXME: do something better than throwing away everything */
498                 //down(&inode->i_sem);
499                 ll_invalidate_inode_pages(inode);
500                 //up(&inode->i_sem);
501                 break;
502         default:
503                 LBUG();
504         }
505
506         RETURN(0);
507 }
508
509 static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
510                             loff_t *ppos)
511 {
512         struct ll_file_data *fd = filp->private_data;
513         struct inode *inode = filp->f_dentry->d_inode;
514         struct ll_sb_info *sbi = ll_i2sbi(inode);
515         struct lustre_handle lockh = { 0, 0 };
516         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
517         int flags = 0;
518         ldlm_error_t err;
519         ssize_t retval;
520         ENTRY;
521
522         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
523             !(sbi->ll_flags & LL_SBI_NOLCK)) {
524                 struct ldlm_extent extent;
525                 extent.start = *ppos;
526                 extent.end = *ppos + count;
527                 CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n",
528                        inode->i_ino, extent.start, extent.end);
529
530                 err = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT,
531                                   &extent, sizeof(extent), LCK_PR, &flags,
532                                   ll_lock_callback, inode, sizeof(*inode),
533                                   &lockh);
534                 if (err != ELDLM_OK) {
535                         CERROR("lock enqueue: err: %d\n", err);
536                         RETURN(err);
537                 }
538         }
539
540         /* If we don't refresh the file size, generic_file_read may not even
541          * call us */
542         retval = ll_file_size(inode, lsm, &fd->fd_osthandle);
543         if (retval < 0) {
544                 CERROR("ll_file_size: "LPSZ"\n", retval);
545                 RETURN(retval);
546         }
547
548         CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n",
549                inode->i_ino, count, *ppos);
550         retval = generic_file_read(filp, buf, count, ppos);
551
552         if (retval > 0)
553                 ll_update_atime(inode);
554
555         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
556             !(sbi->ll_flags & LL_SBI_NOLCK)) {
557                 err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PR, &lockh);
558                 if (err != ELDLM_OK) {
559                         CERROR("lock cancel: err: %d\n", err);
560                         retval = err;
561                 }
562         }
563
564         RETURN(retval);
565 }
566
567 /*
568  * Write to a file (through the page cache).
569  */
570 static ssize_t
571 ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
572 {
573         struct ll_file_data *fd = file->private_data;
574         struct inode *inode = file->f_dentry->d_inode;
575         struct ll_sb_info *sbi = ll_i2sbi(inode);
576         struct lustre_handle lockh = { 0, 0 }, eof_lockh = { 0, 0 };
577         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
578         int flags = 0;
579         ldlm_error_t err;
580         ssize_t retval;
581         ENTRY;
582
583         if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) {
584                 err = ll_size_lock(inode, lsm, 0, LCK_PW, &eof_lockh);
585                 if (err)
586                         RETURN(err);
587
588                 /* Get size here so we know extent to enqueue write lock on. */
589                 retval = ll_file_size(inode, lsm, &fd->fd_osthandle);
590                 if (retval)
591                         GOTO(out_eof, retval);
592
593                 *ppos = inode->i_size;
594         }
595
596         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
597             !(sbi->ll_flags & LL_SBI_NOLCK)) {
598                 struct ldlm_extent extent;
599                 extent.start = *ppos;
600                 extent.end = *ppos + count;
601                 CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n",
602                        inode->i_ino, extent.start, extent.end);
603
604                 err = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT,
605                                   &extent, sizeof(extent), LCK_PW, &flags,
606                                   ll_lock_callback, inode, sizeof(*inode),
607                                   &lockh);
608                 if (err != ELDLM_OK) {
609                         CERROR("lock enqueue: err: %d\n", err);
610                         GOTO(out_eof, retval = err);
611                 }
612         }
613
614         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
615                inode->i_ino, count, *ppos);
616
617         retval = generic_file_write(file, buf, count, ppos);
618
619         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
620             !(sbi->ll_flags & LL_SBI_NOLCK)) {
621                 err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PW, &lockh);
622                 if (err != ELDLM_OK)
623                         CERROR("lock cancel: err: %d\n", err);
624         }
625
626         EXIT;
627  out_eof:
628         if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) {
629                 err = ll_size_unlock(inode, lsm, LCK_PW, &eof_lockh);
630                 if (err)
631                         CERROR("ll_size_unlock: %d\n", err);
632         }
633
634         return retval;
635 }
636
637 static int ll_lov_setstripe(struct inode *inode, struct file *file,
638                             unsigned long arg)
639 {
640         struct ll_inode_info *lli = ll_i2info(inode);
641         struct lustre_handle *conn = ll_i2obdconn(inode);
642         struct lov_stripe_md *lsm;
643         int rc;
644         ENTRY;
645
646         down(&lli->lli_open_sem);
647         lsm = lli->lli_smd;
648         if (lsm) {
649                 up(&lli->lli_open_sem);
650                 CERROR("stripe already set for ino %lu\n", inode->i_ino);
651                 /* If we haven't already done the open, do so now */
652                 if (file->f_flags & O_LOV_DELAY_CREATE) {
653                         int rc2 = ll_osc_open(conn, inode, file, lsm);
654                         if (rc2)
655                                 RETURN(rc2);
656                 }
657
658                 RETURN(-EALREADY);
659         }
660
661         rc = obd_iocontrol(LL_IOC_LOV_SETSTRIPE, conn, 0, &lsm, (void *)arg);
662         if (rc) {
663                 up(&lli->lli_open_sem);
664                 RETURN(rc);
665         }
666         rc = ll_create_obj(conn, inode, file, lsm);
667         up(&lli->lli_open_sem);
668
669         if (rc) {
670                 obd_free_memmd(conn, &lsm);
671                 RETURN(rc);
672         }
673         rc = ll_osc_open(conn, inode, file, lli->lli_smd);
674         RETURN(rc);
675 }
676
677 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
678 {
679         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
680         struct lustre_handle *conn = ll_i2obdconn(inode);
681
682         if (!lsm)
683                 RETURN(-ENODATA);
684
685         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, conn, 0, lsm, (void *)arg);
686 }
687
688 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
689                   unsigned long arg)
690 {
691         struct ll_file_data *fd = file->private_data;
692         struct lustre_handle *conn;
693         int flags;
694
695         switch(cmd) {
696         case TCGETS:
697                 return -ENOTTY;
698         case LL_IOC_GETFLAGS:
699                 /* Get the current value of the file flags */
700                 return put_user(fd->fd_flags, (int *)arg);
701         case LL_IOC_SETFLAGS:
702         case LL_IOC_CLRFLAGS:
703                 /* Set or clear specific file flags */
704                 /* XXX This probably needs checks to ensure the flags are
705                  *     not abused, and to handle any flag side effects.
706                  */
707                 if (get_user(flags, (int *) arg))
708                         return -EFAULT;
709
710                 if (cmd == LL_IOC_SETFLAGS)
711                         fd->fd_flags |= flags;
712                 else
713                         fd->fd_flags &= ~flags;
714                 return 0;
715         case LL_IOC_LOV_SETSTRIPE:
716                 return ll_lov_setstripe(inode, file, arg);
717         case LL_IOC_LOV_GETSTRIPE:
718                 return ll_lov_getstripe(inode, arg);
719
720         /* We need to special case any other ioctls we want to handle,
721          * to send them to the MDS/OST as appropriate and to properly
722          * network encode the arg field.
723         case EXT2_IOC_GETFLAGS:
724         case EXT2_IOC_SETFLAGS:
725         case EXT2_IOC_GETVERSION_OLD:
726         case EXT2_IOC_GETVERSION_NEW:
727         case EXT2_IOC_SETVERSION_OLD:
728         case EXT2_IOC_SETVERSION_NEW:
729         */
730         default:
731                 conn = ll_i2obdconn(inode);
732                 return obd_iocontrol(cmd, conn, 0, NULL, (void *)arg);
733         }
734 }
735
736 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
737 {
738         struct inode *inode = file->f_dentry->d_inode;
739         long long retval;
740         ENTRY;
741
742         switch (origin) {
743         case 2: {
744                 struct ll_inode_info *lli = ll_i2info(inode);
745                 struct ll_file_data *fd = file->private_data;
746
747                 retval = ll_file_size(inode, lli->lli_smd, &fd->fd_osthandle);
748                 if (retval)
749                         RETURN(retval);
750
751                 offset += inode->i_size;
752                 break;
753         }
754         case 1:
755                 offset += file->f_pos;
756         }
757         retval = -EINVAL;
758         if (offset >= 0 && offset <= inode->i_sb->s_maxbytes) {
759                 if (offset != file->f_pos) {
760                         file->f_pos = offset;
761 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
762                         file->f_reada = 0;
763 #endif
764                         file->f_version = ++event;
765                 }
766                 retval = offset;
767         }
768         RETURN(retval);
769 }
770
771 /* XXX this does not need to do anything for data, it _does_ need to
772    call setattr */
773 int ll_fsync(struct file *file, struct dentry *dentry, int data)
774 {
775         return 0;
776 }
777
778 int ll_inode_revalidate(struct dentry *dentry)
779 {
780         struct inode *inode = dentry->d_inode;
781         struct lov_stripe_md *lsm;
782         ENTRY;
783
784         if (!inode) {
785                 CERROR("REPORT THIS LINE TO PETER\n");
786                 RETURN(0);
787         }
788
789         /* this is very tricky.  it is unsafe to call ll_have_md_lock
790            when we have a referenced lock: because it may cause an RPC
791            below when the lock is marked CB_PENDING.  That RPC may not
792            go out because someone else may be in another RPC waiting for
793            that lock*/
794         if (!(dentry->d_it && dentry->d_it->it_lock_mode) &&
795             !ll_have_md_lock(dentry)) {
796                 struct ptlrpc_request *req = NULL;
797                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
798                 struct mds_body *body;
799                 unsigned long valid = 0;
800                 int datalen = 0, rc;
801
802                 /* Why don't we update all valid MDS fields here, if we're
803                  * doing an RPC anyways?  -phil */
804                 if (S_ISREG(inode->i_mode)) {
805                         datalen = obd_size_wiremd(&sbi->ll_osc_conn, NULL);
806                         valid |= OBD_MD_FLEASIZE;
807                 }
808                 rc = mdc_getattr(&sbi->ll_mdc_conn, inode->i_ino,
809                                  inode->i_mode, valid, datalen, &req);
810                 if (rc) {
811                         CERROR("failure %d inode %lu\n", rc, inode->i_ino);
812                         ptlrpc_req_finished(req);
813                         RETURN(-abs(rc));
814                 }
815
816                 body = lustre_msg_buf(req->rq_repmsg, 0);
817                 if (body->valid & OBD_MD_FLEASIZE)
818                         ll_update_inode(inode, body,
819                                         lustre_msg_buf(req->rq_repmsg, 1));
820                 else
821                         ll_update_inode(inode, body, NULL);
822                 ptlrpc_req_finished(req);
823         }
824
825         lsm = ll_i2info(inode)->lli_smd;
826         if (!lsm)       /* object not yet allocated, don't validate size */
827                 RETURN(0);
828
829         /* XXX this should probably become an unconditional obd_getattr()
830          *     so that we update the blocks count and mtime from the OST too.
831          */
832         RETURN(ll_file_size(inode, lsm, NULL));
833 }
834
835 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
836 static int ll_getattr(struct vfsmount *mnt, struct dentry *de,
837                       struct kstat *stat)
838 {
839         return ll_inode_revalidate(de);
840 }
841 #endif
842
843 struct file_operations ll_file_operations = {
844         read:           ll_file_read,
845         write:          ll_file_write,
846         ioctl:          ll_file_ioctl,
847         open:           ll_file_open,
848         release:        ll_file_release,
849         mmap:           generic_file_mmap,
850         llseek:         ll_file_seek,
851         fsync:          NULL
852 };
853
854 struct inode_operations ll_file_inode_operations = {
855         setattr:    ll_setattr,
856         truncate:   ll_truncate,
857 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
858         getattr: ll_getattr,
859 #else
860         revalidate: ll_inode_revalidate,
861 #endif
862 };
863
864 struct inode_operations ll_special_inode_operations = {
865         setattr:    ll_setattr,
866 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
867         getattr:    ll_getattr,
868 #else
869         revalidate: ll_inode_revalidate,
870 #endif
871 };