Whamcloud - gitweb
Fixes some scability and access to not inited memory problems
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #include "llite_internal.h"
32
33 /* also used by llite/special.c:ll_special_open() */
34 struct ll_file_data *ll_file_data_get(void)
35 {
36         struct ll_file_data *fd;
37
38         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
39         return fd;
40 }
41
42 static void ll_file_data_put(struct ll_file_data *fd)
43 {
44         if (fd != NULL)
45                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
46 }
47
48 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
49                           struct lustre_handle *fh)
50 {
51         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
52         op_data->op_attr.ia_mode = inode->i_mode;
53         op_data->op_attr.ia_atime = inode->i_atime;
54         op_data->op_attr.ia_mtime = inode->i_mtime;
55         op_data->op_attr.ia_ctime = inode->i_ctime;
56         op_data->op_attr.ia_size = i_size_read(inode);
57         op_data->op_attr_blocks = inode->i_blocks;
58         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
59         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
60         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
61         op_data->op_capa1 = ll_mdscapa_get(inode);
62 }
63
64 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
65                              struct obd_client_handle *och)
66 {
67         ENTRY;
68
69         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
70                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
71
72         if (!(och->och_flags & FMODE_WRITE))
73                 goto out;
74
75         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
76             !S_ISREG(inode->i_mode))
77                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
78         else
79                 ll_epoch_close(inode, op_data, &och, 0);
80
81 out:
82         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
83         EXIT;
84 }
85
86 static int ll_close_inode_openhandle(struct obd_export *md_exp,
87                                      struct inode *inode,
88                                      struct obd_client_handle *och)
89 {
90         struct obd_export *exp = ll_i2mdexp(inode);
91         struct md_op_data *op_data;
92         struct ptlrpc_request *req = NULL;
93         struct obd_device *obd = class_exp2obd(exp);
94         int epoch_close = 1;
95         int seq_end = 0, rc;
96         ENTRY;
97
98         if (obd == NULL) {
99                 /*
100                  * XXX: in case of LMV, is this correct to access
101                  * ->exp_handle?
102                  */
103                 CERROR("Invalid MDC connection handle "LPX64"\n",
104                        ll_i2mdexp(inode)->exp_handle.h_cookie);
105                 GOTO(out, rc = 0);
106         }
107
108         /*
109          * here we check if this is forced umount. If so this is called on
110          * canceling "open lock" and we do not call md_close() in this case, as
111          * it will not be successful, as import is already deactivated.
112          */
113         if (obd->obd_force)
114                 GOTO(out, rc = 0);
115
116         OBD_ALLOC_PTR(op_data);
117         if (op_data == NULL)
118                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
119
120         ll_prepare_close(inode, op_data, och);
121         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
122         rc = md_close(md_exp, op_data, och->och_mod, &req);
123         if (rc != -EAGAIN)
124                 seq_end = 1;
125
126         if (rc == -EAGAIN) {
127                 /* This close must have the epoch closed. */
128                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
129                 LASSERT(epoch_close);
130                 /* MDS has instructed us to obtain Size-on-MDS attribute from
131                  * OSTs and send setattr to back to MDS. */
132                 rc = ll_sizeonmds_update(inode, och->och_mod,
133                                          &och->och_fh, op_data->op_ioepoch);
134                 if (rc) {
135                         CERROR("inode %lu mdc Size-on-MDS update failed: "
136                                "rc = %d\n", inode->i_ino, rc);
137                         rc = 0;
138                 }
139         } else if (rc) {
140                 CERROR("inode %lu mdc close failed: rc = %d\n",
141                        inode->i_ino, rc);
142         }
143         ll_finish_md_op_data(op_data);
144
145         if (rc == 0) {
146                 rc = ll_objects_destroy(req, inode);
147                 if (rc)
148                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
149                                inode->i_ino, rc);
150         }
151
152         EXIT;
153 out:
154       
155         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
156             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
157                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
158         } else {
159                 if (seq_end)
160                         ptlrpc_close_replay_seq(req);
161                 md_clear_open_replay_data(md_exp, och);
162                 /* Free @och if it is not waiting for DONE_WRITING. */
163                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
164                 OBD_FREE_PTR(och);
165         }
166         if (req) /* This is close request */
167                 ptlrpc_req_finished(req);
168         return rc;
169 }
170
171 int ll_md_real_close(struct inode *inode, int flags)
172 {
173         struct ll_inode_info *lli = ll_i2info(inode);
174         struct obd_client_handle **och_p;
175         struct obd_client_handle *och;
176         __u64 *och_usecount;
177         int rc = 0;
178         ENTRY;
179
180         if (flags & FMODE_WRITE) {
181                 och_p = &lli->lli_mds_write_och;
182                 och_usecount = &lli->lli_open_fd_write_count;
183         } else if (flags & FMODE_EXEC) {
184                 och_p = &lli->lli_mds_exec_och;
185                 och_usecount = &lli->lli_open_fd_exec_count;
186         } else {
187                 LASSERT(flags & FMODE_READ);
188                 och_p = &lli->lli_mds_read_och;
189                 och_usecount = &lli->lli_open_fd_read_count;
190         }
191
192         down(&lli->lli_och_sem);
193         if (*och_usecount) { /* There are still users of this handle, so
194                                 skip freeing it. */
195                 up(&lli->lli_och_sem);
196                 RETURN(0);
197         }
198         och=*och_p;
199         *och_p = NULL;
200         up(&lli->lli_och_sem);
201
202         if (och) { /* There might be a race and somebody have freed this och
203                       already */
204                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
205                                                inode, och);
206         }
207
208         RETURN(rc);
209 }
210
211 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
212                 struct file *file)
213 {
214         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
215         struct ll_inode_info *lli = ll_i2info(inode);
216         int rc = 0;
217         ENTRY;
218
219         /* clear group lock, if present */
220         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
221                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
222                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
223                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
224                                       &fd->fd_cwlockh);
225         }
226
227         /* Let's see if we have good enough OPEN lock on the file and if
228            we can skip talking to MDS */
229         if (file->f_dentry->d_inode) { /* Can this ever be false? */
230                 int lockmode;
231                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
232                 struct lustre_handle lockh;
233                 struct inode *inode = file->f_dentry->d_inode;
234                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
235
236                 down(&lli->lli_och_sem);
237                 if (fd->fd_omode & FMODE_WRITE) {
238                         lockmode = LCK_CW;
239                         LASSERT(lli->lli_open_fd_write_count);
240                         lli->lli_open_fd_write_count--;
241                 } else if (fd->fd_omode & FMODE_EXEC) {
242                         lockmode = LCK_PR;
243                         LASSERT(lli->lli_open_fd_exec_count);
244                         lli->lli_open_fd_exec_count--;
245                 } else {
246                         lockmode = LCK_CR;
247                         LASSERT(lli->lli_open_fd_read_count);
248                         lli->lli_open_fd_read_count--;
249                 }
250                 up(&lli->lli_och_sem);
251
252                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
253                                    LDLM_IBITS, &policy, lockmode,
254                                    &lockh)) {
255                         rc = ll_md_real_close(file->f_dentry->d_inode,
256                                               fd->fd_omode);
257                 }
258         } else {
259                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
260                        file, file->f_dentry, file->f_dentry->d_name.name);
261         }
262
263         LUSTRE_FPRIVATE(file) = NULL;
264         ll_file_data_put(fd);
265         ll_capa_close(inode);
266
267         RETURN(rc);
268 }
269
270 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
271
272 /* While this returns an error code, fput() the caller does not, so we need
273  * to make every effort to clean up all of our state here.  Also, applications
274  * rarely check close errors and even if an error is returned they will not
275  * re-try the close call.
276  */
277 int ll_file_release(struct inode *inode, struct file *file)
278 {
279         struct ll_file_data *fd;
280         struct ll_sb_info *sbi = ll_i2sbi(inode);
281         struct ll_inode_info *lli = ll_i2info(inode);
282         struct lov_stripe_md *lsm = lli->lli_smd;
283         int rc;
284
285         ENTRY;
286         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
287                inode->i_generation, inode);
288
289         /* don't do anything for / */
290         if (inode->i_sb->s_root == file->f_dentry)
291                 RETURN(0);
292
293         ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
294         fd = LUSTRE_FPRIVATE(file);
295         LASSERT(fd != NULL);
296
297         /* don't do anything for / */
298         if (inode->i_sb->s_root == file->f_dentry) {
299                 LUSTRE_FPRIVATE(file) = NULL;
300                 ll_file_data_put(fd);
301                 RETURN(0);
302         }
303         
304         if (lsm)
305                 lov_test_and_clear_async_rc(lsm);
306         lli->lli_async_rc = 0;
307
308         rc = ll_md_close(sbi->ll_md_exp, inode, file);
309         RETURN(rc);
310 }
311
312 static int ll_intent_file_open(struct file *file, void *lmm,
313                                int lmmsize, struct lookup_intent *itp)
314 {
315         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
316         struct dentry *parent = file->f_dentry->d_parent;
317         const char *name = file->f_dentry->d_name.name;
318         const int len = file->f_dentry->d_name.len;
319         struct md_op_data *op_data;
320         struct ptlrpc_request *req;
321         int rc;
322
323         if (!parent)
324                 RETURN(-ENOENT);
325
326         /* Usually we come here only for NFSD, and we want open lock.
327            But we can also get here with pre 2.6.15 patchless kernels, and in
328            that case that lock is also ok */
329         /* We can also get here if there was cached open handle in revalidate_it
330          * but it disappeared while we were getting from there to ll_file_open.
331          * But this means this file was closed and immediatelly opened which
332          * makes a good candidate for using OPEN lock */
333         /* If lmmsize & lmm are not 0, we are just setting stripe info
334          * parameters. No need for the open lock */
335         if (!lmm && !lmmsize)
336                 itp->it_flags |= MDS_OPEN_LOCK;
337
338         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
339                                       file->f_dentry->d_inode, name, len,
340                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
341         if (IS_ERR(op_data))
342                 RETURN(PTR_ERR(op_data));
343
344         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
345                             0 /*unused */, &req, ll_md_blocking_ast, 0);
346         ll_finish_md_op_data(op_data);
347         if (rc == -ESTALE) {
348                 /* reason for keep own exit path - don`t flood log
349                 * with messages with -ESTALE errors.
350                 */
351                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
352                      it_open_error(DISP_OPEN_OPEN, itp))
353                         GOTO(out, rc);
354                 ll_release_openhandle(file->f_dentry, itp);
355                 GOTO(out_stale, rc);
356         }
357
358         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
359                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
360                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
361                 GOTO(out, rc);
362         }
363
364         if (itp->d.lustre.it_lock_mode)
365                 md_set_lock_data(sbi->ll_md_exp,
366                                  &itp->d.lustre.it_lock_handle, 
367                                  file->f_dentry->d_inode);
368
369         rc = ll_prep_inode(&file->f_dentry->d_inode, req, DLM_REPLY_REC_OFF,
370                            NULL);
371 out:
372         ptlrpc_req_finished(itp->d.lustre.it_data);
373
374 out_stale:
375         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
376         ll_intent_drop_lock(itp);
377
378         RETURN(rc);
379 }
380
381 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
382                        struct lookup_intent *it, struct obd_client_handle *och)
383 {
384         struct ptlrpc_request *req = it->d.lustre.it_data;
385         struct mdt_body *body;
386
387         LASSERT(och);
388
389         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
390         /* reply already checked out */
391         LASSERT(body != NULL);
392         /* and swabbed in md_enqueue */
393         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
394
395         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
396         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
397         och->och_fid = lli->lli_fid;
398         och->och_flags = it->it_flags;
399         lli->lli_ioepoch = body->ioepoch;
400
401         return md_set_open_replay_data(md_exp, och, req);
402 }
403
404 int ll_local_open(struct file *file, struct lookup_intent *it,
405                   struct ll_file_data *fd, struct obd_client_handle *och)
406 {
407         struct inode *inode = file->f_dentry->d_inode;
408         struct ll_inode_info *lli = ll_i2info(inode);
409         ENTRY;
410
411         LASSERT(!LUSTRE_FPRIVATE(file));
412
413         LASSERT(fd != NULL);
414
415         if (och) {
416                 struct ptlrpc_request *req = it->d.lustre.it_data;
417                 struct mdt_body *body;
418                 int rc;
419
420                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
421                 if (rc)
422                         RETURN(rc);
423
424                 body = lustre_msg_buf(req->rq_repmsg,
425                                       DLM_REPLY_REC_OFF, sizeof(*body));
426
427                 if ((it->it_flags & FMODE_WRITE) &&
428                     (body->valid & OBD_MD_FLSIZE))
429                 {
430                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
431                                lli->lli_ioepoch, PFID(&lli->lli_fid));
432                 }
433         }
434
435         LUSTRE_FPRIVATE(file) = fd;
436         ll_readahead_init(inode, &fd->fd_ras);
437         fd->fd_omode = it->it_flags;
438         RETURN(0);
439 }
440
441 /* Open a file, and (for the very first open) create objects on the OSTs at
442  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
443  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
444  * lli_open_sem to ensure no other process will create objects, send the
445  * stripe MD to the MDS, or try to destroy the objects if that fails.
446  *
447  * If we already have the stripe MD locally then we don't request it in
448  * md_open(), by passing a lmm_size = 0.
449  *
450  * It is up to the application to ensure no other processes open this file
451  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
452  * used.  We might be able to avoid races of that sort by getting lli_open_sem
453  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
454  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
455  */
456 int ll_file_open(struct inode *inode, struct file *file)
457 {
458         struct ll_inode_info *lli = ll_i2info(inode);
459         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
460                                           .it_flags = file->f_flags };
461         struct lov_stripe_md *lsm;
462         struct ptlrpc_request *req = NULL;
463         struct obd_client_handle **och_p;
464         __u64 *och_usecount;
465         struct ll_file_data *fd;
466         int rc = 0;
467         ENTRY;
468
469         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
470                inode->i_generation, inode, file->f_flags);
471
472         /* don't do anything for / */
473         if (inode->i_sb->s_root == file->f_dentry)
474                 RETURN(0);
475
476 #ifdef HAVE_VFS_INTENT_PATCHES
477         it = file->f_it;
478 #else
479         it = file->private_data; /* XXX: compat macro */
480         file->private_data = NULL; /* prevent ll_local_open assertion */
481 #endif
482
483         fd = ll_file_data_get();
484         if (fd == NULL)
485                 RETURN(-ENOMEM);
486
487         /* don't do anything for / */
488         if (inode->i_sb->s_root == file->f_dentry) {
489                 LUSTRE_FPRIVATE(file) = fd;
490                 RETURN(0);
491         }
492
493         if (!it || !it->d.lustre.it_disposition) {
494                 /* Convert f_flags into access mode. We cannot use file->f_mode,
495                  * because everything but O_ACCMODE mask was stripped from
496                  * there */
497                 if ((oit.it_flags + 1) & O_ACCMODE)
498                         oit.it_flags++;
499                 if (file->f_flags & O_TRUNC)
500                         oit.it_flags |= FMODE_WRITE;
501
502                 /* kernel only call f_op->open in dentry_open.  filp_open calls
503                  * dentry_open after call to open_namei that checks permissions.
504                  * Only nfsd_open call dentry_open directly without checking
505                  * permissions and because of that this code below is safe. */
506                 if (oit.it_flags & FMODE_WRITE)
507                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
508
509                 /* We do not want O_EXCL here, presumably we opened the file
510                  * already? XXX - NFS implications? */
511                 oit.it_flags &= ~O_EXCL;
512
513                 it = &oit;
514         }
515
516         /* Let's see if we have file open on MDS already. */
517         if (it->it_flags & FMODE_WRITE) {
518                 och_p = &lli->lli_mds_write_och;
519                 och_usecount = &lli->lli_open_fd_write_count;
520         } else if (it->it_flags & FMODE_EXEC) {
521                 och_p = &lli->lli_mds_exec_och;
522                 och_usecount = &lli->lli_open_fd_exec_count;
523          } else {
524                 och_p = &lli->lli_mds_read_och;
525                 och_usecount = &lli->lli_open_fd_read_count;
526         }
527         
528         down(&lli->lli_och_sem);
529         if (*och_p) { /* Open handle is present */
530                 if (it_disposition(it, DISP_OPEN_OPEN)) {
531                         /* Well, there's extra open request that we do not need,
532                            let's close it somehow. This will decref request. */
533                         rc = it_open_error(DISP_OPEN_OPEN, it);
534                         if (rc) {
535                                 ll_file_data_put(fd);
536                                 GOTO(out_och_free, rc);
537                         }       
538                         ll_release_openhandle(file->f_dentry, it);
539                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
540                                              LPROC_LL_OPEN);
541                 }
542                 (*och_usecount)++;
543
544                 rc = ll_local_open(file, it, fd, NULL);
545                 if (rc) {
546                         up(&lli->lli_och_sem);
547                         ll_file_data_put(fd);
548                         RETURN(rc);
549                 }
550         } else {
551                 LASSERT(*och_usecount == 0);
552                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
553                 if (!*och_p) {
554                         ll_file_data_put(fd);
555                         GOTO(out_och_free, rc = -ENOMEM);
556                 }
557                 (*och_usecount)++;
558                 if (!it->d.lustre.it_disposition) {
559                         it->it_flags |= O_CHECK_STALE;
560                         rc = ll_intent_file_open(file, NULL, 0, it);
561                         it->it_flags &= ~O_CHECK_STALE;
562                         if (rc) {
563                                 ll_file_data_put(fd);
564                                 GOTO(out_och_free, rc);
565                         }
566
567                         /* Got some error? Release the request */
568                         if (it->d.lustre.it_status < 0) {
569                                 req = it->d.lustre.it_data;
570                                 ptlrpc_req_finished(req);
571                         }
572                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
573                                          &it->d.lustre.it_lock_handle,
574                                          file->f_dentry->d_inode);
575                 }
576                 req = it->d.lustre.it_data;
577
578                 /* md_intent_lock() didn't get a request ref if there was an
579                  * open error, so don't do cleanup on the request here
580                  * (bug 3430) */
581                 /* XXX (green): Should not we bail out on any error here, not
582                  * just open error? */
583                 rc = it_open_error(DISP_OPEN_OPEN, it);
584                 if (rc) {
585                         ll_file_data_put(fd);
586                         GOTO(out_och_free, rc);
587                 }
588
589                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
590                 rc = ll_local_open(file, it, fd, *och_p);
591                 if (rc) {
592                         up(&lli->lli_och_sem);
593                         ll_file_data_put(fd);
594                         GOTO(out_och_free, rc);
595                 }
596         }
597         up(&lli->lli_och_sem);
598
599         /* Must do this outside lli_och_sem lock to prevent deadlock where
600            different kind of OPEN lock for this same inode gets cancelled
601            by ldlm_cancel_lru */
602         if (!S_ISREG(inode->i_mode))
603                 GOTO(out, rc);
604
605         ll_capa_open(inode);
606
607         lsm = lli->lli_smd;
608         if (lsm == NULL) {
609                 if (file->f_flags & O_LOV_DELAY_CREATE ||
610                     !(file->f_mode & FMODE_WRITE)) {
611                         CDEBUG(D_INODE, "object creation was delayed\n");
612                         GOTO(out, rc);
613                 }
614         }
615         file->f_flags &= ~O_LOV_DELAY_CREATE;
616         GOTO(out, rc);
617 out:
618         ptlrpc_req_finished(req);
619         if (req)
620                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
621 out_och_free:
622         if (rc) {
623                 if (*och_p) {
624                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
625                         *och_p = NULL; /* OBD_FREE writes some magic there */
626                         (*och_usecount)--;
627                 }
628                 up(&lli->lli_och_sem);
629         }
630
631         return rc;
632 }
633
634 /* Fills the obdo with the attributes for the inode defined by lsm */
635 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
636 {
637         struct ptlrpc_request_set *set;
638         struct ll_inode_info *lli = ll_i2info(inode);
639         struct lov_stripe_md *lsm = lli->lli_smd;
640
641         struct obd_info oinfo = { { { 0 } } };
642         int rc;
643         ENTRY;
644
645         LASSERT(lsm != NULL);
646
647         oinfo.oi_md = lsm;
648         oinfo.oi_oa = obdo;
649         oinfo.oi_oa->o_id = lsm->lsm_object_id;
650         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
651         oinfo.oi_oa->o_mode = S_IFREG;
652         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
653                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
654                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
655                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
656                                OBD_MD_FLGROUP;
657         oinfo.oi_capa = ll_mdscapa_get(inode);
658
659         set = ptlrpc_prep_set();
660         if (set == NULL) {
661                 CERROR("can't allocate ptlrpc set\n");
662                 rc = -ENOMEM;
663         } else {
664                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
665                 if (rc == 0)
666                         rc = ptlrpc_set_wait(set);
667                 ptlrpc_set_destroy(set);
668         }
669         capa_put(oinfo.oi_capa);
670         if (rc)
671                 RETURN(rc);
672
673         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
674                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
675                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
676
677         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
678         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
679                lli->lli_smd->lsm_object_id, i_size_read(inode),
680                inode->i_blocks, inode->i_blksize);
681         RETURN(0);
682 }
683
684 static inline void ll_remove_suid(struct inode *inode)
685 {
686         unsigned int mode;
687
688         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
689         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
690
691         /* was any of the uid bits set? */
692         mode &= inode->i_mode;
693         if (mode && !capable(CAP_FSETID)) {
694                 inode->i_mode &= ~mode;
695                 // XXX careful here - we cannot change the size
696         }
697 }
698
699 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
700 {
701         struct ll_inode_info *lli = ll_i2info(inode);
702         struct lov_stripe_md *lsm = lli->lli_smd;
703         struct obd_export *exp = ll_i2dtexp(inode);
704         struct {
705                 char name[16];
706                 struct ldlm_lock *lock;
707                 struct lov_stripe_md *lsm;
708         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
709         __u32 stripe, vallen = sizeof(stripe);
710         int rc;
711         ENTRY;
712
713         if (lsm->lsm_stripe_count == 1)
714                 GOTO(check, stripe = 0);
715
716         /* get our offset in the lov */
717         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
718         if (rc != 0) {
719                 CERROR("obd_get_info: rc = %d\n", rc);
720                 RETURN(rc);
721         }
722         LASSERT(stripe < lsm->lsm_stripe_count);
723
724 check:
725         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
726             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
727                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
728                            lsm->lsm_oinfo[stripe]->loi_id,
729                            lsm->lsm_oinfo[stripe]->loi_gr);
730                 RETURN(-ELDLM_NO_LOCK_DATA);
731         }
732
733         RETURN(stripe);
734 }
735
736 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
737  * we get a lock cancellation for each stripe, so we have to map the obd's
738  * region back onto the stripes in the file that it held.
739  *
740  * No one can dirty the extent until we've finished our work and they can
741  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
742  * but other kernel actors could have pages locked.
743  *
744  * Called with the DLM lock held. */
745 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
746                               struct ldlm_lock *lock, __u32 stripe)
747 {
748         ldlm_policy_data_t tmpex;
749         unsigned long start, end, count, skip, i, j;
750         struct page *page;
751         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
752         struct lustre_handle lockh;
753         struct address_space *mapping = inode->i_mapping;
754
755         ENTRY;
756         tmpex = lock->l_policy_data;
757         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
758                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
759                i_size_read(inode));
760
761         /* our locks are page granular thanks to osc_enqueue, we invalidate the
762          * whole page. */
763         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
764             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
765                 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
766                            CFS_PAGE_SIZE);
767         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
768         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
769
770         count = ~0;
771         skip = 0;
772         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
773         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
774         if (lsm->lsm_stripe_count > 1) {
775                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
776                 skip = (lsm->lsm_stripe_count - 1) * count;
777                 start += start/count * skip + stripe * count;
778                 if (end != ~0)
779                         end += end/count * skip + stripe * count;
780         }
781         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
782                 end = ~0;
783
784         i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
785             CFS_PAGE_SHIFT : 0;
786         if (i < end)
787                 end = i;
788
789         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
790                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
791                count, skip, end, discard ? " (DISCARDING)" : "");
792
793         /* walk through the vmas on the inode and tear down mmaped pages that
794          * intersect with the lock.  this stops immediately if there are no
795          * mmap()ed regions of the file.  This is not efficient at all and
796          * should be short lived. We'll associate mmap()ed pages with the lock
797          * and will be able to find them directly */
798         for (i = start; i <= end; i += (j + skip)) {
799                 j = min(count - (i % count), end - i + 1);
800                 LASSERT(j > 0);
801                 LASSERT(mapping);
802                 if (ll_teardown_mmaps(mapping,
803                                       (__u64)i << CFS_PAGE_SHIFT,
804                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
805                         break;
806         }
807
808         /* this is the simplistic implementation of page eviction at
809          * cancelation.  It is careful to get races with other page
810          * lockers handled correctly.  fixes from bug 20 will make it
811          * more efficient by associating locks with pages and with
812          * batching writeback under the lock explicitly. */
813         for (i = start, j = start % count; i <= end;
814              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
815                 if (j == count) {
816                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
817                         i += skip;
818                         j = 0;
819                         if (i > end)
820                                 break;
821                 }
822                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
823                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
824                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
825                          start, i, end);
826
827                 if (!mapping_has_pages(mapping)) {
828                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
829                         break;
830                 }
831
832                 cond_resched();
833
834                 page = find_get_page(mapping, i);
835                 if (page == NULL)
836                         continue;
837                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
838                                i, tmpex.l_extent.start);
839                 lock_page(page);
840
841                 /* page->mapping to check with racing against teardown */
842                 if (!discard && clear_page_dirty_for_io(page)) {
843                         rc = ll_call_writepage(inode, page);
844                         /* either waiting for io to complete or reacquiring
845                          * the lock that the failed writepage released */
846                         lock_page(page);
847                         wait_on_page_writeback(page);
848                         if (rc != 0) {
849                                 CERROR("writepage inode %lu(%p) of page %p "
850                                        "failed: %d\n", inode->i_ino, inode,
851                                        page, rc);
852                                 if (rc == -ENOSPC)
853                                         set_bit(AS_ENOSPC, &mapping->flags);
854                                 else
855                                         set_bit(AS_EIO, &mapping->flags);
856                         }
857                 }
858
859                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
860                 /* check to see if another DLM lock covers this page b=2765 */
861                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
862                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
863                                       LDLM_FL_TEST_LOCK,
864                                       &lock->l_resource->lr_name, LDLM_EXTENT,
865                                       &tmpex, LCK_PR | LCK_PW, &lockh);
866
867                 if (rc2 <= 0 && page->mapping != NULL) {
868                         struct ll_async_page *llap = llap_cast_private(page);
869                         /* checking again to account for writeback's
870                          * lock_page() */
871                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
872                         if (llap)
873                                 ll_ra_accounting(llap, mapping);
874                         ll_truncate_complete_page(page);
875                 }
876                 unlock_page(page);
877                 page_cache_release(page);
878         }
879         LASSERTF(tmpex.l_extent.start <=
880                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
881                   lock->l_policy_data.l_extent.end + 1),
882                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
883                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
884                  start, i, end);
885         EXIT;
886 }
887
888 static int ll_extent_lock_callback(struct ldlm_lock *lock,
889                                    struct ldlm_lock_desc *new, void *data,
890                                    int flag)
891 {
892         struct lustre_handle lockh = { 0 };
893         int rc;
894         ENTRY;
895
896         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
897                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
898                 LBUG();
899         }
900
901         switch (flag) {
902         case LDLM_CB_BLOCKING:
903                 ldlm_lock2handle(lock, &lockh);
904                 rc = ldlm_cli_cancel(&lockh);
905                 if (rc != ELDLM_OK)
906                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
907                 break;
908         case LDLM_CB_CANCELING: {
909                 struct inode *inode;
910                 struct ll_inode_info *lli;
911                 struct lov_stripe_md *lsm;
912                 int stripe;
913                 __u64 kms;
914
915                 /* This lock wasn't granted, don't try to evict pages */
916                 if (lock->l_req_mode != lock->l_granted_mode)
917                         RETURN(0);
918
919                 inode = ll_inode_from_lock(lock);
920                 if (inode == NULL)
921                         RETURN(0);
922                 lli = ll_i2info(inode);
923                 if (lli == NULL)
924                         goto iput;
925                 if (lli->lli_smd == NULL)
926                         goto iput;
927                 lsm = lli->lli_smd;
928
929                 stripe = ll_lock_to_stripe_offset(inode, lock);
930                 if (stripe < 0)
931                         goto iput;
932
933                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
934
935                 lov_stripe_lock(lsm);
936                 lock_res_and_lock(lock);
937                 kms = ldlm_extent_shift_kms(lock,
938                                             lsm->lsm_oinfo[stripe]->loi_kms);
939
940                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
941                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
942                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
943                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
944                 unlock_res_and_lock(lock);
945                 lov_stripe_unlock(lsm);
946         iput:
947                 iput(inode);
948                 break;
949         }
950         default:
951                 LBUG();
952         }
953
954         RETURN(0);
955 }
956
957 #if 0
958 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
959 {
960         /* XXX ALLOCATE - 160 bytes */
961         struct inode *inode = ll_inode_from_lock(lock);
962         struct ll_inode_info *lli = ll_i2info(inode);
963         struct lustre_handle lockh = { 0 };
964         struct ost_lvb *lvb;
965         int stripe;
966         ENTRY;
967
968         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
969                      LDLM_FL_BLOCK_CONV)) {
970                 LBUG(); /* not expecting any blocked async locks yet */
971                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
972                            "lock, returning");
973                 ldlm_lock_dump(D_OTHER, lock, 0);
974                 ldlm_reprocess_all(lock->l_resource);
975                 RETURN(0);
976         }
977
978         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
979
980         stripe = ll_lock_to_stripe_offset(inode, lock);
981         if (stripe < 0)
982                 goto iput;
983
984         if (lock->l_lvb_len) {
985                 struct lov_stripe_md *lsm = lli->lli_smd;
986                 __u64 kms;
987                 lvb = lock->l_lvb_data;
988                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
989
990                 lock_res_and_lock(lock);
991                 ll_inode_size_lock(inode, 1);
992                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
993                 kms = ldlm_extent_shift_kms(NULL, kms);
994                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
995                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
996                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
997                 lsm->lsm_oinfo[stripe].loi_kms = kms;
998                 ll_inode_size_unlock(inode, 1);
999                 unlock_res_and_lock(lock);
1000         }
1001
1002 iput:
1003         iput(inode);
1004         wake_up(&lock->l_waitq);
1005
1006         ldlm_lock2handle(lock, &lockh);
1007         ldlm_lock_decref(&lockh, LCK_PR);
1008         RETURN(0);
1009 }
1010 #endif
1011
1012 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
1013 {
1014         struct ptlrpc_request *req = reqp;
1015         struct inode *inode = ll_inode_from_lock(lock);
1016         struct ll_inode_info *lli;
1017         struct lov_stripe_md *lsm;
1018         struct ost_lvb *lvb;
1019         int rc, stripe;
1020         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
1021         ENTRY;
1022
1023         if (inode == NULL)
1024                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
1025         lli = ll_i2info(inode);
1026         if (lli == NULL)
1027                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1028         lsm = lli->lli_smd;
1029         if (lsm == NULL)
1030                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1031
1032         /* First, find out which stripe index this lock corresponds to. */
1033         stripe = ll_lock_to_stripe_offset(inode, lock);
1034         if (stripe < 0)
1035                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1036
1037         rc = lustre_pack_reply(req, 2, size, NULL);
1038         if (rc)
1039                 GOTO(iput, rc);
1040
1041         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
1042         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1043         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1044         lvb->lvb_atime = LTIME_S(inode->i_atime);
1045         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1046
1047         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1048                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1049                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1050                    lvb->lvb_atime, lvb->lvb_ctime);
1051  iput:
1052         iput(inode);
1053
1054  out:
1055         /* These errors are normal races, so we don't want to fill the console
1056          * with messages by calling ptlrpc_error() */
1057         if (rc == -ELDLM_NO_LOCK_DATA)
1058                 lustre_pack_reply(req, 1, NULL, NULL);
1059
1060         req->rq_status = rc;
1061         return rc;
1062 }
1063
1064 static void ll_merge_lvb(struct inode *inode)
1065 {
1066         struct ll_inode_info *lli = ll_i2info(inode);
1067         struct ll_sb_info *sbi = ll_i2sbi(inode);
1068         struct ost_lvb lvb;
1069         ENTRY;
1070
1071         ll_inode_size_lock(inode, 1);
1072         inode_init_lvb(inode, &lvb);
1073         obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1074         i_size_write(inode, lvb.lvb_size);
1075         inode->i_blocks = lvb.lvb_blocks;
1076         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1077         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1078         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1079         ll_inode_size_unlock(inode, 1);
1080         EXIT;
1081 }
1082
1083 int ll_local_size(struct inode *inode)
1084 {
1085         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1086         struct ll_inode_info *lli = ll_i2info(inode);
1087         struct ll_sb_info *sbi = ll_i2sbi(inode);
1088         struct lustre_handle lockh = { 0 };
1089         int flags = 0;
1090         int rc;
1091         ENTRY;
1092
1093         if (lli->lli_smd->lsm_stripe_count == 0)
1094                 RETURN(0);
1095
1096         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1097                        &policy, LCK_PR, &flags, inode, &lockh);
1098         if (rc < 0)
1099                 RETURN(rc);
1100         else if (rc == 0)
1101                 RETURN(-ENODATA);
1102
1103         ll_merge_lvb(inode);
1104         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1105         RETURN(0);
1106 }
1107
1108 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1109                      lstat_t *st)
1110 {
1111         struct lustre_handle lockh = { 0 };
1112         struct ldlm_enqueue_info einfo = { 0 };
1113         struct obd_info oinfo = { { { 0 } } };
1114         struct ost_lvb lvb;
1115         int rc;
1116
1117         ENTRY;
1118
1119         einfo.ei_type = LDLM_EXTENT;
1120         einfo.ei_mode = LCK_PR;
1121         einfo.ei_cb_bl = ll_extent_lock_callback;
1122         einfo.ei_cb_cp = ldlm_completion_ast;
1123         einfo.ei_cb_gl = ll_glimpse_callback;
1124         einfo.ei_cbdata = NULL;
1125
1126         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1127         oinfo.oi_lockh = &lockh;
1128         oinfo.oi_md = lsm;
1129         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1130
1131         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1132         if (rc == -ENOENT)
1133                 RETURN(rc);
1134         if (rc != 0) {
1135                 CERROR("obd_enqueue returned rc %d, "
1136                        "returning -EIO\n", rc);
1137                 RETURN(rc > 0 ? -EIO : rc);
1138         }
1139
1140         lov_stripe_lock(lsm);
1141         memset(&lvb, 0, sizeof(lvb));
1142         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1143         st->st_size = lvb.lvb_size;
1144         st->st_blocks = lvb.lvb_blocks;
1145         st->st_mtime = lvb.lvb_mtime;
1146         st->st_atime = lvb.lvb_atime;
1147         st->st_ctime = lvb.lvb_ctime;
1148         lov_stripe_unlock(lsm);
1149
1150         RETURN(rc);
1151 }
1152
1153 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1154  * file (because it prefers KMS over RSS when larger) */
1155 int ll_glimpse_size(struct inode *inode, int ast_flags)
1156 {
1157         struct ll_inode_info *lli = ll_i2info(inode);
1158         struct ll_sb_info *sbi = ll_i2sbi(inode);
1159         struct lustre_handle lockh = { 0 };
1160         struct ldlm_enqueue_info einfo = { 0 };
1161         struct obd_info oinfo = { { { 0 } } };
1162         int rc;
1163         ENTRY;
1164
1165         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1166                 RETURN(0);
1167
1168         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1169
1170         if (!lli->lli_smd) {
1171                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1172                 RETURN(0);
1173         }
1174
1175         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1176          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1177          *       won't revoke any conflicting DLM locks held. Instead,
1178          *       ll_glimpse_callback() will be called on each client
1179          *       holding a DLM lock against this file, and resulting size
1180          *       will be returned for each stripe. DLM lock on [0, EOF] is
1181          *       acquired only if there were no conflicting locks. */
1182         einfo.ei_type = LDLM_EXTENT;
1183         einfo.ei_mode = LCK_PR;
1184         einfo.ei_cb_bl = ll_extent_lock_callback;
1185         einfo.ei_cb_cp = ldlm_completion_ast;
1186         einfo.ei_cb_gl = ll_glimpse_callback;
1187         einfo.ei_cbdata = inode;
1188
1189         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1190         oinfo.oi_lockh = &lockh;
1191         oinfo.oi_md = lli->lli_smd;
1192         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1193
1194         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1195         if (rc == -ENOENT)
1196                 RETURN(rc);
1197         if (rc != 0) {
1198                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1199                 RETURN(rc > 0 ? -EIO : rc);
1200         }
1201
1202         ll_merge_lvb(inode);
1203
1204         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1205                i_size_read(inode), inode->i_blocks);
1206
1207         RETURN(rc);
1208 }
1209
1210 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1211                    struct lov_stripe_md *lsm, int mode,
1212                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1213                    int ast_flags)
1214 {
1215         struct ll_sb_info *sbi = ll_i2sbi(inode);
1216         struct ost_lvb lvb;
1217         struct ldlm_enqueue_info einfo = { 0 };
1218         struct obd_info oinfo = { { { 0 } } };
1219         int rc;
1220         ENTRY;
1221
1222         LASSERT(!lustre_handle_is_used(lockh));
1223         LASSERT(lsm != NULL);
1224
1225         /* don't drop the mmapped file to LRU */
1226         if (mapping_mapped(inode->i_mapping))
1227                 ast_flags |= LDLM_FL_NO_LRU;
1228
1229         /* XXX phil: can we do this?  won't it screw the file size up? */
1230         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1231             (sbi->ll_flags & LL_SBI_NOLCK))
1232                 RETURN(0);
1233
1234         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1235                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1236
1237         einfo.ei_type = LDLM_EXTENT;
1238         einfo.ei_mode = mode;
1239         einfo.ei_cb_bl = ll_extent_lock_callback;
1240         einfo.ei_cb_cp = ldlm_completion_ast;
1241         einfo.ei_cb_gl = ll_glimpse_callback;
1242         einfo.ei_cbdata = inode;
1243
1244         oinfo.oi_policy = *policy;
1245         oinfo.oi_lockh = lockh;
1246         oinfo.oi_md = lsm;
1247         oinfo.oi_flags = ast_flags;
1248
1249         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1250         *policy = oinfo.oi_policy;
1251         if (rc > 0)
1252                 rc = -EIO;
1253
1254         ll_inode_size_lock(inode, 1);
1255         inode_init_lvb(inode, &lvb);
1256         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1257
1258         if (policy->l_extent.start == 0 &&
1259             policy->l_extent.end == OBD_OBJECT_EOF) {
1260                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1261                  * the kms under both a DLM lock and the
1262                  * ll_inode_size_lock().  If we don't get the
1263                  * ll_inode_size_lock() here we can match the DLM lock and
1264                  * reset i_size from the kms before the truncating path has
1265                  * updated the kms.  generic_file_write can then trust the
1266                  * stale i_size when doing appending writes and effectively
1267                  * cancel the result of the truncate.  Getting the
1268                  * ll_inode_size_lock() after the enqueue maintains the DLM
1269                  * -> ll_inode_size_lock() acquiring order. */
1270                 i_size_write(inode, lvb.lvb_size);
1271                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1272                        inode->i_ino, i_size_read(inode));
1273         }
1274
1275         if (rc == 0) {
1276                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1277                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1278                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1279         }
1280         ll_inode_size_unlock(inode, 1);
1281
1282         RETURN(rc);
1283 }
1284
1285 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1286                      struct lov_stripe_md *lsm, int mode,
1287                      struct lustre_handle *lockh)
1288 {
1289         struct ll_sb_info *sbi = ll_i2sbi(inode);
1290         int rc;
1291         ENTRY;
1292
1293         /* XXX phil: can we do this?  won't it screw the file size up? */
1294         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1295             (sbi->ll_flags & LL_SBI_NOLCK))
1296                 RETURN(0);
1297
1298         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1299
1300         RETURN(rc);
1301 }
1302
1303 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1304                             loff_t *ppos)
1305 {
1306         struct inode *inode = file->f_dentry->d_inode;
1307         struct ll_inode_info *lli = ll_i2info(inode);
1308         struct lov_stripe_md *lsm = lli->lli_smd;
1309         struct ll_sb_info *sbi = ll_i2sbi(inode);
1310         struct ll_lock_tree tree;
1311         struct ll_lock_tree_node *node;
1312         struct ost_lvb lvb;
1313         struct ll_ra_read bead;
1314         int rc, ra = 0;
1315         loff_t end;
1316         ssize_t retval, chunk, sum = 0;
1317
1318         __u64 kms;
1319         ENTRY;
1320         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1321                inode->i_ino, inode->i_generation, inode, count, *ppos);
1322         /* "If nbyte is 0, read() will return 0 and have no other results."
1323          *                      -- Single Unix Spec */
1324         if (count == 0)
1325                 RETURN(0);
1326
1327         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1328
1329         if (!lsm) {
1330                 /* Read on file with no objects should return zero-filled
1331                  * buffers up to file size (we can get non-zero sizes with
1332                  * mknod + truncate, then opening file for read. This is a
1333                  * common pattern in NFS case, it seems). Bug 6243 */
1334                 int notzeroed;
1335                 /* Since there are no objects on OSTs, we have nothing to get
1336                  * lock on and so we are forced to access inode->i_size
1337                  * unguarded */
1338
1339                 /* Read beyond end of file */
1340                 if (*ppos >= i_size_read(inode))
1341                         RETURN(0);
1342
1343                 if (count > i_size_read(inode) - *ppos)
1344                         count = i_size_read(inode) - *ppos;
1345                 /* Make sure to correctly adjust the file pos pointer for
1346                  * EFAULT case */
1347                 notzeroed = clear_user(buf, count);
1348                 count -= notzeroed;
1349                 *ppos += count;
1350                 if (!count)
1351                         RETURN(-EFAULT);
1352                 RETURN(count);
1353         }
1354
1355 repeat:
1356         if (sbi->ll_max_rw_chunk != 0) {
1357                 /* first, let's know the end of the current stripe */
1358                 end = *ppos;
1359                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
1360                                 (obd_off *)&end);
1361
1362                 /* correct, the end is beyond the request */
1363                 if (end > *ppos + count - 1)
1364                         end = *ppos + count - 1;
1365
1366                 /* and chunk shouldn't be too large even if striping is wide */
1367                 if (end - *ppos > sbi->ll_max_rw_chunk)
1368                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1369         } else {
1370                 end = *ppos + count - 1;
1371         }
1372
1373         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1374         if (IS_ERR(node)){
1375                 GOTO(out, retval = PTR_ERR(node));
1376         }
1377
1378         tree.lt_fd = LUSTRE_FPRIVATE(file);
1379         rc = ll_tree_lock(&tree, node, buf, count,
1380                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1381         if (rc != 0)
1382                 GOTO(out, retval = rc);
1383
1384         ll_inode_size_lock(inode, 1);
1385         /*
1386          * Consistency guarantees: following possibilities exist for the
1387          * relation between region being read and real file size at this
1388          * moment:
1389          *
1390          *  (A): the region is completely inside of the file;
1391          *
1392          *  (B-x): x bytes of region are inside of the file, the rest is
1393          *  outside;
1394          *
1395          *  (C): the region is completely outside of the file.
1396          *
1397          * This classification is stable under DLM lock acquired by
1398          * ll_tree_lock() above, because to change class, other client has to
1399          * take DLM lock conflicting with our lock. Also, any updates to
1400          * ->i_size by other threads on this client are serialized by
1401          * ll_inode_size_lock(). This guarantees that short reads are handled
1402          * correctly in the face of concurrent writes and truncates.
1403          */
1404         inode_init_lvb(inode, &lvb);
1405         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1406         kms = lvb.lvb_size;
1407         if (*ppos + count - 1 > kms) {
1408                 /* A glimpse is necessary to determine whether we return a
1409                  * short read (B) or some zeroes at the end of the buffer (C) */
1410                 ll_inode_size_unlock(inode, 1);
1411                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1412                 if (retval) {
1413                         ll_tree_unlock(&tree);
1414                         goto out;
1415                 }
1416         } else {
1417                 /* region is within kms and, hence, within real file size (A).
1418                  * We need to increase i_size to cover the read region so that
1419                  * generic_file_read() will do its job, but that doesn't mean
1420                  * the kms size is _correct_, it is only the _minimum_ size.
1421                  * If someone does a stat they will get the correct size which
1422                  * will always be >= the kms value here.  b=11081 */
1423                 if (i_size_read(inode) < kms)
1424                         i_size_write(inode, kms);
1425                 ll_inode_size_unlock(inode, 1);
1426         }
1427
1428         chunk = end - *ppos + 1;
1429         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1430                inode->i_ino, chunk, *ppos, i_size_read(inode));
1431
1432         /* turn off the kernel's read-ahead */
1433         file->f_ra.ra_pages = 0;
1434
1435         /* initialize read-ahead window once per syscall */
1436         if (ra == 0) {
1437                 ra = 1;
1438                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1439                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1440                 ll_ra_read_in(file, &bead);
1441         }
1442
1443         /* BUG: 5972 */
1444         file_accessed(file);
1445         retval = generic_file_read(file, buf, chunk, ppos);
1446         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1447
1448         ll_tree_unlock(&tree);
1449
1450         if (retval > 0) {
1451                 buf += retval;
1452                 count -= retval;
1453                 sum += retval;
1454                 if (retval == chunk && count > 0)
1455                         goto repeat;
1456         }
1457
1458  out:
1459         if (ra != 0)
1460                 ll_ra_read_ex(file, &bead);
1461         retval = (sum > 0) ? sum : retval;
1462         RETURN(retval);
1463 }
1464
1465 /*
1466  * Write to a file (through the page cache).
1467  */
1468 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1469                              loff_t *ppos)
1470 {
1471         struct inode *inode = file->f_dentry->d_inode;
1472         struct ll_sb_info *sbi = ll_i2sbi(inode);
1473         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1474         struct ll_lock_tree tree;
1475         struct ll_lock_tree_node *node;
1476         loff_t maxbytes = ll_file_maxbytes(inode);
1477         loff_t lock_start, lock_end, end;
1478         ssize_t retval, chunk, sum = 0;
1479         int rc;
1480         ENTRY;
1481
1482         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1483                inode->i_ino, inode->i_generation, inode, count, *ppos);
1484
1485         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1486
1487         /* POSIX, but surprised the VFS doesn't check this already */
1488         if (count == 0)
1489                 RETURN(0);
1490
1491         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1492          * called on the file, don't fail the below assertion (bug 2388). */
1493         if (file->f_flags & O_LOV_DELAY_CREATE &&
1494             ll_i2info(inode)->lli_smd == NULL)
1495                 RETURN(-EBADF);
1496
1497         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1498
1499         down(&ll_i2info(inode)->lli_write_sem);
1500
1501 repeat:
1502         chunk = 0; /* just to fix gcc's warning */
1503         end = *ppos + count - 1;
1504
1505         if (file->f_flags & O_APPEND) {
1506                 lock_start = 0;
1507                 lock_end = OBD_OBJECT_EOF;
1508         } else if (sbi->ll_max_rw_chunk != 0) {
1509                 /* first, let's know the end of the current stripe */
1510                 end = *ppos;
1511                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1512                                 (obd_off *)&end);
1513
1514                 /* correct, the end is beyond the request */
1515                 if (end > *ppos + count - 1)
1516                         end = *ppos + count - 1;
1517
1518                 /* and chunk shouldn't be too large even if striping is wide */
1519                 if (end - *ppos > sbi->ll_max_rw_chunk)
1520                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1521                 lock_start = *ppos;
1522                 lock_end = end;
1523         } else {
1524                 lock_start = *ppos;
1525                 lock_end = *ppos + count - 1;
1526         }
1527         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1528
1529         if (IS_ERR(node))
1530                 GOTO(out, retval = PTR_ERR(node));
1531
1532         tree.lt_fd = LUSTRE_FPRIVATE(file);
1533         rc = ll_tree_lock(&tree, node, buf, count,
1534                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1535         if (rc != 0)
1536                 GOTO(out, retval = rc);
1537
1538         /* This is ok, g_f_w will overwrite this under i_sem if it races
1539          * with a local truncate, it just makes our maxbyte checking easier.
1540          * The i_size value gets updated in ll_extent_lock() as a consequence
1541          * of the [0,EOF] extent lock we requested above. */
1542         if (file->f_flags & O_APPEND) {
1543                 *ppos = i_size_read(inode);
1544                 end = *ppos + count - 1;
1545         }
1546
1547         if (*ppos >= maxbytes) {
1548                 send_sig(SIGXFSZ, current, 0);
1549                 GOTO(out_unlock, retval = -EFBIG);
1550         }
1551         if (*ppos + count > maxbytes)
1552                 count = maxbytes - *ppos;
1553
1554         /* generic_file_write handles O_APPEND after getting i_mutex */
1555         chunk = end - *ppos + 1;
1556         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1557                inode->i_ino, chunk, *ppos);
1558         retval = generic_file_write(file, buf, chunk, ppos);
1559         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1560
1561 out_unlock:
1562         ll_tree_unlock(&tree);
1563
1564 out:
1565         if (retval > 0) {
1566                 buf += retval;
1567                 count -= retval;
1568                 sum += retval;
1569                 if (retval == chunk && count > 0)
1570                         goto repeat;
1571         }
1572
1573         up(&ll_i2info(inode)->lli_write_sem);
1574
1575         retval = (sum > 0) ? sum : retval;
1576         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1577                            retval > 0 ? retval : 0);
1578         RETURN(retval);
1579 }
1580
1581 /*
1582  * Send file content (through pagecache) somewhere with helper
1583  */
1584 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1585                                 read_actor_t actor, void *target)
1586 {
1587         struct inode *inode = in_file->f_dentry->d_inode;
1588         struct ll_inode_info *lli = ll_i2info(inode);
1589         struct lov_stripe_md *lsm = lli->lli_smd;
1590         struct ll_lock_tree tree;
1591         struct ll_lock_tree_node *node;
1592         struct ost_lvb lvb;
1593         struct ll_ra_read bead;
1594         int rc;
1595         ssize_t retval;
1596         __u64 kms;
1597         ENTRY;
1598         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1599                inode->i_ino, inode->i_generation, inode, count, *ppos);
1600
1601         /* "If nbyte is 0, read() will return 0 and have no other results."
1602          *                      -- Single Unix Spec */
1603         if (count == 0)
1604                 RETURN(0);
1605
1606         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1607         /* turn off the kernel's read-ahead */
1608         in_file->f_ra.ra_pages = 0;
1609
1610         /* File with no objects, nothing to lock */
1611         if (!lsm)
1612                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1613
1614         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1615         if (IS_ERR(node))
1616                 RETURN(PTR_ERR(node));
1617
1618         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1619         rc = ll_tree_lock(&tree, node, NULL, count,
1620                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1621         if (rc != 0)
1622                 RETURN(rc);
1623
1624         ll_inode_size_lock(inode, 1);
1625         /*
1626          * Consistency guarantees: following possibilities exist for the
1627          * relation between region being read and real file size at this
1628          * moment:
1629          *
1630          *  (A): the region is completely inside of the file;
1631          *
1632          *  (B-x): x bytes of region are inside of the file, the rest is
1633          *  outside;
1634          *
1635          *  (C): the region is completely outside of the file.
1636          *
1637          * This classification is stable under DLM lock acquired by
1638          * ll_tree_lock() above, because to change class, other client has to
1639          * take DLM lock conflicting with our lock. Also, any updates to
1640          * ->i_size by other threads on this client are serialized by
1641          * ll_inode_size_lock(). This guarantees that short reads are handled
1642          * correctly in the face of concurrent writes and truncates.
1643          */
1644         inode_init_lvb(inode, &lvb);
1645         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1646         kms = lvb.lvb_size;
1647         if (*ppos + count - 1 > kms) {
1648                 /* A glimpse is necessary to determine whether we return a
1649                  * short read (B) or some zeroes at the end of the buffer (C) */
1650                 ll_inode_size_unlock(inode, 1);
1651                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1652                 if (retval)
1653                         goto out;
1654         } else {
1655                 /* region is within kms and, hence, within real file size (A) */
1656                 i_size_write(inode, kms);
1657                 ll_inode_size_unlock(inode, 1);
1658         }
1659
1660         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1661                inode->i_ino, count, *ppos, i_size_read(inode));
1662
1663         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1664         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1665         ll_ra_read_in(in_file, &bead);
1666         /* BUG: 5972 */
1667         file_accessed(in_file);
1668         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1669         ll_ra_read_ex(in_file, &bead);
1670
1671  out:
1672         ll_tree_unlock(&tree);
1673         RETURN(retval);
1674 }
1675
1676 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1677                                unsigned long arg)
1678 {
1679         struct ll_inode_info *lli = ll_i2info(inode);
1680         struct obd_export *exp = ll_i2dtexp(inode);
1681         struct ll_recreate_obj ucreatp;
1682         struct obd_trans_info oti = { 0 };
1683         struct obdo *oa = NULL;
1684         int lsm_size;
1685         int rc = 0;
1686         struct lov_stripe_md *lsm, *lsm2;
1687         ENTRY;
1688
1689         if (!capable (CAP_SYS_ADMIN))
1690                 RETURN(-EPERM);
1691
1692         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1693                             sizeof(struct ll_recreate_obj));
1694         if (rc) {
1695                 RETURN(-EFAULT);
1696         }
1697         OBDO_ALLOC(oa);
1698         if (oa == NULL)
1699                 RETURN(-ENOMEM);
1700
1701         down(&lli->lli_size_sem);
1702         lsm = lli->lli_smd;
1703         if (lsm == NULL)
1704                 GOTO(out, rc = -ENOENT);
1705         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1706                    (lsm->lsm_stripe_count));
1707
1708         OBD_ALLOC(lsm2, lsm_size);
1709         if (lsm2 == NULL)
1710                 GOTO(out, rc = -ENOMEM);
1711
1712         oa->o_id = ucreatp.lrc_id;
1713         oa->o_gr = ucreatp.lrc_group;
1714         oa->o_nlink = ucreatp.lrc_ost_idx;
1715         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1716         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1717         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1718                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1719
1720         memcpy(lsm2, lsm, lsm_size);
1721         rc = obd_create(exp, oa, &lsm2, &oti);
1722
1723         OBD_FREE(lsm2, lsm_size);
1724         GOTO(out, rc);
1725 out:
1726         up(&lli->lli_size_sem);
1727         OBDO_FREE(oa);
1728         return rc;
1729 }
1730
1731 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1732                              int flags, struct lov_user_md *lum, int lum_size)
1733 {
1734         struct ll_inode_info *lli = ll_i2info(inode);
1735         struct lov_stripe_md *lsm;
1736         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1737         int rc = 0;
1738         ENTRY;
1739
1740         down(&lli->lli_size_sem);
1741         lsm = lli->lli_smd;
1742         if (lsm) {
1743                 up(&lli->lli_size_sem);
1744                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1745                        inode->i_ino);
1746                 RETURN(-EEXIST);
1747         }
1748
1749         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1750         if (rc)
1751                 GOTO(out, rc);
1752         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1753                 GOTO(out_req_free, rc = -ENOENT);
1754         rc = oit.d.lustre.it_status;
1755         if (rc < 0)
1756                 GOTO(out_req_free, rc);
1757
1758         ll_release_openhandle(file->f_dentry, &oit);
1759
1760  out:
1761         up(&lli->lli_size_sem);
1762         ll_intent_release(&oit);
1763         RETURN(rc);
1764 out_req_free:
1765         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1766         goto out;
1767 }
1768
1769 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1770                              struct lov_mds_md **lmmp, int *lmm_size, 
1771                              struct ptlrpc_request **request)
1772 {
1773         struct ll_sb_info *sbi = ll_i2sbi(inode);
1774         struct mdt_body  *body;
1775         struct lov_mds_md *lmm = NULL;
1776         struct ptlrpc_request *req = NULL;
1777         struct obd_capa *oc;
1778         int rc, lmmsize;
1779
1780         rc = ll_get_max_mdsize(sbi, &lmmsize);
1781         if (rc)
1782                 RETURN(rc);
1783
1784         oc = ll_mdscapa_get(inode);
1785         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1786                              oc, filename, strlen(filename) + 1,
1787                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize, &req);
1788         capa_put(oc);
1789         if (rc < 0) {
1790                 CDEBUG(D_INFO, "md_getattr_name failed "
1791                        "on %s: rc %d\n", filename, rc);
1792                 GOTO(out, rc);
1793         }
1794
1795         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
1796         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1797         /* swabbed by mdc_getattr_name */
1798         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF));
1799
1800         lmmsize = body->eadatasize;
1801
1802         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1803                         lmmsize == 0) {
1804                 GOTO(out, rc = -ENODATA);
1805         }
1806
1807         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, lmmsize);
1808         LASSERT(lmm != NULL);
1809         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF + 1));
1810
1811         /*
1812          * This is coming from the MDS, so is probably in
1813          * little endian.  We convert it to host endian before
1814          * passing it to userspace.
1815          */
1816         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1817                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1818                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1819         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1820                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1821         }
1822
1823         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1824                 struct lov_stripe_md *lsm;
1825                 struct lov_user_md_join *lmj;
1826                 int lmj_size, i, aindex = 0;
1827
1828                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1829                 if (rc < 0)
1830                         GOTO(out, rc = -ENOMEM);
1831                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1832                 if (rc)
1833                         GOTO(out_free_memmd, rc);
1834
1835                 lmj_size = sizeof(struct lov_user_md_join) +
1836                            lsm->lsm_stripe_count *
1837                            sizeof(struct lov_user_ost_data_join);
1838                 OBD_ALLOC(lmj, lmj_size);
1839                 if (!lmj)
1840                         GOTO(out_free_memmd, rc = -ENOMEM);
1841
1842                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1843                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1844                         struct lov_extent *lex =
1845                                 &lsm->lsm_array->lai_ext_array[aindex];
1846
1847                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1848                                 aindex ++;
1849                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1850                                         LPU64" len %d\n", aindex, i,
1851                                         lex->le_start, (int)lex->le_len);
1852                         lmj->lmm_objects[i].l_extent_start =
1853                                 lex->le_start;
1854
1855                         if ((int)lex->le_len == -1)
1856                                 lmj->lmm_objects[i].l_extent_end = -1;
1857                         else
1858                                 lmj->lmm_objects[i].l_extent_end =
1859                                         lex->le_start + lex->le_len;
1860                         lmj->lmm_objects[i].l_object_id =
1861                                 lsm->lsm_oinfo[i]->loi_id;
1862                         lmj->lmm_objects[i].l_object_gr =
1863                                 lsm->lsm_oinfo[i]->loi_gr;
1864                         lmj->lmm_objects[i].l_ost_gen =
1865                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1866                         lmj->lmm_objects[i].l_ost_idx =
1867                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1868                 }
1869                 lmm = (struct lov_mds_md *)lmj;
1870                 lmmsize = lmj_size;
1871 out_free_memmd:
1872                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
1873         }
1874 out:
1875         *lmmp = lmm;
1876         *lmm_size = lmmsize;
1877         *request = req;
1878         return rc;
1879 }
1880
1881 static int ll_lov_setea(struct inode *inode, struct file *file,
1882                             unsigned long arg)
1883 {
1884         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1885         struct lov_user_md  *lump;
1886         int lum_size = sizeof(struct lov_user_md) +
1887                        sizeof(struct lov_user_ost_data);
1888         int rc;
1889         ENTRY;
1890
1891         if (!capable (CAP_SYS_ADMIN))
1892                 RETURN(-EPERM);
1893
1894         OBD_ALLOC(lump, lum_size);
1895         if (lump == NULL) {
1896                 RETURN(-ENOMEM);
1897         }
1898         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1899         if (rc) {
1900                 OBD_FREE(lump, lum_size);
1901                 RETURN(-EFAULT);
1902         }
1903
1904         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1905
1906         OBD_FREE(lump, lum_size);
1907         RETURN(rc);
1908 }
1909
1910 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1911                             unsigned long arg)
1912 {
1913         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1914         int rc;
1915         int flags = FMODE_WRITE;
1916         ENTRY;
1917
1918         /* Bug 1152: copy properly when this is no longer true */
1919         LASSERT(sizeof(lum) == sizeof(*lump));
1920         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1921         rc = copy_from_user(&lum, lump, sizeof(lum));
1922         if (rc)
1923                 RETURN(-EFAULT);
1924
1925         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1926         if (rc == 0) {
1927                  put_user(0, &lump->lmm_stripe_count);
1928                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1929                                     0, ll_i2info(inode)->lli_smd, lump);
1930         }
1931         RETURN(rc);
1932 }
1933
1934 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1935 {
1936         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1937
1938         if (!lsm)
1939                 RETURN(-ENODATA);
1940
1941         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1942                             (void *)arg);
1943 }
1944
1945 static int ll_get_grouplock(struct inode *inode, struct file *file,
1946                             unsigned long arg)
1947 {
1948         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1949         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1950                                                     .end = OBD_OBJECT_EOF}};
1951         struct lustre_handle lockh = { 0 };
1952         struct ll_inode_info *lli = ll_i2info(inode);
1953         struct lov_stripe_md *lsm = lli->lli_smd;
1954         int flags = 0, rc;
1955         ENTRY;
1956
1957         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1958                 RETURN(-EINVAL);
1959         }
1960
1961         policy.l_extent.gid = arg;
1962         if (file->f_flags & O_NONBLOCK)
1963                 flags = LDLM_FL_BLOCK_NOWAIT;
1964
1965         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1966         if (rc)
1967                 RETURN(rc);
1968
1969         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1970         fd->fd_gid = arg;
1971         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1972
1973         RETURN(0);
1974 }
1975
1976 static int ll_put_grouplock(struct inode *inode, struct file *file,
1977                             unsigned long arg)
1978 {
1979         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1980         struct ll_inode_info *lli = ll_i2info(inode);
1981         struct lov_stripe_md *lsm = lli->lli_smd;
1982         int rc;
1983         ENTRY;
1984
1985         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1986                 /* Ugh, it's already unlocked. */
1987                 RETURN(-EINVAL);
1988         }
1989
1990         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1991                 RETURN(-EINVAL);
1992
1993         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1994
1995         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1996         if (rc)
1997                 RETURN(rc);
1998
1999         fd->fd_gid = 0;
2000         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2001
2002         RETURN(0);
2003 }
2004
2005 static int join_sanity_check(struct inode *head, struct inode *tail)
2006 {
2007         ENTRY;
2008         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2009                 CERROR("server do not support join \n");
2010                 RETURN(-EINVAL);
2011         }
2012         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2013                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2014                        head->i_ino, tail->i_ino);
2015                 RETURN(-EINVAL);
2016         }
2017         if (head->i_ino == tail->i_ino) {
2018                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2019                 RETURN(-EINVAL);
2020         }
2021         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2022                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2023                 RETURN(-EINVAL);
2024         }
2025         RETURN(0);
2026 }
2027
2028 static int join_file(struct inode *head_inode, struct file *head_filp,
2029                      struct file *tail_filp)
2030 {
2031         struct dentry *tail_dentry = tail_filp->f_dentry;
2032         struct lookup_intent oit = {.it_op = IT_OPEN,
2033                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2034         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2035                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2036
2037         struct lustre_handle lockh;
2038         struct md_op_data *op_data;
2039         int    rc;
2040         loff_t data;
2041         ENTRY;
2042
2043         tail_dentry = tail_filp->f_dentry;
2044
2045         data = i_size_read(head_inode);
2046         op_data = ll_prep_md_op_data(NULL, head_inode,
2047                                      tail_dentry->d_parent->d_inode,
2048                                      tail_dentry->d_name.name,
2049                                      tail_dentry->d_name.len, 0,
2050                                      LUSTRE_OPC_ANY, &data);
2051         if (IS_ERR(op_data))
2052                 RETURN(PTR_ERR(op_data));
2053
2054         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2055                          op_data, &lockh, NULL, 0, 0);
2056
2057         ll_finish_md_op_data(op_data);
2058         if (rc < 0)
2059                 GOTO(out, rc);
2060
2061         rc = oit.d.lustre.it_status;
2062
2063         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2064                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2065                 ptlrpc_req_finished((struct ptlrpc_request *)
2066                                     oit.d.lustre.it_data);
2067                 GOTO(out, rc);
2068         }
2069
2070         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2071                                            * away */
2072                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2073                 oit.d.lustre.it_lock_mode = 0;
2074         }
2075         ll_release_openhandle(head_filp->f_dentry, &oit);
2076 out:
2077         ll_intent_release(&oit);
2078         RETURN(rc);
2079 }
2080
2081 static int ll_file_join(struct inode *head, struct file *filp,
2082                         char *filename_tail)
2083 {
2084         struct inode *tail = NULL, *first = NULL, *second = NULL;
2085         struct dentry *tail_dentry;
2086         struct file *tail_filp, *first_filp, *second_filp;
2087         struct ll_lock_tree first_tree, second_tree;
2088         struct ll_lock_tree_node *first_node, *second_node;
2089         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2090         int rc = 0, cleanup_phase = 0;
2091         ENTRY;
2092
2093         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2094                head->i_ino, head->i_generation, head, filename_tail);
2095
2096         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2097         if (IS_ERR(tail_filp)) {
2098                 CERROR("Can not open tail file %s", filename_tail);
2099                 rc = PTR_ERR(tail_filp);
2100                 GOTO(cleanup, rc);
2101         }
2102         tail = igrab(tail_filp->f_dentry->d_inode);
2103
2104         tlli = ll_i2info(tail);
2105         tail_dentry = tail_filp->f_dentry;
2106         LASSERT(tail_dentry);
2107         cleanup_phase = 1;
2108
2109         /*reorder the inode for lock sequence*/
2110         first = head->i_ino > tail->i_ino ? head : tail;
2111         second = head->i_ino > tail->i_ino ? tail : head;
2112         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2113         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2114
2115         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2116                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2117         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2118         if (IS_ERR(first_node)){
2119                 rc = PTR_ERR(first_node);
2120                 GOTO(cleanup, rc);
2121         }
2122         first_tree.lt_fd = first_filp->private_data;
2123         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2124         if (rc != 0)
2125                 GOTO(cleanup, rc);
2126         cleanup_phase = 2;
2127
2128         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2129         if (IS_ERR(second_node)){
2130                 rc = PTR_ERR(second_node);
2131                 GOTO(cleanup, rc);
2132         }
2133         second_tree.lt_fd = second_filp->private_data;
2134         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2135         if (rc != 0)
2136                 GOTO(cleanup, rc);
2137         cleanup_phase = 3;
2138
2139         rc = join_sanity_check(head, tail);
2140         if (rc)
2141                 GOTO(cleanup, rc);
2142
2143         rc = join_file(head, filp, tail_filp);
2144         if (rc)
2145                 GOTO(cleanup, rc);
2146 cleanup:
2147         switch (cleanup_phase) {
2148         case 3:
2149                 ll_tree_unlock(&second_tree);
2150                 obd_cancel_unused(ll_i2dtexp(second),
2151                                   ll_i2info(second)->lli_smd, 0, NULL);
2152         case 2:
2153                 ll_tree_unlock(&first_tree);
2154                 obd_cancel_unused(ll_i2dtexp(first),
2155                                   ll_i2info(first)->lli_smd, 0, NULL);
2156         case 1:
2157                 filp_close(tail_filp, 0);
2158                 if (tail)
2159                         iput(tail);
2160                 if (head && rc == 0) {
2161                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2162                                        &hlli->lli_smd);
2163                         hlli->lli_smd = NULL;
2164                 }
2165         case 0:
2166                 break;
2167         default:
2168                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2169                 LBUG();
2170         }
2171         RETURN(rc);
2172 }
2173
2174 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2175 {
2176         struct inode *inode = dentry->d_inode;
2177         struct obd_client_handle *och;
2178         int rc;
2179         ENTRY;
2180
2181         LASSERT(inode);
2182
2183         /* Root ? Do nothing. */
2184         if (dentry->d_inode->i_sb->s_root == dentry)
2185                 RETURN(0);
2186
2187         /* No open handle to close? Move away */
2188         if (!it_disposition(it, DISP_OPEN_OPEN))
2189                 RETURN(0);
2190
2191         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2192
2193         OBD_ALLOC(och, sizeof(*och));
2194         if (!och)
2195                 GOTO(out, rc = -ENOMEM);
2196
2197         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2198                     ll_i2info(inode), it, och);
2199
2200         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2201                                        inode, och);
2202  out:
2203         /* this one is in place of ll_file_open */
2204         ptlrpc_req_finished(it->d.lustre.it_data);
2205         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2206         RETURN(rc);
2207 }
2208
2209 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2210                   unsigned long arg)
2211 {
2212         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2213         int flags;
2214         ENTRY;
2215
2216         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2217                inode->i_generation, inode, cmd);
2218         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2219
2220         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2221         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2222                 RETURN(-ENOTTY);
2223
2224         switch(cmd) {
2225         case LL_IOC_GETFLAGS:
2226                 /* Get the current value of the file flags */
2227                 return put_user(fd->fd_flags, (int *)arg);
2228         case LL_IOC_SETFLAGS:
2229         case LL_IOC_CLRFLAGS:
2230                 /* Set or clear specific file flags */
2231                 /* XXX This probably needs checks to ensure the flags are
2232                  *     not abused, and to handle any flag side effects.
2233                  */
2234                 if (get_user(flags, (int *) arg))
2235                         RETURN(-EFAULT);
2236
2237                 if (cmd == LL_IOC_SETFLAGS) {
2238                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2239                             !(file->f_flags & O_DIRECT)) {
2240                                 CERROR("%s: unable to disable locking on "
2241                                        "non-O_DIRECT file\n", current->comm);
2242                                 RETURN(-EINVAL);
2243                         }
2244
2245                         fd->fd_flags |= flags;
2246                 } else {
2247                         fd->fd_flags &= ~flags;
2248                 }
2249                 RETURN(0);
2250         case LL_IOC_LOV_SETSTRIPE:
2251                 RETURN(ll_lov_setstripe(inode, file, arg));
2252         case LL_IOC_LOV_SETEA:
2253                 RETURN(ll_lov_setea(inode, file, arg));
2254         case LL_IOC_LOV_GETSTRIPE:
2255                 RETURN(ll_lov_getstripe(inode, arg));
2256         case LL_IOC_RECREATE_OBJ:
2257                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2258         case EXT3_IOC_GETFLAGS:
2259         case EXT3_IOC_SETFLAGS:
2260                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2261         case EXT3_IOC_GETVERSION_OLD:
2262         case EXT3_IOC_GETVERSION:
2263                 RETURN(put_user(inode->i_generation, (int *)arg));
2264         case LL_IOC_JOIN: {
2265                 char *ftail;
2266                 int rc;
2267
2268                 ftail = getname((const char *)arg);
2269                 if (IS_ERR(ftail))
2270                         RETURN(PTR_ERR(ftail));
2271                 rc = ll_file_join(inode, file, ftail);
2272                 putname(ftail);
2273                 RETURN(rc);
2274         }
2275         case LL_IOC_GROUP_LOCK:
2276                 RETURN(ll_get_grouplock(inode, file, arg));
2277         case LL_IOC_GROUP_UNLOCK:
2278                 RETURN(ll_put_grouplock(inode, file, arg));
2279         case IOC_OBD_STATFS:
2280                 RETURN(ll_obd_statfs(inode, (void *)arg));
2281
2282         /* We need to special case any other ioctls we want to handle,
2283          * to send them to the MDS/OST as appropriate and to properly
2284          * network encode the arg field.
2285         case EXT3_IOC_SETVERSION_OLD:
2286         case EXT3_IOC_SETVERSION:
2287         */
2288         case LL_IOC_FLUSHCTX:
2289                 RETURN(ll_flush_ctx(inode));
2290         case LL_IOC_GETFACL: {
2291                 struct rmtacl_ioctl_data ioc;
2292
2293                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2294                         RETURN(-EFAULT);
2295
2296                 RETURN(ll_ioctl_getfacl(inode, &ioc));
2297         }
2298         case LL_IOC_SETFACL: {
2299                 struct rmtacl_ioctl_data ioc;
2300
2301                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2302                         RETURN(-EFAULT);
2303
2304                 RETURN(ll_ioctl_setfacl(inode, &ioc));
2305         }
2306         default: {
2307                 int err;
2308
2309                 if (LLIOC_STOP == 
2310                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2311                         RETURN(err);
2312
2313                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2314                                      (void *)arg));
2315         }
2316         }
2317 }
2318
2319 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2320 {
2321         struct inode *inode = file->f_dentry->d_inode;
2322         struct ll_inode_info *lli = ll_i2info(inode);
2323         struct lov_stripe_md *lsm = lli->lli_smd;
2324         loff_t retval;
2325         ENTRY;
2326         retval = offset + ((origin == 2) ? i_size_read(inode) :
2327                            (origin == 1) ? file->f_pos : 0);
2328         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2329                inode->i_ino, inode->i_generation, inode, retval, retval,
2330                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2331         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2332
2333         if (origin == 2) { /* SEEK_END */
2334                 int nonblock = 0, rc;
2335
2336                 if (file->f_flags & O_NONBLOCK)
2337                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2338
2339                 if (lsm != NULL) {
2340                         rc = ll_glimpse_size(inode, nonblock);
2341                         if (rc != 0)
2342                                 RETURN(rc);
2343                 }
2344
2345                 ll_inode_size_lock(inode, 0);
2346                 offset += i_size_read(inode);
2347                 ll_inode_size_unlock(inode, 0);
2348         } else if (origin == 1) { /* SEEK_CUR */
2349                 offset += file->f_pos;
2350         }
2351
2352         retval = -EINVAL;
2353         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2354                 if (offset != file->f_pos) {
2355                         file->f_pos = offset;
2356 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2357                         file->f_reada = 0;
2358                         file->f_version = ++event;
2359 #endif
2360                 }
2361                 retval = offset;
2362         }
2363         
2364         RETURN(retval);
2365 }
2366
2367 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2368 {
2369         struct inode *inode = dentry->d_inode;
2370         struct ll_inode_info *lli = ll_i2info(inode);
2371         struct lov_stripe_md *lsm = lli->lli_smd;
2372         struct ptlrpc_request *req;
2373         struct obd_capa *oc;
2374         int rc, err;
2375         ENTRY;
2376         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2377                inode->i_generation, inode);
2378         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2379
2380         /* fsync's caller has already called _fdata{sync,write}, we want
2381          * that IO to finish before calling the osc and mdc sync methods */
2382         rc = filemap_fdatawait(inode->i_mapping);
2383
2384         /* catch async errors that were recorded back when async writeback
2385          * failed for pages in this mapping. */
2386         err = lli->lli_async_rc;
2387         lli->lli_async_rc = 0;
2388         if (rc == 0)
2389                 rc = err;
2390         if (lsm) {
2391                 err = lov_test_and_clear_async_rc(lsm);
2392                 if (rc == 0)
2393                         rc = err;
2394         }
2395
2396         oc = ll_mdscapa_get(inode);
2397         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2398                       &req);
2399         capa_put(oc);
2400         if (!rc)
2401                 rc = err;
2402         if (!err)
2403                 ptlrpc_req_finished(req);
2404
2405         if (data && lsm) {
2406                 struct obdo *oa;
2407                 
2408                 OBDO_ALLOC(oa);
2409                 if (!oa)
2410                         RETURN(rc ? rc : -ENOMEM);
2411
2412                 oa->o_id = lsm->lsm_object_id;
2413                 oa->o_gr = lsm->lsm_object_gr;
2414                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2415                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2416                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2417                                            OBD_MD_FLGROUP);
2418
2419                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2420                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2421                                0, OBD_OBJECT_EOF, oc);
2422                 capa_put(oc);
2423                 if (!rc)
2424                         rc = err;
2425                 OBDO_FREE(oa);
2426         }
2427
2428         RETURN(rc);
2429 }
2430
2431 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2432 {
2433         struct inode *inode = file->f_dentry->d_inode;
2434         struct ll_sb_info *sbi = ll_i2sbi(inode);
2435         struct ldlm_res_id res_id =
2436                 { .name = { fid_seq(ll_inode2fid(inode)),
2437                             fid_oid(ll_inode2fid(inode)),
2438                             fid_ver(ll_inode2fid(inode)),
2439                             LDLM_FLOCK} };
2440         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2441                 ldlm_flock_completion_ast, NULL, file_lock };
2442         struct lustre_handle lockh = {0};
2443         ldlm_policy_data_t flock;
2444         int flags = 0;
2445         int rc;
2446         ENTRY;
2447
2448         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2449                inode->i_ino, file_lock);
2450
2451         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2452  
2453         if (file_lock->fl_flags & FL_FLOCK) {
2454                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2455                 /* set missing params for flock() calls */
2456                 file_lock->fl_end = OFFSET_MAX;
2457                 file_lock->fl_pid = current->tgid;
2458         }
2459         flock.l_flock.pid = file_lock->fl_pid;
2460         flock.l_flock.start = file_lock->fl_start;
2461         flock.l_flock.end = file_lock->fl_end;
2462
2463         switch (file_lock->fl_type) {
2464         case F_RDLCK:
2465                 einfo.ei_mode = LCK_PR;
2466                 break;
2467         case F_UNLCK:
2468                 /* An unlock request may or may not have any relation to
2469                  * existing locks so we may not be able to pass a lock handle
2470                  * via a normal ldlm_lock_cancel() request. The request may even
2471                  * unlock a byte range in the middle of an existing lock. In
2472                  * order to process an unlock request we need all of the same
2473                  * information that is given with a normal read or write record
2474                  * lock request. To avoid creating another ldlm unlock (cancel)
2475                  * message we'll treat a LCK_NL flock request as an unlock. */
2476                 einfo.ei_mode = LCK_NL;
2477                 break;
2478         case F_WRLCK:
2479                 einfo.ei_mode = LCK_PW;
2480                 break;
2481         default:
2482                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2483                 LBUG();
2484         }
2485
2486         switch (cmd) {
2487         case F_SETLKW:
2488 #ifdef F_SETLKW64
2489         case F_SETLKW64:
2490 #endif
2491                 flags = 0;
2492                 break;
2493         case F_SETLK:
2494 #ifdef F_SETLK64
2495         case F_SETLK64:
2496 #endif
2497                 flags = LDLM_FL_BLOCK_NOWAIT;
2498                 break;
2499         case F_GETLK:
2500 #ifdef F_GETLK64
2501         case F_GETLK64:
2502 #endif
2503                 flags = LDLM_FL_TEST_LOCK;
2504                 /* Save the old mode so that if the mode in the lock changes we
2505                  * can decrement the appropriate reader or writer refcount. */
2506                 file_lock->fl_type = einfo.ei_mode;
2507                 break;
2508         default:
2509                 CERROR("unknown fcntl lock command: %d\n", cmd);
2510                 LBUG();
2511         }
2512
2513         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2514                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2515                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2516
2517         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2518                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2519         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2520                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2521 #ifdef HAVE_F_OP_FLOCK
2522         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2523             !(flags & LDLM_FL_TEST_LOCK))
2524                 posix_lock_file_wait(file, file_lock);
2525 #endif
2526
2527         RETURN(rc);
2528 }
2529
2530 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2531 {
2532         ENTRY;
2533
2534         RETURN(-ENOSYS);
2535 }
2536
2537 int ll_have_md_lock(struct inode *inode, __u64 bits)
2538 {
2539         struct lustre_handle lockh;
2540         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2541         struct lu_fid *fid;
2542         int flags;
2543         ENTRY;
2544
2545         if (!inode)
2546                RETURN(0);
2547
2548         fid = &ll_i2info(inode)->lli_fid;
2549         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2550
2551         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2552         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2553                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2554                 RETURN(1);
2555         }
2556         RETURN(0);
2557 }
2558
2559 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2560                             struct lustre_handle *lockh)
2561 {
2562         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2563         struct lu_fid *fid;
2564         ldlm_mode_t rc;
2565         int flags;
2566         ENTRY;
2567
2568         fid = &ll_i2info(inode)->lli_fid;
2569         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2570
2571         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2572         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2573                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2574         RETURN(rc);
2575 }
2576
2577 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2578         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2579                               * and return success */
2580                 inode->i_nlink = 0;
2581                 /* This path cannot be hit for regular files unless in
2582                  * case of obscure races, so no need to to validate
2583                  * size. */
2584                 if (!S_ISREG(inode->i_mode) &&
2585                     !S_ISDIR(inode->i_mode))
2586                         return 0;
2587         }
2588
2589         if (rc) {
2590                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2591                 return -abs(rc);
2592
2593         }
2594
2595         return 0;
2596 }
2597
2598 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2599 {
2600         struct inode *inode = dentry->d_inode;
2601         struct ptlrpc_request *req = NULL;
2602         struct ll_sb_info *sbi;
2603         struct obd_export *exp;
2604         int rc;
2605         ENTRY;
2606
2607         if (!inode) {
2608                 CERROR("REPORT THIS LINE TO PETER\n");
2609                 RETURN(0);
2610         }
2611         sbi = ll_i2sbi(inode);
2612
2613         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2614                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2615
2616         exp = ll_i2mdexp(inode);
2617
2618         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2619                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2620                 struct md_op_data *op_data;
2621
2622                 /* Call getattr by fid, so do not provide name at all. */
2623                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2624                                              dentry->d_inode, NULL, 0, 0,
2625                                              LUSTRE_OPC_ANY, NULL);
2626                 if (IS_ERR(op_data))
2627                         RETURN(PTR_ERR(op_data));
2628
2629                 oit.it_flags |= O_CHECK_STALE;
2630                 rc = md_intent_lock(exp, op_data, NULL, 0,
2631                                     /* we are not interested in name
2632                                        based lookup */
2633                                     &oit, 0, &req,
2634                                     ll_md_blocking_ast, 0);
2635                 ll_finish_md_op_data(op_data);
2636                 oit.it_flags &= ~O_CHECK_STALE;
2637                 if (rc < 0) {
2638                         rc = ll_inode_revalidate_fini(inode, rc);
2639                         GOTO (out, rc);
2640                 }
2641
2642                 rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2643                 if (rc != 0) {
2644                         ll_intent_release(&oit);
2645                         GOTO(out, rc);
2646                 }
2647
2648                 /* Unlinked? Unhash dentry, so it is not picked up later by
2649                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2650                    here to preserve get_cwd functionality on 2.6.
2651                    Bug 10503 */
2652                 if (!dentry->d_inode->i_nlink) {
2653                         spin_lock(&dcache_lock);
2654                         ll_drop_dentry(dentry);
2655                         spin_unlock(&dcache_lock);
2656                 }
2657
2658                 ll_lookup_finish_locks(&oit, dentry);
2659         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE)) {
2660                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2661                 obd_valid valid = OBD_MD_FLGETATTR;
2662                 struct obd_capa *oc;
2663                 int ealen = 0;
2664
2665                 if (S_ISREG(inode->i_mode)) {
2666                         rc = ll_get_max_mdsize(sbi, &ealen);
2667                         if (rc)
2668                                 RETURN(rc);
2669                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2670                 }
2671                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2672                  * capa for this inode. Because we only keep capas of dirs
2673                  * fresh. */
2674                 oc = ll_mdscapa_get(inode);
2675                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2676                                 ealen, &req);
2677                 capa_put(oc);
2678                 if (rc) {
2679                         rc = ll_inode_revalidate_fini(inode, rc);
2680                         RETURN(rc);
2681                 }
2682
2683                 rc = ll_prep_inode(&inode, req, REPLY_REC_OFF,
2684                                    NULL);
2685                 if (rc)
2686                         GOTO(out, rc);
2687         }
2688
2689         /* if object not yet allocated, don't validate size */
2690         if (ll_i2info(inode)->lli_smd == NULL)
2691                 GOTO(out, rc = 0);
2692
2693         /* ll_glimpse_size will prefer locally cached writes if they extend
2694          * the file */
2695         rc = ll_glimpse_size(inode, 0);
2696         EXIT;
2697 out:
2698         ptlrpc_req_finished(req);
2699         return rc;
2700 }
2701
2702 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2703                   struct lookup_intent *it, struct kstat *stat)
2704 {
2705         struct inode *inode = de->d_inode;
2706         int res = 0;
2707
2708         res = ll_inode_revalidate_it(de, it);
2709         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2710
2711         if (res)
2712                 return res;
2713
2714         stat->dev = inode->i_sb->s_dev;
2715         stat->ino = inode->i_ino;
2716         stat->mode = inode->i_mode;
2717         stat->nlink = inode->i_nlink;
2718         stat->uid = inode->i_uid;
2719         stat->gid = inode->i_gid;
2720         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2721         stat->atime = inode->i_atime;
2722         stat->mtime = inode->i_mtime;
2723         stat->ctime = inode->i_ctime;
2724 #ifdef HAVE_INODE_BLKSIZE
2725         stat->blksize = inode->i_blksize;
2726 #else
2727         stat->blksize = 1 << inode->i_blkbits;
2728 #endif
2729
2730         ll_inode_size_lock(inode, 0);
2731         stat->size = i_size_read(inode);
2732         stat->blocks = inode->i_blocks;
2733         ll_inode_size_unlock(inode, 0);
2734
2735         return 0;
2736 }
2737 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2738 {
2739         struct lookup_intent it = { .it_op = IT_GETATTR };
2740
2741         return ll_getattr_it(mnt, de, &it, stat);
2742 }
2743
2744 static
2745 int lustre_check_acl(struct inode *inode, int mask)
2746 {
2747 #ifdef CONFIG_FS_POSIX_ACL
2748         struct ll_inode_info *lli = ll_i2info(inode);
2749         struct posix_acl *acl;
2750         int rc;
2751         ENTRY;
2752
2753         spin_lock(&lli->lli_lock);
2754         acl = posix_acl_dup(lli->lli_posix_acl);
2755         spin_unlock(&lli->lli_lock);
2756
2757         if (!acl)
2758                 RETURN(-EAGAIN);
2759
2760         rc = posix_acl_permission(inode, acl, mask);
2761         posix_acl_release(acl);
2762
2763         RETURN(rc);
2764 #else
2765         return -EAGAIN;
2766 #endif
2767 }
2768
2769 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2770 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2771 {
2772         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2773                inode->i_ino, inode->i_generation, inode, mask);
2774         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2775                 return lustre_check_remote_perm(inode, mask);
2776         
2777         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2778         return generic_permission(inode, mask, lustre_check_acl);
2779 }
2780 #else
2781 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2782 {
2783         int mode = inode->i_mode;
2784         int rc;
2785
2786         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2787                inode->i_ino, inode->i_generation, inode, mask);
2788
2789         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2790                 return lustre_check_remote_perm(inode, mask);
2791
2792         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2793
2794         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2795             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2796                 return -EROFS;
2797         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2798                 return -EACCES;
2799         if (current->fsuid == inode->i_uid) {
2800                 mode >>= 6;
2801         } else if (1) {
2802                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2803                         goto check_groups;
2804                 rc = lustre_check_acl(inode, mask);
2805                 if (rc == -EAGAIN)
2806                         goto check_groups;
2807                 if (rc == -EACCES)
2808                         goto check_capabilities;
2809                 return rc;
2810         } else {
2811 check_groups:
2812                 if (in_group_p(inode->i_gid))
2813                         mode >>= 3;
2814         }
2815         if ((mode & mask & S_IRWXO) == mask)
2816                 return 0;
2817
2818 check_capabilities:
2819         if (!(mask & MAY_EXEC) ||
2820             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2821                 if (capable(CAP_DAC_OVERRIDE))
2822                         return 0;
2823
2824         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2825             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2826                 return 0;
2827         
2828         return -EACCES;
2829 }
2830 #endif
2831
2832 /* -o localflock - only provides locally consistent flock locks */
2833 struct file_operations ll_file_operations = {
2834         .read           = ll_file_read,
2835         .write          = ll_file_write,
2836         .ioctl          = ll_file_ioctl,
2837         .open           = ll_file_open,
2838         .release        = ll_file_release,
2839         .mmap           = ll_file_mmap,
2840         .llseek         = ll_file_seek,
2841         .sendfile       = ll_file_sendfile,
2842         .fsync          = ll_fsync,
2843 };
2844
2845 struct file_operations ll_file_operations_flock = {
2846         .read           = ll_file_read,
2847         .write          = ll_file_write,
2848         .ioctl          = ll_file_ioctl,
2849         .open           = ll_file_open,
2850         .release        = ll_file_release,
2851         .mmap           = ll_file_mmap,
2852         .llseek         = ll_file_seek,
2853         .sendfile       = ll_file_sendfile,
2854         .fsync          = ll_fsync,
2855 #ifdef HAVE_F_OP_FLOCK
2856         .flock          = ll_file_flock,
2857 #endif
2858         .lock           = ll_file_flock
2859 };
2860
2861 /* These are for -o noflock - to return ENOSYS on flock calls */
2862 struct file_operations ll_file_operations_noflock = {
2863         .read           = ll_file_read,
2864         .write          = ll_file_write,
2865         .ioctl          = ll_file_ioctl,
2866         .open           = ll_file_open,
2867         .release        = ll_file_release,
2868         .mmap           = ll_file_mmap,
2869         .llseek         = ll_file_seek,
2870         .sendfile       = ll_file_sendfile,
2871         .fsync          = ll_fsync,
2872 #ifdef HAVE_F_OP_FLOCK
2873         .flock          = ll_file_noflock,
2874 #endif
2875         .lock           = ll_file_noflock
2876 };
2877
2878 struct inode_operations ll_file_inode_operations = {
2879 #ifdef HAVE_VFS_INTENT_PATCHES
2880         .setattr_raw    = ll_setattr_raw,
2881 #endif
2882         .setattr        = ll_setattr,
2883         .truncate       = ll_truncate,
2884         .getattr        = ll_getattr,
2885         .permission     = ll_inode_permission,
2886         .setxattr       = ll_setxattr,
2887         .getxattr       = ll_getxattr,
2888         .listxattr      = ll_listxattr,
2889         .removexattr    = ll_removexattr,
2890 };
2891
2892 /* dynamic ioctl number support routins */
2893 static struct llioc_ctl_data {
2894         struct rw_semaphore ioc_sem;
2895         struct list_head    ioc_head;
2896 } llioc = { 
2897         __RWSEM_INITIALIZER(llioc.ioc_sem), 
2898         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2899 };
2900
2901
2902 struct llioc_data {
2903         struct list_head        iocd_list;
2904         unsigned int            iocd_size;
2905         llioc_callback_t        iocd_cb;
2906         unsigned int            iocd_count;
2907         unsigned int            iocd_cmd[0];
2908 };
2909
2910 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2911 {
2912         unsigned int size;
2913         struct llioc_data *in_data = NULL;
2914         ENTRY;
2915
2916         if (cb == NULL || cmd == NULL ||
2917             count > LLIOC_MAX_CMD || count < 0)
2918                 RETURN(NULL);
2919
2920         size = sizeof(*in_data) + count * sizeof(unsigned int);
2921         OBD_ALLOC(in_data, size);
2922         if (in_data == NULL)
2923                 RETURN(NULL);
2924
2925         memset(in_data, 0, sizeof(*in_data));
2926         in_data->iocd_size = size;
2927         in_data->iocd_cb = cb;
2928         in_data->iocd_count = count;
2929         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2930
2931         down_write(&llioc.ioc_sem);
2932         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2933         up_write(&llioc.ioc_sem);
2934
2935         RETURN(in_data);
2936 }
2937
2938 void ll_iocontrol_unregister(void *magic)
2939 {
2940         struct llioc_data *tmp;
2941
2942         if (magic == NULL)
2943                 return;
2944
2945         down_write(&llioc.ioc_sem);
2946         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2947                 if (tmp == magic) {
2948                         unsigned int size = tmp->iocd_size;
2949
2950                         list_del(&tmp->iocd_list);
2951                         up_write(&llioc.ioc_sem);
2952
2953                         OBD_FREE(tmp, size);
2954                         return;
2955                 }
2956         }
2957         up_write(&llioc.ioc_sem);
2958
2959         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2960 }
2961
2962 EXPORT_SYMBOL(ll_iocontrol_register);
2963 EXPORT_SYMBOL(ll_iocontrol_unregister);
2964
2965 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
2966                         unsigned int cmd, unsigned long arg, int *rcp)
2967 {
2968         enum llioc_iter ret = LLIOC_CONT;
2969         struct llioc_data *data;
2970         int rc = -EINVAL, i;
2971
2972         down_read(&llioc.ioc_sem);
2973         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2974                 for (i = 0; i < data->iocd_count; i++) {
2975                         if (cmd != data->iocd_cmd[i]) 
2976                                 continue;
2977
2978                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2979                         break;
2980                 }
2981
2982                 if (ret == LLIOC_STOP)
2983                         break;
2984         }
2985         up_read(&llioc.ioc_sem);
2986
2987         if (rcp)
2988                 *rcp = rc;
2989         return ret;
2990 }