Whamcloud - gitweb
b=14149
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #include "llite_internal.h"
32
33 /* also used by llite/special.c:ll_special_open() */
34 struct ll_file_data *ll_file_data_get(void)
35 {
36         struct ll_file_data *fd;
37
38         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
39         return fd;
40 }
41
42 static void ll_file_data_put(struct ll_file_data *fd)
43 {
44         if (fd != NULL)
45                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
46 }
47
48 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
49                           struct lustre_handle *fh)
50 {
51         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
52         op_data->op_attr.ia_mode = inode->i_mode;
53         op_data->op_attr.ia_atime = inode->i_atime;
54         op_data->op_attr.ia_mtime = inode->i_mtime;
55         op_data->op_attr.ia_ctime = inode->i_ctime;
56         op_data->op_attr.ia_size = i_size_read(inode);
57         op_data->op_attr_blocks = inode->i_blocks;
58         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
59         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
60         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
61         op_data->op_capa1 = ll_mdscapa_get(inode);
62 }
63
64 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
65                              struct obd_client_handle *och)
66 {
67         ENTRY;
68
69         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
70                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
71
72         if (!(och->och_flags & FMODE_WRITE))
73                 goto out;
74
75         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
76             !S_ISREG(inode->i_mode))
77                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
78         else
79                 ll_epoch_close(inode, op_data, &och, 0);
80
81 out:
82         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
83         EXIT;
84 }
85
86 static int ll_close_inode_openhandle(struct obd_export *md_exp,
87                                      struct inode *inode,
88                                      struct obd_client_handle *och)
89 {
90         struct obd_export *exp = ll_i2mdexp(inode);
91         struct md_op_data *op_data;
92         struct ptlrpc_request *req = NULL;
93         struct obd_device *obd = class_exp2obd(exp);
94         int epoch_close = 1;
95         int seq_end = 0, rc;
96         ENTRY;
97
98         if (obd == NULL) {
99                 /*
100                  * XXX: in case of LMV, is this correct to access
101                  * ->exp_handle?
102                  */
103                 CERROR("Invalid MDC connection handle "LPX64"\n",
104                        ll_i2mdexp(inode)->exp_handle.h_cookie);
105                 GOTO(out, rc = 0);
106         }
107
108         /*
109          * here we check if this is forced umount. If so this is called on
110          * canceling "open lock" and we do not call md_close() in this case, as
111          * it will not be successful, as import is already deactivated.
112          */
113         if (obd->obd_force)
114                 GOTO(out, rc = 0);
115
116         OBD_ALLOC_PTR(op_data);
117         if (op_data == NULL)
118                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
119
120         ll_prepare_close(inode, op_data, och);
121         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
122         rc = md_close(md_exp, op_data, och->och_mod, &req);
123         if (rc != -EAGAIN)
124                 seq_end = 1;
125
126         if (rc == -EAGAIN) {
127                 /* This close must have the epoch closed. */
128                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
129                 LASSERT(epoch_close);
130                 /* MDS has instructed us to obtain Size-on-MDS attribute from
131                  * OSTs and send setattr to back to MDS. */
132                 rc = ll_sizeonmds_update(inode, och->och_mod,
133                                          &och->och_fh, op_data->op_ioepoch);
134                 if (rc) {
135                         CERROR("inode %lu mdc Size-on-MDS update failed: "
136                                "rc = %d\n", inode->i_ino, rc);
137                         rc = 0;
138                 }
139         } else if (rc) {
140                 CERROR("inode %lu mdc close failed: rc = %d\n",
141                        inode->i_ino, rc);
142         }
143         ll_finish_md_op_data(op_data);
144
145         if (rc == 0) {
146                 rc = ll_objects_destroy(req, inode);
147                 if (rc)
148                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
149                                inode->i_ino, rc);
150         }
151
152         EXIT;
153 out:
154       
155         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
156             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
157                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
158         } else {
159                 if (seq_end)
160                         ptlrpc_close_replay_seq(req);
161                 md_clear_open_replay_data(md_exp, och);
162                 /* Free @och if it is not waiting for DONE_WRITING. */
163                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
164                 OBD_FREE_PTR(och);
165         }
166         if (req) /* This is close request */
167                 ptlrpc_req_finished(req);
168         return rc;
169 }
170
171 int ll_md_real_close(struct inode *inode, int flags)
172 {
173         struct ll_inode_info *lli = ll_i2info(inode);
174         struct obd_client_handle **och_p;
175         struct obd_client_handle *och;
176         __u64 *och_usecount;
177         int rc = 0;
178         ENTRY;
179
180         if (flags & FMODE_WRITE) {
181                 och_p = &lli->lli_mds_write_och;
182                 och_usecount = &lli->lli_open_fd_write_count;
183         } else if (flags & FMODE_EXEC) {
184                 och_p = &lli->lli_mds_exec_och;
185                 och_usecount = &lli->lli_open_fd_exec_count;
186         } else {
187                 LASSERT(flags & FMODE_READ);
188                 och_p = &lli->lli_mds_read_och;
189                 och_usecount = &lli->lli_open_fd_read_count;
190         }
191
192         down(&lli->lli_och_sem);
193         if (*och_usecount) { /* There are still users of this handle, so
194                                 skip freeing it. */
195                 up(&lli->lli_och_sem);
196                 RETURN(0);
197         }
198         och=*och_p;
199         *och_p = NULL;
200         up(&lli->lli_och_sem);
201
202         if (och) { /* There might be a race and somebody have freed this och
203                       already */
204                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
205                                                inode, och);
206         }
207
208         RETURN(rc);
209 }
210
211 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
212                 struct file *file)
213 {
214         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
215         struct ll_inode_info *lli = ll_i2info(inode);
216         int rc = 0;
217         ENTRY;
218
219         /* clear group lock, if present */
220         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
221                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
222                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
223                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
224                                       &fd->fd_cwlockh);
225         }
226
227         /* Let's see if we have good enough OPEN lock on the file and if
228            we can skip talking to MDS */
229         if (file->f_dentry->d_inode) { /* Can this ever be false? */
230                 int lockmode;
231                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
232                 struct lustre_handle lockh;
233                 struct inode *inode = file->f_dentry->d_inode;
234                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
235
236                 down(&lli->lli_och_sem);
237                 if (fd->fd_omode & FMODE_WRITE) {
238                         lockmode = LCK_CW;
239                         LASSERT(lli->lli_open_fd_write_count);
240                         lli->lli_open_fd_write_count--;
241                 } else if (fd->fd_omode & FMODE_EXEC) {
242                         lockmode = LCK_PR;
243                         LASSERT(lli->lli_open_fd_exec_count);
244                         lli->lli_open_fd_exec_count--;
245                 } else {
246                         lockmode = LCK_CR;
247                         LASSERT(lli->lli_open_fd_read_count);
248                         lli->lli_open_fd_read_count--;
249                 }
250                 up(&lli->lli_och_sem);
251
252                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
253                                    LDLM_IBITS, &policy, lockmode,
254                                    &lockh)) {
255                         rc = ll_md_real_close(file->f_dentry->d_inode,
256                                               fd->fd_omode);
257                 }
258         } else {
259                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
260                        file, file->f_dentry, file->f_dentry->d_name.name);
261         }
262
263         LUSTRE_FPRIVATE(file) = NULL;
264         ll_file_data_put(fd);
265         ll_capa_close(inode);
266
267         RETURN(rc);
268 }
269
270 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
271
272 /* While this returns an error code, fput() the caller does not, so we need
273  * to make every effort to clean up all of our state here.  Also, applications
274  * rarely check close errors and even if an error is returned they will not
275  * re-try the close call.
276  */
277 int ll_file_release(struct inode *inode, struct file *file)
278 {
279         struct ll_file_data *fd;
280         struct ll_sb_info *sbi = ll_i2sbi(inode);
281         struct ll_inode_info *lli = ll_i2info(inode);
282         struct lov_stripe_md *lsm = lli->lli_smd;
283         int rc;
284
285         ENTRY;
286         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
287                inode->i_generation, inode);
288
289 #ifdef CONFIG_FS_POSIX_ACL
290         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
291             inode == inode->i_sb->s_root->d_inode) {
292                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
293
294                 LASSERT(fd != NULL);
295                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
296                         fd->fd_flags &= ~LL_FILE_RMTACL;
297                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
298                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
299                 }
300         }
301 #endif
302
303         ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
304         fd = LUSTRE_FPRIVATE(file);
305         LASSERT(fd != NULL);
306
307         /* don't do anything for / */
308         if (inode->i_sb->s_root == file->f_dentry) {
309                 LUSTRE_FPRIVATE(file) = NULL;
310                 ll_file_data_put(fd);
311                 RETURN(0);
312         }
313         
314         if (lsm)
315                 lov_test_and_clear_async_rc(lsm);
316         lli->lli_async_rc = 0;
317
318         rc = ll_md_close(sbi->ll_md_exp, inode, file);
319         RETURN(rc);
320 }
321
322 static int ll_intent_file_open(struct file *file, void *lmm,
323                                int lmmsize, struct lookup_intent *itp)
324 {
325         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
326         struct dentry *parent = file->f_dentry->d_parent;
327         const char *name = file->f_dentry->d_name.name;
328         const int len = file->f_dentry->d_name.len;
329         struct md_op_data *op_data;
330         struct ptlrpc_request *req;
331         int rc;
332
333         if (!parent)
334                 RETURN(-ENOENT);
335
336         /* Usually we come here only for NFSD, and we want open lock.
337            But we can also get here with pre 2.6.15 patchless kernels, and in
338            that case that lock is also ok */
339         /* We can also get here if there was cached open handle in revalidate_it
340          * but it disappeared while we were getting from there to ll_file_open.
341          * But this means this file was closed and immediatelly opened which
342          * makes a good candidate for using OPEN lock */
343         /* If lmmsize & lmm are not 0, we are just setting stripe info
344          * parameters. No need for the open lock */
345         if (!lmm && !lmmsize)
346                 itp->it_flags |= MDS_OPEN_LOCK;
347
348         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
349                                       file->f_dentry->d_inode, name, len,
350                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
351         if (IS_ERR(op_data))
352                 RETURN(PTR_ERR(op_data));
353
354         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
355                             0 /*unused */, &req, ll_md_blocking_ast, 0);
356         ll_finish_md_op_data(op_data);
357         if (rc == -ESTALE) {
358                 /* reason for keep own exit path - don`t flood log
359                 * with messages with -ESTALE errors.
360                 */
361                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
362                      it_open_error(DISP_OPEN_OPEN, itp))
363                         GOTO(out, rc);
364                 ll_release_openhandle(file->f_dentry, itp);
365                 GOTO(out_stale, rc);
366         }
367
368         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
369                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
370                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
371                 GOTO(out, rc);
372         }
373
374         if (itp->d.lustre.it_lock_mode)
375                 md_set_lock_data(sbi->ll_md_exp,
376                                  &itp->d.lustre.it_lock_handle, 
377                                  file->f_dentry->d_inode);
378
379         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
380 out:
381         ptlrpc_req_finished(itp->d.lustre.it_data);
382
383 out_stale:
384         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
385         ll_intent_drop_lock(itp);
386
387         RETURN(rc);
388 }
389
390 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
391                        struct lookup_intent *it, struct obd_client_handle *och)
392 {
393         struct ptlrpc_request *req = it->d.lustre.it_data;
394         struct mdt_body *body;
395
396         LASSERT(och);
397
398         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
399         LASSERT(body != NULL);                      /* reply already checked out */
400
401         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
402         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
403         och->och_fid = lli->lli_fid;
404         och->och_flags = it->it_flags;
405         lli->lli_ioepoch = body->ioepoch;
406
407         return md_set_open_replay_data(md_exp, och, req);
408 }
409
410 int ll_local_open(struct file *file, struct lookup_intent *it,
411                   struct ll_file_data *fd, struct obd_client_handle *och)
412 {
413         struct inode *inode = file->f_dentry->d_inode;
414         struct ll_inode_info *lli = ll_i2info(inode);
415         ENTRY;
416
417         LASSERT(!LUSTRE_FPRIVATE(file));
418
419         LASSERT(fd != NULL);
420
421         if (och) {
422                 struct ptlrpc_request *req = it->d.lustre.it_data;
423                 struct mdt_body *body;
424                 int rc;
425
426                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
427                 if (rc)
428                         RETURN(rc);
429
430                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
431                 if ((it->it_flags & FMODE_WRITE) &&
432                     (body->valid & OBD_MD_FLSIZE))
433                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
434                                lli->lli_ioepoch, PFID(&lli->lli_fid));
435         }
436
437         LUSTRE_FPRIVATE(file) = fd;
438         ll_readahead_init(inode, &fd->fd_ras);
439         fd->fd_omode = it->it_flags;
440         RETURN(0);
441 }
442
443 /* Open a file, and (for the very first open) create objects on the OSTs at
444  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
445  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
446  * lli_open_sem to ensure no other process will create objects, send the
447  * stripe MD to the MDS, or try to destroy the objects if that fails.
448  *
449  * If we already have the stripe MD locally then we don't request it in
450  * md_open(), by passing a lmm_size = 0.
451  *
452  * It is up to the application to ensure no other processes open this file
453  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
454  * used.  We might be able to avoid races of that sort by getting lli_open_sem
455  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
456  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
457  */
458 int ll_file_open(struct inode *inode, struct file *file)
459 {
460         struct ll_inode_info *lli = ll_i2info(inode);
461         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
462                                           .it_flags = file->f_flags };
463         struct lov_stripe_md *lsm;
464         struct ptlrpc_request *req = NULL;
465         struct obd_client_handle **och_p;
466         __u64 *och_usecount;
467         struct ll_file_data *fd;
468         int rc = 0;
469         ENTRY;
470
471         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
472                inode->i_generation, inode, file->f_flags);
473
474 #ifdef HAVE_VFS_INTENT_PATCHES
475         it = file->f_it;
476 #else
477         it = file->private_data; /* XXX: compat macro */
478         file->private_data = NULL; /* prevent ll_local_open assertion */
479 #endif
480
481         fd = ll_file_data_get();
482         if (fd == NULL)
483                 RETURN(-ENOMEM);
484
485         /* don't do anything for / */
486         if (inode->i_sb->s_root == file->f_dentry) {
487                 LUSTRE_FPRIVATE(file) = fd;
488                 RETURN(0);
489         }
490
491         if (!it || !it->d.lustre.it_disposition) {
492                 /* Convert f_flags into access mode. We cannot use file->f_mode,
493                  * because everything but O_ACCMODE mask was stripped from
494                  * there */
495                 if ((oit.it_flags + 1) & O_ACCMODE)
496                         oit.it_flags++;
497                 if (file->f_flags & O_TRUNC)
498                         oit.it_flags |= FMODE_WRITE;
499
500                 /* kernel only call f_op->open in dentry_open.  filp_open calls
501                  * dentry_open after call to open_namei that checks permissions.
502                  * Only nfsd_open call dentry_open directly without checking
503                  * permissions and because of that this code below is safe. */
504                 if (oit.it_flags & FMODE_WRITE)
505                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
506
507                 /* We do not want O_EXCL here, presumably we opened the file
508                  * already? XXX - NFS implications? */
509                 oit.it_flags &= ~O_EXCL;
510
511                 it = &oit;
512         }
513
514 restart:
515         /* Let's see if we have file open on MDS already. */
516         if (it->it_flags & FMODE_WRITE) {
517                 och_p = &lli->lli_mds_write_och;
518                 och_usecount = &lli->lli_open_fd_write_count;
519         } else if (it->it_flags & FMODE_EXEC) {
520                 och_p = &lli->lli_mds_exec_och;
521                 och_usecount = &lli->lli_open_fd_exec_count;
522          } else {
523                 och_p = &lli->lli_mds_read_och;
524                 och_usecount = &lli->lli_open_fd_read_count;
525         }
526         
527         down(&lli->lli_och_sem);
528         if (*och_p) { /* Open handle is present */
529                 if (it_disposition(it, DISP_OPEN_OPEN)) {
530                         /* Well, there's extra open request that we do not need,
531                            let's close it somehow. This will decref request. */
532                         rc = it_open_error(DISP_OPEN_OPEN, it);
533                         if (rc) {
534                                 ll_file_data_put(fd);
535                                 GOTO(out_och_free, rc);
536                         }       
537                         ll_release_openhandle(file->f_dentry, it);
538                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
539                                              LPROC_LL_OPEN);
540                 }
541                 (*och_usecount)++;
542
543                 rc = ll_local_open(file, it, fd, NULL);
544                 if (rc) {
545                         up(&lli->lli_och_sem);
546                         ll_file_data_put(fd);
547                         RETURN(rc);
548                 }
549         } else {
550                 LASSERT(*och_usecount == 0);
551                 if (!it->d.lustre.it_disposition) {
552                         /* We cannot just request lock handle now, new ELC code
553                            means that one of other OPEN locks for this file
554                            could be cancelled, and since blocking ast handler
555                            would attempt to grab och_sem as well, that would
556                            result in a deadlock */
557                         up(&lli->lli_och_sem);
558                         it->it_flags |= O_CHECK_STALE;
559                         rc = ll_intent_file_open(file, NULL, 0, it);
560                         it->it_flags &= ~O_CHECK_STALE;
561                         if (rc) {
562                                 ll_file_data_put(fd);
563                                 GOTO(out_openerr, rc);
564                         }
565
566                         /* Got some error? Release the request */
567                         if (it->d.lustre.it_status < 0) {
568                                 req = it->d.lustre.it_data;
569                                 ptlrpc_req_finished(req);
570                         }
571                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
572                                          &it->d.lustre.it_lock_handle,
573                                          file->f_dentry->d_inode);
574                         goto restart;
575                 }
576                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
577                 if (!*och_p) {
578                         ll_file_data_put(fd);
579                         GOTO(out_och_free, rc = -ENOMEM);
580                 }
581                 (*och_usecount)++;
582                 req = it->d.lustre.it_data;
583
584                 /* md_intent_lock() didn't get a request ref if there was an
585                  * open error, so don't do cleanup on the request here
586                  * (bug 3430) */
587                 /* XXX (green): Should not we bail out on any error here, not
588                  * just open error? */
589                 rc = it_open_error(DISP_OPEN_OPEN, it);
590                 if (rc) {
591                         ll_file_data_put(fd);
592                         GOTO(out_och_free, rc);
593                 }
594
595                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
596                 rc = ll_local_open(file, it, fd, *och_p);
597                 if (rc) {
598                         up(&lli->lli_och_sem);
599                         ll_file_data_put(fd);
600                         GOTO(out_och_free, rc);
601                 }
602         }
603         up(&lli->lli_och_sem);
604
605         /* Must do this outside lli_och_sem lock to prevent deadlock where
606            different kind of OPEN lock for this same inode gets cancelled
607            by ldlm_cancel_lru */
608         if (!S_ISREG(inode->i_mode))
609                 GOTO(out, rc);
610
611         ll_capa_open(inode);
612
613         lsm = lli->lli_smd;
614         if (lsm == NULL) {
615                 if (file->f_flags & O_LOV_DELAY_CREATE ||
616                     !(file->f_mode & FMODE_WRITE)) {
617                         CDEBUG(D_INODE, "object creation was delayed\n");
618                         GOTO(out, rc);
619                 }
620         }
621         file->f_flags &= ~O_LOV_DELAY_CREATE;
622         GOTO(out, rc);
623 out:
624         ptlrpc_req_finished(req);
625         if (req)
626                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
627 out_och_free:
628         if (rc) {
629                 if (*och_p) {
630                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
631                         *och_p = NULL; /* OBD_FREE writes some magic there */
632                         (*och_usecount)--;
633                 }
634                 up(&lli->lli_och_sem);
635 out_openerr: ;/* Looks weierd, eh? Just wait for statahead code to insert
636                 a statement here <-- remove this comment after statahead
637                 landing */
638         }
639
640         return rc;
641 }
642
643 /* Fills the obdo with the attributes for the inode defined by lsm */
644 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
645 {
646         struct ptlrpc_request_set *set;
647         struct ll_inode_info *lli = ll_i2info(inode);
648         struct lov_stripe_md *lsm = lli->lli_smd;
649
650         struct obd_info oinfo = { { { 0 } } };
651         int rc;
652         ENTRY;
653
654         LASSERT(lsm != NULL);
655
656         oinfo.oi_md = lsm;
657         oinfo.oi_oa = obdo;
658         oinfo.oi_oa->o_id = lsm->lsm_object_id;
659         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
660         oinfo.oi_oa->o_mode = S_IFREG;
661         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
662                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
663                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
664                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
665                                OBD_MD_FLGROUP;
666         oinfo.oi_capa = ll_mdscapa_get(inode);
667
668         set = ptlrpc_prep_set();
669         if (set == NULL) {
670                 CERROR("can't allocate ptlrpc set\n");
671                 rc = -ENOMEM;
672         } else {
673                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
674                 if (rc == 0)
675                         rc = ptlrpc_set_wait(set);
676                 ptlrpc_set_destroy(set);
677         }
678         capa_put(oinfo.oi_capa);
679         if (rc)
680                 RETURN(rc);
681
682         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
683                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
684                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
685
686         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
687         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
688                lli->lli_smd->lsm_object_id, i_size_read(inode),
689                (unsigned long long)inode->i_blocks, ll_inode_blksize(inode));
690         RETURN(0);
691 }
692
693 static inline void ll_remove_suid(struct inode *inode)
694 {
695         unsigned int mode;
696
697         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
698         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
699
700         /* was any of the uid bits set? */
701         mode &= inode->i_mode;
702         if (mode && !capable(CAP_FSETID)) {
703                 inode->i_mode &= ~mode;
704                 // XXX careful here - we cannot change the size
705         }
706 }
707
708 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
709 {
710         struct ll_inode_info *lli = ll_i2info(inode);
711         struct lov_stripe_md *lsm = lli->lli_smd;
712         struct obd_export *exp = ll_i2dtexp(inode);
713         struct {
714                 char name[16];
715                 struct ldlm_lock *lock;
716                 struct lov_stripe_md *lsm;
717         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
718         __u32 stripe, vallen = sizeof(stripe);
719         int rc;
720         ENTRY;
721
722         if (lsm->lsm_stripe_count == 1)
723                 GOTO(check, stripe = 0);
724
725         /* get our offset in the lov */
726         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
727         if (rc != 0) {
728                 CERROR("obd_get_info: rc = %d\n", rc);
729                 RETURN(rc);
730         }
731         LASSERT(stripe < lsm->lsm_stripe_count);
732
733 check:
734         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
735             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
736                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
737                            lsm->lsm_oinfo[stripe]->loi_id,
738                            lsm->lsm_oinfo[stripe]->loi_gr);
739                 RETURN(-ELDLM_NO_LOCK_DATA);
740         }
741
742         RETURN(stripe);
743 }
744
745 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
746  * we get a lock cancellation for each stripe, so we have to map the obd's
747  * region back onto the stripes in the file that it held.
748  *
749  * No one can dirty the extent until we've finished our work and they can
750  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
751  * but other kernel actors could have pages locked.
752  *
753  * Called with the DLM lock held. */
754 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
755                               struct ldlm_lock *lock, __u32 stripe)
756 {
757         ldlm_policy_data_t tmpex;
758         unsigned long start, end, count, skip, i, j;
759         struct page *page;
760         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
761         struct lustre_handle lockh;
762         struct address_space *mapping = inode->i_mapping;
763
764         ENTRY;
765         tmpex = lock->l_policy_data;
766         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
767                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
768                i_size_read(inode));
769
770         /* our locks are page granular thanks to osc_enqueue, we invalidate the
771          * whole page. */
772         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
773             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
774                 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
775                            CFS_PAGE_SIZE);
776         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
777         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
778
779         count = ~0;
780         skip = 0;
781         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
782         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
783         if (lsm->lsm_stripe_count > 1) {
784                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
785                 skip = (lsm->lsm_stripe_count - 1) * count;
786                 start += start/count * skip + stripe * count;
787                 if (end != ~0)
788                         end += end/count * skip + stripe * count;
789         }
790         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
791                 end = ~0;
792
793         i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
794             CFS_PAGE_SHIFT : 0;
795         if (i < end)
796                 end = i;
797
798         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
799                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
800                count, skip, end, discard ? " (DISCARDING)" : "");
801
802         /* walk through the vmas on the inode and tear down mmaped pages that
803          * intersect with the lock.  this stops immediately if there are no
804          * mmap()ed regions of the file.  This is not efficient at all and
805          * should be short lived. We'll associate mmap()ed pages with the lock
806          * and will be able to find them directly */
807         for (i = start; i <= end; i += (j + skip)) {
808                 j = min(count - (i % count), end - i + 1);
809                 LASSERT(j > 0);
810                 LASSERT(mapping);
811                 if (ll_teardown_mmaps(mapping,
812                                       (__u64)i << CFS_PAGE_SHIFT,
813                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
814                         break;
815         }
816
817         /* this is the simplistic implementation of page eviction at
818          * cancelation.  It is careful to get races with other page
819          * lockers handled correctly.  fixes from bug 20 will make it
820          * more efficient by associating locks with pages and with
821          * batching writeback under the lock explicitly. */
822         for (i = start, j = start % count; i <= end;
823              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
824                 if (j == count) {
825                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
826                         i += skip;
827                         j = 0;
828                         if (i > end)
829                                 break;
830                 }
831                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
832                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
833                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
834                          start, i, end);
835
836                 if (!mapping_has_pages(mapping)) {
837                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
838                         break;
839                 }
840
841                 cond_resched();
842
843                 page = find_get_page(mapping, i);
844                 if (page == NULL)
845                         continue;
846                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
847                                i, tmpex.l_extent.start);
848                 lock_page(page);
849
850                 /* page->mapping to check with racing against teardown */
851                 if (!discard && clear_page_dirty_for_io(page)) {
852                         rc = ll_call_writepage(inode, page);
853                         /* either waiting for io to complete or reacquiring
854                          * the lock that the failed writepage released */
855                         lock_page(page);
856                         wait_on_page_writeback(page);
857                         if (rc != 0) {
858                                 CERROR("writepage inode %lu(%p) of page %p "
859                                        "failed: %d\n", inode->i_ino, inode,
860                                        page, rc);
861                                 if (rc == -ENOSPC)
862                                         set_bit(AS_ENOSPC, &mapping->flags);
863                                 else
864                                         set_bit(AS_EIO, &mapping->flags);
865                         }
866                 }
867
868                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
869                 /* check to see if another DLM lock covers this page b=2765 */
870                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
871                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
872                                       LDLM_FL_TEST_LOCK,
873                                       &lock->l_resource->lr_name, LDLM_EXTENT,
874                                       &tmpex, LCK_PR | LCK_PW, &lockh);
875
876                 if (rc2 <= 0 && page->mapping != NULL) {
877                         struct ll_async_page *llap = llap_cast_private(page);
878                         /* checking again to account for writeback's
879                          * lock_page() */
880                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
881                         if (llap)
882                                 ll_ra_accounting(llap, mapping);
883                         ll_truncate_complete_page(page);
884                 }
885                 unlock_page(page);
886                 page_cache_release(page);
887         }
888         LASSERTF(tmpex.l_extent.start <=
889                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
890                   lock->l_policy_data.l_extent.end + 1),
891                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
892                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
893                  start, i, end);
894         EXIT;
895 }
896
897 static int ll_extent_lock_callback(struct ldlm_lock *lock,
898                                    struct ldlm_lock_desc *new, void *data,
899                                    int flag)
900 {
901         struct lustre_handle lockh = { 0 };
902         int rc;
903         ENTRY;
904
905         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
906                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
907                 LBUG();
908         }
909
910         switch (flag) {
911         case LDLM_CB_BLOCKING:
912                 ldlm_lock2handle(lock, &lockh);
913                 rc = ldlm_cli_cancel(&lockh);
914                 if (rc != ELDLM_OK)
915                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
916                 break;
917         case LDLM_CB_CANCELING: {
918                 struct inode *inode;
919                 struct ll_inode_info *lli;
920                 struct lov_stripe_md *lsm;
921                 int stripe;
922                 __u64 kms;
923
924                 /* This lock wasn't granted, don't try to evict pages */
925                 if (lock->l_req_mode != lock->l_granted_mode)
926                         RETURN(0);
927
928                 inode = ll_inode_from_lock(lock);
929                 if (inode == NULL)
930                         RETURN(0);
931                 lli = ll_i2info(inode);
932                 if (lli == NULL)
933                         goto iput;
934                 if (lli->lli_smd == NULL)
935                         goto iput;
936                 lsm = lli->lli_smd;
937
938                 stripe = ll_lock_to_stripe_offset(inode, lock);
939                 if (stripe < 0)
940                         goto iput;
941
942                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
943
944                 lov_stripe_lock(lsm);
945                 lock_res_and_lock(lock);
946                 kms = ldlm_extent_shift_kms(lock,
947                                             lsm->lsm_oinfo[stripe]->loi_kms);
948
949                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
950                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
951                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
952                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
953                 unlock_res_and_lock(lock);
954                 lov_stripe_unlock(lsm);
955         iput:
956                 iput(inode);
957                 break;
958         }
959         default:
960                 LBUG();
961         }
962
963         RETURN(0);
964 }
965
966 #if 0
967 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
968 {
969         /* XXX ALLOCATE - 160 bytes */
970         struct inode *inode = ll_inode_from_lock(lock);
971         struct ll_inode_info *lli = ll_i2info(inode);
972         struct lustre_handle lockh = { 0 };
973         struct ost_lvb *lvb;
974         int stripe;
975         ENTRY;
976
977         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
978                      LDLM_FL_BLOCK_CONV)) {
979                 LBUG(); /* not expecting any blocked async locks yet */
980                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
981                            "lock, returning");
982                 ldlm_lock_dump(D_OTHER, lock, 0);
983                 ldlm_reprocess_all(lock->l_resource);
984                 RETURN(0);
985         }
986
987         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
988
989         stripe = ll_lock_to_stripe_offset(inode, lock);
990         if (stripe < 0)
991                 goto iput;
992
993         if (lock->l_lvb_len) {
994                 struct lov_stripe_md *lsm = lli->lli_smd;
995                 __u64 kms;
996                 lvb = lock->l_lvb_data;
997                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
998
999                 lock_res_and_lock(lock);
1000                 ll_inode_size_lock(inode, 1);
1001                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
1002                 kms = ldlm_extent_shift_kms(NULL, kms);
1003                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
1004                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
1005                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
1006                 lsm->lsm_oinfo[stripe].loi_kms = kms;
1007                 ll_inode_size_unlock(inode, 1);
1008                 unlock_res_and_lock(lock);
1009         }
1010
1011 iput:
1012         iput(inode);
1013         wake_up(&lock->l_waitq);
1014
1015         ldlm_lock2handle(lock, &lockh);
1016         ldlm_lock_decref(&lockh, LCK_PR);
1017         RETURN(0);
1018 }
1019 #endif
1020
1021 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
1022 {
1023         struct ptlrpc_request *req = reqp;
1024         struct inode *inode = ll_inode_from_lock(lock);
1025         struct ll_inode_info *lli;
1026         struct lov_stripe_md *lsm;
1027         struct ost_lvb *lvb;
1028         int rc, stripe;
1029         ENTRY;
1030
1031         if (inode == NULL)
1032                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
1033         lli = ll_i2info(inode);
1034         if (lli == NULL)
1035                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1036         lsm = lli->lli_smd;
1037         if (lsm == NULL)
1038                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1039
1040         /* First, find out which stripe index this lock corresponds to. */
1041         stripe = ll_lock_to_stripe_offset(inode, lock);
1042         if (stripe < 0)
1043                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1044
1045         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
1046         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
1047                              sizeof(*lvb));
1048         rc = req_capsule_server_pack(&req->rq_pill);
1049         if (rc) {
1050                 CERROR("lustre_pack_reply: %d\n", rc);
1051                 GOTO(iput, rc);
1052         }
1053
1054         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
1055         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1056         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1057         lvb->lvb_atime = LTIME_S(inode->i_atime);
1058         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1059
1060         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1061                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1062                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1063                    lvb->lvb_atime, lvb->lvb_ctime);
1064  iput:
1065         iput(inode);
1066
1067  out:
1068         /* These errors are normal races, so we don't want to fill the console
1069          * with messages by calling ptlrpc_error() */
1070         if (rc == -ELDLM_NO_LOCK_DATA)
1071                 lustre_pack_reply(req, 1, NULL, NULL);
1072
1073         req->rq_status = rc;
1074         return rc;
1075 }
1076
1077 static int ll_merge_lvb(struct inode *inode)
1078 {
1079         struct ll_inode_info *lli = ll_i2info(inode);
1080         struct ll_sb_info *sbi = ll_i2sbi(inode);
1081         struct ost_lvb lvb;
1082         int rc;
1083
1084         ENTRY;
1085
1086         ll_inode_size_lock(inode, 1);
1087         inode_init_lvb(inode, &lvb);
1088         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1089         i_size_write(inode, lvb.lvb_size);
1090         inode->i_blocks = lvb.lvb_blocks;
1091
1092         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1093         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1094         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1095         ll_inode_size_unlock(inode, 1);
1096
1097         RETURN(rc);
1098 }
1099
1100 int ll_local_size(struct inode *inode)
1101 {
1102         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1103         struct ll_inode_info *lli = ll_i2info(inode);
1104         struct ll_sb_info *sbi = ll_i2sbi(inode);
1105         struct lustre_handle lockh = { 0 };
1106         int flags = 0;
1107         int rc;
1108         ENTRY;
1109
1110         if (lli->lli_smd->lsm_stripe_count == 0)
1111                 RETURN(0);
1112
1113         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1114                        &policy, LCK_PR, &flags, inode, &lockh);
1115         if (rc < 0)
1116                 RETURN(rc);
1117         else if (rc == 0)
1118                 RETURN(-ENODATA);
1119
1120         rc = ll_merge_lvb(inode);
1121         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1122         RETURN(rc);
1123 }
1124
1125 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1126                      lstat_t *st)
1127 {
1128         struct lustre_handle lockh = { 0 };
1129         struct ldlm_enqueue_info einfo = { 0 };
1130         struct obd_info oinfo = { { { 0 } } };
1131         struct ost_lvb lvb;
1132         int rc;
1133
1134         ENTRY;
1135
1136         einfo.ei_type = LDLM_EXTENT;
1137         einfo.ei_mode = LCK_PR;
1138         einfo.ei_cb_bl = ll_extent_lock_callback;
1139         einfo.ei_cb_cp = ldlm_completion_ast;
1140         einfo.ei_cb_gl = ll_glimpse_callback;
1141         einfo.ei_cbdata = NULL;
1142
1143         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1144         oinfo.oi_lockh = &lockh;
1145         oinfo.oi_md = lsm;
1146         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1147
1148         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1149         if (rc == -ENOENT)
1150                 RETURN(rc);
1151         if (rc != 0) {
1152                 CERROR("obd_enqueue returned rc %d, "
1153                        "returning -EIO\n", rc);
1154                 RETURN(rc > 0 ? -EIO : rc);
1155         }
1156
1157         lov_stripe_lock(lsm);
1158         memset(&lvb, 0, sizeof(lvb));
1159         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1160         st->st_size = lvb.lvb_size;
1161         st->st_blocks = lvb.lvb_blocks;
1162         st->st_mtime = lvb.lvb_mtime;
1163         st->st_atime = lvb.lvb_atime;
1164         st->st_ctime = lvb.lvb_ctime;
1165         lov_stripe_unlock(lsm);
1166
1167         RETURN(rc);
1168 }
1169
1170 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1171  * file (because it prefers KMS over RSS when larger) */
1172 int ll_glimpse_size(struct inode *inode, int ast_flags)
1173 {
1174         struct ll_inode_info *lli = ll_i2info(inode);
1175         struct ll_sb_info *sbi = ll_i2sbi(inode);
1176         struct lustre_handle lockh = { 0 };
1177         struct ldlm_enqueue_info einfo = { 0 };
1178         struct obd_info oinfo = { { { 0 } } };
1179         int rc;
1180         ENTRY;
1181
1182         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1183                 RETURN(0);
1184
1185         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1186
1187         if (!lli->lli_smd) {
1188                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1189                 RETURN(0);
1190         }
1191
1192         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1193          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1194          *       won't revoke any conflicting DLM locks held. Instead,
1195          *       ll_glimpse_callback() will be called on each client
1196          *       holding a DLM lock against this file, and resulting size
1197          *       will be returned for each stripe. DLM lock on [0, EOF] is
1198          *       acquired only if there were no conflicting locks. */
1199         einfo.ei_type = LDLM_EXTENT;
1200         einfo.ei_mode = LCK_PR;
1201         einfo.ei_cb_bl = ll_extent_lock_callback;
1202         einfo.ei_cb_cp = ldlm_completion_ast;
1203         einfo.ei_cb_gl = ll_glimpse_callback;
1204         einfo.ei_cbdata = inode;
1205
1206         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1207         oinfo.oi_lockh = &lockh;
1208         oinfo.oi_md = lli->lli_smd;
1209         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1210
1211         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1212         if (rc == -ENOENT)
1213                 RETURN(rc);
1214         if (rc != 0) {
1215                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1216                 RETURN(rc > 0 ? -EIO : rc);
1217         }
1218
1219         rc = ll_merge_lvb(inode);
1220
1221         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1222                i_size_read(inode), (unsigned long long)inode->i_blocks);
1223
1224         RETURN(rc);
1225 }
1226
1227 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1228                    struct lov_stripe_md *lsm, int mode,
1229                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1230                    int ast_flags)
1231 {
1232         struct ll_sb_info *sbi = ll_i2sbi(inode);
1233         struct ost_lvb lvb;
1234         struct ldlm_enqueue_info einfo = { 0 };
1235         struct obd_info oinfo = { { { 0 } } };
1236         int rc;
1237         ENTRY;
1238
1239         LASSERT(!lustre_handle_is_used(lockh));
1240         LASSERT(lsm != NULL);
1241
1242         /* don't drop the mmapped file to LRU */
1243         if (mapping_mapped(inode->i_mapping))
1244                 ast_flags |= LDLM_FL_NO_LRU;
1245
1246         /* XXX phil: can we do this?  won't it screw the file size up? */
1247         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1248             (sbi->ll_flags & LL_SBI_NOLCK))
1249                 RETURN(0);
1250
1251         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1252                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1253
1254         einfo.ei_type = LDLM_EXTENT;
1255         einfo.ei_mode = mode;
1256         einfo.ei_cb_bl = ll_extent_lock_callback;
1257         einfo.ei_cb_cp = ldlm_completion_ast;
1258         einfo.ei_cb_gl = ll_glimpse_callback;
1259         einfo.ei_cbdata = inode;
1260
1261         oinfo.oi_policy = *policy;
1262         oinfo.oi_lockh = lockh;
1263         oinfo.oi_md = lsm;
1264         oinfo.oi_flags = ast_flags;
1265
1266         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1267         *policy = oinfo.oi_policy;
1268         if (rc > 0)
1269                 rc = -EIO;
1270
1271         ll_inode_size_lock(inode, 1);
1272         inode_init_lvb(inode, &lvb);
1273         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1274
1275         if (policy->l_extent.start == 0 &&
1276             policy->l_extent.end == OBD_OBJECT_EOF) {
1277                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1278                  * the kms under both a DLM lock and the
1279                  * ll_inode_size_lock().  If we don't get the
1280                  * ll_inode_size_lock() here we can match the DLM lock and
1281                  * reset i_size from the kms before the truncating path has
1282                  * updated the kms.  generic_file_write can then trust the
1283                  * stale i_size when doing appending writes and effectively
1284                  * cancel the result of the truncate.  Getting the
1285                  * ll_inode_size_lock() after the enqueue maintains the DLM
1286                  * -> ll_inode_size_lock() acquiring order. */
1287                 i_size_write(inode, lvb.lvb_size);
1288                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1289                        inode->i_ino, i_size_read(inode));
1290         }
1291
1292         if (rc == 0) {
1293                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1294                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1295                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1296         }
1297         ll_inode_size_unlock(inode, 1);
1298
1299         RETURN(rc);
1300 }
1301
1302 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1303                      struct lov_stripe_md *lsm, int mode,
1304                      struct lustre_handle *lockh)
1305 {
1306         struct ll_sb_info *sbi = ll_i2sbi(inode);
1307         int rc;
1308         ENTRY;
1309
1310         /* XXX phil: can we do this?  won't it screw the file size up? */
1311         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1312             (sbi->ll_flags & LL_SBI_NOLCK))
1313                 RETURN(0);
1314
1315         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1316
1317         RETURN(rc);
1318 }
1319
1320 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1321                             loff_t *ppos)
1322 {
1323         struct inode *inode = file->f_dentry->d_inode;
1324         struct ll_inode_info *lli = ll_i2info(inode);
1325         struct lov_stripe_md *lsm = lli->lli_smd;
1326         struct ll_sb_info *sbi = ll_i2sbi(inode);
1327         struct ll_lock_tree tree;
1328         struct ll_lock_tree_node *node;
1329         struct ost_lvb lvb;
1330         struct ll_ra_read bead;
1331         int rc, ra = 0;
1332         loff_t end;
1333         ssize_t retval, chunk, sum = 0;
1334
1335         __u64 kms;
1336         ENTRY;
1337         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1338                inode->i_ino, inode->i_generation, inode, count, *ppos);
1339         /* "If nbyte is 0, read() will return 0 and have no other results."
1340          *                      -- Single Unix Spec */
1341         if (count == 0)
1342                 RETURN(0);
1343
1344         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1345
1346         if (!lsm) {
1347                 /* Read on file with no objects should return zero-filled
1348                  * buffers up to file size (we can get non-zero sizes with
1349                  * mknod + truncate, then opening file for read. This is a
1350                  * common pattern in NFS case, it seems). Bug 6243 */
1351                 int notzeroed;
1352                 /* Since there are no objects on OSTs, we have nothing to get
1353                  * lock on and so we are forced to access inode->i_size
1354                  * unguarded */
1355
1356                 /* Read beyond end of file */
1357                 if (*ppos >= i_size_read(inode))
1358                         RETURN(0);
1359
1360                 if (count > i_size_read(inode) - *ppos)
1361                         count = i_size_read(inode) - *ppos;
1362                 /* Make sure to correctly adjust the file pos pointer for
1363                  * EFAULT case */
1364                 notzeroed = clear_user(buf, count);
1365                 count -= notzeroed;
1366                 *ppos += count;
1367                 if (!count)
1368                         RETURN(-EFAULT);
1369                 RETURN(count);
1370         }
1371
1372 repeat:
1373         if (sbi->ll_max_rw_chunk != 0) {
1374                 /* first, let's know the end of the current stripe */
1375                 end = *ppos;
1376                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
1377                                 (obd_off *)&end);
1378
1379                 /* correct, the end is beyond the request */
1380                 if (end > *ppos + count - 1)
1381                         end = *ppos + count - 1;
1382
1383                 /* and chunk shouldn't be too large even if striping is wide */
1384                 if (end - *ppos > sbi->ll_max_rw_chunk)
1385                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1386         } else {
1387                 end = *ppos + count - 1;
1388         }
1389
1390         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1391         if (IS_ERR(node)){
1392                 GOTO(out, retval = PTR_ERR(node));
1393         }
1394
1395         tree.lt_fd = LUSTRE_FPRIVATE(file);
1396         rc = ll_tree_lock(&tree, node, buf, count,
1397                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1398         if (rc != 0)
1399                 GOTO(out, retval = rc);
1400
1401         ll_inode_size_lock(inode, 1);
1402         /*
1403          * Consistency guarantees: following possibilities exist for the
1404          * relation between region being read and real file size at this
1405          * moment:
1406          *
1407          *  (A): the region is completely inside of the file;
1408          *
1409          *  (B-x): x bytes of region are inside of the file, the rest is
1410          *  outside;
1411          *
1412          *  (C): the region is completely outside of the file.
1413          *
1414          * This classification is stable under DLM lock acquired by
1415          * ll_tree_lock() above, because to change class, other client has to
1416          * take DLM lock conflicting with our lock. Also, any updates to
1417          * ->i_size by other threads on this client are serialized by
1418          * ll_inode_size_lock(). This guarantees that short reads are handled
1419          * correctly in the face of concurrent writes and truncates.
1420          */
1421         inode_init_lvb(inode, &lvb);
1422         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1423         kms = lvb.lvb_size;
1424         if (*ppos + count - 1 > kms) {
1425                 /* A glimpse is necessary to determine whether we return a
1426                  * short read (B) or some zeroes at the end of the buffer (C) */
1427                 ll_inode_size_unlock(inode, 1);
1428                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1429                 if (retval) {
1430                         ll_tree_unlock(&tree);
1431                         goto out;
1432                 }
1433         } else {
1434                 /* region is within kms and, hence, within real file size (A).
1435                  * We need to increase i_size to cover the read region so that
1436                  * generic_file_read() will do its job, but that doesn't mean
1437                  * the kms size is _correct_, it is only the _minimum_ size.
1438                  * If someone does a stat they will get the correct size which
1439                  * will always be >= the kms value here.  b=11081 */
1440                 if (i_size_read(inode) < kms)
1441                         i_size_write(inode, kms);
1442                 ll_inode_size_unlock(inode, 1);
1443         }
1444
1445         chunk = end - *ppos + 1;
1446         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1447                inode->i_ino, chunk, *ppos, i_size_read(inode));
1448
1449         /* turn off the kernel's read-ahead */
1450         file->f_ra.ra_pages = 0;
1451
1452         /* initialize read-ahead window once per syscall */
1453         if (ra == 0) {
1454                 ra = 1;
1455                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1456                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1457                 ll_ra_read_in(file, &bead);
1458         }
1459
1460         /* BUG: 5972 */
1461         file_accessed(file);
1462         retval = generic_file_read(file, buf, chunk, ppos);
1463         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1464
1465         ll_tree_unlock(&tree);
1466
1467         if (retval > 0) {
1468                 buf += retval;
1469                 count -= retval;
1470                 sum += retval;
1471                 if (retval == chunk && count > 0)
1472                         goto repeat;
1473         }
1474
1475  out:
1476         if (ra != 0)
1477                 ll_ra_read_ex(file, &bead);
1478         retval = (sum > 0) ? sum : retval;
1479         RETURN(retval);
1480 }
1481
1482 /*
1483  * Write to a file (through the page cache).
1484  */
1485 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1486                              loff_t *ppos)
1487 {
1488         struct inode *inode = file->f_dentry->d_inode;
1489         struct ll_sb_info *sbi = ll_i2sbi(inode);
1490         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1491         struct ll_lock_tree tree;
1492         struct ll_lock_tree_node *node;
1493         loff_t maxbytes = ll_file_maxbytes(inode);
1494         loff_t lock_start, lock_end, end;
1495         ssize_t retval, chunk, sum = 0;
1496         int rc;
1497         ENTRY;
1498
1499         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1500                inode->i_ino, inode->i_generation, inode, count, *ppos);
1501
1502         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1503
1504         /* POSIX, but surprised the VFS doesn't check this already */
1505         if (count == 0)
1506                 RETURN(0);
1507
1508         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1509          * called on the file, don't fail the below assertion (bug 2388). */
1510         if (file->f_flags & O_LOV_DELAY_CREATE &&
1511             ll_i2info(inode)->lli_smd == NULL)
1512                 RETURN(-EBADF);
1513
1514         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1515
1516         down(&ll_i2info(inode)->lli_write_sem);
1517
1518 repeat:
1519         chunk = 0; /* just to fix gcc's warning */
1520         end = *ppos + count - 1;
1521
1522         if (file->f_flags & O_APPEND) {
1523                 lock_start = 0;
1524                 lock_end = OBD_OBJECT_EOF;
1525         } else if (sbi->ll_max_rw_chunk != 0) {
1526                 /* first, let's know the end of the current stripe */
1527                 end = *ppos;
1528                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1529                                 (obd_off *)&end);
1530
1531                 /* correct, the end is beyond the request */
1532                 if (end > *ppos + count - 1)
1533                         end = *ppos + count - 1;
1534
1535                 /* and chunk shouldn't be too large even if striping is wide */
1536                 if (end - *ppos > sbi->ll_max_rw_chunk)
1537                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1538                 lock_start = *ppos;
1539                 lock_end = end;
1540         } else {
1541                 lock_start = *ppos;
1542                 lock_end = *ppos + count - 1;
1543         }
1544         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1545
1546         if (IS_ERR(node))
1547                 GOTO(out, retval = PTR_ERR(node));
1548
1549         tree.lt_fd = LUSTRE_FPRIVATE(file);
1550         rc = ll_tree_lock(&tree, node, buf, count,
1551                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1552         if (rc != 0)
1553                 GOTO(out, retval = rc);
1554
1555         /* This is ok, g_f_w will overwrite this under i_sem if it races
1556          * with a local truncate, it just makes our maxbyte checking easier.
1557          * The i_size value gets updated in ll_extent_lock() as a consequence
1558          * of the [0,EOF] extent lock we requested above. */
1559         if (file->f_flags & O_APPEND) {
1560                 *ppos = i_size_read(inode);
1561                 end = *ppos + count - 1;
1562         }
1563
1564         if (*ppos >= maxbytes) {
1565                 send_sig(SIGXFSZ, current, 0);
1566                 GOTO(out_unlock, retval = -EFBIG);
1567         }
1568         if (*ppos + count > maxbytes)
1569                 count = maxbytes - *ppos;
1570
1571         /* generic_file_write handles O_APPEND after getting i_mutex */
1572         chunk = end - *ppos + 1;
1573         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1574                inode->i_ino, chunk, *ppos);
1575         retval = generic_file_write(file, buf, chunk, ppos);
1576         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1577
1578 out_unlock:
1579         ll_tree_unlock(&tree);
1580
1581 out:
1582         if (retval > 0) {
1583                 buf += retval;
1584                 count -= retval;
1585                 sum += retval;
1586                 if (retval == chunk && count > 0)
1587                         goto repeat;
1588         }
1589
1590         up(&ll_i2info(inode)->lli_write_sem);
1591
1592         retval = (sum > 0) ? sum : retval;
1593         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1594                            retval > 0 ? retval : 0);
1595         RETURN(retval);
1596 }
1597
1598 /*
1599  * Send file content (through pagecache) somewhere with helper
1600  */
1601 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1602                                 read_actor_t actor, void *target)
1603 {
1604         struct inode *inode = in_file->f_dentry->d_inode;
1605         struct ll_inode_info *lli = ll_i2info(inode);
1606         struct lov_stripe_md *lsm = lli->lli_smd;
1607         struct ll_lock_tree tree;
1608         struct ll_lock_tree_node *node;
1609         struct ost_lvb lvb;
1610         struct ll_ra_read bead;
1611         int rc;
1612         ssize_t retval;
1613         __u64 kms;
1614         ENTRY;
1615         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1616                inode->i_ino, inode->i_generation, inode, count, *ppos);
1617
1618         /* "If nbyte is 0, read() will return 0 and have no other results."
1619          *                      -- Single Unix Spec */
1620         if (count == 0)
1621                 RETURN(0);
1622
1623         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1624         /* turn off the kernel's read-ahead */
1625         in_file->f_ra.ra_pages = 0;
1626
1627         /* File with no objects, nothing to lock */
1628         if (!lsm)
1629                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1630
1631         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1632         if (IS_ERR(node))
1633                 RETURN(PTR_ERR(node));
1634
1635         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1636         rc = ll_tree_lock(&tree, node, NULL, count,
1637                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1638         if (rc != 0)
1639                 RETURN(rc);
1640
1641         ll_inode_size_lock(inode, 1);
1642         /*
1643          * Consistency guarantees: following possibilities exist for the
1644          * relation between region being read and real file size at this
1645          * moment:
1646          *
1647          *  (A): the region is completely inside of the file;
1648          *
1649          *  (B-x): x bytes of region are inside of the file, the rest is
1650          *  outside;
1651          *
1652          *  (C): the region is completely outside of the file.
1653          *
1654          * This classification is stable under DLM lock acquired by
1655          * ll_tree_lock() above, because to change class, other client has to
1656          * take DLM lock conflicting with our lock. Also, any updates to
1657          * ->i_size by other threads on this client are serialized by
1658          * ll_inode_size_lock(). This guarantees that short reads are handled
1659          * correctly in the face of concurrent writes and truncates.
1660          */
1661         inode_init_lvb(inode, &lvb);
1662         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1663         kms = lvb.lvb_size;
1664         if (*ppos + count - 1 > kms) {
1665                 /* A glimpse is necessary to determine whether we return a
1666                  * short read (B) or some zeroes at the end of the buffer (C) */
1667                 ll_inode_size_unlock(inode, 1);
1668                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1669                 if (retval)
1670                         goto out;
1671         } else {
1672                 /* region is within kms and, hence, within real file size (A) */
1673                 i_size_write(inode, kms);
1674                 ll_inode_size_unlock(inode, 1);
1675         }
1676
1677         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1678                inode->i_ino, count, *ppos, i_size_read(inode));
1679
1680         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1681         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1682         ll_ra_read_in(in_file, &bead);
1683         /* BUG: 5972 */
1684         file_accessed(in_file);
1685         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1686         ll_ra_read_ex(in_file, &bead);
1687
1688  out:
1689         ll_tree_unlock(&tree);
1690         RETURN(retval);
1691 }
1692
1693 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1694                                unsigned long arg)
1695 {
1696         struct ll_inode_info *lli = ll_i2info(inode);
1697         struct obd_export *exp = ll_i2dtexp(inode);
1698         struct ll_recreate_obj ucreatp;
1699         struct obd_trans_info oti = { 0 };
1700         struct obdo *oa = NULL;
1701         int lsm_size;
1702         int rc = 0;
1703         struct lov_stripe_md *lsm, *lsm2;
1704         ENTRY;
1705
1706         if (!capable (CAP_SYS_ADMIN))
1707                 RETURN(-EPERM);
1708
1709         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1710                             sizeof(struct ll_recreate_obj));
1711         if (rc) {
1712                 RETURN(-EFAULT);
1713         }
1714         OBDO_ALLOC(oa);
1715         if (oa == NULL)
1716                 RETURN(-ENOMEM);
1717
1718         down(&lli->lli_size_sem);
1719         lsm = lli->lli_smd;
1720         if (lsm == NULL)
1721                 GOTO(out, rc = -ENOENT);
1722         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1723                    (lsm->lsm_stripe_count));
1724
1725         OBD_ALLOC(lsm2, lsm_size);
1726         if (lsm2 == NULL)
1727                 GOTO(out, rc = -ENOMEM);
1728
1729         oa->o_id = ucreatp.lrc_id;
1730         oa->o_gr = ucreatp.lrc_group;
1731         oa->o_nlink = ucreatp.lrc_ost_idx;
1732         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1733         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1734         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1735                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1736
1737         memcpy(lsm2, lsm, lsm_size);
1738         rc = obd_create(exp, oa, &lsm2, &oti);
1739
1740         OBD_FREE(lsm2, lsm_size);
1741         GOTO(out, rc);
1742 out:
1743         up(&lli->lli_size_sem);
1744         OBDO_FREE(oa);
1745         return rc;
1746 }
1747
1748 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1749                              int flags, struct lov_user_md *lum, int lum_size)
1750 {
1751         struct ll_inode_info *lli = ll_i2info(inode);
1752         struct lov_stripe_md *lsm;
1753         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1754         int rc = 0;
1755         ENTRY;
1756
1757         down(&lli->lli_size_sem);
1758         lsm = lli->lli_smd;
1759         if (lsm) {
1760                 up(&lli->lli_size_sem);
1761                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1762                        inode->i_ino);
1763                 RETURN(-EEXIST);
1764         }
1765
1766         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1767         if (rc)
1768                 GOTO(out, rc);
1769         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1770                 GOTO(out_req_free, rc = -ENOENT);
1771         rc = oit.d.lustre.it_status;
1772         if (rc < 0)
1773                 GOTO(out_req_free, rc);
1774
1775         ll_release_openhandle(file->f_dentry, &oit);
1776
1777  out:
1778         up(&lli->lli_size_sem);
1779         ll_intent_release(&oit);
1780         RETURN(rc);
1781 out_req_free:
1782         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1783         goto out;
1784 }
1785
1786 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1787                              struct lov_mds_md **lmmp, int *lmm_size, 
1788                              struct ptlrpc_request **request)
1789 {
1790         struct ll_sb_info *sbi = ll_i2sbi(inode);
1791         struct mdt_body  *body;
1792         struct lov_mds_md *lmm = NULL;
1793         struct ptlrpc_request *req = NULL;
1794         struct obd_capa *oc;
1795         int rc, lmmsize;
1796
1797         rc = ll_get_max_mdsize(sbi, &lmmsize);
1798         if (rc)
1799                 RETURN(rc);
1800
1801         oc = ll_mdscapa_get(inode);
1802         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1803                              oc, filename, strlen(filename) + 1,
1804                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
1805                              ll_i2suppgid(inode), &req);
1806         capa_put(oc);
1807         if (rc < 0) {
1808                 CDEBUG(D_INFO, "md_getattr_name failed "
1809                        "on %s: rc %d\n", filename, rc);
1810                 GOTO(out, rc);
1811         }
1812
1813         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1814         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1815
1816         lmmsize = body->eadatasize;
1817
1818         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1819                         lmmsize == 0) {
1820                 GOTO(out, rc = -ENODATA);
1821         }
1822
1823         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1824         LASSERT(lmm != NULL);
1825
1826         /*
1827          * This is coming from the MDS, so is probably in
1828          * little endian.  We convert it to host endian before
1829          * passing it to userspace.
1830          */
1831         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1832                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1833                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1834         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1835                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1836         }
1837
1838         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1839                 struct lov_stripe_md *lsm;
1840                 struct lov_user_md_join *lmj;
1841                 int lmj_size, i, aindex = 0;
1842
1843                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1844                 if (rc < 0)
1845                         GOTO(out, rc = -ENOMEM);
1846                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1847                 if (rc)
1848                         GOTO(out_free_memmd, rc);
1849
1850                 lmj_size = sizeof(struct lov_user_md_join) +
1851                            lsm->lsm_stripe_count *
1852                            sizeof(struct lov_user_ost_data_join);
1853                 OBD_ALLOC(lmj, lmj_size);
1854                 if (!lmj)
1855                         GOTO(out_free_memmd, rc = -ENOMEM);
1856
1857                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1858                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1859                         struct lov_extent *lex =
1860                                 &lsm->lsm_array->lai_ext_array[aindex];
1861
1862                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1863                                 aindex ++;
1864                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1865                                         LPU64" len %d\n", aindex, i,
1866                                         lex->le_start, (int)lex->le_len);
1867                         lmj->lmm_objects[i].l_extent_start =
1868                                 lex->le_start;
1869
1870                         if ((int)lex->le_len == -1)
1871                                 lmj->lmm_objects[i].l_extent_end = -1;
1872                         else
1873                                 lmj->lmm_objects[i].l_extent_end =
1874                                         lex->le_start + lex->le_len;
1875                         lmj->lmm_objects[i].l_object_id =
1876                                 lsm->lsm_oinfo[i]->loi_id;
1877                         lmj->lmm_objects[i].l_object_gr =
1878                                 lsm->lsm_oinfo[i]->loi_gr;
1879                         lmj->lmm_objects[i].l_ost_gen =
1880                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1881                         lmj->lmm_objects[i].l_ost_idx =
1882                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1883                 }
1884                 lmm = (struct lov_mds_md *)lmj;
1885                 lmmsize = lmj_size;
1886 out_free_memmd:
1887                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
1888         }
1889 out:
1890         *lmmp = lmm;
1891         *lmm_size = lmmsize;
1892         *request = req;
1893         return rc;
1894 }
1895
1896 static int ll_lov_setea(struct inode *inode, struct file *file,
1897                             unsigned long arg)
1898 {
1899         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1900         struct lov_user_md  *lump;
1901         int lum_size = sizeof(struct lov_user_md) +
1902                        sizeof(struct lov_user_ost_data);
1903         int rc;
1904         ENTRY;
1905
1906         if (!capable (CAP_SYS_ADMIN))
1907                 RETURN(-EPERM);
1908
1909         OBD_ALLOC(lump, lum_size);
1910         if (lump == NULL) {
1911                 RETURN(-ENOMEM);
1912         }
1913         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1914         if (rc) {
1915                 OBD_FREE(lump, lum_size);
1916                 RETURN(-EFAULT);
1917         }
1918
1919         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1920
1921         OBD_FREE(lump, lum_size);
1922         RETURN(rc);
1923 }
1924
1925 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1926                             unsigned long arg)
1927 {
1928         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1929         int rc;
1930         int flags = FMODE_WRITE;
1931         ENTRY;
1932
1933         /* Bug 1152: copy properly when this is no longer true */
1934         LASSERT(sizeof(lum) == sizeof(*lump));
1935         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1936         rc = copy_from_user(&lum, lump, sizeof(lum));
1937         if (rc)
1938                 RETURN(-EFAULT);
1939
1940         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1941         if (rc == 0) {
1942                  put_user(0, &lump->lmm_stripe_count);
1943                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1944                                     0, ll_i2info(inode)->lli_smd, lump);
1945         }
1946         RETURN(rc);
1947 }
1948
1949 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1950 {
1951         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1952
1953         if (!lsm)
1954                 RETURN(-ENODATA);
1955
1956         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1957                             (void *)arg);
1958 }
1959
1960 static int ll_get_grouplock(struct inode *inode, struct file *file,
1961                             unsigned long arg)
1962 {
1963         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1964         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1965                                                     .end = OBD_OBJECT_EOF}};
1966         struct lustre_handle lockh = { 0 };
1967         struct ll_inode_info *lli = ll_i2info(inode);
1968         struct lov_stripe_md *lsm = lli->lli_smd;
1969         int flags = 0, rc;
1970         ENTRY;
1971
1972         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1973                 RETURN(-EINVAL);
1974         }
1975
1976         policy.l_extent.gid = arg;
1977         if (file->f_flags & O_NONBLOCK)
1978                 flags = LDLM_FL_BLOCK_NOWAIT;
1979
1980         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1981         if (rc)
1982                 RETURN(rc);
1983
1984         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1985         fd->fd_gid = arg;
1986         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1987
1988         RETURN(0);
1989 }
1990
1991 static int ll_put_grouplock(struct inode *inode, struct file *file,
1992                             unsigned long arg)
1993 {
1994         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1995         struct ll_inode_info *lli = ll_i2info(inode);
1996         struct lov_stripe_md *lsm = lli->lli_smd;
1997         int rc;
1998         ENTRY;
1999
2000         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2001                 /* Ugh, it's already unlocked. */
2002                 RETURN(-EINVAL);
2003         }
2004
2005         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2006                 RETURN(-EINVAL);
2007
2008         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2009
2010         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2011         if (rc)
2012                 RETURN(rc);
2013
2014         fd->fd_gid = 0;
2015         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2016
2017         RETURN(0);
2018 }
2019
2020 static int join_sanity_check(struct inode *head, struct inode *tail)
2021 {
2022         ENTRY;
2023         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2024                 CERROR("server do not support join \n");
2025                 RETURN(-EINVAL);
2026         }
2027         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2028                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2029                        head->i_ino, tail->i_ino);
2030                 RETURN(-EINVAL);
2031         }
2032         if (head->i_ino == tail->i_ino) {
2033                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2034                 RETURN(-EINVAL);
2035         }
2036         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2037                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2038                 RETURN(-EINVAL);
2039         }
2040         RETURN(0);
2041 }
2042
2043 static int join_file(struct inode *head_inode, struct file *head_filp,
2044                      struct file *tail_filp)
2045 {
2046         struct dentry *tail_dentry = tail_filp->f_dentry;
2047         struct lookup_intent oit = {.it_op = IT_OPEN,
2048                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2049         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2050                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2051
2052         struct lustre_handle lockh;
2053         struct md_op_data *op_data;
2054         int    rc;
2055         loff_t data;
2056         ENTRY;
2057
2058         tail_dentry = tail_filp->f_dentry;
2059
2060         data = i_size_read(head_inode);
2061         op_data = ll_prep_md_op_data(NULL, head_inode,
2062                                      tail_dentry->d_parent->d_inode,
2063                                      tail_dentry->d_name.name,
2064                                      tail_dentry->d_name.len, 0,
2065                                      LUSTRE_OPC_ANY, &data);
2066         if (IS_ERR(op_data))
2067                 RETURN(PTR_ERR(op_data));
2068
2069         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2070                          op_data, &lockh, NULL, 0, 0);
2071
2072         ll_finish_md_op_data(op_data);
2073         if (rc < 0)
2074                 GOTO(out, rc);
2075
2076         rc = oit.d.lustre.it_status;
2077
2078         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2079                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2080                 ptlrpc_req_finished((struct ptlrpc_request *)
2081                                     oit.d.lustre.it_data);
2082                 GOTO(out, rc);
2083         }
2084
2085         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2086                                            * away */
2087                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2088                 oit.d.lustre.it_lock_mode = 0;
2089         }
2090         ll_release_openhandle(head_filp->f_dentry, &oit);
2091 out:
2092         ll_intent_release(&oit);
2093         RETURN(rc);
2094 }
2095
2096 static int ll_file_join(struct inode *head, struct file *filp,
2097                         char *filename_tail)
2098 {
2099         struct inode *tail = NULL, *first = NULL, *second = NULL;
2100         struct dentry *tail_dentry;
2101         struct file *tail_filp, *first_filp, *second_filp;
2102         struct ll_lock_tree first_tree, second_tree;
2103         struct ll_lock_tree_node *first_node, *second_node;
2104         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2105         int rc = 0, cleanup_phase = 0;
2106         ENTRY;
2107
2108         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2109                head->i_ino, head->i_generation, head, filename_tail);
2110
2111         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2112         if (IS_ERR(tail_filp)) {
2113                 CERROR("Can not open tail file %s", filename_tail);
2114                 rc = PTR_ERR(tail_filp);
2115                 GOTO(cleanup, rc);
2116         }
2117         tail = igrab(tail_filp->f_dentry->d_inode);
2118
2119         tlli = ll_i2info(tail);
2120         tail_dentry = tail_filp->f_dentry;
2121         LASSERT(tail_dentry);
2122         cleanup_phase = 1;
2123
2124         /*reorder the inode for lock sequence*/
2125         first = head->i_ino > tail->i_ino ? head : tail;
2126         second = head->i_ino > tail->i_ino ? tail : head;
2127         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2128         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2129
2130         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2131                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2132         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2133         if (IS_ERR(first_node)){
2134                 rc = PTR_ERR(first_node);
2135                 GOTO(cleanup, rc);
2136         }
2137         first_tree.lt_fd = first_filp->private_data;
2138         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2139         if (rc != 0)
2140                 GOTO(cleanup, rc);
2141         cleanup_phase = 2;
2142
2143         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2144         if (IS_ERR(second_node)){
2145                 rc = PTR_ERR(second_node);
2146                 GOTO(cleanup, rc);
2147         }
2148         second_tree.lt_fd = second_filp->private_data;
2149         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2150         if (rc != 0)
2151                 GOTO(cleanup, rc);
2152         cleanup_phase = 3;
2153
2154         rc = join_sanity_check(head, tail);
2155         if (rc)
2156                 GOTO(cleanup, rc);
2157
2158         rc = join_file(head, filp, tail_filp);
2159         if (rc)
2160                 GOTO(cleanup, rc);
2161 cleanup:
2162         switch (cleanup_phase) {
2163         case 3:
2164                 ll_tree_unlock(&second_tree);
2165                 obd_cancel_unused(ll_i2dtexp(second),
2166                                   ll_i2info(second)->lli_smd, 0, NULL);
2167         case 2:
2168                 ll_tree_unlock(&first_tree);
2169                 obd_cancel_unused(ll_i2dtexp(first),
2170                                   ll_i2info(first)->lli_smd, 0, NULL);
2171         case 1:
2172                 filp_close(tail_filp, 0);
2173                 if (tail)
2174                         iput(tail);
2175                 if (head && rc == 0) {
2176                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2177                                        &hlli->lli_smd);
2178                         hlli->lli_smd = NULL;
2179                 }
2180         case 0:
2181                 break;
2182         default:
2183                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2184                 LBUG();
2185         }
2186         RETURN(rc);
2187 }
2188
2189 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2190 {
2191         struct inode *inode = dentry->d_inode;
2192         struct obd_client_handle *och;
2193         int rc;
2194         ENTRY;
2195
2196         LASSERT(inode);
2197
2198         /* Root ? Do nothing. */
2199         if (dentry->d_inode->i_sb->s_root == dentry)
2200                 RETURN(0);
2201
2202         /* No open handle to close? Move away */
2203         if (!it_disposition(it, DISP_OPEN_OPEN))
2204                 RETURN(0);
2205
2206         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2207
2208         OBD_ALLOC(och, sizeof(*och));
2209         if (!och)
2210                 GOTO(out, rc = -ENOMEM);
2211
2212         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2213                     ll_i2info(inode), it, och);
2214
2215         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2216                                        inode, och);
2217  out:
2218         /* this one is in place of ll_file_open */
2219         ptlrpc_req_finished(it->d.lustre.it_data);
2220         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2221         RETURN(rc);
2222 }
2223
2224 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2225                   unsigned long arg)
2226 {
2227         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2228         int flags;
2229         ENTRY;
2230
2231         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2232                inode->i_generation, inode, cmd);
2233         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2234
2235         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2236         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2237                 RETURN(-ENOTTY);
2238
2239         switch(cmd) {
2240         case LL_IOC_GETFLAGS:
2241                 /* Get the current value of the file flags */
2242                 return put_user(fd->fd_flags, (int *)arg);
2243         case LL_IOC_SETFLAGS:
2244         case LL_IOC_CLRFLAGS:
2245                 /* Set or clear specific file flags */
2246                 /* XXX This probably needs checks to ensure the flags are
2247                  *     not abused, and to handle any flag side effects.
2248                  */
2249                 if (get_user(flags, (int *) arg))
2250                         RETURN(-EFAULT);
2251
2252                 if (cmd == LL_IOC_SETFLAGS) {
2253                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2254                             !(file->f_flags & O_DIRECT)) {
2255                                 CERROR("%s: unable to disable locking on "
2256                                        "non-O_DIRECT file\n", current->comm);
2257                                 RETURN(-EINVAL);
2258                         }
2259
2260                         fd->fd_flags |= flags;
2261                 } else {
2262                         fd->fd_flags &= ~flags;
2263                 }
2264                 RETURN(0);
2265         case LL_IOC_LOV_SETSTRIPE:
2266                 RETURN(ll_lov_setstripe(inode, file, arg));
2267         case LL_IOC_LOV_SETEA:
2268                 RETURN(ll_lov_setea(inode, file, arg));
2269         case LL_IOC_LOV_GETSTRIPE:
2270                 RETURN(ll_lov_getstripe(inode, arg));
2271         case LL_IOC_RECREATE_OBJ:
2272                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2273         case EXT3_IOC_GETFLAGS:
2274         case EXT3_IOC_SETFLAGS:
2275                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2276         case EXT3_IOC_GETVERSION_OLD:
2277         case EXT3_IOC_GETVERSION:
2278                 RETURN(put_user(inode->i_generation, (int *)arg));
2279         case LL_IOC_JOIN: {
2280                 char *ftail;
2281                 int rc;
2282
2283                 ftail = getname((const char *)arg);
2284                 if (IS_ERR(ftail))
2285                         RETURN(PTR_ERR(ftail));
2286                 rc = ll_file_join(inode, file, ftail);
2287                 putname(ftail);
2288                 RETURN(rc);
2289         }
2290         case LL_IOC_GROUP_LOCK:
2291                 RETURN(ll_get_grouplock(inode, file, arg));
2292         case LL_IOC_GROUP_UNLOCK:
2293                 RETURN(ll_put_grouplock(inode, file, arg));
2294         case IOC_OBD_STATFS:
2295                 RETURN(ll_obd_statfs(inode, (void *)arg));
2296
2297         /* We need to special case any other ioctls we want to handle,
2298          * to send them to the MDS/OST as appropriate and to properly
2299          * network encode the arg field.
2300         case EXT3_IOC_SETVERSION_OLD:
2301         case EXT3_IOC_SETVERSION:
2302         */
2303         case LL_IOC_FLUSHCTX:
2304                 RETURN(ll_flush_ctx(inode));
2305         default: {
2306                 int err;
2307
2308                 if (LLIOC_STOP == 
2309                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2310                         RETURN(err);
2311
2312                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2313                                      (void *)arg));
2314         }
2315         }
2316 }
2317
2318 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2319 {
2320         struct inode *inode = file->f_dentry->d_inode;
2321         struct ll_inode_info *lli = ll_i2info(inode);
2322         struct lov_stripe_md *lsm = lli->lli_smd;
2323         loff_t retval;
2324         ENTRY;
2325         retval = offset + ((origin == 2) ? i_size_read(inode) :
2326                            (origin == 1) ? file->f_pos : 0);
2327         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2328                inode->i_ino, inode->i_generation, inode, retval, retval,
2329                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2330         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2331
2332         if (origin == 2) { /* SEEK_END */
2333                 int nonblock = 0, rc;
2334
2335                 if (file->f_flags & O_NONBLOCK)
2336                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2337
2338                 if (lsm != NULL) {
2339                         rc = ll_glimpse_size(inode, nonblock);
2340                         if (rc != 0)
2341                                 RETURN(rc);
2342                 }
2343
2344                 ll_inode_size_lock(inode, 0);
2345                 offset += i_size_read(inode);
2346                 ll_inode_size_unlock(inode, 0);
2347         } else if (origin == 1) { /* SEEK_CUR */
2348                 offset += file->f_pos;
2349         }
2350
2351         retval = -EINVAL;
2352         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2353                 if (offset != file->f_pos) {
2354                         file->f_pos = offset;
2355 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2356                         file->f_reada = 0;
2357                         file->f_version = ++event;
2358 #endif
2359                 }
2360                 retval = offset;
2361         }
2362         
2363         RETURN(retval);
2364 }
2365
2366 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2367 {
2368         struct inode *inode = dentry->d_inode;
2369         struct ll_inode_info *lli = ll_i2info(inode);
2370         struct lov_stripe_md *lsm = lli->lli_smd;
2371         struct ptlrpc_request *req;
2372         struct obd_capa *oc;
2373         int rc, err;
2374         ENTRY;
2375         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2376                inode->i_generation, inode);
2377         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2378
2379         /* fsync's caller has already called _fdata{sync,write}, we want
2380          * that IO to finish before calling the osc and mdc sync methods */
2381         rc = filemap_fdatawait(inode->i_mapping);
2382
2383         /* catch async errors that were recorded back when async writeback
2384          * failed for pages in this mapping. */
2385         err = lli->lli_async_rc;
2386         lli->lli_async_rc = 0;
2387         if (rc == 0)
2388                 rc = err;
2389         if (lsm) {
2390                 err = lov_test_and_clear_async_rc(lsm);
2391                 if (rc == 0)
2392                         rc = err;
2393         }
2394
2395         oc = ll_mdscapa_get(inode);
2396         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2397                       &req);
2398         capa_put(oc);
2399         if (!rc)
2400                 rc = err;
2401         if (!err)
2402                 ptlrpc_req_finished(req);
2403
2404         if (data && lsm) {
2405                 struct obdo *oa;
2406                 
2407                 OBDO_ALLOC(oa);
2408                 if (!oa)
2409                         RETURN(rc ? rc : -ENOMEM);
2410
2411                 oa->o_id = lsm->lsm_object_id;
2412                 oa->o_gr = lsm->lsm_object_gr;
2413                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2414                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2415                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2416                                            OBD_MD_FLGROUP);
2417
2418                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2419                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2420                                0, OBD_OBJECT_EOF, oc);
2421                 capa_put(oc);
2422                 if (!rc)
2423                         rc = err;
2424                 OBDO_FREE(oa);
2425         }
2426
2427         RETURN(rc);
2428 }
2429
2430 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2431 {
2432         struct inode *inode = file->f_dentry->d_inode;
2433         struct ll_sb_info *sbi = ll_i2sbi(inode);
2434         struct ldlm_res_id res_id =
2435                 { .name = { fid_seq(ll_inode2fid(inode)),
2436                             fid_oid(ll_inode2fid(inode)),
2437                             fid_ver(ll_inode2fid(inode)),
2438                             LDLM_FLOCK} };
2439         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2440                 ldlm_flock_completion_ast, NULL, file_lock };
2441         struct lustre_handle lockh = {0};
2442         ldlm_policy_data_t flock;
2443         int flags = 0;
2444         int rc;
2445         ENTRY;
2446
2447         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2448                inode->i_ino, file_lock);
2449
2450         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2451  
2452         if (file_lock->fl_flags & FL_FLOCK) {
2453                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2454                 /* set missing params for flock() calls */
2455                 file_lock->fl_end = OFFSET_MAX;
2456                 file_lock->fl_pid = current->tgid;
2457         }
2458         flock.l_flock.pid = file_lock->fl_pid;
2459         flock.l_flock.start = file_lock->fl_start;
2460         flock.l_flock.end = file_lock->fl_end;
2461
2462         switch (file_lock->fl_type) {
2463         case F_RDLCK:
2464                 einfo.ei_mode = LCK_PR;
2465                 break;
2466         case F_UNLCK:
2467                 /* An unlock request may or may not have any relation to
2468                  * existing locks so we may not be able to pass a lock handle
2469                  * via a normal ldlm_lock_cancel() request. The request may even
2470                  * unlock a byte range in the middle of an existing lock. In
2471                  * order to process an unlock request we need all of the same
2472                  * information that is given with a normal read or write record
2473                  * lock request. To avoid creating another ldlm unlock (cancel)
2474                  * message we'll treat a LCK_NL flock request as an unlock. */
2475                 einfo.ei_mode = LCK_NL;
2476                 break;
2477         case F_WRLCK:
2478                 einfo.ei_mode = LCK_PW;
2479                 break;
2480         default:
2481                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2482                 LBUG();
2483         }
2484
2485         switch (cmd) {
2486         case F_SETLKW:
2487 #ifdef F_SETLKW64
2488         case F_SETLKW64:
2489 #endif
2490                 flags = 0;
2491                 break;
2492         case F_SETLK:
2493 #ifdef F_SETLK64
2494         case F_SETLK64:
2495 #endif
2496                 flags = LDLM_FL_BLOCK_NOWAIT;
2497                 break;
2498         case F_GETLK:
2499 #ifdef F_GETLK64
2500         case F_GETLK64:
2501 #endif
2502                 flags = LDLM_FL_TEST_LOCK;
2503                 /* Save the old mode so that if the mode in the lock changes we
2504                  * can decrement the appropriate reader or writer refcount. */
2505                 file_lock->fl_type = einfo.ei_mode;
2506                 break;
2507         default:
2508                 CERROR("unknown fcntl lock command: %d\n", cmd);
2509                 LBUG();
2510         }
2511
2512         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2513                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2514                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2515
2516         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2517                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2518         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2519                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2520 #ifdef HAVE_F_OP_FLOCK
2521         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2522             !(flags & LDLM_FL_TEST_LOCK))
2523                 posix_lock_file_wait(file, file_lock);
2524 #endif
2525
2526         RETURN(rc);
2527 }
2528
2529 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2530 {
2531         ENTRY;
2532
2533         RETURN(-ENOSYS);
2534 }
2535
2536 int ll_have_md_lock(struct inode *inode, __u64 bits)
2537 {
2538         struct lustre_handle lockh;
2539         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2540         struct lu_fid *fid;
2541         int flags;
2542         ENTRY;
2543
2544         if (!inode)
2545                RETURN(0);
2546
2547         fid = &ll_i2info(inode)->lli_fid;
2548         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2549
2550         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2551         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2552                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2553                 RETURN(1);
2554         }
2555         RETURN(0);
2556 }
2557
2558 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2559                             struct lustre_handle *lockh)
2560 {
2561         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2562         struct lu_fid *fid;
2563         ldlm_mode_t rc;
2564         int flags;
2565         ENTRY;
2566
2567         fid = &ll_i2info(inode)->lli_fid;
2568         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2569
2570         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2571         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2572                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2573         RETURN(rc);
2574 }
2575
2576 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2577         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2578                               * and return success */
2579                 inode->i_nlink = 0;
2580                 /* This path cannot be hit for regular files unless in
2581                  * case of obscure races, so no need to to validate
2582                  * size. */
2583                 if (!S_ISREG(inode->i_mode) &&
2584                     !S_ISDIR(inode->i_mode))
2585                         return 0;
2586         }
2587
2588         if (rc) {
2589                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2590                 return -abs(rc);
2591
2592         }
2593
2594         return 0;
2595 }
2596
2597 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2598 {
2599         struct inode *inode = dentry->d_inode;
2600         struct ptlrpc_request *req = NULL;
2601         struct ll_sb_info *sbi;
2602         struct obd_export *exp;
2603         int rc;
2604         ENTRY;
2605
2606         if (!inode) {
2607                 CERROR("REPORT THIS LINE TO PETER\n");
2608                 RETURN(0);
2609         }
2610         sbi = ll_i2sbi(inode);
2611
2612         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2613                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2614
2615         exp = ll_i2mdexp(inode);
2616
2617         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2618                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2619                 struct md_op_data *op_data;
2620
2621                 /* Call getattr by fid, so do not provide name at all. */
2622                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2623                                              dentry->d_inode, NULL, 0, 0,
2624                                              LUSTRE_OPC_ANY, NULL);
2625                 if (IS_ERR(op_data))
2626                         RETURN(PTR_ERR(op_data));
2627
2628                 oit.it_flags |= O_CHECK_STALE;
2629                 rc = md_intent_lock(exp, op_data, NULL, 0,
2630                                     /* we are not interested in name
2631                                        based lookup */
2632                                     &oit, 0, &req,
2633                                     ll_md_blocking_ast, 0);
2634                 ll_finish_md_op_data(op_data);
2635                 oit.it_flags &= ~O_CHECK_STALE;
2636                 if (rc < 0) {
2637                         rc = ll_inode_revalidate_fini(inode, rc);
2638                         GOTO (out, rc);
2639                 }
2640
2641                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2642                 if (rc != 0) {
2643                         ll_intent_release(&oit);
2644                         GOTO(out, rc);
2645                 }
2646
2647                 /* Unlinked? Unhash dentry, so it is not picked up later by
2648                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2649                    here to preserve get_cwd functionality on 2.6.
2650                    Bug 10503 */
2651                 if (!dentry->d_inode->i_nlink) {
2652                         spin_lock(&dcache_lock);
2653                         ll_drop_dentry(dentry);
2654                         spin_unlock(&dcache_lock);
2655                 }
2656
2657                 ll_lookup_finish_locks(&oit, dentry);
2658         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE)) {
2659                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2660                 obd_valid valid = OBD_MD_FLGETATTR;
2661                 struct obd_capa *oc;
2662                 int ealen = 0;
2663
2664                 if (S_ISREG(inode->i_mode)) {
2665                         rc = ll_get_max_mdsize(sbi, &ealen);
2666                         if (rc)
2667                                 RETURN(rc);
2668                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2669                 }
2670                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2671                  * capa for this inode. Because we only keep capas of dirs
2672                  * fresh. */
2673                 oc = ll_mdscapa_get(inode);
2674                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2675                                 ealen, &req);
2676                 capa_put(oc);
2677                 if (rc) {
2678                         rc = ll_inode_revalidate_fini(inode, rc);
2679                         RETURN(rc);
2680                 }
2681
2682                 rc = ll_prep_inode(&inode, req, NULL);
2683                 if (rc)
2684                         GOTO(out, rc);
2685         }
2686
2687         /* if object not yet allocated, don't validate size */
2688         if (ll_i2info(inode)->lli_smd == NULL)
2689                 GOTO(out, rc = 0);
2690
2691         /* ll_glimpse_size will prefer locally cached writes if they extend
2692          * the file */
2693         rc = ll_glimpse_size(inode, 0);
2694         EXIT;
2695 out:
2696         ptlrpc_req_finished(req);
2697         return rc;
2698 }
2699
2700 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2701                   struct lookup_intent *it, struct kstat *stat)
2702 {
2703         struct inode *inode = de->d_inode;
2704         int res = 0;
2705
2706         res = ll_inode_revalidate_it(de, it);
2707         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2708
2709         if (res)
2710                 return res;
2711
2712         stat->dev = inode->i_sb->s_dev;
2713         stat->ino = inode->i_ino;
2714         stat->mode = inode->i_mode;
2715         stat->nlink = inode->i_nlink;
2716         stat->uid = inode->i_uid;
2717         stat->gid = inode->i_gid;
2718         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2719         stat->atime = inode->i_atime;
2720         stat->mtime = inode->i_mtime;
2721         stat->ctime = inode->i_ctime;
2722 #ifdef HAVE_INODE_BLKSIZE
2723         stat->blksize = inode->i_blksize;
2724 #else
2725         stat->blksize = 1 << inode->i_blkbits;
2726 #endif
2727
2728         ll_inode_size_lock(inode, 0);
2729         stat->size = i_size_read(inode);
2730         stat->blocks = inode->i_blocks;
2731         ll_inode_size_unlock(inode, 0);
2732
2733         return 0;
2734 }
2735 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2736 {
2737         struct lookup_intent it = { .it_op = IT_GETATTR };
2738
2739         return ll_getattr_it(mnt, de, &it, stat);
2740 }
2741
2742 static
2743 int lustre_check_acl(struct inode *inode, int mask)
2744 {
2745 #ifdef CONFIG_FS_POSIX_ACL
2746         struct ll_inode_info *lli = ll_i2info(inode);
2747         struct posix_acl *acl;
2748         int rc;
2749         ENTRY;
2750
2751         spin_lock(&lli->lli_lock);
2752         acl = posix_acl_dup(lli->lli_posix_acl);
2753         spin_unlock(&lli->lli_lock);
2754
2755         if (!acl)
2756                 RETURN(-EAGAIN);
2757
2758         rc = posix_acl_permission(inode, acl, mask);
2759         posix_acl_release(acl);
2760
2761         RETURN(rc);
2762 #else
2763         return -EAGAIN;
2764 #endif
2765 }
2766
2767 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2768 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2769 {
2770         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2771                inode->i_ino, inode->i_generation, inode, mask);
2772         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2773                 return lustre_check_remote_perm(inode, mask);
2774         
2775         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2776         return generic_permission(inode, mask, lustre_check_acl);
2777 }
2778 #else
2779 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2780 {
2781         int mode = inode->i_mode;
2782         int rc;
2783
2784         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2785                inode->i_ino, inode->i_generation, inode, mask);
2786
2787         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2788                 return lustre_check_remote_perm(inode, mask);
2789
2790         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2791
2792         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2793             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2794                 return -EROFS;
2795         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2796                 return -EACCES;
2797         if (current->fsuid == inode->i_uid) {
2798                 mode >>= 6;
2799         } else if (1) {
2800                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2801                         goto check_groups;
2802                 rc = lustre_check_acl(inode, mask);
2803                 if (rc == -EAGAIN)
2804                         goto check_groups;
2805                 if (rc == -EACCES)
2806                         goto check_capabilities;
2807                 return rc;
2808         } else {
2809 check_groups:
2810                 if (in_group_p(inode->i_gid))
2811                         mode >>= 3;
2812         }
2813         if ((mode & mask & S_IRWXO) == mask)
2814                 return 0;
2815
2816 check_capabilities:
2817         if (!(mask & MAY_EXEC) ||
2818             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2819                 if (capable(CAP_DAC_OVERRIDE))
2820                         return 0;
2821
2822         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2823             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2824                 return 0;
2825         
2826         return -EACCES;
2827 }
2828 #endif
2829
2830 /* -o localflock - only provides locally consistent flock locks */
2831 struct file_operations ll_file_operations = {
2832         .read           = ll_file_read,
2833         .write          = ll_file_write,
2834         .ioctl          = ll_file_ioctl,
2835         .open           = ll_file_open,
2836         .release        = ll_file_release,
2837         .mmap           = ll_file_mmap,
2838         .llseek         = ll_file_seek,
2839         .sendfile       = ll_file_sendfile,
2840         .fsync          = ll_fsync,
2841 };
2842
2843 struct file_operations ll_file_operations_flock = {
2844         .read           = ll_file_read,
2845         .write          = ll_file_write,
2846         .ioctl          = ll_file_ioctl,
2847         .open           = ll_file_open,
2848         .release        = ll_file_release,
2849         .mmap           = ll_file_mmap,
2850         .llseek         = ll_file_seek,
2851         .sendfile       = ll_file_sendfile,
2852         .fsync          = ll_fsync,
2853 #ifdef HAVE_F_OP_FLOCK
2854         .flock          = ll_file_flock,
2855 #endif
2856         .lock           = ll_file_flock
2857 };
2858
2859 /* These are for -o noflock - to return ENOSYS on flock calls */
2860 struct file_operations ll_file_operations_noflock = {
2861         .read           = ll_file_read,
2862         .write          = ll_file_write,
2863         .ioctl          = ll_file_ioctl,
2864         .open           = ll_file_open,
2865         .release        = ll_file_release,
2866         .mmap           = ll_file_mmap,
2867         .llseek         = ll_file_seek,
2868         .sendfile       = ll_file_sendfile,
2869         .fsync          = ll_fsync,
2870 #ifdef HAVE_F_OP_FLOCK
2871         .flock          = ll_file_noflock,
2872 #endif
2873         .lock           = ll_file_noflock
2874 };
2875
2876 struct inode_operations ll_file_inode_operations = {
2877 #ifdef HAVE_VFS_INTENT_PATCHES
2878         .setattr_raw    = ll_setattr_raw,
2879 #endif
2880         .setattr        = ll_setattr,
2881         .truncate       = ll_truncate,
2882         .getattr        = ll_getattr,
2883         .permission     = ll_inode_permission,
2884         .setxattr       = ll_setxattr,
2885         .getxattr       = ll_getxattr,
2886         .listxattr      = ll_listxattr,
2887         .removexattr    = ll_removexattr,
2888 };
2889
2890 /* dynamic ioctl number support routins */
2891 static struct llioc_ctl_data {
2892         struct rw_semaphore ioc_sem;
2893         struct list_head    ioc_head;
2894 } llioc = { 
2895         __RWSEM_INITIALIZER(llioc.ioc_sem), 
2896         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2897 };
2898
2899
2900 struct llioc_data {
2901         struct list_head        iocd_list;
2902         unsigned int            iocd_size;
2903         llioc_callback_t        iocd_cb;
2904         unsigned int            iocd_count;
2905         unsigned int            iocd_cmd[0];
2906 };
2907
2908 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2909 {
2910         unsigned int size;
2911         struct llioc_data *in_data = NULL;
2912         ENTRY;
2913
2914         if (cb == NULL || cmd == NULL ||
2915             count > LLIOC_MAX_CMD || count < 0)
2916                 RETURN(NULL);
2917
2918         size = sizeof(*in_data) + count * sizeof(unsigned int);
2919         OBD_ALLOC(in_data, size);
2920         if (in_data == NULL)
2921                 RETURN(NULL);
2922
2923         memset(in_data, 0, sizeof(*in_data));
2924         in_data->iocd_size = size;
2925         in_data->iocd_cb = cb;
2926         in_data->iocd_count = count;
2927         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2928
2929         down_write(&llioc.ioc_sem);
2930         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2931         up_write(&llioc.ioc_sem);
2932
2933         RETURN(in_data);
2934 }
2935
2936 void ll_iocontrol_unregister(void *magic)
2937 {
2938         struct llioc_data *tmp;
2939
2940         if (magic == NULL)
2941                 return;
2942
2943         down_write(&llioc.ioc_sem);
2944         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2945                 if (tmp == magic) {
2946                         unsigned int size = tmp->iocd_size;
2947
2948                         list_del(&tmp->iocd_list);
2949                         up_write(&llioc.ioc_sem);
2950
2951                         OBD_FREE(tmp, size);
2952                         return;
2953                 }
2954         }
2955         up_write(&llioc.ioc_sem);
2956
2957         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2958 }
2959
2960 EXPORT_SYMBOL(ll_iocontrol_register);
2961 EXPORT_SYMBOL(ll_iocontrol_unregister);
2962
2963 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
2964                         unsigned int cmd, unsigned long arg, int *rcp)
2965 {
2966         enum llioc_iter ret = LLIOC_CONT;
2967         struct llioc_data *data;
2968         int rc = -EINVAL, i;
2969
2970         down_read(&llioc.ioc_sem);
2971         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2972                 for (i = 0; i < data->iocd_count; i++) {
2973                         if (cmd != data->iocd_cmd[i]) 
2974                                 continue;
2975
2976                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2977                         break;
2978                 }
2979
2980                 if (ret == LLIOC_STOP)
2981                         break;
2982         }
2983         up_read(&llioc.ioc_sem);
2984
2985         if (rcp)
2986                 *rcp = rc;
2987         return ret;
2988 }