Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #include "llite_internal.h"
32
33 /* also used by llite/special.c:ll_special_open() */
34 struct ll_file_data *ll_file_data_get(void)
35 {
36         struct ll_file_data *fd;
37
38         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
39         return fd;
40 }
41
42 static void ll_file_data_put(struct ll_file_data *fd)
43 {
44         if (fd != NULL)
45                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
46 }
47
48 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
49                           struct lustre_handle *fh)
50 {
51         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
52         op_data->op_attr.ia_mode = inode->i_mode;
53         op_data->op_attr.ia_atime = inode->i_atime;
54         op_data->op_attr.ia_mtime = inode->i_mtime;
55         op_data->op_attr.ia_ctime = inode->i_ctime;
56         op_data->op_attr.ia_size = i_size_read(inode);
57         op_data->op_attr_blocks = inode->i_blocks;
58         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
59         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
60         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
61         op_data->op_capa1 = ll_mdscapa_get(inode);
62 }
63
64 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
65                              struct obd_client_handle *och)
66 {
67         ENTRY;
68
69         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
70                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
71
72         if (!(och->och_flags & FMODE_WRITE))
73                 goto out;
74
75         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
76             !S_ISREG(inode->i_mode))
77                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
78         else
79                 ll_epoch_close(inode, op_data, &och, 0);
80
81 out:
82         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
83         EXIT;
84 }
85
86 static int ll_close_inode_openhandle(struct obd_export *md_exp,
87                                      struct inode *inode,
88                                      struct obd_client_handle *och)
89 {
90         struct obd_export *exp = ll_i2mdexp(inode);
91         struct md_op_data *op_data;
92         struct ptlrpc_request *req = NULL;
93         struct obd_device *obd = class_exp2obd(exp);
94         int epoch_close = 1;
95         int seq_end = 0, rc;
96         ENTRY;
97
98         if (obd == NULL) {
99                 /*
100                  * XXX: in case of LMV, is this correct to access
101                  * ->exp_handle?
102                  */
103                 CERROR("Invalid MDC connection handle "LPX64"\n",
104                        ll_i2mdexp(inode)->exp_handle.h_cookie);
105                 GOTO(out, rc = 0);
106         }
107
108         /*
109          * here we check if this is forced umount. If so this is called on
110          * canceling "open lock" and we do not call md_close() in this case, as
111          * it will not be successful, as import is already deactivated.
112          */
113         if (obd->obd_force)
114                 GOTO(out, rc = 0);
115
116         OBD_ALLOC_PTR(op_data);
117         if (op_data == NULL)
118                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
119
120         ll_prepare_close(inode, op_data, och);
121         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
122         rc = md_close(md_exp, op_data, och->och_mod, &req);
123         if (rc != -EAGAIN)
124                 seq_end = 1;
125
126         if (rc == -EAGAIN) {
127                 /* This close must have the epoch closed. */
128                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
129                 LASSERT(epoch_close);
130                 /* MDS has instructed us to obtain Size-on-MDS attribute from
131                  * OSTs and send setattr to back to MDS. */
132                 rc = ll_sizeonmds_update(inode, och->och_mod,
133                                          &och->och_fh, op_data->op_ioepoch);
134                 if (rc) {
135                         CERROR("inode %lu mdc Size-on-MDS update failed: "
136                                "rc = %d\n", inode->i_ino, rc);
137                         rc = 0;
138                 }
139         } else if (rc) {
140                 CERROR("inode %lu mdc close failed: rc = %d\n",
141                        inode->i_ino, rc);
142         }
143         ll_finish_md_op_data(op_data);
144
145         if (rc == 0) {
146                 rc = ll_objects_destroy(req, inode);
147                 if (rc)
148                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
149                                inode->i_ino, rc);
150         }
151
152         EXIT;
153 out:
154       
155         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
156             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
157                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
158         } else {
159                 if (seq_end)
160                         ptlrpc_close_replay_seq(req);
161                 md_clear_open_replay_data(md_exp, och);
162                 /* Free @och if it is not waiting for DONE_WRITING. */
163                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
164                 OBD_FREE_PTR(och);
165         }
166         if (req) /* This is close request */
167                 ptlrpc_req_finished(req);
168         return rc;
169 }
170
171 int ll_md_real_close(struct inode *inode, int flags)
172 {
173         struct ll_inode_info *lli = ll_i2info(inode);
174         struct obd_client_handle **och_p;
175         struct obd_client_handle *och;
176         __u64 *och_usecount;
177         int rc = 0;
178         ENTRY;
179
180         if (flags & FMODE_WRITE) {
181                 och_p = &lli->lli_mds_write_och;
182                 och_usecount = &lli->lli_open_fd_write_count;
183         } else if (flags & FMODE_EXEC) {
184                 och_p = &lli->lli_mds_exec_och;
185                 och_usecount = &lli->lli_open_fd_exec_count;
186         } else {
187                 LASSERT(flags & FMODE_READ);
188                 och_p = &lli->lli_mds_read_och;
189                 och_usecount = &lli->lli_open_fd_read_count;
190         }
191
192         down(&lli->lli_och_sem);
193         if (*och_usecount) { /* There are still users of this handle, so
194                                 skip freeing it. */
195                 up(&lli->lli_och_sem);
196                 RETURN(0);
197         }
198         och=*och_p;
199         *och_p = NULL;
200         up(&lli->lli_och_sem);
201
202         if (och) { /* There might be a race and somebody have freed this och
203                       already */
204                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
205                                                inode, och);
206         }
207
208         RETURN(rc);
209 }
210
211 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
212                 struct file *file)
213 {
214         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
215         struct ll_inode_info *lli = ll_i2info(inode);
216         int rc = 0;
217         ENTRY;
218
219         /* clear group lock, if present */
220         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
221                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
222                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
223                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
224                                       &fd->fd_cwlockh);
225         }
226
227         /* Let's see if we have good enough OPEN lock on the file and if
228            we can skip talking to MDS */
229         if (file->f_dentry->d_inode) { /* Can this ever be false? */
230                 int lockmode;
231                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
232                 struct lustre_handle lockh;
233                 struct inode *inode = file->f_dentry->d_inode;
234                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
235
236                 down(&lli->lli_och_sem);
237                 if (fd->fd_omode & FMODE_WRITE) {
238                         lockmode = LCK_CW;
239                         LASSERT(lli->lli_open_fd_write_count);
240                         lli->lli_open_fd_write_count--;
241                 } else if (fd->fd_omode & FMODE_EXEC) {
242                         lockmode = LCK_PR;
243                         LASSERT(lli->lli_open_fd_exec_count);
244                         lli->lli_open_fd_exec_count--;
245                 } else {
246                         lockmode = LCK_CR;
247                         LASSERT(lli->lli_open_fd_read_count);
248                         lli->lli_open_fd_read_count--;
249                 }
250                 up(&lli->lli_och_sem);
251
252                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
253                                    LDLM_IBITS, &policy, lockmode,
254                                    &lockh)) {
255                         rc = ll_md_real_close(file->f_dentry->d_inode,
256                                               fd->fd_omode);
257                 }
258         } else {
259                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
260                        file, file->f_dentry, file->f_dentry->d_name.name);
261         }
262
263         LUSTRE_FPRIVATE(file) = NULL;
264         ll_file_data_put(fd);
265         ll_capa_close(inode);
266
267         RETURN(rc);
268 }
269
270 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
271
272 /* While this returns an error code, fput() the caller does not, so we need
273  * to make every effort to clean up all of our state here.  Also, applications
274  * rarely check close errors and even if an error is returned they will not
275  * re-try the close call.
276  */
277 int ll_file_release(struct inode *inode, struct file *file)
278 {
279         struct ll_file_data *fd;
280         struct ll_sb_info *sbi = ll_i2sbi(inode);
281         struct ll_inode_info *lli = ll_i2info(inode);
282         struct lov_stripe_md *lsm = lli->lli_smd;
283         int rc;
284
285         ENTRY;
286         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
287                inode->i_generation, inode);
288
289         /* don't do anything for / */
290         if (inode->i_sb->s_root == file->f_dentry)
291                 RETURN(0);
292
293         ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
294         fd = LUSTRE_FPRIVATE(file);
295         LASSERT(fd != NULL);
296
297         /* don't do anything for / */
298         if (inode->i_sb->s_root == file->f_dentry) {
299                 LUSTRE_FPRIVATE(file) = NULL;
300                 ll_file_data_put(fd);
301                 RETURN(0);
302         }
303         
304         if (lsm)
305                 lov_test_and_clear_async_rc(lsm);
306         lli->lli_async_rc = 0;
307
308         rc = ll_md_close(sbi->ll_md_exp, inode, file);
309         RETURN(rc);
310 }
311
312 static int ll_intent_file_open(struct file *file, void *lmm,
313                                int lmmsize, struct lookup_intent *itp)
314 {
315         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
316         struct dentry *parent = file->f_dentry->d_parent;
317         const char *name = file->f_dentry->d_name.name;
318         const int len = file->f_dentry->d_name.len;
319         struct md_op_data *op_data;
320         struct ptlrpc_request *req;
321         int rc;
322
323         if (!parent)
324                 RETURN(-ENOENT);
325
326         /* Usually we come here only for NFSD, and we want open lock.
327            But we can also get here with pre 2.6.15 patchless kernels, and in
328            that case that lock is also ok */
329         /* We can also get here if there was cached open handle in revalidate_it
330          * but it disappeared while we were getting from there to ll_file_open.
331          * But this means this file was closed and immediatelly opened which
332          * makes a good candidate for using OPEN lock */
333         /* If lmmsize & lmm are not 0, we are just setting stripe info
334          * parameters. No need for the open lock */
335         if (!lmm && !lmmsize)
336                 itp->it_flags |= MDS_OPEN_LOCK;
337
338         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
339                                       file->f_dentry->d_inode, name, len,
340                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
341         if (IS_ERR(op_data))
342                 RETURN(PTR_ERR(op_data));
343
344         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
345                             0 /*unused */, &req, ll_md_blocking_ast, 0);
346         ll_finish_md_op_data(op_data);
347         if (rc == -ESTALE) {
348                 /* reason for keep own exit path - don`t flood log
349                 * with messages with -ESTALE errors.
350                 */
351                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
352                      it_open_error(DISP_OPEN_OPEN, itp))
353                         GOTO(out, rc);
354                 ll_release_openhandle(file->f_dentry, itp);
355                 GOTO(out_stale, rc);
356         }
357
358         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
359                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
360                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
361                 GOTO(out, rc);
362         }
363
364         if (itp->d.lustre.it_lock_mode)
365                 md_set_lock_data(sbi->ll_md_exp,
366                                  &itp->d.lustre.it_lock_handle, 
367                                  file->f_dentry->d_inode);
368
369         rc = ll_prep_inode(&file->f_dentry->d_inode, req, DLM_REPLY_REC_OFF,
370                            NULL);
371 out:
372         ptlrpc_req_finished(itp->d.lustre.it_data);
373
374 out_stale:
375         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
376         ll_intent_drop_lock(itp);
377
378         RETURN(rc);
379 }
380
381 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
382                        struct lookup_intent *it, struct obd_client_handle *och)
383 {
384         struct ptlrpc_request *req = it->d.lustre.it_data;
385         struct mdt_body *body;
386
387         LASSERT(och);
388
389         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
390         /* reply already checked out */
391         LASSERT(body != NULL);
392         /* and swabbed in md_enqueue */
393         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
394
395         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
396         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
397         och->och_fid = lli->lli_fid;
398         och->och_flags = it->it_flags;
399         lli->lli_ioepoch = body->ioepoch;
400
401         return md_set_open_replay_data(md_exp, och, req);
402 }
403
404 int ll_local_open(struct file *file, struct lookup_intent *it,
405                   struct ll_file_data *fd, struct obd_client_handle *och)
406 {
407         struct inode *inode = file->f_dentry->d_inode;
408         struct ll_inode_info *lli = ll_i2info(inode);
409         ENTRY;
410
411         LASSERT(!LUSTRE_FPRIVATE(file));
412
413         LASSERT(fd != NULL);
414
415         if (och) {
416                 struct ptlrpc_request *req = it->d.lustre.it_data;
417                 struct mdt_body *body;
418                 int rc;
419
420                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
421                 if (rc)
422                         RETURN(rc);
423
424                 body = lustre_msg_buf(req->rq_repmsg,
425                                       DLM_REPLY_REC_OFF, sizeof(*body));
426
427                 if ((it->it_flags & FMODE_WRITE) &&
428                     (body->valid & OBD_MD_FLSIZE))
429                 {
430                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
431                                lli->lli_ioepoch, PFID(&lli->lli_fid));
432                 }
433         }
434
435         LUSTRE_FPRIVATE(file) = fd;
436         ll_readahead_init(inode, &fd->fd_ras);
437         fd->fd_omode = it->it_flags;
438         RETURN(0);
439 }
440
441 /* Open a file, and (for the very first open) create objects on the OSTs at
442  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
443  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
444  * lli_open_sem to ensure no other process will create objects, send the
445  * stripe MD to the MDS, or try to destroy the objects if that fails.
446  *
447  * If we already have the stripe MD locally then we don't request it in
448  * md_open(), by passing a lmm_size = 0.
449  *
450  * It is up to the application to ensure no other processes open this file
451  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
452  * used.  We might be able to avoid races of that sort by getting lli_open_sem
453  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
454  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
455  */
456 int ll_file_open(struct inode *inode, struct file *file)
457 {
458         struct ll_inode_info *lli = ll_i2info(inode);
459         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
460                                           .it_flags = file->f_flags };
461         struct lov_stripe_md *lsm;
462         struct ptlrpc_request *req = NULL;
463         struct obd_client_handle **och_p;
464         __u64 *och_usecount;
465         struct ll_file_data *fd;
466         int rc = 0;
467         ENTRY;
468
469         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
470                inode->i_generation, inode, file->f_flags);
471
472         /* don't do anything for / */
473         if (inode->i_sb->s_root == file->f_dentry)
474                 RETURN(0);
475
476 #ifdef HAVE_VFS_INTENT_PATCHES
477         it = file->f_it;
478 #else
479         it = file->private_data; /* XXX: compat macro */
480         file->private_data = NULL; /* prevent ll_local_open assertion */
481 #endif
482
483         fd = ll_file_data_get();
484         if (fd == NULL)
485                 RETURN(-ENOMEM);
486
487         /* don't do anything for / */
488         if (inode->i_sb->s_root == file->f_dentry) {
489                 LUSTRE_FPRIVATE(file) = fd;
490                 RETURN(0);
491         }
492
493         if (!it || !it->d.lustre.it_disposition) {
494                 /* Convert f_flags into access mode. We cannot use file->f_mode,
495                  * because everything but O_ACCMODE mask was stripped from
496                  * there */
497                 if ((oit.it_flags + 1) & O_ACCMODE)
498                         oit.it_flags++;
499                 if (file->f_flags & O_TRUNC)
500                         oit.it_flags |= FMODE_WRITE;
501
502                 /* kernel only call f_op->open in dentry_open.  filp_open calls
503                  * dentry_open after call to open_namei that checks permissions.
504                  * Only nfsd_open call dentry_open directly without checking
505                  * permissions and because of that this code below is safe. */
506                 if (oit.it_flags & FMODE_WRITE)
507                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
508
509                 /* We do not want O_EXCL here, presumably we opened the file
510                  * already? XXX - NFS implications? */
511                 oit.it_flags &= ~O_EXCL;
512
513                 it = &oit;
514         }
515
516         /* Let's see if we have file open on MDS already. */
517         if (it->it_flags & FMODE_WRITE) {
518                 och_p = &lli->lli_mds_write_och;
519                 och_usecount = &lli->lli_open_fd_write_count;
520         } else if (it->it_flags & FMODE_EXEC) {
521                 och_p = &lli->lli_mds_exec_och;
522                 och_usecount = &lli->lli_open_fd_exec_count;
523          } else {
524                 och_p = &lli->lli_mds_read_och;
525                 och_usecount = &lli->lli_open_fd_read_count;
526         }
527         
528         down(&lli->lli_och_sem);
529         if (*och_p) { /* Open handle is present */
530                 if (it_disposition(it, DISP_OPEN_OPEN)) {
531                         /* Well, there's extra open request that we do not need,
532                            let's close it somehow. This will decref request. */
533                         rc = it_open_error(DISP_OPEN_OPEN, it);
534                         if (rc) {
535                                 ll_file_data_put(fd);
536                                 GOTO(out_och_free, rc);
537                         }       
538                         ll_release_openhandle(file->f_dentry, it);
539                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
540                                              LPROC_LL_OPEN);
541                 }
542                 (*och_usecount)++;
543
544                 rc = ll_local_open(file, it, fd, NULL);
545                 if (rc) {
546                         up(&lli->lli_och_sem);
547                         ll_file_data_put(fd);
548                         RETURN(rc);
549                 }
550         } else {
551                 LASSERT(*och_usecount == 0);
552                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
553                 if (!*och_p) {
554                         ll_file_data_put(fd);
555                         GOTO(out_och_free, rc = -ENOMEM);
556                 }
557                 (*och_usecount)++;
558                 if (!it->d.lustre.it_disposition) {
559                         it->it_flags |= O_CHECK_STALE;
560                         rc = ll_intent_file_open(file, NULL, 0, it);
561                         it->it_flags &= ~O_CHECK_STALE;
562                         if (rc) {
563                                 ll_file_data_put(fd);
564                                 GOTO(out_och_free, rc);
565                         }
566
567                         /* Got some error? Release the request */
568                         if (it->d.lustre.it_status < 0) {
569                                 req = it->d.lustre.it_data;
570                                 ptlrpc_req_finished(req);
571                         }
572                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
573                                          &it->d.lustre.it_lock_handle,
574                                          file->f_dentry->d_inode);
575                 }
576                 req = it->d.lustre.it_data;
577
578                 /* md_intent_lock() didn't get a request ref if there was an
579                  * open error, so don't do cleanup on the request here
580                  * (bug 3430) */
581                 /* XXX (green): Should not we bail out on any error here, not
582                  * just open error? */
583                 rc = it_open_error(DISP_OPEN_OPEN, it);
584                 if (rc) {
585                         ll_file_data_put(fd);
586                         GOTO(out_och_free, rc);
587                 }
588
589                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
590                 rc = ll_local_open(file, it, fd, *och_p);
591                 if (rc) {
592                         up(&lli->lli_och_sem);
593                         ll_file_data_put(fd);
594                         GOTO(out_och_free, rc);
595                 }
596         }
597         up(&lli->lli_och_sem);
598
599         /* Must do this outside lli_och_sem lock to prevent deadlock where
600            different kind of OPEN lock for this same inode gets cancelled
601            by ldlm_cancel_lru */
602         if (!S_ISREG(inode->i_mode))
603                 GOTO(out, rc);
604
605         ll_capa_open(inode);
606
607         lsm = lli->lli_smd;
608         if (lsm == NULL) {
609                 if (file->f_flags & O_LOV_DELAY_CREATE ||
610                     !(file->f_mode & FMODE_WRITE)) {
611                         CDEBUG(D_INODE, "object creation was delayed\n");
612                         GOTO(out, rc);
613                 }
614         }
615         file->f_flags &= ~O_LOV_DELAY_CREATE;
616         GOTO(out, rc);
617 out:
618         ptlrpc_req_finished(req);
619         if (req)
620                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
621 out_och_free:
622         if (rc) {
623                 if (*och_p) {
624                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
625                         *och_p = NULL; /* OBD_FREE writes some magic there */
626                         (*och_usecount)--;
627                 }
628                 up(&lli->lli_och_sem);
629         }
630
631         return rc;
632 }
633
634 /* Fills the obdo with the attributes for the inode defined by lsm */
635 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
636 {
637         struct ptlrpc_request_set *set;
638         struct ll_inode_info *lli = ll_i2info(inode);
639         struct lov_stripe_md *lsm = lli->lli_smd;
640
641         struct obd_info oinfo = { { { 0 } } };
642         int rc;
643         ENTRY;
644
645         LASSERT(lsm != NULL);
646
647         oinfo.oi_md = lsm;
648         oinfo.oi_oa = obdo;
649         oinfo.oi_oa->o_id = lsm->lsm_object_id;
650         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
651         oinfo.oi_oa->o_mode = S_IFREG;
652         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
653                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
654                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
655                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
656                                OBD_MD_FLGROUP;
657         oinfo.oi_capa = ll_mdscapa_get(inode);
658
659         set = ptlrpc_prep_set();
660         if (set == NULL) {
661                 CERROR("can't allocate ptlrpc set\n");
662                 rc = -ENOMEM;
663         } else {
664                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
665                 if (rc == 0)
666                         rc = ptlrpc_set_wait(set);
667                 ptlrpc_set_destroy(set);
668         }
669         capa_put(oinfo.oi_capa);
670         if (rc)
671                 RETURN(rc);
672
673         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
674                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
675                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
676
677         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
678         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
679                lli->lli_smd->lsm_object_id, i_size_read(inode),
680                inode->i_blocks, inode->i_blksize);
681         RETURN(0);
682 }
683
684 static inline void ll_remove_suid(struct inode *inode)
685 {
686         unsigned int mode;
687
688         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
689         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
690
691         /* was any of the uid bits set? */
692         mode &= inode->i_mode;
693         if (mode && !capable(CAP_FSETID)) {
694                 inode->i_mode &= ~mode;
695                 // XXX careful here - we cannot change the size
696         }
697 }
698
699 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
700 {
701         struct ll_inode_info *lli = ll_i2info(inode);
702         struct lov_stripe_md *lsm = lli->lli_smd;
703         struct obd_export *exp = ll_i2dtexp(inode);
704         struct {
705                 char name[16];
706                 struct ldlm_lock *lock;
707                 struct lov_stripe_md *lsm;
708         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
709         __u32 stripe, vallen = sizeof(stripe);
710         int rc;
711         ENTRY;
712
713         if (lsm->lsm_stripe_count == 1)
714                 GOTO(check, stripe = 0);
715
716         /* get our offset in the lov */
717         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
718         if (rc != 0) {
719                 CERROR("obd_get_info: rc = %d\n", rc);
720                 RETURN(rc);
721         }
722         LASSERT(stripe < lsm->lsm_stripe_count);
723
724 check:
725         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
726             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
727                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
728                            lsm->lsm_oinfo[stripe]->loi_id,
729                            lsm->lsm_oinfo[stripe]->loi_gr);
730                 RETURN(-ELDLM_NO_LOCK_DATA);
731         }
732
733         RETURN(stripe);
734 }
735
736 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
737  * we get a lock cancellation for each stripe, so we have to map the obd's
738  * region back onto the stripes in the file that it held.
739  *
740  * No one can dirty the extent until we've finished our work and they can
741  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
742  * but other kernel actors could have pages locked.
743  *
744  * Called with the DLM lock held. */
745 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
746                               struct ldlm_lock *lock, __u32 stripe)
747 {
748         ldlm_policy_data_t tmpex;
749         unsigned long start, end, count, skip, i, j;
750         struct page *page;
751         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
752         struct lustre_handle lockh;
753         struct address_space *mapping = inode->i_mapping;
754
755         ENTRY;
756         tmpex = lock->l_policy_data;
757         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
758                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
759                i_size_read(inode));
760
761         /* our locks are page granular thanks to osc_enqueue, we invalidate the
762          * whole page. */
763         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
764             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
765                 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
766                            CFS_PAGE_SIZE);
767         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
768         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
769
770         count = ~0;
771         skip = 0;
772         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
773         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
774         if (lsm->lsm_stripe_count > 1) {
775                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
776                 skip = (lsm->lsm_stripe_count - 1) * count;
777                 start += start/count * skip + stripe * count;
778                 if (end != ~0)
779                         end += end/count * skip + stripe * count;
780         }
781         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
782                 end = ~0;
783
784         i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
785             CFS_PAGE_SHIFT : 0;
786         if (i < end)
787                 end = i;
788
789         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
790                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
791                count, skip, end, discard ? " (DISCARDING)" : "");
792
793         /* walk through the vmas on the inode and tear down mmaped pages that
794          * intersect with the lock.  this stops immediately if there are no
795          * mmap()ed regions of the file.  This is not efficient at all and
796          * should be short lived. We'll associate mmap()ed pages with the lock
797          * and will be able to find them directly */
798         for (i = start; i <= end; i += (j + skip)) {
799                 j = min(count - (i % count), end - i + 1);
800                 LASSERT(j > 0);
801                 LASSERT(mapping);
802                 if (ll_teardown_mmaps(mapping,
803                                       (__u64)i << CFS_PAGE_SHIFT,
804                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
805                         break;
806         }
807
808         /* this is the simplistic implementation of page eviction at
809          * cancelation.  It is careful to get races with other page
810          * lockers handled correctly.  fixes from bug 20 will make it
811          * more efficient by associating locks with pages and with
812          * batching writeback under the lock explicitly. */
813         for (i = start, j = start % count; i <= end;
814              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
815                 if (j == count) {
816                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
817                         i += skip;
818                         j = 0;
819                         if (i > end)
820                                 break;
821                 }
822                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
823                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
824                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
825                          start, i, end);
826
827                 if (!mapping_has_pages(mapping)) {
828                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
829                         break;
830                 }
831
832                 cond_resched();
833
834                 page = find_get_page(mapping, i);
835                 if (page == NULL)
836                         continue;
837                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
838                                i, tmpex.l_extent.start);
839                 lock_page(page);
840
841                 /* page->mapping to check with racing against teardown */
842                 if (!discard && clear_page_dirty_for_io(page)) {
843                         rc = ll_call_writepage(inode, page);
844                         /* either waiting for io to complete or reacquiring
845                          * the lock that the failed writepage released */
846                         lock_page(page);
847                         wait_on_page_writeback(page);
848                         if (rc != 0) {
849                                 CERROR("writepage inode %lu(%p) of page %p "
850                                        "failed: %d\n", inode->i_ino, inode,
851                                        page, rc);
852                                 if (rc == -ENOSPC)
853                                         set_bit(AS_ENOSPC, &mapping->flags);
854                                 else
855                                         set_bit(AS_EIO, &mapping->flags);
856                         }
857                 }
858
859                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
860                 /* check to see if another DLM lock covers this page b=2765 */
861                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
862                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
863                                       LDLM_FL_TEST_LOCK,
864                                       &lock->l_resource->lr_name, LDLM_EXTENT,
865                                       &tmpex, LCK_PR | LCK_PW, &lockh);
866
867                 if (rc2 <= 0 && page->mapping != NULL) {
868                         struct ll_async_page *llap = llap_cast_private(page);
869                         /* checking again to account for writeback's
870                          * lock_page() */
871                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
872                         if (llap)
873                                 ll_ra_accounting(llap, mapping);
874                         ll_truncate_complete_page(page);
875                 }
876                 unlock_page(page);
877                 page_cache_release(page);
878         }
879         LASSERTF(tmpex.l_extent.start <=
880                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
881                   lock->l_policy_data.l_extent.end + 1),
882                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
883                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
884                  start, i, end);
885         EXIT;
886 }
887
888 static int ll_extent_lock_callback(struct ldlm_lock *lock,
889                                    struct ldlm_lock_desc *new, void *data,
890                                    int flag)
891 {
892         struct lustre_handle lockh = { 0 };
893         int rc;
894         ENTRY;
895
896         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
897                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
898                 LBUG();
899         }
900
901         switch (flag) {
902         case LDLM_CB_BLOCKING:
903                 ldlm_lock2handle(lock, &lockh);
904                 rc = ldlm_cli_cancel(&lockh);
905                 if (rc != ELDLM_OK)
906                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
907                 break;
908         case LDLM_CB_CANCELING: {
909                 struct inode *inode;
910                 struct ll_inode_info *lli;
911                 struct lov_stripe_md *lsm;
912                 int stripe;
913                 __u64 kms;
914
915                 /* This lock wasn't granted, don't try to evict pages */
916                 if (lock->l_req_mode != lock->l_granted_mode)
917                         RETURN(0);
918
919                 inode = ll_inode_from_lock(lock);
920                 if (inode == NULL)
921                         RETURN(0);
922                 lli = ll_i2info(inode);
923                 if (lli == NULL)
924                         goto iput;
925                 if (lli->lli_smd == NULL)
926                         goto iput;
927                 lsm = lli->lli_smd;
928
929                 stripe = ll_lock_to_stripe_offset(inode, lock);
930                 if (stripe < 0)
931                         goto iput;
932
933                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
934
935                 lov_stripe_lock(lsm);
936                 lock_res_and_lock(lock);
937                 kms = ldlm_extent_shift_kms(lock,
938                                             lsm->lsm_oinfo[stripe]->loi_kms);
939
940                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
941                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
942                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
943                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
944                 unlock_res_and_lock(lock);
945                 lov_stripe_unlock(lsm);
946         iput:
947                 iput(inode);
948                 break;
949         }
950         default:
951                 LBUG();
952         }
953
954         RETURN(0);
955 }
956
957 #if 0
958 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
959 {
960         /* XXX ALLOCATE - 160 bytes */
961         struct inode *inode = ll_inode_from_lock(lock);
962         struct ll_inode_info *lli = ll_i2info(inode);
963         struct lustre_handle lockh = { 0 };
964         struct ost_lvb *lvb;
965         int stripe;
966         ENTRY;
967
968         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
969                      LDLM_FL_BLOCK_CONV)) {
970                 LBUG(); /* not expecting any blocked async locks yet */
971                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
972                            "lock, returning");
973                 ldlm_lock_dump(D_OTHER, lock, 0);
974                 ldlm_reprocess_all(lock->l_resource);
975                 RETURN(0);
976         }
977
978         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
979
980         stripe = ll_lock_to_stripe_offset(inode, lock);
981         if (stripe < 0)
982                 goto iput;
983
984         if (lock->l_lvb_len) {
985                 struct lov_stripe_md *lsm = lli->lli_smd;
986                 __u64 kms;
987                 lvb = lock->l_lvb_data;
988                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
989
990                 lock_res_and_lock(lock);
991                 ll_inode_size_lock(inode, 1);
992                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
993                 kms = ldlm_extent_shift_kms(NULL, kms);
994                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
995                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
996                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
997                 lsm->lsm_oinfo[stripe].loi_kms = kms;
998                 ll_inode_size_unlock(inode, 1);
999                 unlock_res_and_lock(lock);
1000         }
1001
1002 iput:
1003         iput(inode);
1004         wake_up(&lock->l_waitq);
1005
1006         ldlm_lock2handle(lock, &lockh);
1007         ldlm_lock_decref(&lockh, LCK_PR);
1008         RETURN(0);
1009 }
1010 #endif
1011
1012 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
1013 {
1014         struct ptlrpc_request *req = reqp;
1015         struct inode *inode = ll_inode_from_lock(lock);
1016         struct ll_inode_info *lli;
1017         struct lov_stripe_md *lsm;
1018         struct ost_lvb *lvb;
1019         int rc, stripe;
1020         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
1021         ENTRY;
1022
1023         if (inode == NULL)
1024                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
1025         lli = ll_i2info(inode);
1026         if (lli == NULL)
1027                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1028         lsm = lli->lli_smd;
1029         if (lsm == NULL)
1030                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1031
1032         /* First, find out which stripe index this lock corresponds to. */
1033         stripe = ll_lock_to_stripe_offset(inode, lock);
1034         if (stripe < 0)
1035                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1036
1037         rc = lustre_pack_reply(req, 2, size, NULL);
1038         if (rc) {
1039                 CERROR("lustre_pack_reply: %d\n", rc);
1040                 GOTO(iput, rc);
1041         }
1042
1043         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
1044         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1045         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1046         lvb->lvb_atime = LTIME_S(inode->i_atime);
1047         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1048
1049         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1050                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1051                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1052                    lvb->lvb_atime, lvb->lvb_ctime);
1053  iput:
1054         iput(inode);
1055
1056  out:
1057         /* These errors are normal races, so we don't want to fill the console
1058          * with messages by calling ptlrpc_error() */
1059         if (rc == -ELDLM_NO_LOCK_DATA)
1060                 lustre_pack_reply(req, 1, NULL, NULL);
1061
1062         req->rq_status = rc;
1063         return rc;
1064 }
1065
1066 static void ll_merge_lvb(struct inode *inode)
1067 {
1068         struct ll_inode_info *lli = ll_i2info(inode);
1069         struct ll_sb_info *sbi = ll_i2sbi(inode);
1070         struct ost_lvb lvb;
1071         ENTRY;
1072
1073         ll_inode_size_lock(inode, 1);
1074         inode_init_lvb(inode, &lvb);
1075         obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1076         i_size_write(inode, lvb.lvb_size);
1077         inode->i_blocks = lvb.lvb_blocks;
1078         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1079         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1080         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1081         ll_inode_size_unlock(inode, 1);
1082         EXIT;
1083 }
1084
1085 int ll_local_size(struct inode *inode)
1086 {
1087         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1088         struct ll_inode_info *lli = ll_i2info(inode);
1089         struct ll_sb_info *sbi = ll_i2sbi(inode);
1090         struct lustre_handle lockh = { 0 };
1091         int flags = 0;
1092         int rc;
1093         ENTRY;
1094
1095         if (lli->lli_smd->lsm_stripe_count == 0)
1096                 RETURN(0);
1097
1098         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1099                        &policy, LCK_PR, &flags, inode, &lockh);
1100         if (rc < 0)
1101                 RETURN(rc);
1102         else if (rc == 0)
1103                 RETURN(-ENODATA);
1104
1105         ll_merge_lvb(inode);
1106         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1107         RETURN(0);
1108 }
1109
1110 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1111                      lstat_t *st)
1112 {
1113         struct lustre_handle lockh = { 0 };
1114         struct ldlm_enqueue_info einfo = { 0 };
1115         struct obd_info oinfo = { { { 0 } } };
1116         struct ost_lvb lvb;
1117         int rc;
1118
1119         ENTRY;
1120
1121         einfo.ei_type = LDLM_EXTENT;
1122         einfo.ei_mode = LCK_PR;
1123         einfo.ei_cb_bl = ll_extent_lock_callback;
1124         einfo.ei_cb_cp = ldlm_completion_ast;
1125         einfo.ei_cb_gl = ll_glimpse_callback;
1126         einfo.ei_cbdata = NULL;
1127
1128         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1129         oinfo.oi_lockh = &lockh;
1130         oinfo.oi_md = lsm;
1131         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1132
1133         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1134         if (rc == -ENOENT)
1135                 RETURN(rc);
1136         if (rc != 0) {
1137                 CERROR("obd_enqueue returned rc %d, "
1138                        "returning -EIO\n", rc);
1139                 RETURN(rc > 0 ? -EIO : rc);
1140         }
1141
1142         lov_stripe_lock(lsm);
1143         memset(&lvb, 0, sizeof(lvb));
1144         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1145         st->st_size = lvb.lvb_size;
1146         st->st_blocks = lvb.lvb_blocks;
1147         st->st_mtime = lvb.lvb_mtime;
1148         st->st_atime = lvb.lvb_atime;
1149         st->st_ctime = lvb.lvb_ctime;
1150         lov_stripe_unlock(lsm);
1151
1152         RETURN(rc);
1153 }
1154
1155 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1156  * file (because it prefers KMS over RSS when larger) */
1157 int ll_glimpse_size(struct inode *inode, int ast_flags)
1158 {
1159         struct ll_inode_info *lli = ll_i2info(inode);
1160         struct ll_sb_info *sbi = ll_i2sbi(inode);
1161         struct lustre_handle lockh = { 0 };
1162         struct ldlm_enqueue_info einfo = { 0 };
1163         struct obd_info oinfo = { { { 0 } } };
1164         int rc;
1165         ENTRY;
1166
1167         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1168                 RETURN(0);
1169
1170         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1171
1172         if (!lli->lli_smd) {
1173                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1174                 RETURN(0);
1175         }
1176
1177         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1178          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1179          *       won't revoke any conflicting DLM locks held. Instead,
1180          *       ll_glimpse_callback() will be called on each client
1181          *       holding a DLM lock against this file, and resulting size
1182          *       will be returned for each stripe. DLM lock on [0, EOF] is
1183          *       acquired only if there were no conflicting locks. */
1184         einfo.ei_type = LDLM_EXTENT;
1185         einfo.ei_mode = LCK_PR;
1186         einfo.ei_cb_bl = ll_extent_lock_callback;
1187         einfo.ei_cb_cp = ldlm_completion_ast;
1188         einfo.ei_cb_gl = ll_glimpse_callback;
1189         einfo.ei_cbdata = inode;
1190
1191         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1192         oinfo.oi_lockh = &lockh;
1193         oinfo.oi_md = lli->lli_smd;
1194         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1195
1196         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1197         if (rc == -ENOENT)
1198                 RETURN(rc);
1199         if (rc != 0) {
1200                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1201                 RETURN(rc > 0 ? -EIO : rc);
1202         }
1203
1204         ll_merge_lvb(inode);
1205
1206         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1207                i_size_read(inode), inode->i_blocks);
1208
1209         RETURN(rc);
1210 }
1211
1212 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1213                    struct lov_stripe_md *lsm, int mode,
1214                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1215                    int ast_flags)
1216 {
1217         struct ll_sb_info *sbi = ll_i2sbi(inode);
1218         struct ost_lvb lvb;
1219         struct ldlm_enqueue_info einfo = { 0 };
1220         struct obd_info oinfo = { { { 0 } } };
1221         int rc;
1222         ENTRY;
1223
1224         LASSERT(!lustre_handle_is_used(lockh));
1225         LASSERT(lsm != NULL);
1226
1227         /* don't drop the mmapped file to LRU */
1228         if (mapping_mapped(inode->i_mapping))
1229                 ast_flags |= LDLM_FL_NO_LRU;
1230
1231         /* XXX phil: can we do this?  won't it screw the file size up? */
1232         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1233             (sbi->ll_flags & LL_SBI_NOLCK))
1234                 RETURN(0);
1235
1236         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1237                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1238
1239         einfo.ei_type = LDLM_EXTENT;
1240         einfo.ei_mode = mode;
1241         einfo.ei_cb_bl = ll_extent_lock_callback;
1242         einfo.ei_cb_cp = ldlm_completion_ast;
1243         einfo.ei_cb_gl = ll_glimpse_callback;
1244         einfo.ei_cbdata = inode;
1245
1246         oinfo.oi_policy = *policy;
1247         oinfo.oi_lockh = lockh;
1248         oinfo.oi_md = lsm;
1249         oinfo.oi_flags = ast_flags;
1250
1251         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1252         *policy = oinfo.oi_policy;
1253         if (rc > 0)
1254                 rc = -EIO;
1255
1256         ll_inode_size_lock(inode, 1);
1257         inode_init_lvb(inode, &lvb);
1258         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1259
1260         if (policy->l_extent.start == 0 &&
1261             policy->l_extent.end == OBD_OBJECT_EOF) {
1262                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1263                  * the kms under both a DLM lock and the
1264                  * ll_inode_size_lock().  If we don't get the
1265                  * ll_inode_size_lock() here we can match the DLM lock and
1266                  * reset i_size from the kms before the truncating path has
1267                  * updated the kms.  generic_file_write can then trust the
1268                  * stale i_size when doing appending writes and effectively
1269                  * cancel the result of the truncate.  Getting the
1270                  * ll_inode_size_lock() after the enqueue maintains the DLM
1271                  * -> ll_inode_size_lock() acquiring order. */
1272                 i_size_write(inode, lvb.lvb_size);
1273                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1274                        inode->i_ino, i_size_read(inode));
1275         }
1276
1277         if (rc == 0) {
1278                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1279                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1280                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1281         }
1282         ll_inode_size_unlock(inode, 1);
1283
1284         RETURN(rc);
1285 }
1286
1287 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1288                      struct lov_stripe_md *lsm, int mode,
1289                      struct lustre_handle *lockh)
1290 {
1291         struct ll_sb_info *sbi = ll_i2sbi(inode);
1292         int rc;
1293         ENTRY;
1294
1295         /* XXX phil: can we do this?  won't it screw the file size up? */
1296         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1297             (sbi->ll_flags & LL_SBI_NOLCK))
1298                 RETURN(0);
1299
1300         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1301
1302         RETURN(rc);
1303 }
1304
1305 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1306                             loff_t *ppos)
1307 {
1308         struct inode *inode = file->f_dentry->d_inode;
1309         struct ll_inode_info *lli = ll_i2info(inode);
1310         struct lov_stripe_md *lsm = lli->lli_smd;
1311         struct ll_sb_info *sbi = ll_i2sbi(inode);
1312         struct ll_lock_tree tree;
1313         struct ll_lock_tree_node *node;
1314         struct ost_lvb lvb;
1315         struct ll_ra_read bead;
1316         int rc, ra = 0;
1317         loff_t end;
1318         ssize_t retval, chunk, sum = 0;
1319
1320         __u64 kms;
1321         ENTRY;
1322         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1323                inode->i_ino, inode->i_generation, inode, count, *ppos);
1324         /* "If nbyte is 0, read() will return 0 and have no other results."
1325          *                      -- Single Unix Spec */
1326         if (count == 0)
1327                 RETURN(0);
1328
1329         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1330
1331         if (!lsm) {
1332                 /* Read on file with no objects should return zero-filled
1333                  * buffers up to file size (we can get non-zero sizes with
1334                  * mknod + truncate, then opening file for read. This is a
1335                  * common pattern in NFS case, it seems). Bug 6243 */
1336                 int notzeroed;
1337                 /* Since there are no objects on OSTs, we have nothing to get
1338                  * lock on and so we are forced to access inode->i_size
1339                  * unguarded */
1340
1341                 /* Read beyond end of file */
1342                 if (*ppos >= i_size_read(inode))
1343                         RETURN(0);
1344
1345                 if (count > i_size_read(inode) - *ppos)
1346                         count = i_size_read(inode) - *ppos;
1347                 /* Make sure to correctly adjust the file pos pointer for
1348                  * EFAULT case */
1349                 notzeroed = clear_user(buf, count);
1350                 count -= notzeroed;
1351                 *ppos += count;
1352                 if (!count)
1353                         RETURN(-EFAULT);
1354                 RETURN(count);
1355         }
1356
1357 repeat:
1358         if (sbi->ll_max_rw_chunk != 0) {
1359                 /* first, let's know the end of the current stripe */
1360                 end = *ppos;
1361                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
1362                                 (obd_off *)&end);
1363
1364                 /* correct, the end is beyond the request */
1365                 if (end > *ppos + count - 1)
1366                         end = *ppos + count - 1;
1367
1368                 /* and chunk shouldn't be too large even if striping is wide */
1369                 if (end - *ppos > sbi->ll_max_rw_chunk)
1370                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1371         } else {
1372                 end = *ppos + count - 1;
1373         }
1374
1375         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1376         if (IS_ERR(node)){
1377                 GOTO(out, retval = PTR_ERR(node));
1378         }
1379
1380         tree.lt_fd = LUSTRE_FPRIVATE(file);
1381         rc = ll_tree_lock(&tree, node, buf, count,
1382                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1383         if (rc != 0)
1384                 GOTO(out, retval = rc);
1385
1386         ll_inode_size_lock(inode, 1);
1387         /*
1388          * Consistency guarantees: following possibilities exist for the
1389          * relation between region being read and real file size at this
1390          * moment:
1391          *
1392          *  (A): the region is completely inside of the file;
1393          *
1394          *  (B-x): x bytes of region are inside of the file, the rest is
1395          *  outside;
1396          *
1397          *  (C): the region is completely outside of the file.
1398          *
1399          * This classification is stable under DLM lock acquired by
1400          * ll_tree_lock() above, because to change class, other client has to
1401          * take DLM lock conflicting with our lock. Also, any updates to
1402          * ->i_size by other threads on this client are serialized by
1403          * ll_inode_size_lock(). This guarantees that short reads are handled
1404          * correctly in the face of concurrent writes and truncates.
1405          */
1406         inode_init_lvb(inode, &lvb);
1407         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1408         kms = lvb.lvb_size;
1409         if (*ppos + count - 1 > kms) {
1410                 /* A glimpse is necessary to determine whether we return a
1411                  * short read (B) or some zeroes at the end of the buffer (C) */
1412                 ll_inode_size_unlock(inode, 1);
1413                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1414                 if (retval) {
1415                         ll_tree_unlock(&tree);
1416                         goto out;
1417                 }
1418         } else {
1419                 /* region is within kms and, hence, within real file size (A).
1420                  * We need to increase i_size to cover the read region so that
1421                  * generic_file_read() will do its job, but that doesn't mean
1422                  * the kms size is _correct_, it is only the _minimum_ size.
1423                  * If someone does a stat they will get the correct size which
1424                  * will always be >= the kms value here.  b=11081 */
1425                 if (i_size_read(inode) < kms)
1426                         i_size_write(inode, kms);
1427                 ll_inode_size_unlock(inode, 1);
1428         }
1429
1430         chunk = end - *ppos + 1;
1431         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1432                inode->i_ino, chunk, *ppos, i_size_read(inode));
1433
1434         /* turn off the kernel's read-ahead */
1435         file->f_ra.ra_pages = 0;
1436
1437         /* initialize read-ahead window once per syscall */
1438         if (ra == 0) {
1439                 ra = 1;
1440                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1441                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1442                 ll_ra_read_in(file, &bead);
1443         }
1444
1445         /* BUG: 5972 */
1446         file_accessed(file);
1447         retval = generic_file_read(file, buf, chunk, ppos);
1448         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1449
1450         ll_tree_unlock(&tree);
1451
1452         if (retval > 0) {
1453                 buf += retval;
1454                 count -= retval;
1455                 sum += retval;
1456                 if (retval == chunk && count > 0)
1457                         goto repeat;
1458         }
1459
1460  out:
1461         if (ra != 0)
1462                 ll_ra_read_ex(file, &bead);
1463         retval = (sum > 0) ? sum : retval;
1464         RETURN(retval);
1465 }
1466
1467 /*
1468  * Write to a file (through the page cache).
1469  */
1470 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1471                              loff_t *ppos)
1472 {
1473         struct inode *inode = file->f_dentry->d_inode;
1474         struct ll_sb_info *sbi = ll_i2sbi(inode);
1475         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1476         struct ll_lock_tree tree;
1477         struct ll_lock_tree_node *node;
1478         loff_t maxbytes = ll_file_maxbytes(inode);
1479         loff_t lock_start, lock_end, end;
1480         ssize_t retval, chunk, sum = 0;
1481         int rc;
1482         ENTRY;
1483
1484         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1485                inode->i_ino, inode->i_generation, inode, count, *ppos);
1486
1487         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1488
1489         /* POSIX, but surprised the VFS doesn't check this already */
1490         if (count == 0)
1491                 RETURN(0);
1492
1493         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1494          * called on the file, don't fail the below assertion (bug 2388). */
1495         if (file->f_flags & O_LOV_DELAY_CREATE &&
1496             ll_i2info(inode)->lli_smd == NULL)
1497                 RETURN(-EBADF);
1498
1499         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1500
1501         down(&ll_i2info(inode)->lli_write_sem);
1502
1503 repeat:
1504         chunk = 0; /* just to fix gcc's warning */
1505         end = *ppos + count - 1;
1506
1507         if (file->f_flags & O_APPEND) {
1508                 lock_start = 0;
1509                 lock_end = OBD_OBJECT_EOF;
1510         } else if (sbi->ll_max_rw_chunk != 0) {
1511                 /* first, let's know the end of the current stripe */
1512                 end = *ppos;
1513                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1514                                 (obd_off *)&end);
1515
1516                 /* correct, the end is beyond the request */
1517                 if (end > *ppos + count - 1)
1518                         end = *ppos + count - 1;
1519
1520                 /* and chunk shouldn't be too large even if striping is wide */
1521                 if (end - *ppos > sbi->ll_max_rw_chunk)
1522                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1523                 lock_start = *ppos;
1524                 lock_end = end;
1525         } else {
1526                 lock_start = *ppos;
1527                 lock_end = *ppos + count - 1;
1528         }
1529         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1530
1531         if (IS_ERR(node))
1532                 GOTO(out, retval = PTR_ERR(node));
1533
1534         tree.lt_fd = LUSTRE_FPRIVATE(file);
1535         rc = ll_tree_lock(&tree, node, buf, count,
1536                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1537         if (rc != 0)
1538                 GOTO(out, retval = rc);
1539
1540         /* This is ok, g_f_w will overwrite this under i_sem if it races
1541          * with a local truncate, it just makes our maxbyte checking easier.
1542          * The i_size value gets updated in ll_extent_lock() as a consequence
1543          * of the [0,EOF] extent lock we requested above. */
1544         if (file->f_flags & O_APPEND) {
1545                 *ppos = i_size_read(inode);
1546                 end = *ppos + count - 1;
1547         }
1548
1549         if (*ppos >= maxbytes) {
1550                 send_sig(SIGXFSZ, current, 0);
1551                 GOTO(out_unlock, retval = -EFBIG);
1552         }
1553         if (*ppos + count > maxbytes)
1554                 count = maxbytes - *ppos;
1555
1556         /* generic_file_write handles O_APPEND after getting i_mutex */
1557         chunk = end - *ppos + 1;
1558         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1559                inode->i_ino, chunk, *ppos);
1560         retval = generic_file_write(file, buf, chunk, ppos);
1561         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1562
1563 out_unlock:
1564         ll_tree_unlock(&tree);
1565
1566 out:
1567         if (retval > 0) {
1568                 buf += retval;
1569                 count -= retval;
1570                 sum += retval;
1571                 if (retval == chunk && count > 0)
1572                         goto repeat;
1573         }
1574
1575         up(&ll_i2info(inode)->lli_write_sem);
1576
1577         retval = (sum > 0) ? sum : retval;
1578         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1579                            retval > 0 ? retval : 0);
1580         RETURN(retval);
1581 }
1582
1583 /*
1584  * Send file content (through pagecache) somewhere with helper
1585  */
1586 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1587                                 read_actor_t actor, void *target)
1588 {
1589         struct inode *inode = in_file->f_dentry->d_inode;
1590         struct ll_inode_info *lli = ll_i2info(inode);
1591         struct lov_stripe_md *lsm = lli->lli_smd;
1592         struct ll_lock_tree tree;
1593         struct ll_lock_tree_node *node;
1594         struct ost_lvb lvb;
1595         struct ll_ra_read bead;
1596         int rc;
1597         ssize_t retval;
1598         __u64 kms;
1599         ENTRY;
1600         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1601                inode->i_ino, inode->i_generation, inode, count, *ppos);
1602
1603         /* "If nbyte is 0, read() will return 0 and have no other results."
1604          *                      -- Single Unix Spec */
1605         if (count == 0)
1606                 RETURN(0);
1607
1608         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1609         /* turn off the kernel's read-ahead */
1610         in_file->f_ra.ra_pages = 0;
1611
1612         /* File with no objects, nothing to lock */
1613         if (!lsm)
1614                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1615
1616         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1617         if (IS_ERR(node))
1618                 RETURN(PTR_ERR(node));
1619
1620         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1621         rc = ll_tree_lock(&tree, node, NULL, count,
1622                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1623         if (rc != 0)
1624                 RETURN(rc);
1625
1626         ll_inode_size_lock(inode, 1);
1627         /*
1628          * Consistency guarantees: following possibilities exist for the
1629          * relation between region being read and real file size at this
1630          * moment:
1631          *
1632          *  (A): the region is completely inside of the file;
1633          *
1634          *  (B-x): x bytes of region are inside of the file, the rest is
1635          *  outside;
1636          *
1637          *  (C): the region is completely outside of the file.
1638          *
1639          * This classification is stable under DLM lock acquired by
1640          * ll_tree_lock() above, because to change class, other client has to
1641          * take DLM lock conflicting with our lock. Also, any updates to
1642          * ->i_size by other threads on this client are serialized by
1643          * ll_inode_size_lock(). This guarantees that short reads are handled
1644          * correctly in the face of concurrent writes and truncates.
1645          */
1646         inode_init_lvb(inode, &lvb);
1647         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1648         kms = lvb.lvb_size;
1649         if (*ppos + count - 1 > kms) {
1650                 /* A glimpse is necessary to determine whether we return a
1651                  * short read (B) or some zeroes at the end of the buffer (C) */
1652                 ll_inode_size_unlock(inode, 1);
1653                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1654                 if (retval)
1655                         goto out;
1656         } else {
1657                 /* region is within kms and, hence, within real file size (A) */
1658                 i_size_write(inode, kms);
1659                 ll_inode_size_unlock(inode, 1);
1660         }
1661
1662         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1663                inode->i_ino, count, *ppos, i_size_read(inode));
1664
1665         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1666         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1667         ll_ra_read_in(in_file, &bead);
1668         /* BUG: 5972 */
1669         file_accessed(in_file);
1670         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1671         ll_ra_read_ex(in_file, &bead);
1672
1673  out:
1674         ll_tree_unlock(&tree);
1675         RETURN(retval);
1676 }
1677
1678 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1679                                unsigned long arg)
1680 {
1681         struct ll_inode_info *lli = ll_i2info(inode);
1682         struct obd_export *exp = ll_i2dtexp(inode);
1683         struct ll_recreate_obj ucreatp;
1684         struct obd_trans_info oti = { 0 };
1685         struct obdo *oa = NULL;
1686         int lsm_size;
1687         int rc = 0;
1688         struct lov_stripe_md *lsm, *lsm2;
1689         ENTRY;
1690
1691         if (!capable (CAP_SYS_ADMIN))
1692                 RETURN(-EPERM);
1693
1694         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1695                             sizeof(struct ll_recreate_obj));
1696         if (rc) {
1697                 RETURN(-EFAULT);
1698         }
1699         OBDO_ALLOC(oa);
1700         if (oa == NULL)
1701                 RETURN(-ENOMEM);
1702
1703         down(&lli->lli_size_sem);
1704         lsm = lli->lli_smd;
1705         if (lsm == NULL)
1706                 GOTO(out, rc = -ENOENT);
1707         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1708                    (lsm->lsm_stripe_count));
1709
1710         OBD_ALLOC(lsm2, lsm_size);
1711         if (lsm2 == NULL)
1712                 GOTO(out, rc = -ENOMEM);
1713
1714         oa->o_id = ucreatp.lrc_id;
1715         oa->o_gr = ucreatp.lrc_group;
1716         oa->o_nlink = ucreatp.lrc_ost_idx;
1717         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1718         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1719         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1720                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1721
1722         oti.oti_objid = NULL;
1723         memcpy(lsm2, lsm, lsm_size);
1724         rc = obd_create(exp, oa, &lsm2, &oti);
1725
1726         OBD_FREE(lsm2, lsm_size);
1727         GOTO(out, rc);
1728 out:
1729         up(&lli->lli_size_sem);
1730         OBDO_FREE(oa);
1731         return rc;
1732 }
1733
1734 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1735                              int flags, struct lov_user_md *lum, int lum_size)
1736 {
1737         struct ll_inode_info *lli = ll_i2info(inode);
1738         struct lov_stripe_md *lsm;
1739         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1740         int rc = 0;
1741         ENTRY;
1742
1743         down(&lli->lli_size_sem);
1744         lsm = lli->lli_smd;
1745         if (lsm) {
1746                 up(&lli->lli_size_sem);
1747                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1748                        inode->i_ino);
1749                 RETURN(-EEXIST);
1750         }
1751
1752         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1753         if (rc)
1754                 GOTO(out, rc);
1755         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1756                 GOTO(out_req_free, rc = -ENOENT);
1757         rc = oit.d.lustre.it_status;
1758         if (rc < 0)
1759                 GOTO(out_req_free, rc);
1760
1761         ll_release_openhandle(file->f_dentry, &oit);
1762
1763  out:
1764         up(&lli->lli_size_sem);
1765         ll_intent_release(&oit);
1766         RETURN(rc);
1767 out_req_free:
1768         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1769         goto out;
1770 }
1771
1772 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1773                              struct lov_mds_md **lmmp, int *lmm_size, 
1774                              struct ptlrpc_request **request)
1775 {
1776         struct ll_sb_info *sbi = ll_i2sbi(inode);
1777         struct mdt_body  *body;
1778         struct lov_mds_md *lmm = NULL;
1779         struct ptlrpc_request *req = NULL;
1780         struct obd_capa *oc;
1781         int rc, lmmsize;
1782
1783         rc = ll_get_max_mdsize(sbi, &lmmsize);
1784         if (rc)
1785                 RETURN(rc);
1786
1787         oc = ll_mdscapa_get(inode);
1788         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1789                              oc, filename, strlen(filename) + 1,
1790                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize, &req);
1791         capa_put(oc);
1792         if (rc < 0) {
1793                 CDEBUG(D_INFO, "md_getattr_name failed "
1794                        "on %s: rc %d\n", filename, rc);
1795                 GOTO(out, rc);
1796         }
1797
1798         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
1799         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1800         /* swabbed by mdc_getattr_name */
1801         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF));
1802
1803         lmmsize = body->eadatasize;
1804
1805         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1806                         lmmsize == 0) {
1807                 GOTO(out, rc = -ENODATA);
1808         }
1809
1810         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, lmmsize);
1811         LASSERT(lmm != NULL);
1812         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF + 1));
1813
1814         /*
1815          * This is coming from the MDS, so is probably in
1816          * little endian.  We convert it to host endian before
1817          * passing it to userspace.
1818          */
1819         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1820                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1821                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1822         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1823                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1824         }
1825
1826         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1827                 struct lov_stripe_md *lsm;
1828                 struct lov_user_md_join *lmj;
1829                 int lmj_size, i, aindex = 0;
1830
1831                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1832                 if (rc < 0)
1833                         GOTO(out, rc = -ENOMEM);
1834                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1835                 if (rc)
1836                         GOTO(out_free_memmd, rc);
1837
1838                 lmj_size = sizeof(struct lov_user_md_join) +
1839                            lsm->lsm_stripe_count *
1840                            sizeof(struct lov_user_ost_data_join);
1841                 OBD_ALLOC(lmj, lmj_size);
1842                 if (!lmj)
1843                         GOTO(out_free_memmd, rc = -ENOMEM);
1844
1845                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1846                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1847                         struct lov_extent *lex =
1848                                 &lsm->lsm_array->lai_ext_array[aindex];
1849
1850                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1851                                 aindex ++;
1852                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1853                                         LPU64" len %d\n", aindex, i,
1854                                         lex->le_start, (int)lex->le_len);
1855                         lmj->lmm_objects[i].l_extent_start =
1856                                 lex->le_start;
1857
1858                         if ((int)lex->le_len == -1)
1859                                 lmj->lmm_objects[i].l_extent_end = -1;
1860                         else
1861                                 lmj->lmm_objects[i].l_extent_end =
1862                                         lex->le_start + lex->le_len;
1863                         lmj->lmm_objects[i].l_object_id =
1864                                 lsm->lsm_oinfo[i]->loi_id;
1865                         lmj->lmm_objects[i].l_object_gr =
1866                                 lsm->lsm_oinfo[i]->loi_gr;
1867                         lmj->lmm_objects[i].l_ost_gen =
1868                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1869                         lmj->lmm_objects[i].l_ost_idx =
1870                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1871                 }
1872                 lmm = (struct lov_mds_md *)lmj;
1873                 lmmsize = lmj_size;
1874 out_free_memmd:
1875                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
1876         }
1877 out:
1878         *lmmp = lmm;
1879         *lmm_size = lmmsize;
1880         *request = req;
1881         return rc;
1882 }
1883
1884 static int ll_lov_setea(struct inode *inode, struct file *file,
1885                             unsigned long arg)
1886 {
1887         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1888         struct lov_user_md  *lump;
1889         int lum_size = sizeof(struct lov_user_md) +
1890                        sizeof(struct lov_user_ost_data);
1891         int rc;
1892         ENTRY;
1893
1894         if (!capable (CAP_SYS_ADMIN))
1895                 RETURN(-EPERM);
1896
1897         OBD_ALLOC(lump, lum_size);
1898         if (lump == NULL) {
1899                 RETURN(-ENOMEM);
1900         }
1901         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1902         if (rc) {
1903                 OBD_FREE(lump, lum_size);
1904                 RETURN(-EFAULT);
1905         }
1906
1907         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1908
1909         OBD_FREE(lump, lum_size);
1910         RETURN(rc);
1911 }
1912
1913 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1914                             unsigned long arg)
1915 {
1916         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1917         int rc;
1918         int flags = FMODE_WRITE;
1919         ENTRY;
1920
1921         /* Bug 1152: copy properly when this is no longer true */
1922         LASSERT(sizeof(lum) == sizeof(*lump));
1923         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1924         rc = copy_from_user(&lum, lump, sizeof(lum));
1925         if (rc)
1926                 RETURN(-EFAULT);
1927
1928         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1929         if (rc == 0) {
1930                  put_user(0, &lump->lmm_stripe_count);
1931                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1932                                     0, ll_i2info(inode)->lli_smd, lump);
1933         }
1934         RETURN(rc);
1935 }
1936
1937 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1938 {
1939         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1940
1941         if (!lsm)
1942                 RETURN(-ENODATA);
1943
1944         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1945                             (void *)arg);
1946 }
1947
1948 static int ll_get_grouplock(struct inode *inode, struct file *file,
1949                             unsigned long arg)
1950 {
1951         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1952         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1953                                                     .end = OBD_OBJECT_EOF}};
1954         struct lustre_handle lockh = { 0 };
1955         struct ll_inode_info *lli = ll_i2info(inode);
1956         struct lov_stripe_md *lsm = lli->lli_smd;
1957         int flags = 0, rc;
1958         ENTRY;
1959
1960         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1961                 RETURN(-EINVAL);
1962         }
1963
1964         policy.l_extent.gid = arg;
1965         if (file->f_flags & O_NONBLOCK)
1966                 flags = LDLM_FL_BLOCK_NOWAIT;
1967
1968         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1969         if (rc)
1970                 RETURN(rc);
1971
1972         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1973         fd->fd_gid = arg;
1974         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1975
1976         RETURN(0);
1977 }
1978
1979 static int ll_put_grouplock(struct inode *inode, struct file *file,
1980                             unsigned long arg)
1981 {
1982         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1983         struct ll_inode_info *lli = ll_i2info(inode);
1984         struct lov_stripe_md *lsm = lli->lli_smd;
1985         int rc;
1986         ENTRY;
1987
1988         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1989                 /* Ugh, it's already unlocked. */
1990                 RETURN(-EINVAL);
1991         }
1992
1993         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1994                 RETURN(-EINVAL);
1995
1996         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1997
1998         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1999         if (rc)
2000                 RETURN(rc);
2001
2002         fd->fd_gid = 0;
2003         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2004
2005         RETURN(0);
2006 }
2007
2008 static int join_sanity_check(struct inode *head, struct inode *tail)
2009 {
2010         ENTRY;
2011         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2012                 CERROR("server do not support join \n");
2013                 RETURN(-EINVAL);
2014         }
2015         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2016                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2017                        head->i_ino, tail->i_ino);
2018                 RETURN(-EINVAL);
2019         }
2020         if (head->i_ino == tail->i_ino) {
2021                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2022                 RETURN(-EINVAL);
2023         }
2024         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2025                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2026                 RETURN(-EINVAL);
2027         }
2028         RETURN(0);
2029 }
2030
2031 static int join_file(struct inode *head_inode, struct file *head_filp,
2032                      struct file *tail_filp)
2033 {
2034         struct dentry *tail_dentry = tail_filp->f_dentry;
2035         struct lookup_intent oit = {.it_op = IT_OPEN,
2036                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2037         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2038                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2039
2040         struct lustre_handle lockh;
2041         struct md_op_data *op_data;
2042         int    rc;
2043         loff_t data;
2044         ENTRY;
2045
2046         tail_dentry = tail_filp->f_dentry;
2047
2048         data = i_size_read(head_inode);
2049         op_data = ll_prep_md_op_data(NULL, head_inode,
2050                                      tail_dentry->d_parent->d_inode,
2051                                      tail_dentry->d_name.name,
2052                                      tail_dentry->d_name.len, 0,
2053                                      LUSTRE_OPC_ANY, &data);
2054         if (IS_ERR(op_data))
2055                 RETURN(PTR_ERR(op_data));
2056
2057         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2058                          op_data, &lockh, NULL, 0, 0);
2059
2060         ll_finish_md_op_data(op_data);
2061         if (rc < 0)
2062                 GOTO(out, rc);
2063
2064         rc = oit.d.lustre.it_status;
2065
2066         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2067                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2068                 ptlrpc_req_finished((struct ptlrpc_request *)
2069                                     oit.d.lustre.it_data);
2070                 GOTO(out, rc);
2071         }
2072
2073         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2074                                            * away */
2075                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2076                 oit.d.lustre.it_lock_mode = 0;
2077         }
2078         ll_release_openhandle(head_filp->f_dentry, &oit);
2079 out:
2080         ll_intent_release(&oit);
2081         RETURN(rc);
2082 }
2083
2084 static int ll_file_join(struct inode *head, struct file *filp,
2085                         char *filename_tail)
2086 {
2087         struct inode *tail = NULL, *first = NULL, *second = NULL;
2088         struct dentry *tail_dentry;
2089         struct file *tail_filp, *first_filp, *second_filp;
2090         struct ll_lock_tree first_tree, second_tree;
2091         struct ll_lock_tree_node *first_node, *second_node;
2092         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2093         int rc = 0, cleanup_phase = 0;
2094         ENTRY;
2095
2096         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2097                head->i_ino, head->i_generation, head, filename_tail);
2098
2099         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2100         if (IS_ERR(tail_filp)) {
2101                 CERROR("Can not open tail file %s", filename_tail);
2102                 rc = PTR_ERR(tail_filp);
2103                 GOTO(cleanup, rc);
2104         }
2105         tail = igrab(tail_filp->f_dentry->d_inode);
2106
2107         tlli = ll_i2info(tail);
2108         tail_dentry = tail_filp->f_dentry;
2109         LASSERT(tail_dentry);
2110         cleanup_phase = 1;
2111
2112         /*reorder the inode for lock sequence*/
2113         first = head->i_ino > tail->i_ino ? head : tail;
2114         second = head->i_ino > tail->i_ino ? tail : head;
2115         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2116         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2117
2118         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2119                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2120         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2121         if (IS_ERR(first_node)){
2122                 rc = PTR_ERR(first_node);
2123                 GOTO(cleanup, rc);
2124         }
2125         first_tree.lt_fd = first_filp->private_data;
2126         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2127         if (rc != 0)
2128                 GOTO(cleanup, rc);
2129         cleanup_phase = 2;
2130
2131         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2132         if (IS_ERR(second_node)){
2133                 rc = PTR_ERR(second_node);
2134                 GOTO(cleanup, rc);
2135         }
2136         second_tree.lt_fd = second_filp->private_data;
2137         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2138         if (rc != 0)
2139                 GOTO(cleanup, rc);
2140         cleanup_phase = 3;
2141
2142         rc = join_sanity_check(head, tail);
2143         if (rc)
2144                 GOTO(cleanup, rc);
2145
2146         rc = join_file(head, filp, tail_filp);
2147         if (rc)
2148                 GOTO(cleanup, rc);
2149 cleanup:
2150         switch (cleanup_phase) {
2151         case 3:
2152                 ll_tree_unlock(&second_tree);
2153                 obd_cancel_unused(ll_i2dtexp(second),
2154                                   ll_i2info(second)->lli_smd, 0, NULL);
2155         case 2:
2156                 ll_tree_unlock(&first_tree);
2157                 obd_cancel_unused(ll_i2dtexp(first),
2158                                   ll_i2info(first)->lli_smd, 0, NULL);
2159         case 1:
2160                 filp_close(tail_filp, 0);
2161                 if (tail)
2162                         iput(tail);
2163                 if (head && rc == 0) {
2164                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2165                                        &hlli->lli_smd);
2166                         hlli->lli_smd = NULL;
2167                 }
2168         case 0:
2169                 break;
2170         default:
2171                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2172                 LBUG();
2173         }
2174         RETURN(rc);
2175 }
2176
2177 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2178 {
2179         struct inode *inode = dentry->d_inode;
2180         struct obd_client_handle *och;
2181         int rc;
2182         ENTRY;
2183
2184         LASSERT(inode);
2185
2186         /* Root ? Do nothing. */
2187         if (dentry->d_inode->i_sb->s_root == dentry)
2188                 RETURN(0);
2189
2190         /* No open handle to close? Move away */
2191         if (!it_disposition(it, DISP_OPEN_OPEN))
2192                 RETURN(0);
2193
2194         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2195
2196         OBD_ALLOC(och, sizeof(*och));
2197         if (!och)
2198                 GOTO(out, rc = -ENOMEM);
2199
2200         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2201                     ll_i2info(inode), it, och);
2202
2203         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2204                                        inode, och);
2205  out:
2206         /* this one is in place of ll_file_open */
2207         ptlrpc_req_finished(it->d.lustre.it_data);
2208         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2209         RETURN(rc);
2210 }
2211
2212 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2213                   unsigned long arg)
2214 {
2215         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2216         int flags;
2217         ENTRY;
2218
2219         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2220                inode->i_generation, inode, cmd);
2221         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2222
2223         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2224         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2225                 RETURN(-ENOTTY);
2226
2227         switch(cmd) {
2228         case LL_IOC_GETFLAGS:
2229                 /* Get the current value of the file flags */
2230                 return put_user(fd->fd_flags, (int *)arg);
2231         case LL_IOC_SETFLAGS:
2232         case LL_IOC_CLRFLAGS:
2233                 /* Set or clear specific file flags */
2234                 /* XXX This probably needs checks to ensure the flags are
2235                  *     not abused, and to handle any flag side effects.
2236                  */
2237                 if (get_user(flags, (int *) arg))
2238                         RETURN(-EFAULT);
2239
2240                 if (cmd == LL_IOC_SETFLAGS) {
2241                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2242                             !(file->f_flags & O_DIRECT)) {
2243                                 CERROR("%s: unable to disable locking on "
2244                                        "non-O_DIRECT file\n", current->comm);
2245                                 RETURN(-EINVAL);
2246                         }
2247
2248                         fd->fd_flags |= flags;
2249                 } else {
2250                         fd->fd_flags &= ~flags;
2251                 }
2252                 RETURN(0);
2253         case LL_IOC_LOV_SETSTRIPE:
2254                 RETURN(ll_lov_setstripe(inode, file, arg));
2255         case LL_IOC_LOV_SETEA:
2256                 RETURN(ll_lov_setea(inode, file, arg));
2257         case LL_IOC_LOV_GETSTRIPE:
2258                 RETURN(ll_lov_getstripe(inode, arg));
2259         case LL_IOC_RECREATE_OBJ:
2260                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2261         case EXT3_IOC_GETFLAGS:
2262         case EXT3_IOC_SETFLAGS:
2263                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2264         case EXT3_IOC_GETVERSION_OLD:
2265         case EXT3_IOC_GETVERSION:
2266                 RETURN(put_user(inode->i_generation, (int *)arg));
2267         case LL_IOC_JOIN: {
2268                 char *ftail;
2269                 int rc;
2270
2271                 ftail = getname((const char *)arg);
2272                 if (IS_ERR(ftail))
2273                         RETURN(PTR_ERR(ftail));
2274                 rc = ll_file_join(inode, file, ftail);
2275                 putname(ftail);
2276                 RETURN(rc);
2277         }
2278         case LL_IOC_GROUP_LOCK:
2279                 RETURN(ll_get_grouplock(inode, file, arg));
2280         case LL_IOC_GROUP_UNLOCK:
2281                 RETURN(ll_put_grouplock(inode, file, arg));
2282         case IOC_OBD_STATFS:
2283                 RETURN(ll_obd_statfs(inode, (void *)arg));
2284
2285         /* We need to special case any other ioctls we want to handle,
2286          * to send them to the MDS/OST as appropriate and to properly
2287          * network encode the arg field.
2288         case EXT3_IOC_SETVERSION_OLD:
2289         case EXT3_IOC_SETVERSION:
2290         */
2291         case LL_IOC_FLUSHCTX:
2292                 RETURN(ll_flush_ctx(inode));
2293         case LL_IOC_GETFACL: {
2294                 struct rmtacl_ioctl_data ioc;
2295
2296                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2297                         RETURN(-EFAULT);
2298
2299                 RETURN(ll_ioctl_getfacl(inode, &ioc));
2300         }
2301         case LL_IOC_SETFACL: {
2302                 struct rmtacl_ioctl_data ioc;
2303
2304                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2305                         RETURN(-EFAULT);
2306
2307                 RETURN(ll_ioctl_setfacl(inode, &ioc));
2308         }
2309         default: {
2310                 int err;
2311
2312                 if (LLIOC_STOP == 
2313                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2314                         RETURN(err);
2315
2316                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2317                                      (void *)arg));
2318         }
2319         }
2320 }
2321
2322 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2323 {
2324         struct inode *inode = file->f_dentry->d_inode;
2325         struct ll_inode_info *lli = ll_i2info(inode);
2326         struct lov_stripe_md *lsm = lli->lli_smd;
2327         loff_t retval;
2328         ENTRY;
2329         retval = offset + ((origin == 2) ? i_size_read(inode) :
2330                            (origin == 1) ? file->f_pos : 0);
2331         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2332                inode->i_ino, inode->i_generation, inode, retval, retval,
2333                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2334         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2335
2336         if (origin == 2) { /* SEEK_END */
2337                 int nonblock = 0, rc;
2338
2339                 if (file->f_flags & O_NONBLOCK)
2340                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2341
2342                 if (lsm != NULL) {
2343                         rc = ll_glimpse_size(inode, nonblock);
2344                         if (rc != 0)
2345                                 RETURN(rc);
2346                 }
2347
2348                 ll_inode_size_lock(inode, 0);
2349                 offset += i_size_read(inode);
2350                 ll_inode_size_unlock(inode, 0);
2351         } else if (origin == 1) { /* SEEK_CUR */
2352                 offset += file->f_pos;
2353         }
2354
2355         retval = -EINVAL;
2356         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2357                 if (offset != file->f_pos) {
2358                         file->f_pos = offset;
2359 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2360                         file->f_reada = 0;
2361                         file->f_version = ++event;
2362 #endif
2363                 }
2364                 retval = offset;
2365         }
2366         
2367         RETURN(retval);
2368 }
2369
2370 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2371 {
2372         struct inode *inode = dentry->d_inode;
2373         struct ll_inode_info *lli = ll_i2info(inode);
2374         struct lov_stripe_md *lsm = lli->lli_smd;
2375         struct ptlrpc_request *req;
2376         struct obd_capa *oc;
2377         int rc, err;
2378         ENTRY;
2379         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2380                inode->i_generation, inode);
2381         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2382
2383         /* fsync's caller has already called _fdata{sync,write}, we want
2384          * that IO to finish before calling the osc and mdc sync methods */
2385         rc = filemap_fdatawait(inode->i_mapping);
2386
2387         /* catch async errors that were recorded back when async writeback
2388          * failed for pages in this mapping. */
2389         err = lli->lli_async_rc;
2390         lli->lli_async_rc = 0;
2391         if (rc == 0)
2392                 rc = err;
2393         if (lsm) {
2394                 err = lov_test_and_clear_async_rc(lsm);
2395                 if (rc == 0)
2396                         rc = err;
2397         }
2398
2399         oc = ll_mdscapa_get(inode);
2400         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2401                       &req);
2402         capa_put(oc);
2403         if (!rc)
2404                 rc = err;
2405         if (!err)
2406                 ptlrpc_req_finished(req);
2407
2408         if (data && lsm) {
2409                 struct obdo *oa;
2410                 
2411                 OBDO_ALLOC(oa);
2412                 if (!oa)
2413                         RETURN(rc ? rc : -ENOMEM);
2414
2415                 oa->o_id = lsm->lsm_object_id;
2416                 oa->o_gr = lsm->lsm_object_gr;
2417                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2418                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2419                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2420                                            OBD_MD_FLGROUP);
2421
2422                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2423                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2424                                0, OBD_OBJECT_EOF, oc);
2425                 capa_put(oc);
2426                 if (!rc)
2427                         rc = err;
2428                 OBDO_FREE(oa);
2429         }
2430
2431         RETURN(rc);
2432 }
2433
2434 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2435 {
2436         struct inode *inode = file->f_dentry->d_inode;
2437         struct ll_sb_info *sbi = ll_i2sbi(inode);
2438         struct ldlm_res_id res_id =
2439                 { .name = { fid_seq(ll_inode2fid(inode)),
2440                             fid_oid(ll_inode2fid(inode)),
2441                             fid_ver(ll_inode2fid(inode)),
2442                             LDLM_FLOCK} };
2443         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2444                 ldlm_flock_completion_ast, NULL, file_lock };
2445         struct lustre_handle lockh = {0};
2446         ldlm_policy_data_t flock;
2447         int flags = 0;
2448         int rc;
2449         ENTRY;
2450
2451         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2452                inode->i_ino, file_lock);
2453
2454         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2455  
2456         if (file_lock->fl_flags & FL_FLOCK) {
2457                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2458                 /* set missing params for flock() calls */
2459                 file_lock->fl_end = OFFSET_MAX;
2460                 file_lock->fl_pid = current->tgid;
2461         }
2462         flock.l_flock.pid = file_lock->fl_pid;
2463         flock.l_flock.start = file_lock->fl_start;
2464         flock.l_flock.end = file_lock->fl_end;
2465
2466         switch (file_lock->fl_type) {
2467         case F_RDLCK:
2468                 einfo.ei_mode = LCK_PR;
2469                 break;
2470         case F_UNLCK:
2471                 /* An unlock request may or may not have any relation to
2472                  * existing locks so we may not be able to pass a lock handle
2473                  * via a normal ldlm_lock_cancel() request. The request may even
2474                  * unlock a byte range in the middle of an existing lock. In
2475                  * order to process an unlock request we need all of the same
2476                  * information that is given with a normal read or write record
2477                  * lock request. To avoid creating another ldlm unlock (cancel)
2478                  * message we'll treat a LCK_NL flock request as an unlock. */
2479                 einfo.ei_mode = LCK_NL;
2480                 break;
2481         case F_WRLCK:
2482                 einfo.ei_mode = LCK_PW;
2483                 break;
2484         default:
2485                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2486                 LBUG();
2487         }
2488
2489         switch (cmd) {
2490         case F_SETLKW:
2491 #ifdef F_SETLKW64
2492         case F_SETLKW64:
2493 #endif
2494                 flags = 0;
2495                 break;
2496         case F_SETLK:
2497 #ifdef F_SETLK64
2498         case F_SETLK64:
2499 #endif
2500                 flags = LDLM_FL_BLOCK_NOWAIT;
2501                 break;
2502         case F_GETLK:
2503 #ifdef F_GETLK64
2504         case F_GETLK64:
2505 #endif
2506                 flags = LDLM_FL_TEST_LOCK;
2507                 /* Save the old mode so that if the mode in the lock changes we
2508                  * can decrement the appropriate reader or writer refcount. */
2509                 file_lock->fl_type = einfo.ei_mode;
2510                 break;
2511         default:
2512                 CERROR("unknown fcntl lock command: %d\n", cmd);
2513                 LBUG();
2514         }
2515
2516         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2517                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2518                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2519
2520         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2521                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2522         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2523                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2524 #ifdef HAVE_F_OP_FLOCK
2525         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2526             !(flags & LDLM_FL_TEST_LOCK))
2527                 posix_lock_file_wait(file, file_lock);
2528 #endif
2529
2530         RETURN(rc);
2531 }
2532
2533 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2534 {
2535         ENTRY;
2536
2537         RETURN(-ENOSYS);
2538 }
2539
2540 int ll_have_md_lock(struct inode *inode, __u64 bits)
2541 {
2542         struct lustre_handle lockh;
2543         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2544         struct lu_fid *fid;
2545         int flags;
2546         ENTRY;
2547
2548         if (!inode)
2549                RETURN(0);
2550
2551         fid = &ll_i2info(inode)->lli_fid;
2552         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2553
2554         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2555         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2556                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2557                 RETURN(1);
2558         }
2559         RETURN(0);
2560 }
2561
2562 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2563                             struct lustre_handle *lockh)
2564 {
2565         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2566         struct lu_fid *fid;
2567         ldlm_mode_t rc;
2568         int flags;
2569         ENTRY;
2570
2571         fid = &ll_i2info(inode)->lli_fid;
2572         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2573
2574         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2575         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2576                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2577         RETURN(rc);
2578 }
2579
2580 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2581         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2582                               * and return success */
2583                 inode->i_nlink = 0;
2584                 /* This path cannot be hit for regular files unless in
2585                  * case of obscure races, so no need to to validate
2586                  * size. */
2587                 if (!S_ISREG(inode->i_mode) &&
2588                     !S_ISDIR(inode->i_mode))
2589                         return 0;
2590         }
2591
2592         if (rc) {
2593                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2594                 return -abs(rc);
2595
2596         }
2597
2598         return 0;
2599 }
2600
2601 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2602 {
2603         struct inode *inode = dentry->d_inode;
2604         struct ptlrpc_request *req = NULL;
2605         struct ll_sb_info *sbi;
2606         struct obd_export *exp;
2607         int rc;
2608         ENTRY;
2609
2610         if (!inode) {
2611                 CERROR("REPORT THIS LINE TO PETER\n");
2612                 RETURN(0);
2613         }
2614         sbi = ll_i2sbi(inode);
2615
2616         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2617                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2618
2619         exp = ll_i2mdexp(inode);
2620
2621         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2622                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2623                 struct md_op_data *op_data;
2624
2625                 /* Call getattr by fid, so do not provide name at all. */
2626                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2627                                              dentry->d_inode, NULL, 0, 0,
2628                                              LUSTRE_OPC_ANY, NULL);
2629                 if (IS_ERR(op_data))
2630                         RETURN(PTR_ERR(op_data));
2631
2632                 oit.it_flags |= O_CHECK_STALE;
2633                 rc = md_intent_lock(exp, op_data, NULL, 0,
2634                                     /* we are not interested in name
2635                                        based lookup */
2636                                     &oit, 0, &req,
2637                                     ll_md_blocking_ast, 0);
2638                 ll_finish_md_op_data(op_data);
2639                 oit.it_flags &= ~O_CHECK_STALE;
2640                 if (rc < 0) {
2641                         rc = ll_inode_revalidate_fini(inode, rc);
2642                         GOTO (out, rc);
2643                 }
2644
2645                 rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2646                 if (rc != 0) {
2647                         ll_intent_release(&oit);
2648                         GOTO(out, rc);
2649                 }
2650
2651                 /* Unlinked? Unhash dentry, so it is not picked up later by
2652                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2653                    here to preserve get_cwd functionality on 2.6.
2654                    Bug 10503 */
2655                 if (!dentry->d_inode->i_nlink) {
2656                         spin_lock(&dcache_lock);
2657                         ll_drop_dentry(dentry);
2658                         spin_unlock(&dcache_lock);
2659                 }
2660
2661                 ll_lookup_finish_locks(&oit, dentry);
2662         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE)) {
2663                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2664                 obd_valid valid = OBD_MD_FLGETATTR;
2665                 struct obd_capa *oc;
2666                 int ealen = 0;
2667
2668                 if (S_ISREG(inode->i_mode)) {
2669                         rc = ll_get_max_mdsize(sbi, &ealen);
2670                         if (rc)
2671                                 RETURN(rc);
2672                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2673                 }
2674                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2675                  * capa for this inode. Because we only keep capas of dirs
2676                  * fresh. */
2677                 oc = ll_mdscapa_get(inode);
2678                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2679                                 ealen, &req);
2680                 capa_put(oc);
2681                 if (rc) {
2682                         rc = ll_inode_revalidate_fini(inode, rc);
2683                         RETURN(rc);
2684                 }
2685
2686                 rc = ll_prep_inode(&inode, req, REPLY_REC_OFF,
2687                                    NULL);
2688                 if (rc)
2689                         GOTO(out, rc);
2690         }
2691
2692         /* if object not yet allocated, don't validate size */
2693         if (ll_i2info(inode)->lli_smd == NULL)
2694                 GOTO(out, rc = 0);
2695
2696         /* ll_glimpse_size will prefer locally cached writes if they extend
2697          * the file */
2698         rc = ll_glimpse_size(inode, 0);
2699         EXIT;
2700 out:
2701         ptlrpc_req_finished(req);
2702         return rc;
2703 }
2704
2705 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2706                   struct lookup_intent *it, struct kstat *stat)
2707 {
2708         struct inode *inode = de->d_inode;
2709         int res = 0;
2710
2711         res = ll_inode_revalidate_it(de, it);
2712         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2713
2714         if (res)
2715                 return res;
2716
2717         stat->dev = inode->i_sb->s_dev;
2718         stat->ino = inode->i_ino;
2719         stat->mode = inode->i_mode;
2720         stat->nlink = inode->i_nlink;
2721         stat->uid = inode->i_uid;
2722         stat->gid = inode->i_gid;
2723         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2724         stat->atime = inode->i_atime;
2725         stat->mtime = inode->i_mtime;
2726         stat->ctime = inode->i_ctime;
2727 #ifdef HAVE_INODE_BLKSIZE
2728         stat->blksize = inode->i_blksize;
2729 #else
2730         stat->blksize = 1 << inode->i_blkbits;
2731 #endif
2732
2733         ll_inode_size_lock(inode, 0);
2734         stat->size = i_size_read(inode);
2735         stat->blocks = inode->i_blocks;
2736         ll_inode_size_unlock(inode, 0);
2737
2738         return 0;
2739 }
2740 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2741 {
2742         struct lookup_intent it = { .it_op = IT_GETATTR };
2743
2744         return ll_getattr_it(mnt, de, &it, stat);
2745 }
2746
2747 static
2748 int lustre_check_acl(struct inode *inode, int mask)
2749 {
2750 #ifdef CONFIG_FS_POSIX_ACL
2751         struct ll_inode_info *lli = ll_i2info(inode);
2752         struct posix_acl *acl;
2753         int rc;
2754         ENTRY;
2755
2756         spin_lock(&lli->lli_lock);
2757         acl = posix_acl_dup(lli->lli_posix_acl);
2758         spin_unlock(&lli->lli_lock);
2759
2760         if (!acl)
2761                 RETURN(-EAGAIN);
2762
2763         rc = posix_acl_permission(inode, acl, mask);
2764         posix_acl_release(acl);
2765
2766         RETURN(rc);
2767 #else
2768         return -EAGAIN;
2769 #endif
2770 }
2771
2772 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2773 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2774 {
2775         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2776                inode->i_ino, inode->i_generation, inode, mask);
2777         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2778                 return lustre_check_remote_perm(inode, mask);
2779         
2780         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2781         return generic_permission(inode, mask, lustre_check_acl);
2782 }
2783 #else
2784 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2785 {
2786         int mode = inode->i_mode;
2787         int rc;
2788
2789         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2790                inode->i_ino, inode->i_generation, inode, mask);
2791
2792         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2793                 return lustre_check_remote_perm(inode, mask);
2794
2795         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2796
2797         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2798             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2799                 return -EROFS;
2800         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2801                 return -EACCES;
2802         if (current->fsuid == inode->i_uid) {
2803                 mode >>= 6;
2804         } else if (1) {
2805                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2806                         goto check_groups;
2807                 rc = lustre_check_acl(inode, mask);
2808                 if (rc == -EAGAIN)
2809                         goto check_groups;
2810                 if (rc == -EACCES)
2811                         goto check_capabilities;
2812                 return rc;
2813         } else {
2814 check_groups:
2815                 if (in_group_p(inode->i_gid))
2816                         mode >>= 3;
2817         }
2818         if ((mode & mask & S_IRWXO) == mask)
2819                 return 0;
2820
2821 check_capabilities:
2822         if (!(mask & MAY_EXEC) ||
2823             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2824                 if (capable(CAP_DAC_OVERRIDE))
2825                         return 0;
2826
2827         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2828             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2829                 return 0;
2830         
2831         return -EACCES;
2832 }
2833 #endif
2834
2835 /* -o localflock - only provides locally consistent flock locks */
2836 struct file_operations ll_file_operations = {
2837         .read           = ll_file_read,
2838         .write          = ll_file_write,
2839         .ioctl          = ll_file_ioctl,
2840         .open           = ll_file_open,
2841         .release        = ll_file_release,
2842         .mmap           = ll_file_mmap,
2843         .llseek         = ll_file_seek,
2844         .sendfile       = ll_file_sendfile,
2845         .fsync          = ll_fsync,
2846 };
2847
2848 struct file_operations ll_file_operations_flock = {
2849         .read           = ll_file_read,
2850         .write          = ll_file_write,
2851         .ioctl          = ll_file_ioctl,
2852         .open           = ll_file_open,
2853         .release        = ll_file_release,
2854         .mmap           = ll_file_mmap,
2855         .llseek         = ll_file_seek,
2856         .sendfile       = ll_file_sendfile,
2857         .fsync          = ll_fsync,
2858 #ifdef HAVE_F_OP_FLOCK
2859         .flock          = ll_file_flock,
2860 #endif
2861         .lock           = ll_file_flock
2862 };
2863
2864 /* These are for -o noflock - to return ENOSYS on flock calls */
2865 struct file_operations ll_file_operations_noflock = {
2866         .read           = ll_file_read,
2867         .write          = ll_file_write,
2868         .ioctl          = ll_file_ioctl,
2869         .open           = ll_file_open,
2870         .release        = ll_file_release,
2871         .mmap           = ll_file_mmap,
2872         .llseek         = ll_file_seek,
2873         .sendfile       = ll_file_sendfile,
2874         .fsync          = ll_fsync,
2875 #ifdef HAVE_F_OP_FLOCK
2876         .flock          = ll_file_noflock,
2877 #endif
2878         .lock           = ll_file_noflock
2879 };
2880
2881 struct inode_operations ll_file_inode_operations = {
2882 #ifdef HAVE_VFS_INTENT_PATCHES
2883         .setattr_raw    = ll_setattr_raw,
2884 #endif
2885         .setattr        = ll_setattr,
2886         .truncate       = ll_truncate,
2887         .getattr        = ll_getattr,
2888         .permission     = ll_inode_permission,
2889         .setxattr       = ll_setxattr,
2890         .getxattr       = ll_getxattr,
2891         .listxattr      = ll_listxattr,
2892         .removexattr    = ll_removexattr,
2893 };
2894
2895 /* dynamic ioctl number support routins */
2896 static struct llioc_ctl_data {
2897         struct rw_semaphore ioc_sem;
2898         struct list_head    ioc_head;
2899 } llioc = { 
2900         __RWSEM_INITIALIZER(llioc.ioc_sem), 
2901         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2902 };
2903
2904
2905 struct llioc_data {
2906         struct list_head        iocd_list;
2907         unsigned int            iocd_size;
2908         llioc_callback_t        iocd_cb;
2909         unsigned int            iocd_count;
2910         unsigned int            iocd_cmd[0];
2911 };
2912
2913 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2914 {
2915         unsigned int size;
2916         struct llioc_data *in_data = NULL;
2917         ENTRY;
2918
2919         if (cb == NULL || cmd == NULL ||
2920             count > LLIOC_MAX_CMD || count < 0)
2921                 RETURN(NULL);
2922
2923         size = sizeof(*in_data) + count * sizeof(unsigned int);
2924         OBD_ALLOC(in_data, size);
2925         if (in_data == NULL)
2926                 RETURN(NULL);
2927
2928         memset(in_data, 0, sizeof(*in_data));
2929         in_data->iocd_size = size;
2930         in_data->iocd_cb = cb;
2931         in_data->iocd_count = count;
2932         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2933
2934         down_write(&llioc.ioc_sem);
2935         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2936         up_write(&llioc.ioc_sem);
2937
2938         RETURN(in_data);
2939 }
2940
2941 void ll_iocontrol_unregister(void *magic)
2942 {
2943         struct llioc_data *tmp;
2944
2945         if (magic == NULL)
2946                 return;
2947
2948         down_write(&llioc.ioc_sem);
2949         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2950                 if (tmp == magic) {
2951                         unsigned int size = tmp->iocd_size;
2952
2953                         list_del(&tmp->iocd_list);
2954                         up_write(&llioc.ioc_sem);
2955
2956                         OBD_FREE(tmp, size);
2957                         return;
2958                 }
2959         }
2960         up_write(&llioc.ioc_sem);
2961
2962         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2963 }
2964
2965 EXPORT_SYMBOL(ll_iocontrol_register);
2966 EXPORT_SYMBOL(ll_iocontrol_unregister);
2967
2968 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
2969                         unsigned int cmd, unsigned long arg, int *rcp)
2970 {
2971         enum llioc_iter ret = LLIOC_CONT;
2972         struct llioc_data *data;
2973         int rc = -EINVAL, i;
2974
2975         down_read(&llioc.ioc_sem);
2976         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2977                 for (i = 0; i < data->iocd_count; i++) {
2978                         if (cmd != data->iocd_cmd[i]) 
2979                                 continue;
2980
2981                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2982                         break;
2983                 }
2984
2985                 if (ret == LLIOC_STOP)
2986                         break;
2987         }
2988         up_read(&llioc.ioc_sem);
2989
2990         if (rcp)
2991                 *rcp = rc;
2992         return ret;
2993 }