Whamcloud - gitweb
r=adilger,vitaly
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #include "llite_internal.h"
32
33 /* also used by llite/special.c:ll_special_open() */
34 struct ll_file_data *ll_file_data_get(void)
35 {
36         struct ll_file_data *fd;
37
38         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
39         return fd;
40 }
41
42 static void ll_file_data_put(struct ll_file_data *fd)
43 {
44         if (fd != NULL)
45                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
46 }
47
48 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
49                           struct lustre_handle *fh)
50 {
51         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
52         op_data->op_attr.ia_mode = inode->i_mode;
53         op_data->op_attr.ia_atime = inode->i_atime;
54         op_data->op_attr.ia_mtime = inode->i_mtime;
55         op_data->op_attr.ia_ctime = inode->i_ctime;
56         op_data->op_attr.ia_size = i_size_read(inode);
57         op_data->op_attr_blocks = inode->i_blocks;
58         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
59         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
60         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
61         op_data->op_capa1 = ll_mdscapa_get(inode);
62 }
63
64 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
65                              struct obd_client_handle *och)
66 {
67         ENTRY;
68
69         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
70                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
71
72         if (!(och->och_flags & FMODE_WRITE))
73                 goto out;
74
75         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
76             !S_ISREG(inode->i_mode))
77                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
78         else
79                 ll_epoch_close(inode, op_data, &och, 0);
80
81 out:
82         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
83         EXIT;
84 }
85
86 static int ll_close_inode_openhandle(struct obd_export *md_exp,
87                                      struct inode *inode,
88                                      struct obd_client_handle *och)
89 {
90         struct obd_export *exp = ll_i2mdexp(inode);
91         struct md_op_data *op_data;
92         struct ptlrpc_request *req = NULL;
93         struct obd_device *obd = class_exp2obd(exp);
94         int epoch_close = 1;
95         int seq_end = 0, rc;
96         ENTRY;
97
98         if (obd == NULL) {
99                 /*
100                  * XXX: in case of LMV, is this correct to access
101                  * ->exp_handle?
102                  */
103                 CERROR("Invalid MDC connection handle "LPX64"\n",
104                        ll_i2mdexp(inode)->exp_handle.h_cookie);
105                 GOTO(out, rc = 0);
106         }
107
108         /*
109          * here we check if this is forced umount. If so this is called on
110          * canceling "open lock" and we do not call md_close() in this case, as
111          * it will not be successful, as import is already deactivated.
112          */
113         if (obd->obd_force)
114                 GOTO(out, rc = 0);
115
116         OBD_ALLOC_PTR(op_data);
117         if (op_data == NULL)
118                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
119
120         ll_prepare_close(inode, op_data, och);
121         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
122         rc = md_close(md_exp, op_data, och->och_mod, &req);
123         if (rc != -EAGAIN)
124                 seq_end = 1;
125
126         if (rc == -EAGAIN) {
127                 /* This close must have the epoch closed. */
128                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
129                 LASSERT(epoch_close);
130                 /* MDS has instructed us to obtain Size-on-MDS attribute from
131                  * OSTs and send setattr to back to MDS. */
132                 rc = ll_sizeonmds_update(inode, och->och_mod,
133                                          &och->och_fh, op_data->op_ioepoch);
134                 if (rc) {
135                         CERROR("inode %lu mdc Size-on-MDS update failed: "
136                                "rc = %d\n", inode->i_ino, rc);
137                         rc = 0;
138                 }
139         } else if (rc) {
140                 CERROR("inode %lu mdc close failed: rc = %d\n",
141                        inode->i_ino, rc);
142         }
143         ll_finish_md_op_data(op_data);
144
145         if (rc == 0) {
146                 rc = ll_objects_destroy(req, inode);
147                 if (rc)
148                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
149                                inode->i_ino, rc);
150         }
151
152         EXIT;
153 out:
154       
155         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
156             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
157                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
158         } else {
159                 if (seq_end)
160                         ptlrpc_close_replay_seq(req);
161                 md_clear_open_replay_data(md_exp, och);
162                 /* Free @och if it is not waiting for DONE_WRITING. */
163                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
164                 OBD_FREE_PTR(och);
165         }
166         if (req) /* This is close request */
167                 ptlrpc_req_finished(req);
168         return rc;
169 }
170
171 int ll_md_real_close(struct inode *inode, int flags)
172 {
173         struct ll_inode_info *lli = ll_i2info(inode);
174         struct obd_client_handle **och_p;
175         struct obd_client_handle *och;
176         __u64 *och_usecount;
177         int rc = 0;
178         ENTRY;
179
180         if (flags & FMODE_WRITE) {
181                 och_p = &lli->lli_mds_write_och;
182                 och_usecount = &lli->lli_open_fd_write_count;
183         } else if (flags & FMODE_EXEC) {
184                 och_p = &lli->lli_mds_exec_och;
185                 och_usecount = &lli->lli_open_fd_exec_count;
186         } else {
187                 LASSERT(flags & FMODE_READ);
188                 och_p = &lli->lli_mds_read_och;
189                 och_usecount = &lli->lli_open_fd_read_count;
190         }
191
192         down(&lli->lli_och_sem);
193         if (*och_usecount) { /* There are still users of this handle, so
194                                 skip freeing it. */
195                 up(&lli->lli_och_sem);
196                 RETURN(0);
197         }
198         och=*och_p;
199         *och_p = NULL;
200         up(&lli->lli_och_sem);
201
202         if (och) { /* There might be a race and somebody have freed this och
203                       already */
204                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
205                                                inode, och);
206         }
207
208         RETURN(rc);
209 }
210
211 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
212                 struct file *file)
213 {
214         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
215         struct ll_inode_info *lli = ll_i2info(inode);
216         int rc = 0;
217         ENTRY;
218
219         /* clear group lock, if present */
220         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
221                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
222                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
223                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
224                                       &fd->fd_cwlockh);
225         }
226
227         /* Let's see if we have good enough OPEN lock on the file and if
228            we can skip talking to MDS */
229         if (file->f_dentry->d_inode) { /* Can this ever be false? */
230                 int lockmode;
231                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
232                 struct lustre_handle lockh;
233                 struct inode *inode = file->f_dentry->d_inode;
234                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
235
236                 down(&lli->lli_och_sem);
237                 if (fd->fd_omode & FMODE_WRITE) {
238                         lockmode = LCK_CW;
239                         LASSERT(lli->lli_open_fd_write_count);
240                         lli->lli_open_fd_write_count--;
241                 } else if (fd->fd_omode & FMODE_EXEC) {
242                         lockmode = LCK_PR;
243                         LASSERT(lli->lli_open_fd_exec_count);
244                         lli->lli_open_fd_exec_count--;
245                 } else {
246                         lockmode = LCK_CR;
247                         LASSERT(lli->lli_open_fd_read_count);
248                         lli->lli_open_fd_read_count--;
249                 }
250                 up(&lli->lli_och_sem);
251
252                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
253                                    LDLM_IBITS, &policy, lockmode,
254                                    &lockh)) {
255                         rc = ll_md_real_close(file->f_dentry->d_inode,
256                                               fd->fd_omode);
257                 }
258         } else {
259                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
260                        file, file->f_dentry, file->f_dentry->d_name.name);
261         }
262
263         LUSTRE_FPRIVATE(file) = NULL;
264         ll_file_data_put(fd);
265         ll_capa_close(inode);
266
267         RETURN(rc);
268 }
269
270 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
271
272 /* While this returns an error code, fput() the caller does not, so we need
273  * to make every effort to clean up all of our state here.  Also, applications
274  * rarely check close errors and even if an error is returned they will not
275  * re-try the close call.
276  */
277 int ll_file_release(struct inode *inode, struct file *file)
278 {
279         struct ll_file_data *fd;
280         struct ll_sb_info *sbi = ll_i2sbi(inode);
281         struct ll_inode_info *lli = ll_i2info(inode);
282         struct lov_stripe_md *lsm = lli->lli_smd;
283         int rc;
284
285         ENTRY;
286         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
287                inode->i_generation, inode);
288
289         /* don't do anything for / */
290         if (inode->i_sb->s_root == file->f_dentry)
291                 RETURN(0);
292
293         ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
294         fd = LUSTRE_FPRIVATE(file);
295         LASSERT(fd != NULL);
296
297         /* don't do anything for / */
298         if (inode->i_sb->s_root == file->f_dentry) {
299                 LUSTRE_FPRIVATE(file) = NULL;
300                 ll_file_data_put(fd);
301                 RETURN(0);
302         }
303         
304         if (lsm)
305                 lov_test_and_clear_async_rc(lsm);
306         lli->lli_async_rc = 0;
307
308         rc = ll_md_close(sbi->ll_md_exp, inode, file);
309         RETURN(rc);
310 }
311
312 static int ll_intent_file_open(struct file *file, void *lmm,
313                                int lmmsize, struct lookup_intent *itp)
314 {
315         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
316         struct dentry *parent = file->f_dentry->d_parent;
317         const char *name = file->f_dentry->d_name.name;
318         const int len = file->f_dentry->d_name.len;
319         struct md_op_data *op_data;
320         struct ptlrpc_request *req;
321         int rc;
322
323         if (!parent)
324                 RETURN(-ENOENT);
325
326         /* Usually we come here only for NFSD, and we want open lock.
327            But we can also get here with pre 2.6.15 patchless kernels, and in
328            that case that lock is also ok */
329         /* We can also get here if there was cached open handle in revalidate_it
330          * but it disappeared while we were getting from there to ll_file_open.
331          * But this means this file was closed and immediatelly opened which
332          * makes a good candidate for using OPEN lock */
333         /* If lmmsize & lmm are not 0, we are just setting stripe info
334          * parameters. No need for the open lock */
335         if (!lmm && !lmmsize)
336                 itp->it_flags |= MDS_OPEN_LOCK;
337
338         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
339                                       file->f_dentry->d_inode, name, len,
340                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
341         if (IS_ERR(op_data))
342                 RETURN(PTR_ERR(op_data));
343
344         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
345                             0 /*unused */, &req, ll_md_blocking_ast, 0);
346         ll_finish_md_op_data(op_data);
347         if (rc == -ESTALE) {
348                 /* reason for keep own exit path - don`t flood log
349                 * with messages with -ESTALE errors.
350                 */
351                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
352                      it_open_error(DISP_OPEN_OPEN, itp))
353                         GOTO(out, rc);
354                 ll_release_openhandle(file->f_dentry, itp);
355                 GOTO(out_stale, rc);
356         }
357
358         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
359                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
360                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
361                 GOTO(out, rc);
362         }
363
364         if (itp->d.lustre.it_lock_mode)
365                 md_set_lock_data(sbi->ll_md_exp,
366                                  &itp->d.lustre.it_lock_handle, 
367                                  file->f_dentry->d_inode);
368
369         rc = ll_prep_inode(&file->f_dentry->d_inode, req, DLM_REPLY_REC_OFF,
370                            NULL);
371 out:
372         ptlrpc_req_finished(itp->d.lustre.it_data);
373
374 out_stale:
375         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
376         ll_intent_drop_lock(itp);
377
378         RETURN(rc);
379 }
380
381 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
382                        struct lookup_intent *it, struct obd_client_handle *och)
383 {
384         struct ptlrpc_request *req = it->d.lustre.it_data;
385         struct mdt_body *body;
386
387         LASSERT(och);
388
389         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
390         /* reply already checked out */
391         LASSERT(body != NULL);
392         /* and swabbed in md_enqueue */
393         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
394
395         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
396         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
397         och->och_fid = lli->lli_fid;
398         och->och_flags = it->it_flags;
399         lli->lli_ioepoch = body->ioepoch;
400
401         return md_set_open_replay_data(md_exp, och, req);
402 }
403
404 int ll_local_open(struct file *file, struct lookup_intent *it,
405                   struct ll_file_data *fd, struct obd_client_handle *och)
406 {
407         struct inode *inode = file->f_dentry->d_inode;
408         struct ll_inode_info *lli = ll_i2info(inode);
409         ENTRY;
410
411         LASSERT(!LUSTRE_FPRIVATE(file));
412
413         LASSERT(fd != NULL);
414
415         if (och) {
416                 struct ptlrpc_request *req = it->d.lustre.it_data;
417                 struct mdt_body *body;
418                 int rc;
419
420                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
421                 if (rc)
422                         RETURN(rc);
423
424                 body = lustre_msg_buf(req->rq_repmsg,
425                                       DLM_REPLY_REC_OFF, sizeof(*body));
426
427                 if ((it->it_flags & FMODE_WRITE) &&
428                     (body->valid & OBD_MD_FLSIZE))
429                 {
430                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
431                                lli->lli_ioepoch, PFID(&lli->lli_fid));
432                 }
433         }
434
435         LUSTRE_FPRIVATE(file) = fd;
436         ll_readahead_init(inode, &fd->fd_ras);
437         fd->fd_omode = it->it_flags;
438         RETURN(0);
439 }
440
441 /* Open a file, and (for the very first open) create objects on the OSTs at
442  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
443  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
444  * lli_open_sem to ensure no other process will create objects, send the
445  * stripe MD to the MDS, or try to destroy the objects if that fails.
446  *
447  * If we already have the stripe MD locally then we don't request it in
448  * md_open(), by passing a lmm_size = 0.
449  *
450  * It is up to the application to ensure no other processes open this file
451  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
452  * used.  We might be able to avoid races of that sort by getting lli_open_sem
453  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
454  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
455  */
456 int ll_file_open(struct inode *inode, struct file *file)
457 {
458         struct ll_inode_info *lli = ll_i2info(inode);
459         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
460                                           .it_flags = file->f_flags };
461         struct lov_stripe_md *lsm;
462         struct ptlrpc_request *req = NULL;
463         struct obd_client_handle **och_p;
464         __u64 *och_usecount;
465         struct ll_file_data *fd;
466         int rc = 0;
467         ENTRY;
468
469         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
470                inode->i_generation, inode, file->f_flags);
471
472         /* don't do anything for / */
473         if (inode->i_sb->s_root == file->f_dentry)
474                 RETURN(0);
475
476 #ifdef HAVE_VFS_INTENT_PATCHES
477         it = file->f_it;
478 #else
479         it = file->private_data; /* XXX: compat macro */
480         file->private_data = NULL; /* prevent ll_local_open assertion */
481 #endif
482
483         fd = ll_file_data_get();
484         if (fd == NULL)
485                 RETURN(-ENOMEM);
486
487         /* don't do anything for / */
488         if (inode->i_sb->s_root == file->f_dentry) {
489                 LUSTRE_FPRIVATE(file) = fd;
490                 RETURN(0);
491         }
492
493         if (!it || !it->d.lustre.it_disposition) {
494                 /* Convert f_flags into access mode. We cannot use file->f_mode,
495                  * because everything but O_ACCMODE mask was stripped from
496                  * there */
497                 if ((oit.it_flags + 1) & O_ACCMODE)
498                         oit.it_flags++;
499                 if (file->f_flags & O_TRUNC)
500                         oit.it_flags |= FMODE_WRITE;
501
502                 /* kernel only call f_op->open in dentry_open.  filp_open calls
503                  * dentry_open after call to open_namei that checks permissions.
504                  * Only nfsd_open call dentry_open directly without checking
505                  * permissions and because of that this code below is safe. */
506                 if (oit.it_flags & FMODE_WRITE)
507                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
508
509                 /* We do not want O_EXCL here, presumably we opened the file
510                  * already? XXX - NFS implications? */
511                 oit.it_flags &= ~O_EXCL;
512
513                 it = &oit;
514         }
515
516 restart:
517         /* Let's see if we have file open on MDS already. */
518         if (it->it_flags & FMODE_WRITE) {
519                 och_p = &lli->lli_mds_write_och;
520                 och_usecount = &lli->lli_open_fd_write_count;
521         } else if (it->it_flags & FMODE_EXEC) {
522                 och_p = &lli->lli_mds_exec_och;
523                 och_usecount = &lli->lli_open_fd_exec_count;
524          } else {
525                 och_p = &lli->lli_mds_read_och;
526                 och_usecount = &lli->lli_open_fd_read_count;
527         }
528         
529         down(&lli->lli_och_sem);
530         if (*och_p) { /* Open handle is present */
531                 if (it_disposition(it, DISP_OPEN_OPEN)) {
532                         /* Well, there's extra open request that we do not need,
533                            let's close it somehow. This will decref request. */
534                         rc = it_open_error(DISP_OPEN_OPEN, it);
535                         if (rc) {
536                                 ll_file_data_put(fd);
537                                 GOTO(out_och_free, rc);
538                         }       
539                         ll_release_openhandle(file->f_dentry, it);
540                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
541                                              LPROC_LL_OPEN);
542                 }
543                 (*och_usecount)++;
544
545                 rc = ll_local_open(file, it, fd, NULL);
546                 if (rc) {
547                         up(&lli->lli_och_sem);
548                         ll_file_data_put(fd);
549                         RETURN(rc);
550                 }
551         } else {
552                 LASSERT(*och_usecount == 0);
553                 if (!it->d.lustre.it_disposition) {
554                         /* We cannot just request lock handle now, new ELC code
555                            means that one of other OPEN locks for this file
556                            could be cancelled, and since blocking ast handler
557                            would attempt to grab och_sem as well, that would
558                            result in a deadlock */
559                         up(&lli->lli_och_sem);
560                         it->it_flags |= O_CHECK_STALE;
561                         rc = ll_intent_file_open(file, NULL, 0, it);
562                         it->it_flags &= ~O_CHECK_STALE;
563                         if (rc) {
564                                 ll_file_data_put(fd);
565                                 GOTO(out_openerr, rc);
566                         }
567
568                         /* Got some error? Release the request */
569                         if (it->d.lustre.it_status < 0) {
570                                 req = it->d.lustre.it_data;
571                                 ptlrpc_req_finished(req);
572                         }
573                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
574                                          &it->d.lustre.it_lock_handle,
575                                          file->f_dentry->d_inode);
576                         goto restart;
577                 }
578                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
579                 if (!*och_p) {
580                         ll_file_data_put(fd);
581                         GOTO(out_och_free, rc = -ENOMEM);
582                 }
583                 (*och_usecount)++;
584                 req = it->d.lustre.it_data;
585
586                 /* md_intent_lock() didn't get a request ref if there was an
587                  * open error, so don't do cleanup on the request here
588                  * (bug 3430) */
589                 /* XXX (green): Should not we bail out on any error here, not
590                  * just open error? */
591                 rc = it_open_error(DISP_OPEN_OPEN, it);
592                 if (rc) {
593                         ll_file_data_put(fd);
594                         GOTO(out_och_free, rc);
595                 }
596
597                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
598                 rc = ll_local_open(file, it, fd, *och_p);
599                 if (rc) {
600                         up(&lli->lli_och_sem);
601                         ll_file_data_put(fd);
602                         GOTO(out_och_free, rc);
603                 }
604         }
605         up(&lli->lli_och_sem);
606
607         /* Must do this outside lli_och_sem lock to prevent deadlock where
608            different kind of OPEN lock for this same inode gets cancelled
609            by ldlm_cancel_lru */
610         if (!S_ISREG(inode->i_mode))
611                 GOTO(out, rc);
612
613         ll_capa_open(inode);
614
615         lsm = lli->lli_smd;
616         if (lsm == NULL) {
617                 if (file->f_flags & O_LOV_DELAY_CREATE ||
618                     !(file->f_mode & FMODE_WRITE)) {
619                         CDEBUG(D_INODE, "object creation was delayed\n");
620                         GOTO(out, rc);
621                 }
622         }
623         file->f_flags &= ~O_LOV_DELAY_CREATE;
624         GOTO(out, rc);
625 out:
626         ptlrpc_req_finished(req);
627         if (req)
628                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
629 out_och_free:
630         if (rc) {
631                 if (*och_p) {
632                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
633                         *och_p = NULL; /* OBD_FREE writes some magic there */
634                         (*och_usecount)--;
635                 }
636                 up(&lli->lli_och_sem);
637 out_openerr: ;/* Looks weierd, eh? Just wait for statahead code to insert
638                 a statement here <-- remove this comment after statahead
639                 landing */
640         }
641
642         return rc;
643 }
644
645 /* Fills the obdo with the attributes for the inode defined by lsm */
646 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
647 {
648         struct ptlrpc_request_set *set;
649         struct ll_inode_info *lli = ll_i2info(inode);
650         struct lov_stripe_md *lsm = lli->lli_smd;
651
652         struct obd_info oinfo = { { { 0 } } };
653         int rc;
654         ENTRY;
655
656         LASSERT(lsm != NULL);
657
658         oinfo.oi_md = lsm;
659         oinfo.oi_oa = obdo;
660         oinfo.oi_oa->o_id = lsm->lsm_object_id;
661         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
662         oinfo.oi_oa->o_mode = S_IFREG;
663         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
664                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
665                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
666                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
667                                OBD_MD_FLGROUP;
668         oinfo.oi_capa = ll_mdscapa_get(inode);
669
670         set = ptlrpc_prep_set();
671         if (set == NULL) {
672                 CERROR("can't allocate ptlrpc set\n");
673                 rc = -ENOMEM;
674         } else {
675                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
676                 if (rc == 0)
677                         rc = ptlrpc_set_wait(set);
678                 ptlrpc_set_destroy(set);
679         }
680         capa_put(oinfo.oi_capa);
681         if (rc)
682                 RETURN(rc);
683
684         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
685                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
686                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
687
688         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
689         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
690                lli->lli_smd->lsm_object_id, i_size_read(inode),
691                inode->i_blocks, inode->i_blksize);
692         RETURN(0);
693 }
694
695 static inline void ll_remove_suid(struct inode *inode)
696 {
697         unsigned int mode;
698
699         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
700         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
701
702         /* was any of the uid bits set? */
703         mode &= inode->i_mode;
704         if (mode && !capable(CAP_FSETID)) {
705                 inode->i_mode &= ~mode;
706                 // XXX careful here - we cannot change the size
707         }
708 }
709
710 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
711 {
712         struct ll_inode_info *lli = ll_i2info(inode);
713         struct lov_stripe_md *lsm = lli->lli_smd;
714         struct obd_export *exp = ll_i2dtexp(inode);
715         struct {
716                 char name[16];
717                 struct ldlm_lock *lock;
718                 struct lov_stripe_md *lsm;
719         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
720         __u32 stripe, vallen = sizeof(stripe);
721         int rc;
722         ENTRY;
723
724         if (lsm->lsm_stripe_count == 1)
725                 GOTO(check, stripe = 0);
726
727         /* get our offset in the lov */
728         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
729         if (rc != 0) {
730                 CERROR("obd_get_info: rc = %d\n", rc);
731                 RETURN(rc);
732         }
733         LASSERT(stripe < lsm->lsm_stripe_count);
734
735 check:
736         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
737             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
738                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
739                            lsm->lsm_oinfo[stripe]->loi_id,
740                            lsm->lsm_oinfo[stripe]->loi_gr);
741                 RETURN(-ELDLM_NO_LOCK_DATA);
742         }
743
744         RETURN(stripe);
745 }
746
747 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
748  * we get a lock cancellation for each stripe, so we have to map the obd's
749  * region back onto the stripes in the file that it held.
750  *
751  * No one can dirty the extent until we've finished our work and they can
752  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
753  * but other kernel actors could have pages locked.
754  *
755  * Called with the DLM lock held. */
756 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
757                               struct ldlm_lock *lock, __u32 stripe)
758 {
759         ldlm_policy_data_t tmpex;
760         unsigned long start, end, count, skip, i, j;
761         struct page *page;
762         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
763         struct lustre_handle lockh;
764         struct address_space *mapping = inode->i_mapping;
765
766         ENTRY;
767         tmpex = lock->l_policy_data;
768         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
769                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
770                i_size_read(inode));
771
772         /* our locks are page granular thanks to osc_enqueue, we invalidate the
773          * whole page. */
774         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
775             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
776                 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
777                            CFS_PAGE_SIZE);
778         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
779         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
780
781         count = ~0;
782         skip = 0;
783         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
784         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
785         if (lsm->lsm_stripe_count > 1) {
786                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
787                 skip = (lsm->lsm_stripe_count - 1) * count;
788                 start += start/count * skip + stripe * count;
789                 if (end != ~0)
790                         end += end/count * skip + stripe * count;
791         }
792         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
793                 end = ~0;
794
795         i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
796             CFS_PAGE_SHIFT : 0;
797         if (i < end)
798                 end = i;
799
800         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
801                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
802                count, skip, end, discard ? " (DISCARDING)" : "");
803
804         /* walk through the vmas on the inode and tear down mmaped pages that
805          * intersect with the lock.  this stops immediately if there are no
806          * mmap()ed regions of the file.  This is not efficient at all and
807          * should be short lived. We'll associate mmap()ed pages with the lock
808          * and will be able to find them directly */
809         for (i = start; i <= end; i += (j + skip)) {
810                 j = min(count - (i % count), end - i + 1);
811                 LASSERT(j > 0);
812                 LASSERT(mapping);
813                 if (ll_teardown_mmaps(mapping,
814                                       (__u64)i << CFS_PAGE_SHIFT,
815                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
816                         break;
817         }
818
819         /* this is the simplistic implementation of page eviction at
820          * cancelation.  It is careful to get races with other page
821          * lockers handled correctly.  fixes from bug 20 will make it
822          * more efficient by associating locks with pages and with
823          * batching writeback under the lock explicitly. */
824         for (i = start, j = start % count; i <= end;
825              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
826                 if (j == count) {
827                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
828                         i += skip;
829                         j = 0;
830                         if (i > end)
831                                 break;
832                 }
833                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
834                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
835                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
836                          start, i, end);
837
838                 if (!mapping_has_pages(mapping)) {
839                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
840                         break;
841                 }
842
843                 cond_resched();
844
845                 page = find_get_page(mapping, i);
846                 if (page == NULL)
847                         continue;
848                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
849                                i, tmpex.l_extent.start);
850                 lock_page(page);
851
852                 /* page->mapping to check with racing against teardown */
853                 if (!discard && clear_page_dirty_for_io(page)) {
854                         rc = ll_call_writepage(inode, page);
855                         /* either waiting for io to complete or reacquiring
856                          * the lock that the failed writepage released */
857                         lock_page(page);
858                         wait_on_page_writeback(page);
859                         if (rc != 0) {
860                                 CERROR("writepage inode %lu(%p) of page %p "
861                                        "failed: %d\n", inode->i_ino, inode,
862                                        page, rc);
863                                 if (rc == -ENOSPC)
864                                         set_bit(AS_ENOSPC, &mapping->flags);
865                                 else
866                                         set_bit(AS_EIO, &mapping->flags);
867                         }
868                 }
869
870                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
871                 /* check to see if another DLM lock covers this page b=2765 */
872                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
873                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
874                                       LDLM_FL_TEST_LOCK,
875                                       &lock->l_resource->lr_name, LDLM_EXTENT,
876                                       &tmpex, LCK_PR | LCK_PW, &lockh);
877
878                 if (rc2 <= 0 && page->mapping != NULL) {
879                         struct ll_async_page *llap = llap_cast_private(page);
880                         /* checking again to account for writeback's
881                          * lock_page() */
882                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
883                         if (llap)
884                                 ll_ra_accounting(llap, mapping);
885                         ll_truncate_complete_page(page);
886                 }
887                 unlock_page(page);
888                 page_cache_release(page);
889         }
890         LASSERTF(tmpex.l_extent.start <=
891                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
892                   lock->l_policy_data.l_extent.end + 1),
893                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
894                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
895                  start, i, end);
896         EXIT;
897 }
898
899 static int ll_extent_lock_callback(struct ldlm_lock *lock,
900                                    struct ldlm_lock_desc *new, void *data,
901                                    int flag)
902 {
903         struct lustre_handle lockh = { 0 };
904         int rc;
905         ENTRY;
906
907         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
908                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
909                 LBUG();
910         }
911
912         switch (flag) {
913         case LDLM_CB_BLOCKING:
914                 ldlm_lock2handle(lock, &lockh);
915                 rc = ldlm_cli_cancel(&lockh);
916                 if (rc != ELDLM_OK)
917                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
918                 break;
919         case LDLM_CB_CANCELING: {
920                 struct inode *inode;
921                 struct ll_inode_info *lli;
922                 struct lov_stripe_md *lsm;
923                 int stripe;
924                 __u64 kms;
925
926                 /* This lock wasn't granted, don't try to evict pages */
927                 if (lock->l_req_mode != lock->l_granted_mode)
928                         RETURN(0);
929
930                 inode = ll_inode_from_lock(lock);
931                 if (inode == NULL)
932                         RETURN(0);
933                 lli = ll_i2info(inode);
934                 if (lli == NULL)
935                         goto iput;
936                 if (lli->lli_smd == NULL)
937                         goto iput;
938                 lsm = lli->lli_smd;
939
940                 stripe = ll_lock_to_stripe_offset(inode, lock);
941                 if (stripe < 0)
942                         goto iput;
943
944                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
945
946                 lov_stripe_lock(lsm);
947                 lock_res_and_lock(lock);
948                 kms = ldlm_extent_shift_kms(lock,
949                                             lsm->lsm_oinfo[stripe]->loi_kms);
950
951                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
952                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
953                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
954                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
955                 unlock_res_and_lock(lock);
956                 lov_stripe_unlock(lsm);
957         iput:
958                 iput(inode);
959                 break;
960         }
961         default:
962                 LBUG();
963         }
964
965         RETURN(0);
966 }
967
968 #if 0
969 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
970 {
971         /* XXX ALLOCATE - 160 bytes */
972         struct inode *inode = ll_inode_from_lock(lock);
973         struct ll_inode_info *lli = ll_i2info(inode);
974         struct lustre_handle lockh = { 0 };
975         struct ost_lvb *lvb;
976         int stripe;
977         ENTRY;
978
979         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
980                      LDLM_FL_BLOCK_CONV)) {
981                 LBUG(); /* not expecting any blocked async locks yet */
982                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
983                            "lock, returning");
984                 ldlm_lock_dump(D_OTHER, lock, 0);
985                 ldlm_reprocess_all(lock->l_resource);
986                 RETURN(0);
987         }
988
989         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
990
991         stripe = ll_lock_to_stripe_offset(inode, lock);
992         if (stripe < 0)
993                 goto iput;
994
995         if (lock->l_lvb_len) {
996                 struct lov_stripe_md *lsm = lli->lli_smd;
997                 __u64 kms;
998                 lvb = lock->l_lvb_data;
999                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
1000
1001                 lock_res_and_lock(lock);
1002                 ll_inode_size_lock(inode, 1);
1003                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
1004                 kms = ldlm_extent_shift_kms(NULL, kms);
1005                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
1006                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
1007                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
1008                 lsm->lsm_oinfo[stripe].loi_kms = kms;
1009                 ll_inode_size_unlock(inode, 1);
1010                 unlock_res_and_lock(lock);
1011         }
1012
1013 iput:
1014         iput(inode);
1015         wake_up(&lock->l_waitq);
1016
1017         ldlm_lock2handle(lock, &lockh);
1018         ldlm_lock_decref(&lockh, LCK_PR);
1019         RETURN(0);
1020 }
1021 #endif
1022
1023 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
1024 {
1025         struct ptlrpc_request *req = reqp;
1026         struct inode *inode = ll_inode_from_lock(lock);
1027         struct ll_inode_info *lli;
1028         struct lov_stripe_md *lsm;
1029         struct ost_lvb *lvb;
1030         int rc, stripe;
1031         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
1032         ENTRY;
1033
1034         if (inode == NULL)
1035                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
1036         lli = ll_i2info(inode);
1037         if (lli == NULL)
1038                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1039         lsm = lli->lli_smd;
1040         if (lsm == NULL)
1041                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1042
1043         /* First, find out which stripe index this lock corresponds to. */
1044         stripe = ll_lock_to_stripe_offset(inode, lock);
1045         if (stripe < 0)
1046                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1047
1048         rc = lustre_pack_reply(req, 2, size, NULL);
1049         if (rc)
1050                 GOTO(iput, rc);
1051
1052         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
1053         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1054         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1055         lvb->lvb_atime = LTIME_S(inode->i_atime);
1056         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1057
1058         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1059                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1060                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1061                    lvb->lvb_atime, lvb->lvb_ctime);
1062  iput:
1063         iput(inode);
1064
1065  out:
1066         /* These errors are normal races, so we don't want to fill the console
1067          * with messages by calling ptlrpc_error() */
1068         if (rc == -ELDLM_NO_LOCK_DATA)
1069                 lustre_pack_reply(req, 1, NULL, NULL);
1070
1071         req->rq_status = rc;
1072         return rc;
1073 }
1074
1075 static int ll_merge_lvb(struct inode *inode)
1076 {
1077         struct ll_inode_info *lli = ll_i2info(inode);
1078         struct ll_sb_info *sbi = ll_i2sbi(inode);
1079         struct ost_lvb lvb;
1080         int rc;
1081
1082         ENTRY;
1083
1084         ll_inode_size_lock(inode, 1);
1085         inode_init_lvb(inode, &lvb);
1086         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1087         i_size_write(inode, lvb.lvb_size);
1088         inode->i_blocks = lvb.lvb_blocks;
1089
1090         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1091         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1092         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1093         ll_inode_size_unlock(inode, 1);
1094
1095         RETURN(rc);
1096 }
1097
1098 int ll_local_size(struct inode *inode)
1099 {
1100         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1101         struct ll_inode_info *lli = ll_i2info(inode);
1102         struct ll_sb_info *sbi = ll_i2sbi(inode);
1103         struct lustre_handle lockh = { 0 };
1104         int flags = 0;
1105         int rc;
1106         ENTRY;
1107
1108         if (lli->lli_smd->lsm_stripe_count == 0)
1109                 RETURN(0);
1110
1111         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1112                        &policy, LCK_PR, &flags, inode, &lockh);
1113         if (rc < 0)
1114                 RETURN(rc);
1115         else if (rc == 0)
1116                 RETURN(-ENODATA);
1117
1118         rc = ll_merge_lvb(inode);
1119         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1120         RETURN(rc);
1121 }
1122
1123 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1124                      lstat_t *st)
1125 {
1126         struct lustre_handle lockh = { 0 };
1127         struct ldlm_enqueue_info einfo = { 0 };
1128         struct obd_info oinfo = { { { 0 } } };
1129         struct ost_lvb lvb;
1130         int rc;
1131
1132         ENTRY;
1133
1134         einfo.ei_type = LDLM_EXTENT;
1135         einfo.ei_mode = LCK_PR;
1136         einfo.ei_cb_bl = ll_extent_lock_callback;
1137         einfo.ei_cb_cp = ldlm_completion_ast;
1138         einfo.ei_cb_gl = ll_glimpse_callback;
1139         einfo.ei_cbdata = NULL;
1140
1141         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1142         oinfo.oi_lockh = &lockh;
1143         oinfo.oi_md = lsm;
1144         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1145
1146         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1147         if (rc == -ENOENT)
1148                 RETURN(rc);
1149         if (rc != 0) {
1150                 CERROR("obd_enqueue returned rc %d, "
1151                        "returning -EIO\n", rc);
1152                 RETURN(rc > 0 ? -EIO : rc);
1153         }
1154
1155         lov_stripe_lock(lsm);
1156         memset(&lvb, 0, sizeof(lvb));
1157         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1158         st->st_size = lvb.lvb_size;
1159         st->st_blocks = lvb.lvb_blocks;
1160         st->st_mtime = lvb.lvb_mtime;
1161         st->st_atime = lvb.lvb_atime;
1162         st->st_ctime = lvb.lvb_ctime;
1163         lov_stripe_unlock(lsm);
1164
1165         RETURN(rc);
1166 }
1167
1168 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1169  * file (because it prefers KMS over RSS when larger) */
1170 int ll_glimpse_size(struct inode *inode, int ast_flags)
1171 {
1172         struct ll_inode_info *lli = ll_i2info(inode);
1173         struct ll_sb_info *sbi = ll_i2sbi(inode);
1174         struct lustre_handle lockh = { 0 };
1175         struct ldlm_enqueue_info einfo = { 0 };
1176         struct obd_info oinfo = { { { 0 } } };
1177         int rc;
1178         ENTRY;
1179
1180         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1181                 RETURN(0);
1182
1183         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1184
1185         if (!lli->lli_smd) {
1186                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1187                 RETURN(0);
1188         }
1189
1190         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1191          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1192          *       won't revoke any conflicting DLM locks held. Instead,
1193          *       ll_glimpse_callback() will be called on each client
1194          *       holding a DLM lock against this file, and resulting size
1195          *       will be returned for each stripe. DLM lock on [0, EOF] is
1196          *       acquired only if there were no conflicting locks. */
1197         einfo.ei_type = LDLM_EXTENT;
1198         einfo.ei_mode = LCK_PR;
1199         einfo.ei_cb_bl = ll_extent_lock_callback;
1200         einfo.ei_cb_cp = ldlm_completion_ast;
1201         einfo.ei_cb_gl = ll_glimpse_callback;
1202         einfo.ei_cbdata = inode;
1203
1204         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1205         oinfo.oi_lockh = &lockh;
1206         oinfo.oi_md = lli->lli_smd;
1207         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1208
1209         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1210         if (rc == -ENOENT)
1211                 RETURN(rc);
1212         if (rc != 0) {
1213                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1214                 RETURN(rc > 0 ? -EIO : rc);
1215         }
1216
1217         rc = ll_merge_lvb(inode);
1218
1219         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1220                i_size_read(inode), inode->i_blocks);
1221
1222         RETURN(rc);
1223 }
1224
1225 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1226                    struct lov_stripe_md *lsm, int mode,
1227                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1228                    int ast_flags)
1229 {
1230         struct ll_sb_info *sbi = ll_i2sbi(inode);
1231         struct ost_lvb lvb;
1232         struct ldlm_enqueue_info einfo = { 0 };
1233         struct obd_info oinfo = { { { 0 } } };
1234         int rc;
1235         ENTRY;
1236
1237         LASSERT(!lustre_handle_is_used(lockh));
1238         LASSERT(lsm != NULL);
1239
1240         /* don't drop the mmapped file to LRU */
1241         if (mapping_mapped(inode->i_mapping))
1242                 ast_flags |= LDLM_FL_NO_LRU;
1243
1244         /* XXX phil: can we do this?  won't it screw the file size up? */
1245         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1246             (sbi->ll_flags & LL_SBI_NOLCK))
1247                 RETURN(0);
1248
1249         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1250                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1251
1252         einfo.ei_type = LDLM_EXTENT;
1253         einfo.ei_mode = mode;
1254         einfo.ei_cb_bl = ll_extent_lock_callback;
1255         einfo.ei_cb_cp = ldlm_completion_ast;
1256         einfo.ei_cb_gl = ll_glimpse_callback;
1257         einfo.ei_cbdata = inode;
1258
1259         oinfo.oi_policy = *policy;
1260         oinfo.oi_lockh = lockh;
1261         oinfo.oi_md = lsm;
1262         oinfo.oi_flags = ast_flags;
1263
1264         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1265         *policy = oinfo.oi_policy;
1266         if (rc > 0)
1267                 rc = -EIO;
1268
1269         ll_inode_size_lock(inode, 1);
1270         inode_init_lvb(inode, &lvb);
1271         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1272
1273         if (policy->l_extent.start == 0 &&
1274             policy->l_extent.end == OBD_OBJECT_EOF) {
1275                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1276                  * the kms under both a DLM lock and the
1277                  * ll_inode_size_lock().  If we don't get the
1278                  * ll_inode_size_lock() here we can match the DLM lock and
1279                  * reset i_size from the kms before the truncating path has
1280                  * updated the kms.  generic_file_write can then trust the
1281                  * stale i_size when doing appending writes and effectively
1282                  * cancel the result of the truncate.  Getting the
1283                  * ll_inode_size_lock() after the enqueue maintains the DLM
1284                  * -> ll_inode_size_lock() acquiring order. */
1285                 i_size_write(inode, lvb.lvb_size);
1286                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1287                        inode->i_ino, i_size_read(inode));
1288         }
1289
1290         if (rc == 0) {
1291                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1292                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1293                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1294         }
1295         ll_inode_size_unlock(inode, 1);
1296
1297         RETURN(rc);
1298 }
1299
1300 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1301                      struct lov_stripe_md *lsm, int mode,
1302                      struct lustre_handle *lockh)
1303 {
1304         struct ll_sb_info *sbi = ll_i2sbi(inode);
1305         int rc;
1306         ENTRY;
1307
1308         /* XXX phil: can we do this?  won't it screw the file size up? */
1309         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1310             (sbi->ll_flags & LL_SBI_NOLCK))
1311                 RETURN(0);
1312
1313         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1314
1315         RETURN(rc);
1316 }
1317
1318 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1319                             loff_t *ppos)
1320 {
1321         struct inode *inode = file->f_dentry->d_inode;
1322         struct ll_inode_info *lli = ll_i2info(inode);
1323         struct lov_stripe_md *lsm = lli->lli_smd;
1324         struct ll_sb_info *sbi = ll_i2sbi(inode);
1325         struct ll_lock_tree tree;
1326         struct ll_lock_tree_node *node;
1327         struct ost_lvb lvb;
1328         struct ll_ra_read bead;
1329         int rc, ra = 0;
1330         loff_t end;
1331         ssize_t retval, chunk, sum = 0;
1332
1333         __u64 kms;
1334         ENTRY;
1335         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1336                inode->i_ino, inode->i_generation, inode, count, *ppos);
1337         /* "If nbyte is 0, read() will return 0 and have no other results."
1338          *                      -- Single Unix Spec */
1339         if (count == 0)
1340                 RETURN(0);
1341
1342         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1343
1344         if (!lsm) {
1345                 /* Read on file with no objects should return zero-filled
1346                  * buffers up to file size (we can get non-zero sizes with
1347                  * mknod + truncate, then opening file for read. This is a
1348                  * common pattern in NFS case, it seems). Bug 6243 */
1349                 int notzeroed;
1350                 /* Since there are no objects on OSTs, we have nothing to get
1351                  * lock on and so we are forced to access inode->i_size
1352                  * unguarded */
1353
1354                 /* Read beyond end of file */
1355                 if (*ppos >= i_size_read(inode))
1356                         RETURN(0);
1357
1358                 if (count > i_size_read(inode) - *ppos)
1359                         count = i_size_read(inode) - *ppos;
1360                 /* Make sure to correctly adjust the file pos pointer for
1361                  * EFAULT case */
1362                 notzeroed = clear_user(buf, count);
1363                 count -= notzeroed;
1364                 *ppos += count;
1365                 if (!count)
1366                         RETURN(-EFAULT);
1367                 RETURN(count);
1368         }
1369
1370 repeat:
1371         if (sbi->ll_max_rw_chunk != 0) {
1372                 /* first, let's know the end of the current stripe */
1373                 end = *ppos;
1374                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
1375                                 (obd_off *)&end);
1376
1377                 /* correct, the end is beyond the request */
1378                 if (end > *ppos + count - 1)
1379                         end = *ppos + count - 1;
1380
1381                 /* and chunk shouldn't be too large even if striping is wide */
1382                 if (end - *ppos > sbi->ll_max_rw_chunk)
1383                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1384         } else {
1385                 end = *ppos + count - 1;
1386         }
1387
1388         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1389         if (IS_ERR(node)){
1390                 GOTO(out, retval = PTR_ERR(node));
1391         }
1392
1393         tree.lt_fd = LUSTRE_FPRIVATE(file);
1394         rc = ll_tree_lock(&tree, node, buf, count,
1395                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1396         if (rc != 0)
1397                 GOTO(out, retval = rc);
1398
1399         ll_inode_size_lock(inode, 1);
1400         /*
1401          * Consistency guarantees: following possibilities exist for the
1402          * relation between region being read and real file size at this
1403          * moment:
1404          *
1405          *  (A): the region is completely inside of the file;
1406          *
1407          *  (B-x): x bytes of region are inside of the file, the rest is
1408          *  outside;
1409          *
1410          *  (C): the region is completely outside of the file.
1411          *
1412          * This classification is stable under DLM lock acquired by
1413          * ll_tree_lock() above, because to change class, other client has to
1414          * take DLM lock conflicting with our lock. Also, any updates to
1415          * ->i_size by other threads on this client are serialized by
1416          * ll_inode_size_lock(). This guarantees that short reads are handled
1417          * correctly in the face of concurrent writes and truncates.
1418          */
1419         inode_init_lvb(inode, &lvb);
1420         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1421         kms = lvb.lvb_size;
1422         if (*ppos + count - 1 > kms) {
1423                 /* A glimpse is necessary to determine whether we return a
1424                  * short read (B) or some zeroes at the end of the buffer (C) */
1425                 ll_inode_size_unlock(inode, 1);
1426                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1427                 if (retval) {
1428                         ll_tree_unlock(&tree);
1429                         goto out;
1430                 }
1431         } else {
1432                 /* region is within kms and, hence, within real file size (A).
1433                  * We need to increase i_size to cover the read region so that
1434                  * generic_file_read() will do its job, but that doesn't mean
1435                  * the kms size is _correct_, it is only the _minimum_ size.
1436                  * If someone does a stat they will get the correct size which
1437                  * will always be >= the kms value here.  b=11081 */
1438                 if (i_size_read(inode) < kms)
1439                         i_size_write(inode, kms);
1440                 ll_inode_size_unlock(inode, 1);
1441         }
1442
1443         chunk = end - *ppos + 1;
1444         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1445                inode->i_ino, chunk, *ppos, i_size_read(inode));
1446
1447         /* turn off the kernel's read-ahead */
1448         file->f_ra.ra_pages = 0;
1449
1450         /* initialize read-ahead window once per syscall */
1451         if (ra == 0) {
1452                 ra = 1;
1453                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1454                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1455                 ll_ra_read_in(file, &bead);
1456         }
1457
1458         /* BUG: 5972 */
1459         file_accessed(file);
1460         retval = generic_file_read(file, buf, chunk, ppos);
1461         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1462
1463         ll_tree_unlock(&tree);
1464
1465         if (retval > 0) {
1466                 buf += retval;
1467                 count -= retval;
1468                 sum += retval;
1469                 if (retval == chunk && count > 0)
1470                         goto repeat;
1471         }
1472
1473  out:
1474         if (ra != 0)
1475                 ll_ra_read_ex(file, &bead);
1476         retval = (sum > 0) ? sum : retval;
1477         RETURN(retval);
1478 }
1479
1480 /*
1481  * Write to a file (through the page cache).
1482  */
1483 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1484                              loff_t *ppos)
1485 {
1486         struct inode *inode = file->f_dentry->d_inode;
1487         struct ll_sb_info *sbi = ll_i2sbi(inode);
1488         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1489         struct ll_lock_tree tree;
1490         struct ll_lock_tree_node *node;
1491         loff_t maxbytes = ll_file_maxbytes(inode);
1492         loff_t lock_start, lock_end, end;
1493         ssize_t retval, chunk, sum = 0;
1494         int rc;
1495         ENTRY;
1496
1497         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1498                inode->i_ino, inode->i_generation, inode, count, *ppos);
1499
1500         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1501
1502         /* POSIX, but surprised the VFS doesn't check this already */
1503         if (count == 0)
1504                 RETURN(0);
1505
1506         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1507          * called on the file, don't fail the below assertion (bug 2388). */
1508         if (file->f_flags & O_LOV_DELAY_CREATE &&
1509             ll_i2info(inode)->lli_smd == NULL)
1510                 RETURN(-EBADF);
1511
1512         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1513
1514         down(&ll_i2info(inode)->lli_write_sem);
1515
1516 repeat:
1517         chunk = 0; /* just to fix gcc's warning */
1518         end = *ppos + count - 1;
1519
1520         if (file->f_flags & O_APPEND) {
1521                 lock_start = 0;
1522                 lock_end = OBD_OBJECT_EOF;
1523         } else if (sbi->ll_max_rw_chunk != 0) {
1524                 /* first, let's know the end of the current stripe */
1525                 end = *ppos;
1526                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1527                                 (obd_off *)&end);
1528
1529                 /* correct, the end is beyond the request */
1530                 if (end > *ppos + count - 1)
1531                         end = *ppos + count - 1;
1532
1533                 /* and chunk shouldn't be too large even if striping is wide */
1534                 if (end - *ppos > sbi->ll_max_rw_chunk)
1535                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1536                 lock_start = *ppos;
1537                 lock_end = end;
1538         } else {
1539                 lock_start = *ppos;
1540                 lock_end = *ppos + count - 1;
1541         }
1542         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1543
1544         if (IS_ERR(node))
1545                 GOTO(out, retval = PTR_ERR(node));
1546
1547         tree.lt_fd = LUSTRE_FPRIVATE(file);
1548         rc = ll_tree_lock(&tree, node, buf, count,
1549                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1550         if (rc != 0)
1551                 GOTO(out, retval = rc);
1552
1553         /* This is ok, g_f_w will overwrite this under i_sem if it races
1554          * with a local truncate, it just makes our maxbyte checking easier.
1555          * The i_size value gets updated in ll_extent_lock() as a consequence
1556          * of the [0,EOF] extent lock we requested above. */
1557         if (file->f_flags & O_APPEND) {
1558                 *ppos = i_size_read(inode);
1559                 end = *ppos + count - 1;
1560         }
1561
1562         if (*ppos >= maxbytes) {
1563                 send_sig(SIGXFSZ, current, 0);
1564                 GOTO(out_unlock, retval = -EFBIG);
1565         }
1566         if (*ppos + count > maxbytes)
1567                 count = maxbytes - *ppos;
1568
1569         /* generic_file_write handles O_APPEND after getting i_mutex */
1570         chunk = end - *ppos + 1;
1571         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1572                inode->i_ino, chunk, *ppos);
1573         retval = generic_file_write(file, buf, chunk, ppos);
1574         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1575
1576 out_unlock:
1577         ll_tree_unlock(&tree);
1578
1579 out:
1580         if (retval > 0) {
1581                 buf += retval;
1582                 count -= retval;
1583                 sum += retval;
1584                 if (retval == chunk && count > 0)
1585                         goto repeat;
1586         }
1587
1588         up(&ll_i2info(inode)->lli_write_sem);
1589
1590         retval = (sum > 0) ? sum : retval;
1591         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1592                            retval > 0 ? retval : 0);
1593         RETURN(retval);
1594 }
1595
1596 /*
1597  * Send file content (through pagecache) somewhere with helper
1598  */
1599 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1600                                 read_actor_t actor, void *target)
1601 {
1602         struct inode *inode = in_file->f_dentry->d_inode;
1603         struct ll_inode_info *lli = ll_i2info(inode);
1604         struct lov_stripe_md *lsm = lli->lli_smd;
1605         struct ll_lock_tree tree;
1606         struct ll_lock_tree_node *node;
1607         struct ost_lvb lvb;
1608         struct ll_ra_read bead;
1609         int rc;
1610         ssize_t retval;
1611         __u64 kms;
1612         ENTRY;
1613         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1614                inode->i_ino, inode->i_generation, inode, count, *ppos);
1615
1616         /* "If nbyte is 0, read() will return 0 and have no other results."
1617          *                      -- Single Unix Spec */
1618         if (count == 0)
1619                 RETURN(0);
1620
1621         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1622         /* turn off the kernel's read-ahead */
1623         in_file->f_ra.ra_pages = 0;
1624
1625         /* File with no objects, nothing to lock */
1626         if (!lsm)
1627                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1628
1629         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1630         if (IS_ERR(node))
1631                 RETURN(PTR_ERR(node));
1632
1633         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1634         rc = ll_tree_lock(&tree, node, NULL, count,
1635                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1636         if (rc != 0)
1637                 RETURN(rc);
1638
1639         ll_inode_size_lock(inode, 1);
1640         /*
1641          * Consistency guarantees: following possibilities exist for the
1642          * relation between region being read and real file size at this
1643          * moment:
1644          *
1645          *  (A): the region is completely inside of the file;
1646          *
1647          *  (B-x): x bytes of region are inside of the file, the rest is
1648          *  outside;
1649          *
1650          *  (C): the region is completely outside of the file.
1651          *
1652          * This classification is stable under DLM lock acquired by
1653          * ll_tree_lock() above, because to change class, other client has to
1654          * take DLM lock conflicting with our lock. Also, any updates to
1655          * ->i_size by other threads on this client are serialized by
1656          * ll_inode_size_lock(). This guarantees that short reads are handled
1657          * correctly in the face of concurrent writes and truncates.
1658          */
1659         inode_init_lvb(inode, &lvb);
1660         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1661         kms = lvb.lvb_size;
1662         if (*ppos + count - 1 > kms) {
1663                 /* A glimpse is necessary to determine whether we return a
1664                  * short read (B) or some zeroes at the end of the buffer (C) */
1665                 ll_inode_size_unlock(inode, 1);
1666                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1667                 if (retval)
1668                         goto out;
1669         } else {
1670                 /* region is within kms and, hence, within real file size (A) */
1671                 i_size_write(inode, kms);
1672                 ll_inode_size_unlock(inode, 1);
1673         }
1674
1675         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1676                inode->i_ino, count, *ppos, i_size_read(inode));
1677
1678         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1679         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1680         ll_ra_read_in(in_file, &bead);
1681         /* BUG: 5972 */
1682         file_accessed(in_file);
1683         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1684         ll_ra_read_ex(in_file, &bead);
1685
1686  out:
1687         ll_tree_unlock(&tree);
1688         RETURN(retval);
1689 }
1690
1691 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1692                                unsigned long arg)
1693 {
1694         struct ll_inode_info *lli = ll_i2info(inode);
1695         struct obd_export *exp = ll_i2dtexp(inode);
1696         struct ll_recreate_obj ucreatp;
1697         struct obd_trans_info oti = { 0 };
1698         struct obdo *oa = NULL;
1699         int lsm_size;
1700         int rc = 0;
1701         struct lov_stripe_md *lsm, *lsm2;
1702         ENTRY;
1703
1704         if (!capable (CAP_SYS_ADMIN))
1705                 RETURN(-EPERM);
1706
1707         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1708                             sizeof(struct ll_recreate_obj));
1709         if (rc) {
1710                 RETURN(-EFAULT);
1711         }
1712         OBDO_ALLOC(oa);
1713         if (oa == NULL)
1714                 RETURN(-ENOMEM);
1715
1716         down(&lli->lli_size_sem);
1717         lsm = lli->lli_smd;
1718         if (lsm == NULL)
1719                 GOTO(out, rc = -ENOENT);
1720         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1721                    (lsm->lsm_stripe_count));
1722
1723         OBD_ALLOC(lsm2, lsm_size);
1724         if (lsm2 == NULL)
1725                 GOTO(out, rc = -ENOMEM);
1726
1727         oa->o_id = ucreatp.lrc_id;
1728         oa->o_gr = ucreatp.lrc_group;
1729         oa->o_nlink = ucreatp.lrc_ost_idx;
1730         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1731         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1732         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1733                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1734
1735         memcpy(lsm2, lsm, lsm_size);
1736         rc = obd_create(exp, oa, &lsm2, &oti);
1737
1738         OBD_FREE(lsm2, lsm_size);
1739         GOTO(out, rc);
1740 out:
1741         up(&lli->lli_size_sem);
1742         OBDO_FREE(oa);
1743         return rc;
1744 }
1745
1746 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1747                              int flags, struct lov_user_md *lum, int lum_size)
1748 {
1749         struct ll_inode_info *lli = ll_i2info(inode);
1750         struct lov_stripe_md *lsm;
1751         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1752         int rc = 0;
1753         ENTRY;
1754
1755         down(&lli->lli_size_sem);
1756         lsm = lli->lli_smd;
1757         if (lsm) {
1758                 up(&lli->lli_size_sem);
1759                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1760                        inode->i_ino);
1761                 RETURN(-EEXIST);
1762         }
1763
1764         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1765         if (rc)
1766                 GOTO(out, rc);
1767         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1768                 GOTO(out_req_free, rc = -ENOENT);
1769         rc = oit.d.lustre.it_status;
1770         if (rc < 0)
1771                 GOTO(out_req_free, rc);
1772
1773         ll_release_openhandle(file->f_dentry, &oit);
1774
1775  out:
1776         up(&lli->lli_size_sem);
1777         ll_intent_release(&oit);
1778         RETURN(rc);
1779 out_req_free:
1780         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1781         goto out;
1782 }
1783
1784 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1785                              struct lov_mds_md **lmmp, int *lmm_size, 
1786                              struct ptlrpc_request **request)
1787 {
1788         struct ll_sb_info *sbi = ll_i2sbi(inode);
1789         struct mdt_body  *body;
1790         struct lov_mds_md *lmm = NULL;
1791         struct ptlrpc_request *req = NULL;
1792         struct obd_capa *oc;
1793         int rc, lmmsize;
1794
1795         rc = ll_get_max_mdsize(sbi, &lmmsize);
1796         if (rc)
1797                 RETURN(rc);
1798
1799         oc = ll_mdscapa_get(inode);
1800         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1801                              oc, filename, strlen(filename) + 1,
1802                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize, &req);
1803         capa_put(oc);
1804         if (rc < 0) {
1805                 CDEBUG(D_INFO, "md_getattr_name failed "
1806                        "on %s: rc %d\n", filename, rc);
1807                 GOTO(out, rc);
1808         }
1809
1810         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
1811         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1812         /* swabbed by mdc_getattr_name */
1813         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF));
1814
1815         lmmsize = body->eadatasize;
1816
1817         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1818                         lmmsize == 0) {
1819                 GOTO(out, rc = -ENODATA);
1820         }
1821
1822         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, lmmsize);
1823         LASSERT(lmm != NULL);
1824         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF + 1));
1825
1826         /*
1827          * This is coming from the MDS, so is probably in
1828          * little endian.  We convert it to host endian before
1829          * passing it to userspace.
1830          */
1831         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1832                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1833                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1834         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1835                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1836         }
1837
1838         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1839                 struct lov_stripe_md *lsm;
1840                 struct lov_user_md_join *lmj;
1841                 int lmj_size, i, aindex = 0;
1842
1843                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1844                 if (rc < 0)
1845                         GOTO(out, rc = -ENOMEM);
1846                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1847                 if (rc)
1848                         GOTO(out_free_memmd, rc);
1849
1850                 lmj_size = sizeof(struct lov_user_md_join) +
1851                            lsm->lsm_stripe_count *
1852                            sizeof(struct lov_user_ost_data_join);
1853                 OBD_ALLOC(lmj, lmj_size);
1854                 if (!lmj)
1855                         GOTO(out_free_memmd, rc = -ENOMEM);
1856
1857                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1858                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1859                         struct lov_extent *lex =
1860                                 &lsm->lsm_array->lai_ext_array[aindex];
1861
1862                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1863                                 aindex ++;
1864                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1865                                         LPU64" len %d\n", aindex, i,
1866                                         lex->le_start, (int)lex->le_len);
1867                         lmj->lmm_objects[i].l_extent_start =
1868                                 lex->le_start;
1869
1870                         if ((int)lex->le_len == -1)
1871                                 lmj->lmm_objects[i].l_extent_end = -1;
1872                         else
1873                                 lmj->lmm_objects[i].l_extent_end =
1874                                         lex->le_start + lex->le_len;
1875                         lmj->lmm_objects[i].l_object_id =
1876                                 lsm->lsm_oinfo[i]->loi_id;
1877                         lmj->lmm_objects[i].l_object_gr =
1878                                 lsm->lsm_oinfo[i]->loi_gr;
1879                         lmj->lmm_objects[i].l_ost_gen =
1880                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1881                         lmj->lmm_objects[i].l_ost_idx =
1882                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1883                 }
1884                 lmm = (struct lov_mds_md *)lmj;
1885                 lmmsize = lmj_size;
1886 out_free_memmd:
1887                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
1888         }
1889 out:
1890         *lmmp = lmm;
1891         *lmm_size = lmmsize;
1892         *request = req;
1893         return rc;
1894 }
1895
1896 static int ll_lov_setea(struct inode *inode, struct file *file,
1897                             unsigned long arg)
1898 {
1899         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1900         struct lov_user_md  *lump;
1901         int lum_size = sizeof(struct lov_user_md) +
1902                        sizeof(struct lov_user_ost_data);
1903         int rc;
1904         ENTRY;
1905
1906         if (!capable (CAP_SYS_ADMIN))
1907                 RETURN(-EPERM);
1908
1909         OBD_ALLOC(lump, lum_size);
1910         if (lump == NULL) {
1911                 RETURN(-ENOMEM);
1912         }
1913         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1914         if (rc) {
1915                 OBD_FREE(lump, lum_size);
1916                 RETURN(-EFAULT);
1917         }
1918
1919         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1920
1921         OBD_FREE(lump, lum_size);
1922         RETURN(rc);
1923 }
1924
1925 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1926                             unsigned long arg)
1927 {
1928         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1929         int rc;
1930         int flags = FMODE_WRITE;
1931         ENTRY;
1932
1933         /* Bug 1152: copy properly when this is no longer true */
1934         LASSERT(sizeof(lum) == sizeof(*lump));
1935         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1936         rc = copy_from_user(&lum, lump, sizeof(lum));
1937         if (rc)
1938                 RETURN(-EFAULT);
1939
1940         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1941         if (rc == 0) {
1942                  put_user(0, &lump->lmm_stripe_count);
1943                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1944                                     0, ll_i2info(inode)->lli_smd, lump);
1945         }
1946         RETURN(rc);
1947 }
1948
1949 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1950 {
1951         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1952
1953         if (!lsm)
1954                 RETURN(-ENODATA);
1955
1956         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1957                             (void *)arg);
1958 }
1959
1960 static int ll_get_grouplock(struct inode *inode, struct file *file,
1961                             unsigned long arg)
1962 {
1963         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1964         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1965                                                     .end = OBD_OBJECT_EOF}};
1966         struct lustre_handle lockh = { 0 };
1967         struct ll_inode_info *lli = ll_i2info(inode);
1968         struct lov_stripe_md *lsm = lli->lli_smd;
1969         int flags = 0, rc;
1970         ENTRY;
1971
1972         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1973                 RETURN(-EINVAL);
1974         }
1975
1976         policy.l_extent.gid = arg;
1977         if (file->f_flags & O_NONBLOCK)
1978                 flags = LDLM_FL_BLOCK_NOWAIT;
1979
1980         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1981         if (rc)
1982                 RETURN(rc);
1983
1984         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1985         fd->fd_gid = arg;
1986         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1987
1988         RETURN(0);
1989 }
1990
1991 static int ll_put_grouplock(struct inode *inode, struct file *file,
1992                             unsigned long arg)
1993 {
1994         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1995         struct ll_inode_info *lli = ll_i2info(inode);
1996         struct lov_stripe_md *lsm = lli->lli_smd;
1997         int rc;
1998         ENTRY;
1999
2000         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2001                 /* Ugh, it's already unlocked. */
2002                 RETURN(-EINVAL);
2003         }
2004
2005         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2006                 RETURN(-EINVAL);
2007
2008         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2009
2010         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2011         if (rc)
2012                 RETURN(rc);
2013
2014         fd->fd_gid = 0;
2015         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2016
2017         RETURN(0);
2018 }
2019
2020 static int join_sanity_check(struct inode *head, struct inode *tail)
2021 {
2022         ENTRY;
2023         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2024                 CERROR("server do not support join \n");
2025                 RETURN(-EINVAL);
2026         }
2027         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2028                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2029                        head->i_ino, tail->i_ino);
2030                 RETURN(-EINVAL);
2031         }
2032         if (head->i_ino == tail->i_ino) {
2033                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2034                 RETURN(-EINVAL);
2035         }
2036         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2037                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2038                 RETURN(-EINVAL);
2039         }
2040         RETURN(0);
2041 }
2042
2043 static int join_file(struct inode *head_inode, struct file *head_filp,
2044                      struct file *tail_filp)
2045 {
2046         struct dentry *tail_dentry = tail_filp->f_dentry;
2047         struct lookup_intent oit = {.it_op = IT_OPEN,
2048                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2049         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2050                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2051
2052         struct lustre_handle lockh;
2053         struct md_op_data *op_data;
2054         int    rc;
2055         loff_t data;
2056         ENTRY;
2057
2058         tail_dentry = tail_filp->f_dentry;
2059
2060         data = i_size_read(head_inode);
2061         op_data = ll_prep_md_op_data(NULL, head_inode,
2062                                      tail_dentry->d_parent->d_inode,
2063                                      tail_dentry->d_name.name,
2064                                      tail_dentry->d_name.len, 0,
2065                                      LUSTRE_OPC_ANY, &data);
2066         if (IS_ERR(op_data))
2067                 RETURN(PTR_ERR(op_data));
2068
2069         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2070                          op_data, &lockh, NULL, 0, 0);
2071
2072         ll_finish_md_op_data(op_data);
2073         if (rc < 0)
2074                 GOTO(out, rc);
2075
2076         rc = oit.d.lustre.it_status;
2077
2078         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2079                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2080                 ptlrpc_req_finished((struct ptlrpc_request *)
2081                                     oit.d.lustre.it_data);
2082                 GOTO(out, rc);
2083         }
2084
2085         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2086                                            * away */
2087                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2088                 oit.d.lustre.it_lock_mode = 0;
2089         }
2090         ll_release_openhandle(head_filp->f_dentry, &oit);
2091 out:
2092         ll_intent_release(&oit);
2093         RETURN(rc);
2094 }
2095
2096 static int ll_file_join(struct inode *head, struct file *filp,
2097                         char *filename_tail)
2098 {
2099         struct inode *tail = NULL, *first = NULL, *second = NULL;
2100         struct dentry *tail_dentry;
2101         struct file *tail_filp, *first_filp, *second_filp;
2102         struct ll_lock_tree first_tree, second_tree;
2103         struct ll_lock_tree_node *first_node, *second_node;
2104         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2105         int rc = 0, cleanup_phase = 0;
2106         ENTRY;
2107
2108         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2109                head->i_ino, head->i_generation, head, filename_tail);
2110
2111         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2112         if (IS_ERR(tail_filp)) {
2113                 CERROR("Can not open tail file %s", filename_tail);
2114                 rc = PTR_ERR(tail_filp);
2115                 GOTO(cleanup, rc);
2116         }
2117         tail = igrab(tail_filp->f_dentry->d_inode);
2118
2119         tlli = ll_i2info(tail);
2120         tail_dentry = tail_filp->f_dentry;
2121         LASSERT(tail_dentry);
2122         cleanup_phase = 1;
2123
2124         /*reorder the inode for lock sequence*/
2125         first = head->i_ino > tail->i_ino ? head : tail;
2126         second = head->i_ino > tail->i_ino ? tail : head;
2127         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2128         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2129
2130         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2131                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2132         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2133         if (IS_ERR(first_node)){
2134                 rc = PTR_ERR(first_node);
2135                 GOTO(cleanup, rc);
2136         }
2137         first_tree.lt_fd = first_filp->private_data;
2138         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2139         if (rc != 0)
2140                 GOTO(cleanup, rc);
2141         cleanup_phase = 2;
2142
2143         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2144         if (IS_ERR(second_node)){
2145                 rc = PTR_ERR(second_node);
2146                 GOTO(cleanup, rc);
2147         }
2148         second_tree.lt_fd = second_filp->private_data;
2149         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2150         if (rc != 0)
2151                 GOTO(cleanup, rc);
2152         cleanup_phase = 3;
2153
2154         rc = join_sanity_check(head, tail);
2155         if (rc)
2156                 GOTO(cleanup, rc);
2157
2158         rc = join_file(head, filp, tail_filp);
2159         if (rc)
2160                 GOTO(cleanup, rc);
2161 cleanup:
2162         switch (cleanup_phase) {
2163         case 3:
2164                 ll_tree_unlock(&second_tree);
2165                 obd_cancel_unused(ll_i2dtexp(second),
2166                                   ll_i2info(second)->lli_smd, 0, NULL);
2167         case 2:
2168                 ll_tree_unlock(&first_tree);
2169                 obd_cancel_unused(ll_i2dtexp(first),
2170                                   ll_i2info(first)->lli_smd, 0, NULL);
2171         case 1:
2172                 filp_close(tail_filp, 0);
2173                 if (tail)
2174                         iput(tail);
2175                 if (head && rc == 0) {
2176                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2177                                        &hlli->lli_smd);
2178                         hlli->lli_smd = NULL;
2179                 }
2180         case 0:
2181                 break;
2182         default:
2183                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2184                 LBUG();
2185         }
2186         RETURN(rc);
2187 }
2188
2189 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2190 {
2191         struct inode *inode = dentry->d_inode;
2192         struct obd_client_handle *och;
2193         int rc;
2194         ENTRY;
2195
2196         LASSERT(inode);
2197
2198         /* Root ? Do nothing. */
2199         if (dentry->d_inode->i_sb->s_root == dentry)
2200                 RETURN(0);
2201
2202         /* No open handle to close? Move away */
2203         if (!it_disposition(it, DISP_OPEN_OPEN))
2204                 RETURN(0);
2205
2206         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2207
2208         OBD_ALLOC(och, sizeof(*och));
2209         if (!och)
2210                 GOTO(out, rc = -ENOMEM);
2211
2212         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2213                     ll_i2info(inode), it, och);
2214
2215         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2216                                        inode, och);
2217  out:
2218         /* this one is in place of ll_file_open */
2219         ptlrpc_req_finished(it->d.lustre.it_data);
2220         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2221         RETURN(rc);
2222 }
2223
2224 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2225                   unsigned long arg)
2226 {
2227         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2228         int flags;
2229         ENTRY;
2230
2231         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2232                inode->i_generation, inode, cmd);
2233         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2234
2235         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2236         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2237                 RETURN(-ENOTTY);
2238
2239         switch(cmd) {
2240         case LL_IOC_GETFLAGS:
2241                 /* Get the current value of the file flags */
2242                 return put_user(fd->fd_flags, (int *)arg);
2243         case LL_IOC_SETFLAGS:
2244         case LL_IOC_CLRFLAGS:
2245                 /* Set or clear specific file flags */
2246                 /* XXX This probably needs checks to ensure the flags are
2247                  *     not abused, and to handle any flag side effects.
2248                  */
2249                 if (get_user(flags, (int *) arg))
2250                         RETURN(-EFAULT);
2251
2252                 if (cmd == LL_IOC_SETFLAGS) {
2253                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2254                             !(file->f_flags & O_DIRECT)) {
2255                                 CERROR("%s: unable to disable locking on "
2256                                        "non-O_DIRECT file\n", current->comm);
2257                                 RETURN(-EINVAL);
2258                         }
2259
2260                         fd->fd_flags |= flags;
2261                 } else {
2262                         fd->fd_flags &= ~flags;
2263                 }
2264                 RETURN(0);
2265         case LL_IOC_LOV_SETSTRIPE:
2266                 RETURN(ll_lov_setstripe(inode, file, arg));
2267         case LL_IOC_LOV_SETEA:
2268                 RETURN(ll_lov_setea(inode, file, arg));
2269         case LL_IOC_LOV_GETSTRIPE:
2270                 RETURN(ll_lov_getstripe(inode, arg));
2271         case LL_IOC_RECREATE_OBJ:
2272                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2273         case EXT3_IOC_GETFLAGS:
2274         case EXT3_IOC_SETFLAGS:
2275                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2276         case EXT3_IOC_GETVERSION_OLD:
2277         case EXT3_IOC_GETVERSION:
2278                 RETURN(put_user(inode->i_generation, (int *)arg));
2279         case LL_IOC_JOIN: {
2280                 char *ftail;
2281                 int rc;
2282
2283                 ftail = getname((const char *)arg);
2284                 if (IS_ERR(ftail))
2285                         RETURN(PTR_ERR(ftail));
2286                 rc = ll_file_join(inode, file, ftail);
2287                 putname(ftail);
2288                 RETURN(rc);
2289         }
2290         case LL_IOC_GROUP_LOCK:
2291                 RETURN(ll_get_grouplock(inode, file, arg));
2292         case LL_IOC_GROUP_UNLOCK:
2293                 RETURN(ll_put_grouplock(inode, file, arg));
2294         case IOC_OBD_STATFS:
2295                 RETURN(ll_obd_statfs(inode, (void *)arg));
2296
2297         /* We need to special case any other ioctls we want to handle,
2298          * to send them to the MDS/OST as appropriate and to properly
2299          * network encode the arg field.
2300         case EXT3_IOC_SETVERSION_OLD:
2301         case EXT3_IOC_SETVERSION:
2302         */
2303         case LL_IOC_FLUSHCTX:
2304                 RETURN(ll_flush_ctx(inode));
2305         case LL_IOC_GETFACL: {
2306                 struct rmtacl_ioctl_data ioc;
2307
2308                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2309                         RETURN(-EFAULT);
2310
2311                 RETURN(ll_ioctl_getfacl(inode, &ioc));
2312         }
2313         case LL_IOC_SETFACL: {
2314                 struct rmtacl_ioctl_data ioc;
2315
2316                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2317                         RETURN(-EFAULT);
2318
2319                 RETURN(ll_ioctl_setfacl(inode, &ioc));
2320         }
2321         default: {
2322                 int err;
2323
2324                 if (LLIOC_STOP == 
2325                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2326                         RETURN(err);
2327
2328                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2329                                      (void *)arg));
2330         }
2331         }
2332 }
2333
2334 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2335 {
2336         struct inode *inode = file->f_dentry->d_inode;
2337         struct ll_inode_info *lli = ll_i2info(inode);
2338         struct lov_stripe_md *lsm = lli->lli_smd;
2339         loff_t retval;
2340         ENTRY;
2341         retval = offset + ((origin == 2) ? i_size_read(inode) :
2342                            (origin == 1) ? file->f_pos : 0);
2343         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2344                inode->i_ino, inode->i_generation, inode, retval, retval,
2345                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2346         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2347
2348         if (origin == 2) { /* SEEK_END */
2349                 int nonblock = 0, rc;
2350
2351                 if (file->f_flags & O_NONBLOCK)
2352                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2353
2354                 if (lsm != NULL) {
2355                         rc = ll_glimpse_size(inode, nonblock);
2356                         if (rc != 0)
2357                                 RETURN(rc);
2358                 }
2359
2360                 ll_inode_size_lock(inode, 0);
2361                 offset += i_size_read(inode);
2362                 ll_inode_size_unlock(inode, 0);
2363         } else if (origin == 1) { /* SEEK_CUR */
2364                 offset += file->f_pos;
2365         }
2366
2367         retval = -EINVAL;
2368         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2369                 if (offset != file->f_pos) {
2370                         file->f_pos = offset;
2371 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2372                         file->f_reada = 0;
2373                         file->f_version = ++event;
2374 #endif
2375                 }
2376                 retval = offset;
2377         }
2378         
2379         RETURN(retval);
2380 }
2381
2382 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2383 {
2384         struct inode *inode = dentry->d_inode;
2385         struct ll_inode_info *lli = ll_i2info(inode);
2386         struct lov_stripe_md *lsm = lli->lli_smd;
2387         struct ptlrpc_request *req;
2388         struct obd_capa *oc;
2389         int rc, err;
2390         ENTRY;
2391         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2392                inode->i_generation, inode);
2393         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2394
2395         /* fsync's caller has already called _fdata{sync,write}, we want
2396          * that IO to finish before calling the osc and mdc sync methods */
2397         rc = filemap_fdatawait(inode->i_mapping);
2398
2399         /* catch async errors that were recorded back when async writeback
2400          * failed for pages in this mapping. */
2401         err = lli->lli_async_rc;
2402         lli->lli_async_rc = 0;
2403         if (rc == 0)
2404                 rc = err;
2405         if (lsm) {
2406                 err = lov_test_and_clear_async_rc(lsm);
2407                 if (rc == 0)
2408                         rc = err;
2409         }
2410
2411         oc = ll_mdscapa_get(inode);
2412         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2413                       &req);
2414         capa_put(oc);
2415         if (!rc)
2416                 rc = err;
2417         if (!err)
2418                 ptlrpc_req_finished(req);
2419
2420         if (data && lsm) {
2421                 struct obdo *oa;
2422                 
2423                 OBDO_ALLOC(oa);
2424                 if (!oa)
2425                         RETURN(rc ? rc : -ENOMEM);
2426
2427                 oa->o_id = lsm->lsm_object_id;
2428                 oa->o_gr = lsm->lsm_object_gr;
2429                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2430                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2431                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2432                                            OBD_MD_FLGROUP);
2433
2434                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2435                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2436                                0, OBD_OBJECT_EOF, oc);
2437                 capa_put(oc);
2438                 if (!rc)
2439                         rc = err;
2440                 OBDO_FREE(oa);
2441         }
2442
2443         RETURN(rc);
2444 }
2445
2446 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2447 {
2448         struct inode *inode = file->f_dentry->d_inode;
2449         struct ll_sb_info *sbi = ll_i2sbi(inode);
2450         struct ldlm_res_id res_id =
2451                 { .name = { fid_seq(ll_inode2fid(inode)),
2452                             fid_oid(ll_inode2fid(inode)),
2453                             fid_ver(ll_inode2fid(inode)),
2454                             LDLM_FLOCK} };
2455         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2456                 ldlm_flock_completion_ast, NULL, file_lock };
2457         struct lustre_handle lockh = {0};
2458         ldlm_policy_data_t flock;
2459         int flags = 0;
2460         int rc;
2461         ENTRY;
2462
2463         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2464                inode->i_ino, file_lock);
2465
2466         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2467  
2468         if (file_lock->fl_flags & FL_FLOCK) {
2469                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2470                 /* set missing params for flock() calls */
2471                 file_lock->fl_end = OFFSET_MAX;
2472                 file_lock->fl_pid = current->tgid;
2473         }
2474         flock.l_flock.pid = file_lock->fl_pid;
2475         flock.l_flock.start = file_lock->fl_start;
2476         flock.l_flock.end = file_lock->fl_end;
2477
2478         switch (file_lock->fl_type) {
2479         case F_RDLCK:
2480                 einfo.ei_mode = LCK_PR;
2481                 break;
2482         case F_UNLCK:
2483                 /* An unlock request may or may not have any relation to
2484                  * existing locks so we may not be able to pass a lock handle
2485                  * via a normal ldlm_lock_cancel() request. The request may even
2486                  * unlock a byte range in the middle of an existing lock. In
2487                  * order to process an unlock request we need all of the same
2488                  * information that is given with a normal read or write record
2489                  * lock request. To avoid creating another ldlm unlock (cancel)
2490                  * message we'll treat a LCK_NL flock request as an unlock. */
2491                 einfo.ei_mode = LCK_NL;
2492                 break;
2493         case F_WRLCK:
2494                 einfo.ei_mode = LCK_PW;
2495                 break;
2496         default:
2497                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2498                 LBUG();
2499         }
2500
2501         switch (cmd) {
2502         case F_SETLKW:
2503 #ifdef F_SETLKW64
2504         case F_SETLKW64:
2505 #endif
2506                 flags = 0;
2507                 break;
2508         case F_SETLK:
2509 #ifdef F_SETLK64
2510         case F_SETLK64:
2511 #endif
2512                 flags = LDLM_FL_BLOCK_NOWAIT;
2513                 break;
2514         case F_GETLK:
2515 #ifdef F_GETLK64
2516         case F_GETLK64:
2517 #endif
2518                 flags = LDLM_FL_TEST_LOCK;
2519                 /* Save the old mode so that if the mode in the lock changes we
2520                  * can decrement the appropriate reader or writer refcount. */
2521                 file_lock->fl_type = einfo.ei_mode;
2522                 break;
2523         default:
2524                 CERROR("unknown fcntl lock command: %d\n", cmd);
2525                 LBUG();
2526         }
2527
2528         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2529                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2530                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2531
2532         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2533                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2534         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2535                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2536 #ifdef HAVE_F_OP_FLOCK
2537         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2538             !(flags & LDLM_FL_TEST_LOCK))
2539                 posix_lock_file_wait(file, file_lock);
2540 #endif
2541
2542         RETURN(rc);
2543 }
2544
2545 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2546 {
2547         ENTRY;
2548
2549         RETURN(-ENOSYS);
2550 }
2551
2552 int ll_have_md_lock(struct inode *inode, __u64 bits)
2553 {
2554         struct lustre_handle lockh;
2555         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2556         struct lu_fid *fid;
2557         int flags;
2558         ENTRY;
2559
2560         if (!inode)
2561                RETURN(0);
2562
2563         fid = &ll_i2info(inode)->lli_fid;
2564         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2565
2566         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2567         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2568                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2569                 RETURN(1);
2570         }
2571         RETURN(0);
2572 }
2573
2574 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2575                             struct lustre_handle *lockh)
2576 {
2577         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2578         struct lu_fid *fid;
2579         ldlm_mode_t rc;
2580         int flags;
2581         ENTRY;
2582
2583         fid = &ll_i2info(inode)->lli_fid;
2584         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2585
2586         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2587         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2588                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2589         RETURN(rc);
2590 }
2591
2592 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2593         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2594                               * and return success */
2595                 inode->i_nlink = 0;
2596                 /* This path cannot be hit for regular files unless in
2597                  * case of obscure races, so no need to to validate
2598                  * size. */
2599                 if (!S_ISREG(inode->i_mode) &&
2600                     !S_ISDIR(inode->i_mode))
2601                         return 0;
2602         }
2603
2604         if (rc) {
2605                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2606                 return -abs(rc);
2607
2608         }
2609
2610         return 0;
2611 }
2612
2613 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2614 {
2615         struct inode *inode = dentry->d_inode;
2616         struct ptlrpc_request *req = NULL;
2617         struct ll_sb_info *sbi;
2618         struct obd_export *exp;
2619         int rc;
2620         ENTRY;
2621
2622         if (!inode) {
2623                 CERROR("REPORT THIS LINE TO PETER\n");
2624                 RETURN(0);
2625         }
2626         sbi = ll_i2sbi(inode);
2627
2628         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2629                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2630
2631         exp = ll_i2mdexp(inode);
2632
2633         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2634                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2635                 struct md_op_data *op_data;
2636
2637                 /* Call getattr by fid, so do not provide name at all. */
2638                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2639                                              dentry->d_inode, NULL, 0, 0,
2640                                              LUSTRE_OPC_ANY, NULL);
2641                 if (IS_ERR(op_data))
2642                         RETURN(PTR_ERR(op_data));
2643
2644                 oit.it_flags |= O_CHECK_STALE;
2645                 rc = md_intent_lock(exp, op_data, NULL, 0,
2646                                     /* we are not interested in name
2647                                        based lookup */
2648                                     &oit, 0, &req,
2649                                     ll_md_blocking_ast, 0);
2650                 ll_finish_md_op_data(op_data);
2651                 oit.it_flags &= ~O_CHECK_STALE;
2652                 if (rc < 0) {
2653                         rc = ll_inode_revalidate_fini(inode, rc);
2654                         GOTO (out, rc);
2655                 }
2656
2657                 rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2658                 if (rc != 0) {
2659                         ll_intent_release(&oit);
2660                         GOTO(out, rc);
2661                 }
2662
2663                 /* Unlinked? Unhash dentry, so it is not picked up later by
2664                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2665                    here to preserve get_cwd functionality on 2.6.
2666                    Bug 10503 */
2667                 if (!dentry->d_inode->i_nlink) {
2668                         spin_lock(&dcache_lock);
2669                         ll_drop_dentry(dentry);
2670                         spin_unlock(&dcache_lock);
2671                 }
2672
2673                 ll_lookup_finish_locks(&oit, dentry);
2674         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE)) {
2675                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2676                 obd_valid valid = OBD_MD_FLGETATTR;
2677                 struct obd_capa *oc;
2678                 int ealen = 0;
2679
2680                 if (S_ISREG(inode->i_mode)) {
2681                         rc = ll_get_max_mdsize(sbi, &ealen);
2682                         if (rc)
2683                                 RETURN(rc);
2684                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2685                 }
2686                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2687                  * capa for this inode. Because we only keep capas of dirs
2688                  * fresh. */
2689                 oc = ll_mdscapa_get(inode);
2690                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2691                                 ealen, &req);
2692                 capa_put(oc);
2693                 if (rc) {
2694                         rc = ll_inode_revalidate_fini(inode, rc);
2695                         RETURN(rc);
2696                 }
2697
2698                 rc = ll_prep_inode(&inode, req, REPLY_REC_OFF,
2699                                    NULL);
2700                 if (rc)
2701                         GOTO(out, rc);
2702         }
2703
2704         /* if object not yet allocated, don't validate size */
2705         if (ll_i2info(inode)->lli_smd == NULL)
2706                 GOTO(out, rc = 0);
2707
2708         /* ll_glimpse_size will prefer locally cached writes if they extend
2709          * the file */
2710         rc = ll_glimpse_size(inode, 0);
2711         EXIT;
2712 out:
2713         ptlrpc_req_finished(req);
2714         return rc;
2715 }
2716
2717 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2718                   struct lookup_intent *it, struct kstat *stat)
2719 {
2720         struct inode *inode = de->d_inode;
2721         int res = 0;
2722
2723         res = ll_inode_revalidate_it(de, it);
2724         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2725
2726         if (res)
2727                 return res;
2728
2729         stat->dev = inode->i_sb->s_dev;
2730         stat->ino = inode->i_ino;
2731         stat->mode = inode->i_mode;
2732         stat->nlink = inode->i_nlink;
2733         stat->uid = inode->i_uid;
2734         stat->gid = inode->i_gid;
2735         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2736         stat->atime = inode->i_atime;
2737         stat->mtime = inode->i_mtime;
2738         stat->ctime = inode->i_ctime;
2739 #ifdef HAVE_INODE_BLKSIZE
2740         stat->blksize = inode->i_blksize;
2741 #else
2742         stat->blksize = 1 << inode->i_blkbits;
2743 #endif
2744
2745         ll_inode_size_lock(inode, 0);
2746         stat->size = i_size_read(inode);
2747         stat->blocks = inode->i_blocks;
2748         ll_inode_size_unlock(inode, 0);
2749
2750         return 0;
2751 }
2752 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2753 {
2754         struct lookup_intent it = { .it_op = IT_GETATTR };
2755
2756         return ll_getattr_it(mnt, de, &it, stat);
2757 }
2758
2759 static
2760 int lustre_check_acl(struct inode *inode, int mask)
2761 {
2762 #ifdef CONFIG_FS_POSIX_ACL
2763         struct ll_inode_info *lli = ll_i2info(inode);
2764         struct posix_acl *acl;
2765         int rc;
2766         ENTRY;
2767
2768         spin_lock(&lli->lli_lock);
2769         acl = posix_acl_dup(lli->lli_posix_acl);
2770         spin_unlock(&lli->lli_lock);
2771
2772         if (!acl)
2773                 RETURN(-EAGAIN);
2774
2775         rc = posix_acl_permission(inode, acl, mask);
2776         posix_acl_release(acl);
2777
2778         RETURN(rc);
2779 #else
2780         return -EAGAIN;
2781 #endif
2782 }
2783
2784 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2785 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2786 {
2787         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2788                inode->i_ino, inode->i_generation, inode, mask);
2789         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2790                 return lustre_check_remote_perm(inode, mask);
2791         
2792         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2793         return generic_permission(inode, mask, lustre_check_acl);
2794 }
2795 #else
2796 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2797 {
2798         int mode = inode->i_mode;
2799         int rc;
2800
2801         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2802                inode->i_ino, inode->i_generation, inode, mask);
2803
2804         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2805                 return lustre_check_remote_perm(inode, mask);
2806
2807         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2808
2809         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2810             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2811                 return -EROFS;
2812         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2813                 return -EACCES;
2814         if (current->fsuid == inode->i_uid) {
2815                 mode >>= 6;
2816         } else if (1) {
2817                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2818                         goto check_groups;
2819                 rc = lustre_check_acl(inode, mask);
2820                 if (rc == -EAGAIN)
2821                         goto check_groups;
2822                 if (rc == -EACCES)
2823                         goto check_capabilities;
2824                 return rc;
2825         } else {
2826 check_groups:
2827                 if (in_group_p(inode->i_gid))
2828                         mode >>= 3;
2829         }
2830         if ((mode & mask & S_IRWXO) == mask)
2831                 return 0;
2832
2833 check_capabilities:
2834         if (!(mask & MAY_EXEC) ||
2835             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2836                 if (capable(CAP_DAC_OVERRIDE))
2837                         return 0;
2838
2839         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2840             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2841                 return 0;
2842         
2843         return -EACCES;
2844 }
2845 #endif
2846
2847 /* -o localflock - only provides locally consistent flock locks */
2848 struct file_operations ll_file_operations = {
2849         .read           = ll_file_read,
2850         .write          = ll_file_write,
2851         .ioctl          = ll_file_ioctl,
2852         .open           = ll_file_open,
2853         .release        = ll_file_release,
2854         .mmap           = ll_file_mmap,
2855         .llseek         = ll_file_seek,
2856         .sendfile       = ll_file_sendfile,
2857         .fsync          = ll_fsync,
2858 };
2859
2860 struct file_operations ll_file_operations_flock = {
2861         .read           = ll_file_read,
2862         .write          = ll_file_write,
2863         .ioctl          = ll_file_ioctl,
2864         .open           = ll_file_open,
2865         .release        = ll_file_release,
2866         .mmap           = ll_file_mmap,
2867         .llseek         = ll_file_seek,
2868         .sendfile       = ll_file_sendfile,
2869         .fsync          = ll_fsync,
2870 #ifdef HAVE_F_OP_FLOCK
2871         .flock          = ll_file_flock,
2872 #endif
2873         .lock           = ll_file_flock
2874 };
2875
2876 /* These are for -o noflock - to return ENOSYS on flock calls */
2877 struct file_operations ll_file_operations_noflock = {
2878         .read           = ll_file_read,
2879         .write          = ll_file_write,
2880         .ioctl          = ll_file_ioctl,
2881         .open           = ll_file_open,
2882         .release        = ll_file_release,
2883         .mmap           = ll_file_mmap,
2884         .llseek         = ll_file_seek,
2885         .sendfile       = ll_file_sendfile,
2886         .fsync          = ll_fsync,
2887 #ifdef HAVE_F_OP_FLOCK
2888         .flock          = ll_file_noflock,
2889 #endif
2890         .lock           = ll_file_noflock
2891 };
2892
2893 struct inode_operations ll_file_inode_operations = {
2894 #ifdef HAVE_VFS_INTENT_PATCHES
2895         .setattr_raw    = ll_setattr_raw,
2896 #endif
2897         .setattr        = ll_setattr,
2898         .truncate       = ll_truncate,
2899         .getattr        = ll_getattr,
2900         .permission     = ll_inode_permission,
2901         .setxattr       = ll_setxattr,
2902         .getxattr       = ll_getxattr,
2903         .listxattr      = ll_listxattr,
2904         .removexattr    = ll_removexattr,
2905 };
2906
2907 /* dynamic ioctl number support routins */
2908 static struct llioc_ctl_data {
2909         struct rw_semaphore ioc_sem;
2910         struct list_head    ioc_head;
2911 } llioc = { 
2912         __RWSEM_INITIALIZER(llioc.ioc_sem), 
2913         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2914 };
2915
2916
2917 struct llioc_data {
2918         struct list_head        iocd_list;
2919         unsigned int            iocd_size;
2920         llioc_callback_t        iocd_cb;
2921         unsigned int            iocd_count;
2922         unsigned int            iocd_cmd[0];
2923 };
2924
2925 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2926 {
2927         unsigned int size;
2928         struct llioc_data *in_data = NULL;
2929         ENTRY;
2930
2931         if (cb == NULL || cmd == NULL ||
2932             count > LLIOC_MAX_CMD || count < 0)
2933                 RETURN(NULL);
2934
2935         size = sizeof(*in_data) + count * sizeof(unsigned int);
2936         OBD_ALLOC(in_data, size);
2937         if (in_data == NULL)
2938                 RETURN(NULL);
2939
2940         memset(in_data, 0, sizeof(*in_data));
2941         in_data->iocd_size = size;
2942         in_data->iocd_cb = cb;
2943         in_data->iocd_count = count;
2944         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2945
2946         down_write(&llioc.ioc_sem);
2947         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2948         up_write(&llioc.ioc_sem);
2949
2950         RETURN(in_data);
2951 }
2952
2953 void ll_iocontrol_unregister(void *magic)
2954 {
2955         struct llioc_data *tmp;
2956
2957         if (magic == NULL)
2958                 return;
2959
2960         down_write(&llioc.ioc_sem);
2961         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2962                 if (tmp == magic) {
2963                         unsigned int size = tmp->iocd_size;
2964
2965                         list_del(&tmp->iocd_list);
2966                         up_write(&llioc.ioc_sem);
2967
2968                         OBD_FREE(tmp, size);
2969                         return;
2970                 }
2971         }
2972         up_write(&llioc.ioc_sem);
2973
2974         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2975 }
2976
2977 EXPORT_SYMBOL(ll_iocontrol_register);
2978 EXPORT_SYMBOL(ll_iocontrol_unregister);
2979
2980 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
2981                         unsigned int cmd, unsigned long arg, int *rcp)
2982 {
2983         enum llioc_iter ret = LLIOC_CONT;
2984         struct llioc_data *data;
2985         int rc = -EINVAL, i;
2986
2987         down_read(&llioc.ioc_sem);
2988         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2989                 for (i = 0; i < data->iocd_count; i++) {
2990                         if (cmd != data->iocd_cmd[i]) 
2991                                 continue;
2992
2993                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2994                         break;
2995                 }
2996
2997                 if (ret == LLIOC_STOP)
2998                         break;
2999         }
3000         up_read(&llioc.ioc_sem);
3001
3002         if (rcp)
3003                 *rcp = rc;
3004         return ret;
3005 }