Whamcloud - gitweb
e6c8d393a6ab3bdc628746d7975a76c1d6a37d34
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #include "llite_internal.h"
32
33 /* also used by llite/special.c:ll_special_open() */
34 struct ll_file_data *ll_file_data_get(void)
35 {
36         struct ll_file_data *fd;
37
38         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
39         return fd;
40 }
41
42 static void ll_file_data_put(struct ll_file_data *fd)
43 {
44         if (fd != NULL)
45                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
46 }
47
48 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
49                           struct lustre_handle *fh)
50 {
51         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
52         op_data->op_attr.ia_mode = inode->i_mode;
53         op_data->op_attr.ia_atime = inode->i_atime;
54         op_data->op_attr.ia_mtime = inode->i_mtime;
55         op_data->op_attr.ia_ctime = inode->i_ctime;
56         op_data->op_attr.ia_size = i_size_read(inode);
57         op_data->op_attr_blocks = inode->i_blocks;
58         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
59         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
60         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
61         op_data->op_capa1 = ll_mdscapa_get(inode);
62 }
63
64 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
65                              struct obd_client_handle *och)
66 {
67         ENTRY;
68
69         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
70                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
71
72         if (!(och->och_flags & FMODE_WRITE))
73                 goto out;
74
75         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
76             !S_ISREG(inode->i_mode))
77                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
78         else
79                 ll_epoch_close(inode, op_data, &och, 0);
80
81 out:
82         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
83         EXIT;
84 }
85
86 static int ll_close_inode_openhandle(struct obd_export *md_exp,
87                                      struct inode *inode,
88                                      struct obd_client_handle *och)
89 {
90         struct obd_export *exp = ll_i2mdexp(inode);
91         struct md_op_data *op_data;
92         struct ptlrpc_request *req = NULL;
93         struct obd_device *obd = class_exp2obd(exp);
94         int epoch_close = 1;
95         int seq_end = 0, rc;
96         ENTRY;
97
98         if (obd == NULL) {
99                 /*
100                  * XXX: in case of LMV, is this correct to access
101                  * ->exp_handle?
102                  */
103                 CERROR("Invalid MDC connection handle "LPX64"\n",
104                        ll_i2mdexp(inode)->exp_handle.h_cookie);
105                 GOTO(out, rc = 0);
106         }
107
108         /*
109          * here we check if this is forced umount. If so this is called on
110          * canceling "open lock" and we do not call md_close() in this case, as
111          * it will not be successful, as import is already deactivated.
112          */
113         if (obd->obd_force)
114                 GOTO(out, rc = 0);
115
116         OBD_ALLOC_PTR(op_data);
117         if (op_data == NULL)
118                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
119
120         ll_prepare_close(inode, op_data, och);
121         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
122         rc = md_close(md_exp, op_data, och->och_mod, &req);
123         if (rc != -EAGAIN)
124                 seq_end = 1;
125
126         if (rc == -EAGAIN) {
127                 /* This close must have the epoch closed. */
128                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
129                 LASSERT(epoch_close);
130                 /* MDS has instructed us to obtain Size-on-MDS attribute from
131                  * OSTs and send setattr to back to MDS. */
132                 rc = ll_sizeonmds_update(inode, och->och_mod,
133                                          &och->och_fh, op_data->op_ioepoch);
134                 if (rc) {
135                         CERROR("inode %lu mdc Size-on-MDS update failed: "
136                                "rc = %d\n", inode->i_ino, rc);
137                         rc = 0;
138                 }
139         } else if (rc) {
140                 CERROR("inode %lu mdc close failed: rc = %d\n",
141                        inode->i_ino, rc);
142         }
143         ll_finish_md_op_data(op_data);
144
145         if (rc == 0) {
146                 rc = ll_objects_destroy(req, inode);
147                 if (rc)
148                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
149                                inode->i_ino, rc);
150         }
151
152         EXIT;
153 out:
154       
155         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
156             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
157                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
158         } else {
159                 if (seq_end)
160                         ptlrpc_close_replay_seq(req);
161                 md_clear_open_replay_data(md_exp, och);
162                 /* Free @och if it is not waiting for DONE_WRITING. */
163                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
164                 OBD_FREE_PTR(och);
165         }
166         if (req) /* This is close request */
167                 ptlrpc_req_finished(req);
168         return rc;
169 }
170
171 int ll_md_real_close(struct inode *inode, int flags)
172 {
173         struct ll_inode_info *lli = ll_i2info(inode);
174         struct obd_client_handle **och_p;
175         struct obd_client_handle *och;
176         __u64 *och_usecount;
177         int rc = 0;
178         ENTRY;
179
180         if (flags & FMODE_WRITE) {
181                 och_p = &lli->lli_mds_write_och;
182                 och_usecount = &lli->lli_open_fd_write_count;
183         } else if (flags & FMODE_EXEC) {
184                 och_p = &lli->lli_mds_exec_och;
185                 och_usecount = &lli->lli_open_fd_exec_count;
186         } else {
187                 LASSERT(flags & FMODE_READ);
188                 och_p = &lli->lli_mds_read_och;
189                 och_usecount = &lli->lli_open_fd_read_count;
190         }
191
192         down(&lli->lli_och_sem);
193         if (*och_usecount) { /* There are still users of this handle, so
194                                 skip freeing it. */
195                 up(&lli->lli_och_sem);
196                 RETURN(0);
197         }
198         och=*och_p;
199         *och_p = NULL;
200         up(&lli->lli_och_sem);
201
202         if (och) { /* There might be a race and somebody have freed this och
203                       already */
204                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
205                                                inode, och);
206         }
207
208         RETURN(rc);
209 }
210
211 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
212                 struct file *file)
213 {
214         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
215         struct ll_inode_info *lli = ll_i2info(inode);
216         int rc = 0;
217         ENTRY;
218
219         /* clear group lock, if present */
220         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
221                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
222                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
223                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
224                                       &fd->fd_cwlockh);
225         }
226
227         /* Let's see if we have good enough OPEN lock on the file and if
228            we can skip talking to MDS */
229         if (file->f_dentry->d_inode) { /* Can this ever be false? */
230                 int lockmode;
231                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
232                 struct lustre_handle lockh;
233                 struct inode *inode = file->f_dentry->d_inode;
234                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
235
236                 down(&lli->lli_och_sem);
237                 if (fd->fd_omode & FMODE_WRITE) {
238                         lockmode = LCK_CW;
239                         LASSERT(lli->lli_open_fd_write_count);
240                         lli->lli_open_fd_write_count--;
241                 } else if (fd->fd_omode & FMODE_EXEC) {
242                         lockmode = LCK_PR;
243                         LASSERT(lli->lli_open_fd_exec_count);
244                         lli->lli_open_fd_exec_count--;
245                 } else {
246                         lockmode = LCK_CR;
247                         LASSERT(lli->lli_open_fd_read_count);
248                         lli->lli_open_fd_read_count--;
249                 }
250                 up(&lli->lli_och_sem);
251
252                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
253                                    LDLM_IBITS, &policy, lockmode,
254                                    &lockh)) {
255                         rc = ll_md_real_close(file->f_dentry->d_inode,
256                                               fd->fd_omode);
257                 }
258         } else {
259                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
260                        file, file->f_dentry, file->f_dentry->d_name.name);
261         }
262
263         LUSTRE_FPRIVATE(file) = NULL;
264         ll_file_data_put(fd);
265         ll_capa_close(inode);
266
267         RETURN(rc);
268 }
269
270 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
271
272 /* While this returns an error code, fput() the caller does not, so we need
273  * to make every effort to clean up all of our state here.  Also, applications
274  * rarely check close errors and even if an error is returned they will not
275  * re-try the close call.
276  */
277 int ll_file_release(struct inode *inode, struct file *file)
278 {
279         struct ll_file_data *fd;
280         struct ll_sb_info *sbi = ll_i2sbi(inode);
281         struct ll_inode_info *lli = ll_i2info(inode);
282         struct lov_stripe_md *lsm = lli->lli_smd;
283         int rc;
284
285         ENTRY;
286         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
287                inode->i_generation, inode);
288
289         /* don't do anything for / */
290         if (inode->i_sb->s_root == file->f_dentry)
291                 RETURN(0);
292
293         ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
294         fd = LUSTRE_FPRIVATE(file);
295         LASSERT(fd != NULL);
296
297         /* don't do anything for / */
298         if (inode->i_sb->s_root == file->f_dentry) {
299                 LUSTRE_FPRIVATE(file) = NULL;
300                 ll_file_data_put(fd);
301                 RETURN(0);
302         }
303         
304         if (lsm)
305                 lov_test_and_clear_async_rc(lsm);
306         lli->lli_async_rc = 0;
307
308         rc = ll_md_close(sbi->ll_md_exp, inode, file);
309         RETURN(rc);
310 }
311
312 static int ll_intent_file_open(struct file *file, void *lmm,
313                                int lmmsize, struct lookup_intent *itp)
314 {
315         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
316         struct dentry *parent = file->f_dentry->d_parent;
317         const char *name = file->f_dentry->d_name.name;
318         const int len = file->f_dentry->d_name.len;
319         struct md_op_data *op_data;
320         struct ptlrpc_request *req;
321         int rc;
322
323         if (!parent)
324                 RETURN(-ENOENT);
325
326         /* Usually we come here only for NFSD, and we want open lock.
327            But we can also get here with pre 2.6.15 patchless kernels, and in
328            that case that lock is also ok */
329         /* We can also get here if there was cached open handle in revalidate_it
330          * but it disappeared while we were getting from there to ll_file_open.
331          * But this means this file was closed and immediatelly opened which
332          * makes a good candidate for using OPEN lock */
333         /* If lmmsize & lmm are not 0, we are just setting stripe info
334          * parameters. No need for the open lock */
335         if (!lmm && !lmmsize)
336                 itp->it_flags |= MDS_OPEN_LOCK;
337
338         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
339                                       file->f_dentry->d_inode, name, len,
340                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
341         if (IS_ERR(op_data))
342                 RETURN(PTR_ERR(op_data));
343
344         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
345                             0 /*unused */, &req, ll_md_blocking_ast, 0);
346         ll_finish_md_op_data(op_data);
347         if (rc == -ESTALE) {
348                 /* reason for keep own exit path - don`t flood log
349                 * with messages with -ESTALE errors.
350                 */
351                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
352                      it_open_error(DISP_OPEN_OPEN, itp))
353                         GOTO(out, rc);
354                 ll_release_openhandle(file->f_dentry, itp);
355                 GOTO(out_stale, rc);
356         }
357
358         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
359                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
360                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
361                 GOTO(out, rc);
362         }
363
364         if (itp->d.lustre.it_lock_mode)
365                 md_set_lock_data(sbi->ll_md_exp,
366                                  &itp->d.lustre.it_lock_handle, 
367                                  file->f_dentry->d_inode);
368
369         rc = ll_prep_inode(&file->f_dentry->d_inode, req, DLM_REPLY_REC_OFF,
370                            NULL);
371 out:
372         ptlrpc_req_finished(itp->d.lustre.it_data);
373
374 out_stale:
375         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
376         ll_intent_drop_lock(itp);
377
378         RETURN(rc);
379 }
380
381 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
382                        struct lookup_intent *it, struct obd_client_handle *och)
383 {
384         struct ptlrpc_request *req = it->d.lustre.it_data;
385         struct mdt_body *body;
386
387         LASSERT(och);
388
389         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
390         /* reply already checked out */
391         LASSERT(body != NULL);
392         /* and swabbed in md_enqueue */
393         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
394
395         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
396         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
397         och->och_fid = lli->lli_fid;
398         och->och_flags = it->it_flags;
399         lli->lli_ioepoch = body->ioepoch;
400
401         return md_set_open_replay_data(md_exp, och, req);
402 }
403
404 int ll_local_open(struct file *file, struct lookup_intent *it,
405                   struct ll_file_data *fd, struct obd_client_handle *och)
406 {
407         struct inode *inode = file->f_dentry->d_inode;
408         struct ll_inode_info *lli = ll_i2info(inode);
409         ENTRY;
410
411         LASSERT(!LUSTRE_FPRIVATE(file));
412
413         LASSERT(fd != NULL);
414
415         if (och) {
416                 struct ptlrpc_request *req = it->d.lustre.it_data;
417                 struct mdt_body *body;
418                 int rc;
419
420                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
421                 if (rc)
422                         RETURN(rc);
423
424                 body = lustre_msg_buf(req->rq_repmsg,
425                                       DLM_REPLY_REC_OFF, sizeof(*body));
426
427                 if ((it->it_flags & FMODE_WRITE) &&
428                     (body->valid & OBD_MD_FLSIZE))
429                 {
430                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
431                                lli->lli_ioepoch, PFID(&lli->lli_fid));
432                 }
433         }
434
435         LUSTRE_FPRIVATE(file) = fd;
436         ll_readahead_init(inode, &fd->fd_ras);
437         fd->fd_omode = it->it_flags;
438         RETURN(0);
439 }
440
441 /* Open a file, and (for the very first open) create objects on the OSTs at
442  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
443  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
444  * lli_open_sem to ensure no other process will create objects, send the
445  * stripe MD to the MDS, or try to destroy the objects if that fails.
446  *
447  * If we already have the stripe MD locally then we don't request it in
448  * md_open(), by passing a lmm_size = 0.
449  *
450  * It is up to the application to ensure no other processes open this file
451  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
452  * used.  We might be able to avoid races of that sort by getting lli_open_sem
453  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
454  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
455  */
456 int ll_file_open(struct inode *inode, struct file *file)
457 {
458         struct ll_inode_info *lli = ll_i2info(inode);
459         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
460                                           .it_flags = file->f_flags };
461         struct lov_stripe_md *lsm;
462         struct ptlrpc_request *req = NULL;
463         struct obd_client_handle **och_p;
464         __u64 *och_usecount;
465         struct ll_file_data *fd;
466         int rc = 0;
467         ENTRY;
468
469         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
470                inode->i_generation, inode, file->f_flags);
471
472         /* don't do anything for / */
473         if (inode->i_sb->s_root == file->f_dentry)
474                 RETURN(0);
475
476 #ifdef HAVE_VFS_INTENT_PATCHES
477         it = file->f_it;
478 #else
479         it = file->private_data; /* XXX: compat macro */
480         file->private_data = NULL; /* prevent ll_local_open assertion */
481 #endif
482
483         fd = ll_file_data_get();
484         if (fd == NULL)
485                 RETURN(-ENOMEM);
486
487         /* don't do anything for / */
488         if (inode->i_sb->s_root == file->f_dentry) {
489                 LUSTRE_FPRIVATE(file) = fd;
490                 RETURN(0);
491         }
492
493         if (!it || !it->d.lustre.it_disposition) {
494                 /* Convert f_flags into access mode. We cannot use file->f_mode,
495                  * because everything but O_ACCMODE mask was stripped from
496                  * there */
497                 if ((oit.it_flags + 1) & O_ACCMODE)
498                         oit.it_flags++;
499                 if (file->f_flags & O_TRUNC)
500                         oit.it_flags |= FMODE_WRITE;
501
502                 /* kernel only call f_op->open in dentry_open.  filp_open calls
503                  * dentry_open after call to open_namei that checks permissions.
504                  * Only nfsd_open call dentry_open directly without checking
505                  * permissions and because of that this code below is safe. */
506                 if (oit.it_flags & FMODE_WRITE)
507                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
508
509                 /* We do not want O_EXCL here, presumably we opened the file
510                  * already? XXX - NFS implications? */
511                 oit.it_flags &= ~O_EXCL;
512
513                 it = &oit;
514         }
515
516         /* Let's see if we have file open on MDS already. */
517         if (it->it_flags & FMODE_WRITE) {
518                 och_p = &lli->lli_mds_write_och;
519                 och_usecount = &lli->lli_open_fd_write_count;
520         } else if (it->it_flags & FMODE_EXEC) {
521                 och_p = &lli->lli_mds_exec_och;
522                 och_usecount = &lli->lli_open_fd_exec_count;
523          } else {
524                 och_p = &lli->lli_mds_read_och;
525                 och_usecount = &lli->lli_open_fd_read_count;
526         }
527         
528         down(&lli->lli_och_sem);
529         if (*och_p) { /* Open handle is present */
530                 if (it_disposition(it, DISP_OPEN_OPEN)) {
531                         /* Well, there's extra open request that we do not need,
532                            let's close it somehow. This will decref request. */
533                         rc = it_open_error(DISP_OPEN_OPEN, it);
534                         if (rc) {
535                                 ll_file_data_put(fd);
536                                 GOTO(out_och_free, rc);
537                         }       
538                         ll_release_openhandle(file->f_dentry, it);
539                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
540                                              LPROC_LL_OPEN);
541                 }
542                 (*och_usecount)++;
543
544                 rc = ll_local_open(file, it, fd, NULL);
545                 if (rc) {
546                         up(&lli->lli_och_sem);
547                         ll_file_data_put(fd);
548                         RETURN(rc);
549                 }
550         } else {
551                 LASSERT(*och_usecount == 0);
552                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
553                 if (!*och_p) {
554                         ll_file_data_put(fd);
555                         GOTO(out_och_free, rc = -ENOMEM);
556                 }
557                 (*och_usecount)++;
558                 if (!it->d.lustre.it_disposition) {
559                         it->it_flags |= O_CHECK_STALE;
560                         rc = ll_intent_file_open(file, NULL, 0, it);
561                         it->it_flags &= ~O_CHECK_STALE;
562                         if (rc) {
563                                 ll_file_data_put(fd);
564                                 GOTO(out_och_free, rc);
565                         }
566
567                         /* Got some error? Release the request */
568                         if (it->d.lustre.it_status < 0) {
569                                 req = it->d.lustre.it_data;
570                                 ptlrpc_req_finished(req);
571                         }
572                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
573                                          &it->d.lustre.it_lock_handle,
574                                          file->f_dentry->d_inode);
575                 }
576                 req = it->d.lustre.it_data;
577
578                 /* md_intent_lock() didn't get a request ref if there was an
579                  * open error, so don't do cleanup on the request here
580                  * (bug 3430) */
581                 /* XXX (green): Should not we bail out on any error here, not
582                  * just open error? */
583                 rc = it_open_error(DISP_OPEN_OPEN, it);
584                 if (rc) {
585                         ll_file_data_put(fd);
586                         GOTO(out_och_free, rc);
587                 }
588
589                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
590                 rc = ll_local_open(file, it, fd, *och_p);
591                 if (rc) {
592                         up(&lli->lli_och_sem);
593                         ll_file_data_put(fd);
594                         GOTO(out_och_free, rc);
595                 }
596         }
597         up(&lli->lli_och_sem);
598
599         /* Must do this outside lli_och_sem lock to prevent deadlock where
600            different kind of OPEN lock for this same inode gets cancelled
601            by ldlm_cancel_lru */
602         if (!S_ISREG(inode->i_mode))
603                 GOTO(out, rc);
604
605         ll_capa_open(inode);
606
607         lsm = lli->lli_smd;
608         if (lsm == NULL) {
609                 if (file->f_flags & O_LOV_DELAY_CREATE ||
610                     !(file->f_mode & FMODE_WRITE)) {
611                         CDEBUG(D_INODE, "object creation was delayed\n");
612                         GOTO(out, rc);
613                 }
614         }
615         file->f_flags &= ~O_LOV_DELAY_CREATE;
616         GOTO(out, rc);
617 out:
618         ptlrpc_req_finished(req);
619         if (req)
620                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
621 out_och_free:
622         if (rc) {
623                 if (*och_p) {
624                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
625                         *och_p = NULL; /* OBD_FREE writes some magic there */
626                         (*och_usecount)--;
627                 }
628                 up(&lli->lli_och_sem);
629         }
630
631         return rc;
632 }
633
634 /* Fills the obdo with the attributes for the inode defined by lsm */
635 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
636 {
637         struct ptlrpc_request_set *set;
638         struct ll_inode_info *lli = ll_i2info(inode);
639         struct lov_stripe_md *lsm = lli->lli_smd;
640
641         struct obd_info oinfo = { { { 0 } } };
642         int rc;
643         ENTRY;
644
645         LASSERT(lsm != NULL);
646
647         oinfo.oi_md = lsm;
648         oinfo.oi_oa = obdo;
649         oinfo.oi_oa->o_id = lsm->lsm_object_id;
650         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
651         oinfo.oi_oa->o_mode = S_IFREG;
652         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
653                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
654                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
655                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
656                                OBD_MD_FLGROUP;
657         oinfo.oi_capa = ll_mdscapa_get(inode);
658
659         set = ptlrpc_prep_set();
660         if (set == NULL) {
661                 CERROR("can't allocate ptlrpc set\n");
662                 rc = -ENOMEM;
663         } else {
664                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
665                 if (rc == 0)
666                         rc = ptlrpc_set_wait(set);
667                 ptlrpc_set_destroy(set);
668         }
669         capa_put(oinfo.oi_capa);
670         if (rc)
671                 RETURN(rc);
672
673         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
674                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
675                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
676
677         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
678         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
679                lli->lli_smd->lsm_object_id, i_size_read(inode),
680                inode->i_blocks, inode->i_blksize);
681         RETURN(0);
682 }
683
684 static inline void ll_remove_suid(struct inode *inode)
685 {
686         unsigned int mode;
687
688         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
689         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
690
691         /* was any of the uid bits set? */
692         mode &= inode->i_mode;
693         if (mode && !capable(CAP_FSETID)) {
694                 inode->i_mode &= ~mode;
695                 // XXX careful here - we cannot change the size
696         }
697 }
698
699 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
700 {
701         struct ll_inode_info *lli = ll_i2info(inode);
702         struct lov_stripe_md *lsm = lli->lli_smd;
703         struct obd_export *exp = ll_i2dtexp(inode);
704         struct {
705                 char name[16];
706                 struct ldlm_lock *lock;
707                 struct lov_stripe_md *lsm;
708         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
709         __u32 stripe, vallen = sizeof(stripe);
710         int rc;
711         ENTRY;
712
713         if (lsm->lsm_stripe_count == 1)
714                 GOTO(check, stripe = 0);
715
716         /* get our offset in the lov */
717         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
718         if (rc != 0) {
719                 CERROR("obd_get_info: rc = %d\n", rc);
720                 RETURN(rc);
721         }
722         LASSERT(stripe < lsm->lsm_stripe_count);
723
724 check:
725         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
726             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
727                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
728                            lsm->lsm_oinfo[stripe]->loi_id,
729                            lsm->lsm_oinfo[stripe]->loi_gr);
730                 RETURN(-ELDLM_NO_LOCK_DATA);
731         }
732
733         RETURN(stripe);
734 }
735
736 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
737  * we get a lock cancellation for each stripe, so we have to map the obd's
738  * region back onto the stripes in the file that it held.
739  *
740  * No one can dirty the extent until we've finished our work and they can
741  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
742  * but other kernel actors could have pages locked.
743  *
744  * Called with the DLM lock held. */
745 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
746                               struct ldlm_lock *lock, __u32 stripe)
747 {
748         ldlm_policy_data_t tmpex;
749         unsigned long start, end, count, skip, i, j;
750         struct page *page;
751         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
752         struct lustre_handle lockh;
753         struct address_space *mapping = inode->i_mapping;
754
755         ENTRY;
756         tmpex = lock->l_policy_data;
757         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
758                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
759                i_size_read(inode));
760
761         /* our locks are page granular thanks to osc_enqueue, we invalidate the
762          * whole page. */
763         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
764             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
765                 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
766                            CFS_PAGE_SIZE);
767         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
768         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
769
770         count = ~0;
771         skip = 0;
772         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
773         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
774         if (lsm->lsm_stripe_count > 1) {
775                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
776                 skip = (lsm->lsm_stripe_count - 1) * count;
777                 start += start/count * skip + stripe * count;
778                 if (end != ~0)
779                         end += end/count * skip + stripe * count;
780         }
781         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
782                 end = ~0;
783
784         i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
785             CFS_PAGE_SHIFT : 0;
786         if (i < end)
787                 end = i;
788
789         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
790                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
791                count, skip, end, discard ? " (DISCARDING)" : "");
792
793         /* walk through the vmas on the inode and tear down mmaped pages that
794          * intersect with the lock.  this stops immediately if there are no
795          * mmap()ed regions of the file.  This is not efficient at all and
796          * should be short lived. We'll associate mmap()ed pages with the lock
797          * and will be able to find them directly */
798         for (i = start; i <= end; i += (j + skip)) {
799                 j = min(count - (i % count), end - i + 1);
800                 LASSERT(j > 0);
801                 LASSERT(mapping);
802                 if (ll_teardown_mmaps(mapping,
803                                       (__u64)i << CFS_PAGE_SHIFT,
804                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
805                         break;
806         }
807
808         /* this is the simplistic implementation of page eviction at
809          * cancelation.  It is careful to get races with other page
810          * lockers handled correctly.  fixes from bug 20 will make it
811          * more efficient by associating locks with pages and with
812          * batching writeback under the lock explicitly. */
813         for (i = start, j = start % count; i <= end;
814              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
815                 if (j == count) {
816                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
817                         i += skip;
818                         j = 0;
819                         if (i > end)
820                                 break;
821                 }
822                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
823                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
824                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
825                          start, i, end);
826
827                 if (!mapping_has_pages(mapping)) {
828                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
829                         break;
830                 }
831
832                 cond_resched();
833
834                 page = find_get_page(mapping, i);
835                 if (page == NULL)
836                         continue;
837                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
838                                i, tmpex.l_extent.start);
839                 lock_page(page);
840
841                 /* page->mapping to check with racing against teardown */
842                 if (!discard && clear_page_dirty_for_io(page)) {
843                         rc = ll_call_writepage(inode, page);
844                         /* either waiting for io to complete or reacquiring
845                          * the lock that the failed writepage released */
846                         lock_page(page);
847                         wait_on_page_writeback(page);
848                         if (rc != 0) {
849                                 CERROR("writepage inode %lu(%p) of page %p "
850                                        "failed: %d\n", inode->i_ino, inode,
851                                        page, rc);
852                                 if (rc == -ENOSPC)
853                                         set_bit(AS_ENOSPC, &mapping->flags);
854                                 else
855                                         set_bit(AS_EIO, &mapping->flags);
856                         }
857                 }
858
859                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
860                 /* check to see if another DLM lock covers this page b=2765 */
861                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
862                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
863                                       LDLM_FL_TEST_LOCK,
864                                       &lock->l_resource->lr_name, LDLM_EXTENT,
865                                       &tmpex, LCK_PR | LCK_PW, &lockh);
866
867                 if (rc2 <= 0 && page->mapping != NULL) {
868                         struct ll_async_page *llap = llap_cast_private(page);
869                         /* checking again to account for writeback's
870                          * lock_page() */
871                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
872                         if (llap)
873                                 ll_ra_accounting(llap, mapping);
874                         ll_truncate_complete_page(page);
875                 }
876                 unlock_page(page);
877                 page_cache_release(page);
878         }
879         LASSERTF(tmpex.l_extent.start <=
880                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
881                   lock->l_policy_data.l_extent.end + 1),
882                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
883                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
884                  start, i, end);
885         EXIT;
886 }
887
888 static int ll_extent_lock_callback(struct ldlm_lock *lock,
889                                    struct ldlm_lock_desc *new, void *data,
890                                    int flag)
891 {
892         struct lustre_handle lockh = { 0 };
893         int rc;
894         ENTRY;
895
896         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
897                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
898                 LBUG();
899         }
900
901         switch (flag) {
902         case LDLM_CB_BLOCKING:
903                 ldlm_lock2handle(lock, &lockh);
904                 rc = ldlm_cli_cancel(&lockh);
905                 if (rc != ELDLM_OK)
906                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
907                 break;
908         case LDLM_CB_CANCELING: {
909                 struct inode *inode;
910                 struct ll_inode_info *lli;
911                 struct lov_stripe_md *lsm;
912                 int stripe;
913                 __u64 kms;
914
915                 /* This lock wasn't granted, don't try to evict pages */
916                 if (lock->l_req_mode != lock->l_granted_mode)
917                         RETURN(0);
918
919                 inode = ll_inode_from_lock(lock);
920                 if (inode == NULL)
921                         RETURN(0);
922                 lli = ll_i2info(inode);
923                 if (lli == NULL)
924                         goto iput;
925                 if (lli->lli_smd == NULL)
926                         goto iput;
927                 lsm = lli->lli_smd;
928
929                 stripe = ll_lock_to_stripe_offset(inode, lock);
930                 if (stripe < 0)
931                         goto iput;
932
933                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
934
935                 lov_stripe_lock(lsm);
936                 lock_res_and_lock(lock);
937                 kms = ldlm_extent_shift_kms(lock,
938                                             lsm->lsm_oinfo[stripe]->loi_kms);
939
940                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
941                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
942                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
943                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
944                 unlock_res_and_lock(lock);
945                 lov_stripe_unlock(lsm);
946         iput:
947                 iput(inode);
948                 break;
949         }
950         default:
951                 LBUG();
952         }
953
954         RETURN(0);
955 }
956
957 #if 0
958 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
959 {
960         /* XXX ALLOCATE - 160 bytes */
961         struct inode *inode = ll_inode_from_lock(lock);
962         struct ll_inode_info *lli = ll_i2info(inode);
963         struct lustre_handle lockh = { 0 };
964         struct ost_lvb *lvb;
965         int stripe;
966         ENTRY;
967
968         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
969                      LDLM_FL_BLOCK_CONV)) {
970                 LBUG(); /* not expecting any blocked async locks yet */
971                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
972                            "lock, returning");
973                 ldlm_lock_dump(D_OTHER, lock, 0);
974                 ldlm_reprocess_all(lock->l_resource);
975                 RETURN(0);
976         }
977
978         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
979
980         stripe = ll_lock_to_stripe_offset(inode, lock);
981         if (stripe < 0)
982                 goto iput;
983
984         if (lock->l_lvb_len) {
985                 struct lov_stripe_md *lsm = lli->lli_smd;
986                 __u64 kms;
987                 lvb = lock->l_lvb_data;
988                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
989
990                 lock_res_and_lock(lock);
991                 ll_inode_size_lock(inode, 1);
992                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
993                 kms = ldlm_extent_shift_kms(NULL, kms);
994                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
995                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
996                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
997                 lsm->lsm_oinfo[stripe].loi_kms = kms;
998                 ll_inode_size_unlock(inode, 1);
999                 unlock_res_and_lock(lock);
1000         }
1001
1002 iput:
1003         iput(inode);
1004         wake_up(&lock->l_waitq);
1005
1006         ldlm_lock2handle(lock, &lockh);
1007         ldlm_lock_decref(&lockh, LCK_PR);
1008         RETURN(0);
1009 }
1010 #endif
1011
1012 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
1013 {
1014         struct ptlrpc_request *req = reqp;
1015         struct inode *inode = ll_inode_from_lock(lock);
1016         struct ll_inode_info *lli;
1017         struct lov_stripe_md *lsm;
1018         struct ost_lvb *lvb;
1019         int rc, stripe;
1020         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
1021         ENTRY;
1022
1023         if (inode == NULL)
1024                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
1025         lli = ll_i2info(inode);
1026         if (lli == NULL)
1027                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1028         lsm = lli->lli_smd;
1029         if (lsm == NULL)
1030                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1031
1032         /* First, find out which stripe index this lock corresponds to. */
1033         stripe = ll_lock_to_stripe_offset(inode, lock);
1034         if (stripe < 0)
1035                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1036
1037         rc = lustre_pack_reply(req, 2, size, NULL);
1038         if (rc)
1039                 GOTO(iput, rc);
1040
1041         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
1042         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1043         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1044         lvb->lvb_atime = LTIME_S(inode->i_atime);
1045         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1046
1047         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1048                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1049                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1050                    lvb->lvb_atime, lvb->lvb_ctime);
1051  iput:
1052         iput(inode);
1053
1054  out:
1055         /* These errors are normal races, so we don't want to fill the console
1056          * with messages by calling ptlrpc_error() */
1057         if (rc == -ELDLM_NO_LOCK_DATA)
1058                 lustre_pack_reply(req, 1, NULL, NULL);
1059
1060         req->rq_status = rc;
1061         return rc;
1062 }
1063
1064 static void ll_merge_lvb(struct inode *inode)
1065 {
1066         struct ll_inode_info *lli = ll_i2info(inode);
1067         struct ll_sb_info *sbi = ll_i2sbi(inode);
1068         struct ost_lvb lvb;
1069         ENTRY;
1070
1071         ll_inode_size_lock(inode, 1);
1072         inode_init_lvb(inode, &lvb);
1073         obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1074         i_size_write(inode, lvb.lvb_size);
1075         inode->i_blocks = lvb.lvb_blocks;
1076         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1077         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1078         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1079         ll_inode_size_unlock(inode, 1);
1080         EXIT;
1081 }
1082
1083 int ll_local_size(struct inode *inode)
1084 {
1085         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1086         struct ll_inode_info *lli = ll_i2info(inode);
1087         struct ll_sb_info *sbi = ll_i2sbi(inode);
1088         struct lustre_handle lockh = { 0 };
1089         int flags = 0;
1090         int rc;
1091         ENTRY;
1092
1093         if (lli->lli_smd->lsm_stripe_count == 0)
1094                 RETURN(0);
1095
1096         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1097                        &policy, LCK_PR, &flags, inode, &lockh);
1098         if (rc < 0)
1099                 RETURN(rc);
1100         else if (rc == 0)
1101                 RETURN(-ENODATA);
1102
1103         ll_merge_lvb(inode);
1104         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1105         RETURN(0);
1106 }
1107
1108 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1109                      lstat_t *st)
1110 {
1111         struct lustre_handle lockh = { 0 };
1112         struct ldlm_enqueue_info einfo = { 0 };
1113         struct obd_info oinfo = { { { 0 } } };
1114         struct ost_lvb lvb;
1115         int rc;
1116
1117         ENTRY;
1118
1119         einfo.ei_type = LDLM_EXTENT;
1120         einfo.ei_mode = LCK_PR;
1121         einfo.ei_cb_bl = ll_extent_lock_callback;
1122         einfo.ei_cb_cp = ldlm_completion_ast;
1123         einfo.ei_cb_gl = ll_glimpse_callback;
1124         einfo.ei_cbdata = NULL;
1125
1126         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1127         oinfo.oi_lockh = &lockh;
1128         oinfo.oi_md = lsm;
1129         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1130
1131         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1132         if (rc == -ENOENT)
1133                 RETURN(rc);
1134         if (rc != 0) {
1135                 CERROR("obd_enqueue returned rc %d, "
1136                        "returning -EIO\n", rc);
1137                 RETURN(rc > 0 ? -EIO : rc);
1138         }
1139
1140         lov_stripe_lock(lsm);
1141         memset(&lvb, 0, sizeof(lvb));
1142         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1143         st->st_size = lvb.lvb_size;
1144         st->st_blocks = lvb.lvb_blocks;
1145         st->st_mtime = lvb.lvb_mtime;
1146         st->st_atime = lvb.lvb_atime;
1147         st->st_ctime = lvb.lvb_ctime;
1148         lov_stripe_unlock(lsm);
1149
1150         RETURN(rc);
1151 }
1152
1153 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1154  * file (because it prefers KMS over RSS when larger) */
1155 int ll_glimpse_size(struct inode *inode, int ast_flags)
1156 {
1157         struct ll_inode_info *lli = ll_i2info(inode);
1158         struct ll_sb_info *sbi = ll_i2sbi(inode);
1159         struct lustre_handle lockh = { 0 };
1160         struct ldlm_enqueue_info einfo = { 0 };
1161         struct obd_info oinfo = { { { 0 } } };
1162         int rc;
1163         ENTRY;
1164
1165         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1166                 RETURN(0);
1167
1168         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1169
1170         if (!lli->lli_smd) {
1171                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1172                 RETURN(0);
1173         }
1174
1175         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1176          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1177          *       won't revoke any conflicting DLM locks held. Instead,
1178          *       ll_glimpse_callback() will be called on each client
1179          *       holding a DLM lock against this file, and resulting size
1180          *       will be returned for each stripe. DLM lock on [0, EOF] is
1181          *       acquired only if there were no conflicting locks. */
1182         einfo.ei_type = LDLM_EXTENT;
1183         einfo.ei_mode = LCK_PR;
1184         einfo.ei_cb_bl = ll_extent_lock_callback;
1185         einfo.ei_cb_cp = ldlm_completion_ast;
1186         einfo.ei_cb_gl = ll_glimpse_callback;
1187         einfo.ei_cbdata = inode;
1188
1189         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1190         oinfo.oi_lockh = &lockh;
1191         oinfo.oi_md = lli->lli_smd;
1192         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1193
1194         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1195         if (rc == -ENOENT)
1196                 RETURN(rc);
1197         if (rc != 0) {
1198                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1199                 RETURN(rc > 0 ? -EIO : rc);
1200         }
1201
1202         ll_merge_lvb(inode);
1203
1204         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1205                i_size_read(inode), inode->i_blocks);
1206
1207         RETURN(rc);
1208 }
1209
1210 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1211                    struct lov_stripe_md *lsm, int mode,
1212                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1213                    int ast_flags)
1214 {
1215         struct ll_sb_info *sbi = ll_i2sbi(inode);
1216         struct ost_lvb lvb;
1217         struct ldlm_enqueue_info einfo = { 0 };
1218         struct obd_info oinfo = { { { 0 } } };
1219         int rc;
1220         ENTRY;
1221
1222         LASSERT(!lustre_handle_is_used(lockh));
1223         LASSERT(lsm != NULL);
1224
1225         /* don't drop the mmapped file to LRU */
1226         if (mapping_mapped(inode->i_mapping))
1227                 ast_flags |= LDLM_FL_NO_LRU;
1228
1229         /* XXX phil: can we do this?  won't it screw the file size up? */
1230         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1231             (sbi->ll_flags & LL_SBI_NOLCK))
1232                 RETURN(0);
1233
1234         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1235                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1236
1237         einfo.ei_type = LDLM_EXTENT;
1238         einfo.ei_mode = mode;
1239         einfo.ei_cb_bl = ll_extent_lock_callback;
1240         einfo.ei_cb_cp = ldlm_completion_ast;
1241         einfo.ei_cb_gl = ll_glimpse_callback;
1242         einfo.ei_cbdata = inode;
1243
1244         oinfo.oi_policy = *policy;
1245         oinfo.oi_lockh = lockh;
1246         oinfo.oi_md = lsm;
1247         oinfo.oi_flags = ast_flags;
1248
1249         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1250         *policy = oinfo.oi_policy;
1251         if (rc > 0)
1252                 rc = -EIO;
1253
1254         ll_inode_size_lock(inode, 1);
1255         inode_init_lvb(inode, &lvb);
1256         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1257
1258         if (policy->l_extent.start == 0 &&
1259             policy->l_extent.end == OBD_OBJECT_EOF) {
1260                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1261                  * the kms under both a DLM lock and the
1262                  * ll_inode_size_lock().  If we don't get the
1263                  * ll_inode_size_lock() here we can match the DLM lock and
1264                  * reset i_size from the kms before the truncating path has
1265                  * updated the kms.  generic_file_write can then trust the
1266                  * stale i_size when doing appending writes and effectively
1267                  * cancel the result of the truncate.  Getting the
1268                  * ll_inode_size_lock() after the enqueue maintains the DLM
1269                  * -> ll_inode_size_lock() acquiring order. */
1270                 i_size_write(inode, lvb.lvb_size);
1271                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1272                        inode->i_ino, i_size_read(inode));
1273         }
1274
1275         if (rc == 0) {
1276                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1277                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1278                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1279         }
1280         ll_inode_size_unlock(inode, 1);
1281
1282         RETURN(rc);
1283 }
1284
1285 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1286                      struct lov_stripe_md *lsm, int mode,
1287                      struct lustre_handle *lockh)
1288 {
1289         struct ll_sb_info *sbi = ll_i2sbi(inode);
1290         int rc;
1291         ENTRY;
1292
1293         /* XXX phil: can we do this?  won't it screw the file size up? */
1294         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1295             (sbi->ll_flags & LL_SBI_NOLCK))
1296                 RETURN(0);
1297
1298         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1299
1300         RETURN(rc);
1301 }
1302
1303 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1304                             loff_t *ppos)
1305 {
1306         struct inode *inode = file->f_dentry->d_inode;
1307         struct ll_inode_info *lli = ll_i2info(inode);
1308         struct lov_stripe_md *lsm = lli->lli_smd;
1309         struct ll_sb_info *sbi = ll_i2sbi(inode);
1310         struct ll_lock_tree tree;
1311         struct ll_lock_tree_node *node;
1312         struct ost_lvb lvb;
1313         struct ll_ra_read bead;
1314         int rc, ra = 0;
1315         loff_t end;
1316         ssize_t retval, chunk, sum = 0;
1317
1318         __u64 kms;
1319         ENTRY;
1320         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1321                inode->i_ino, inode->i_generation, inode, count, *ppos);
1322         /* "If nbyte is 0, read() will return 0 and have no other results."
1323          *                      -- Single Unix Spec */
1324         if (count == 0)
1325                 RETURN(0);
1326
1327         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1328
1329         if (!lsm) {
1330                 /* Read on file with no objects should return zero-filled
1331                  * buffers up to file size (we can get non-zero sizes with
1332                  * mknod + truncate, then opening file for read. This is a
1333                  * common pattern in NFS case, it seems). Bug 6243 */
1334                 int notzeroed;
1335                 /* Since there are no objects on OSTs, we have nothing to get
1336                  * lock on and so we are forced to access inode->i_size
1337                  * unguarded */
1338
1339                 /* Read beyond end of file */
1340                 if (*ppos >= i_size_read(inode))
1341                         RETURN(0);
1342
1343                 if (count > i_size_read(inode) - *ppos)
1344                         count = i_size_read(inode) - *ppos;
1345                 /* Make sure to correctly adjust the file pos pointer for
1346                  * EFAULT case */
1347                 notzeroed = clear_user(buf, count);
1348                 count -= notzeroed;
1349                 *ppos += count;
1350                 if (!count)
1351                         RETURN(-EFAULT);
1352                 RETURN(count);
1353         }
1354
1355 repeat:
1356         if (sbi->ll_max_rw_chunk != 0) {
1357                 /* first, let's know the end of the current stripe */
1358                 end = *ppos;
1359                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
1360                                 (obd_off *)&end);
1361
1362                 /* correct, the end is beyond the request */
1363                 if (end > *ppos + count - 1)
1364                         end = *ppos + count - 1;
1365
1366                 /* and chunk shouldn't be too large even if striping is wide */
1367                 if (end - *ppos > sbi->ll_max_rw_chunk)
1368                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1369         } else {
1370                 end = *ppos + count - 1;
1371         }
1372
1373         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1374         if (IS_ERR(node)){
1375                 GOTO(out, retval = PTR_ERR(node));
1376         }
1377
1378         tree.lt_fd = LUSTRE_FPRIVATE(file);
1379         rc = ll_tree_lock(&tree, node, buf, count,
1380                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1381         if (rc != 0)
1382                 GOTO(out, retval = rc);
1383
1384         ll_inode_size_lock(inode, 1);
1385         /*
1386          * Consistency guarantees: following possibilities exist for the
1387          * relation between region being read and real file size at this
1388          * moment:
1389          *
1390          *  (A): the region is completely inside of the file;
1391          *
1392          *  (B-x): x bytes of region are inside of the file, the rest is
1393          *  outside;
1394          *
1395          *  (C): the region is completely outside of the file.
1396          *
1397          * This classification is stable under DLM lock acquired by
1398          * ll_tree_lock() above, because to change class, other client has to
1399          * take DLM lock conflicting with our lock. Also, any updates to
1400          * ->i_size by other threads on this client are serialized by
1401          * ll_inode_size_lock(). This guarantees that short reads are handled
1402          * correctly in the face of concurrent writes and truncates.
1403          */
1404         inode_init_lvb(inode, &lvb);
1405         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1406         kms = lvb.lvb_size;
1407         if (*ppos + count - 1 > kms) {
1408                 /* A glimpse is necessary to determine whether we return a
1409                  * short read (B) or some zeroes at the end of the buffer (C) */
1410                 ll_inode_size_unlock(inode, 1);
1411                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1412                 if (retval) {
1413                         ll_tree_unlock(&tree);
1414                         goto out;
1415                 }
1416         } else {
1417                 /* region is within kms and, hence, within real file size (A).
1418                  * We need to increase i_size to cover the read region so that
1419                  * generic_file_read() will do its job, but that doesn't mean
1420                  * the kms size is _correct_, it is only the _minimum_ size.
1421                  * If someone does a stat they will get the correct size which
1422                  * will always be >= the kms value here.  b=11081 */
1423                 if (i_size_read(inode) < kms)
1424                         i_size_write(inode, kms);
1425                 ll_inode_size_unlock(inode, 1);
1426         }
1427
1428         chunk = end - *ppos + 1;
1429         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1430                inode->i_ino, chunk, *ppos, i_size_read(inode));
1431
1432         /* turn off the kernel's read-ahead */
1433         file->f_ra.ra_pages = 0;
1434
1435         /* initialize read-ahead window once per syscall */
1436         if (ra == 0) {
1437                 ra = 1;
1438                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1439                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1440                 ll_ra_read_in(file, &bead);
1441         }
1442
1443         /* BUG: 5972 */
1444         file_accessed(file);
1445         retval = generic_file_read(file, buf, chunk, ppos);
1446         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1447
1448         ll_tree_unlock(&tree);
1449
1450         if (retval > 0) {
1451                 buf += retval;
1452                 count -= retval;
1453                 sum += retval;
1454                 if (retval == chunk && count > 0)
1455                         goto repeat;
1456         }
1457
1458  out:
1459         if (ra != 0)
1460                 ll_ra_read_ex(file, &bead);
1461         retval = (sum > 0) ? sum : retval;
1462         RETURN(retval);
1463 }
1464
1465 /*
1466  * Write to a file (through the page cache).
1467  */
1468 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1469                              loff_t *ppos)
1470 {
1471         struct inode *inode = file->f_dentry->d_inode;
1472         struct ll_sb_info *sbi = ll_i2sbi(inode);
1473         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1474         struct ll_lock_tree tree;
1475         struct ll_lock_tree_node *node;
1476         loff_t maxbytes = ll_file_maxbytes(inode);
1477         loff_t lock_start, lock_end, end;
1478         ssize_t retval, chunk, sum = 0;
1479         int rc;
1480         ENTRY;
1481
1482         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1483                inode->i_ino, inode->i_generation, inode, count, *ppos);
1484
1485         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1486
1487         /* POSIX, but surprised the VFS doesn't check this already */
1488         if (count == 0)
1489                 RETURN(0);
1490
1491         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1492          * called on the file, don't fail the below assertion (bug 2388). */
1493         if (file->f_flags & O_LOV_DELAY_CREATE &&
1494             ll_i2info(inode)->lli_smd == NULL)
1495                 RETURN(-EBADF);
1496
1497         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1498
1499         down(&ll_i2info(inode)->lli_write_sem);
1500
1501 repeat:
1502         chunk = 0; /* just to fix gcc's warning */
1503         end = *ppos + count - 1;
1504
1505         if (file->f_flags & O_APPEND) {
1506                 lock_start = 0;
1507                 lock_end = OBD_OBJECT_EOF;
1508         } else if (sbi->ll_max_rw_chunk != 0) {
1509                 /* first, let's know the end of the current stripe */
1510                 end = *ppos;
1511                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1512                                 (obd_off *)&end);
1513
1514                 /* correct, the end is beyond the request */
1515                 if (end > *ppos + count - 1)
1516                         end = *ppos + count - 1;
1517
1518                 /* and chunk shouldn't be too large even if striping is wide */
1519                 if (end - *ppos > sbi->ll_max_rw_chunk)
1520                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1521                 lock_start = *ppos;
1522                 lock_end = end;
1523         } else {
1524                 lock_start = *ppos;
1525                 lock_end = *ppos + count - 1;
1526         }
1527         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1528
1529         if (IS_ERR(node))
1530                 GOTO(out, retval = PTR_ERR(node));
1531
1532         tree.lt_fd = LUSTRE_FPRIVATE(file);
1533         rc = ll_tree_lock(&tree, node, buf, count,
1534                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1535         if (rc != 0)
1536                 GOTO(out, retval = rc);
1537
1538         /* This is ok, g_f_w will overwrite this under i_sem if it races
1539          * with a local truncate, it just makes our maxbyte checking easier.
1540          * The i_size value gets updated in ll_extent_lock() as a consequence
1541          * of the [0,EOF] extent lock we requested above. */
1542         if (file->f_flags & O_APPEND) {
1543                 *ppos = i_size_read(inode);
1544                 end = *ppos + count - 1;
1545         }
1546
1547         if (*ppos >= maxbytes) {
1548                 send_sig(SIGXFSZ, current, 0);
1549                 GOTO(out_unlock, retval = -EFBIG);
1550         }
1551         if (*ppos + count > maxbytes)
1552                 count = maxbytes - *ppos;
1553
1554         /* generic_file_write handles O_APPEND after getting i_mutex */
1555         chunk = end - *ppos + 1;
1556         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1557                inode->i_ino, chunk, *ppos);
1558         retval = generic_file_write(file, buf, chunk, ppos);
1559         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1560
1561 out_unlock:
1562         ll_tree_unlock(&tree);
1563
1564 out:
1565         if (retval > 0) {
1566                 buf += retval;
1567                 count -= retval;
1568                 sum += retval;
1569                 if (retval == chunk && count > 0)
1570                         goto repeat;
1571         }
1572
1573         up(&ll_i2info(inode)->lli_write_sem);
1574
1575         retval = (sum > 0) ? sum : retval;
1576         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1577                            retval > 0 ? retval : 0);
1578         RETURN(retval);
1579 }
1580
1581 /*
1582  * Send file content (through pagecache) somewhere with helper
1583  */
1584 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1585                                 read_actor_t actor, void *target)
1586 {
1587         struct inode *inode = in_file->f_dentry->d_inode;
1588         struct ll_inode_info *lli = ll_i2info(inode);
1589         struct lov_stripe_md *lsm = lli->lli_smd;
1590         struct ll_lock_tree tree;
1591         struct ll_lock_tree_node *node;
1592         struct ost_lvb lvb;
1593         struct ll_ra_read bead;
1594         int rc;
1595         ssize_t retval;
1596         __u64 kms;
1597         ENTRY;
1598         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1599                inode->i_ino, inode->i_generation, inode, count, *ppos);
1600
1601         /* "If nbyte is 0, read() will return 0 and have no other results."
1602          *                      -- Single Unix Spec */
1603         if (count == 0)
1604                 RETURN(0);
1605
1606         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1607         /* turn off the kernel's read-ahead */
1608         in_file->f_ra.ra_pages = 0;
1609
1610         /* File with no objects, nothing to lock */
1611         if (!lsm)
1612                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1613
1614         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1615         if (IS_ERR(node))
1616                 RETURN(PTR_ERR(node));
1617
1618         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1619         rc = ll_tree_lock(&tree, node, NULL, count,
1620                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1621         if (rc != 0)
1622                 RETURN(rc);
1623
1624         ll_inode_size_lock(inode, 1);
1625         /*
1626          * Consistency guarantees: following possibilities exist for the
1627          * relation between region being read and real file size at this
1628          * moment:
1629          *
1630          *  (A): the region is completely inside of the file;
1631          *
1632          *  (B-x): x bytes of region are inside of the file, the rest is
1633          *  outside;
1634          *
1635          *  (C): the region is completely outside of the file.
1636          *
1637          * This classification is stable under DLM lock acquired by
1638          * ll_tree_lock() above, because to change class, other client has to
1639          * take DLM lock conflicting with our lock. Also, any updates to
1640          * ->i_size by other threads on this client are serialized by
1641          * ll_inode_size_lock(). This guarantees that short reads are handled
1642          * correctly in the face of concurrent writes and truncates.
1643          */
1644         inode_init_lvb(inode, &lvb);
1645         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1646         kms = lvb.lvb_size;
1647         if (*ppos + count - 1 > kms) {
1648                 /* A glimpse is necessary to determine whether we return a
1649                  * short read (B) or some zeroes at the end of the buffer (C) */
1650                 ll_inode_size_unlock(inode, 1);
1651                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1652                 if (retval)
1653                         goto out;
1654         } else {
1655                 /* region is within kms and, hence, within real file size (A) */
1656                 i_size_write(inode, kms);
1657                 ll_inode_size_unlock(inode, 1);
1658         }
1659
1660         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1661                inode->i_ino, count, *ppos, i_size_read(inode));
1662
1663         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1664         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1665         ll_ra_read_in(in_file, &bead);
1666         /* BUG: 5972 */
1667         file_accessed(in_file);
1668         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1669         ll_ra_read_ex(in_file, &bead);
1670
1671  out:
1672         ll_tree_unlock(&tree);
1673         RETURN(retval);
1674 }
1675
1676 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1677                                unsigned long arg)
1678 {
1679         struct ll_inode_info *lli = ll_i2info(inode);
1680         struct obd_export *exp = ll_i2dtexp(inode);
1681         struct ll_recreate_obj ucreatp;
1682         struct obd_trans_info oti = { 0 };
1683         struct obdo *oa = NULL;
1684         int lsm_size;
1685         int rc = 0;
1686         struct lov_stripe_md *lsm, *lsm2;
1687         ENTRY;
1688
1689         if (!capable (CAP_SYS_ADMIN))
1690                 RETURN(-EPERM);
1691
1692         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1693                             sizeof(struct ll_recreate_obj));
1694         if (rc) {
1695                 RETURN(-EFAULT);
1696         }
1697         OBDO_ALLOC(oa);
1698         if (oa == NULL)
1699                 RETURN(-ENOMEM);
1700
1701         down(&lli->lli_size_sem);
1702         lsm = lli->lli_smd;
1703         if (lsm == NULL)
1704                 GOTO(out, rc = -ENOENT);
1705         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1706                    (lsm->lsm_stripe_count));
1707
1708         OBD_ALLOC(lsm2, lsm_size);
1709         if (lsm2 == NULL)
1710                 GOTO(out, rc = -ENOMEM);
1711
1712         oa->o_id = ucreatp.lrc_id;
1713         oa->o_gr = ucreatp.lrc_group;
1714         oa->o_nlink = ucreatp.lrc_ost_idx;
1715         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1716         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1717         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1718                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1719
1720         oti.oti_objid = NULL;
1721         memcpy(lsm2, lsm, lsm_size);
1722         rc = obd_create(exp, oa, &lsm2, &oti);
1723
1724         OBD_FREE(lsm2, lsm_size);
1725         GOTO(out, rc);
1726 out:
1727         up(&lli->lli_size_sem);
1728         OBDO_FREE(oa);
1729         return rc;
1730 }
1731
1732 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1733                              int flags, struct lov_user_md *lum, int lum_size)
1734 {
1735         struct ll_inode_info *lli = ll_i2info(inode);
1736         struct lov_stripe_md *lsm;
1737         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1738         int rc = 0;
1739         ENTRY;
1740
1741         down(&lli->lli_size_sem);
1742         lsm = lli->lli_smd;
1743         if (lsm) {
1744                 up(&lli->lli_size_sem);
1745                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1746                        inode->i_ino);
1747                 RETURN(-EEXIST);
1748         }
1749
1750         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1751         if (rc)
1752                 GOTO(out, rc);
1753         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1754                 GOTO(out_req_free, rc = -ENOENT);
1755         rc = oit.d.lustre.it_status;
1756         if (rc < 0)
1757                 GOTO(out_req_free, rc);
1758
1759         ll_release_openhandle(file->f_dentry, &oit);
1760
1761  out:
1762         up(&lli->lli_size_sem);
1763         ll_intent_release(&oit);
1764         RETURN(rc);
1765 out_req_free:
1766         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1767         goto out;
1768 }
1769
1770 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1771                              struct lov_mds_md **lmmp, int *lmm_size, 
1772                              struct ptlrpc_request **request)
1773 {
1774         struct ll_sb_info *sbi = ll_i2sbi(inode);
1775         struct mdt_body  *body;
1776         struct lov_mds_md *lmm = NULL;
1777         struct ptlrpc_request *req = NULL;
1778         struct obd_capa *oc;
1779         int rc, lmmsize;
1780
1781         rc = ll_get_max_mdsize(sbi, &lmmsize);
1782         if (rc)
1783                 RETURN(rc);
1784
1785         oc = ll_mdscapa_get(inode);
1786         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1787                              oc, filename, strlen(filename) + 1,
1788                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize, &req);
1789         capa_put(oc);
1790         if (rc < 0) {
1791                 CDEBUG(D_INFO, "md_getattr_name failed "
1792                        "on %s: rc %d\n", filename, rc);
1793                 GOTO(out, rc);
1794         }
1795
1796         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
1797         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1798         /* swabbed by mdc_getattr_name */
1799         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF));
1800
1801         lmmsize = body->eadatasize;
1802
1803         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1804                         lmmsize == 0) {
1805                 GOTO(out, rc = -ENODATA);
1806         }
1807
1808         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, lmmsize);
1809         LASSERT(lmm != NULL);
1810         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF + 1));
1811
1812         /*
1813          * This is coming from the MDS, so is probably in
1814          * little endian.  We convert it to host endian before
1815          * passing it to userspace.
1816          */
1817         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1818                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1819                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1820         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1821                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1822         }
1823
1824         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1825                 struct lov_stripe_md *lsm;
1826                 struct lov_user_md_join *lmj;
1827                 int lmj_size, i, aindex = 0;
1828
1829                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1830                 if (rc < 0)
1831                         GOTO(out, rc = -ENOMEM);
1832                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1833                 if (rc)
1834                         GOTO(out_free_memmd, rc);
1835
1836                 lmj_size = sizeof(struct lov_user_md_join) +
1837                            lsm->lsm_stripe_count *
1838                            sizeof(struct lov_user_ost_data_join);
1839                 OBD_ALLOC(lmj, lmj_size);
1840                 if (!lmj)
1841                         GOTO(out_free_memmd, rc = -ENOMEM);
1842
1843                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1844                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1845                         struct lov_extent *lex =
1846                                 &lsm->lsm_array->lai_ext_array[aindex];
1847
1848                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1849                                 aindex ++;
1850                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1851                                         LPU64" len %d\n", aindex, i,
1852                                         lex->le_start, (int)lex->le_len);
1853                         lmj->lmm_objects[i].l_extent_start =
1854                                 lex->le_start;
1855
1856                         if ((int)lex->le_len == -1)
1857                                 lmj->lmm_objects[i].l_extent_end = -1;
1858                         else
1859                                 lmj->lmm_objects[i].l_extent_end =
1860                                         lex->le_start + lex->le_len;
1861                         lmj->lmm_objects[i].l_object_id =
1862                                 lsm->lsm_oinfo[i]->loi_id;
1863                         lmj->lmm_objects[i].l_object_gr =
1864                                 lsm->lsm_oinfo[i]->loi_gr;
1865                         lmj->lmm_objects[i].l_ost_gen =
1866                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1867                         lmj->lmm_objects[i].l_ost_idx =
1868                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1869                 }
1870                 lmm = (struct lov_mds_md *)lmj;
1871                 lmmsize = lmj_size;
1872 out_free_memmd:
1873                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
1874         }
1875 out:
1876         *lmmp = lmm;
1877         *lmm_size = lmmsize;
1878         *request = req;
1879         return rc;
1880 }
1881
1882 static int ll_lov_setea(struct inode *inode, struct file *file,
1883                             unsigned long arg)
1884 {
1885         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1886         struct lov_user_md  *lump;
1887         int lum_size = sizeof(struct lov_user_md) +
1888                        sizeof(struct lov_user_ost_data);
1889         int rc;
1890         ENTRY;
1891
1892         if (!capable (CAP_SYS_ADMIN))
1893                 RETURN(-EPERM);
1894
1895         OBD_ALLOC(lump, lum_size);
1896         if (lump == NULL) {
1897                 RETURN(-ENOMEM);
1898         }
1899         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1900         if (rc) {
1901                 OBD_FREE(lump, lum_size);
1902                 RETURN(-EFAULT);
1903         }
1904
1905         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1906
1907         OBD_FREE(lump, lum_size);
1908         RETURN(rc);
1909 }
1910
1911 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1912                             unsigned long arg)
1913 {
1914         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1915         int rc;
1916         int flags = FMODE_WRITE;
1917         ENTRY;
1918
1919         /* Bug 1152: copy properly when this is no longer true */
1920         LASSERT(sizeof(lum) == sizeof(*lump));
1921         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1922         rc = copy_from_user(&lum, lump, sizeof(lum));
1923         if (rc)
1924                 RETURN(-EFAULT);
1925
1926         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1927         if (rc == 0) {
1928                  put_user(0, &lump->lmm_stripe_count);
1929                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1930                                     0, ll_i2info(inode)->lli_smd, lump);
1931         }
1932         RETURN(rc);
1933 }
1934
1935 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1936 {
1937         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1938
1939         if (!lsm)
1940                 RETURN(-ENODATA);
1941
1942         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1943                             (void *)arg);
1944 }
1945
1946 static int ll_get_grouplock(struct inode *inode, struct file *file,
1947                             unsigned long arg)
1948 {
1949         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1950         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1951                                                     .end = OBD_OBJECT_EOF}};
1952         struct lustre_handle lockh = { 0 };
1953         struct ll_inode_info *lli = ll_i2info(inode);
1954         struct lov_stripe_md *lsm = lli->lli_smd;
1955         int flags = 0, rc;
1956         ENTRY;
1957
1958         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1959                 RETURN(-EINVAL);
1960         }
1961
1962         policy.l_extent.gid = arg;
1963         if (file->f_flags & O_NONBLOCK)
1964                 flags = LDLM_FL_BLOCK_NOWAIT;
1965
1966         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1967         if (rc)
1968                 RETURN(rc);
1969
1970         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1971         fd->fd_gid = arg;
1972         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1973
1974         RETURN(0);
1975 }
1976
1977 static int ll_put_grouplock(struct inode *inode, struct file *file,
1978                             unsigned long arg)
1979 {
1980         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1981         struct ll_inode_info *lli = ll_i2info(inode);
1982         struct lov_stripe_md *lsm = lli->lli_smd;
1983         int rc;
1984         ENTRY;
1985
1986         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1987                 /* Ugh, it's already unlocked. */
1988                 RETURN(-EINVAL);
1989         }
1990
1991         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1992                 RETURN(-EINVAL);
1993
1994         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1995
1996         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1997         if (rc)
1998                 RETURN(rc);
1999
2000         fd->fd_gid = 0;
2001         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2002
2003         RETURN(0);
2004 }
2005
2006 static int join_sanity_check(struct inode *head, struct inode *tail)
2007 {
2008         ENTRY;
2009         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2010                 CERROR("server do not support join \n");
2011                 RETURN(-EINVAL);
2012         }
2013         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2014                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2015                        head->i_ino, tail->i_ino);
2016                 RETURN(-EINVAL);
2017         }
2018         if (head->i_ino == tail->i_ino) {
2019                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2020                 RETURN(-EINVAL);
2021         }
2022         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2023                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2024                 RETURN(-EINVAL);
2025         }
2026         RETURN(0);
2027 }
2028
2029 static int join_file(struct inode *head_inode, struct file *head_filp,
2030                      struct file *tail_filp)
2031 {
2032         struct dentry *tail_dentry = tail_filp->f_dentry;
2033         struct lookup_intent oit = {.it_op = IT_OPEN,
2034                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2035         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2036                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2037
2038         struct lustre_handle lockh;
2039         struct md_op_data *op_data;
2040         int    rc;
2041         loff_t data;
2042         ENTRY;
2043
2044         tail_dentry = tail_filp->f_dentry;
2045
2046         data = i_size_read(head_inode);
2047         op_data = ll_prep_md_op_data(NULL, head_inode,
2048                                      tail_dentry->d_parent->d_inode,
2049                                      tail_dentry->d_name.name,
2050                                      tail_dentry->d_name.len, 0,
2051                                      LUSTRE_OPC_ANY, &data);
2052         if (IS_ERR(op_data))
2053                 RETURN(PTR_ERR(op_data));
2054
2055         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2056                          op_data, &lockh, NULL, 0, 0);
2057
2058         ll_finish_md_op_data(op_data);
2059         if (rc < 0)
2060                 GOTO(out, rc);
2061
2062         rc = oit.d.lustre.it_status;
2063
2064         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2065                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2066                 ptlrpc_req_finished((struct ptlrpc_request *)
2067                                     oit.d.lustre.it_data);
2068                 GOTO(out, rc);
2069         }
2070
2071         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2072                                            * away */
2073                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2074                 oit.d.lustre.it_lock_mode = 0;
2075         }
2076         ll_release_openhandle(head_filp->f_dentry, &oit);
2077 out:
2078         ll_intent_release(&oit);
2079         RETURN(rc);
2080 }
2081
2082 static int ll_file_join(struct inode *head, struct file *filp,
2083                         char *filename_tail)
2084 {
2085         struct inode *tail = NULL, *first = NULL, *second = NULL;
2086         struct dentry *tail_dentry;
2087         struct file *tail_filp, *first_filp, *second_filp;
2088         struct ll_lock_tree first_tree, second_tree;
2089         struct ll_lock_tree_node *first_node, *second_node;
2090         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2091         int rc = 0, cleanup_phase = 0;
2092         ENTRY;
2093
2094         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2095                head->i_ino, head->i_generation, head, filename_tail);
2096
2097         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2098         if (IS_ERR(tail_filp)) {
2099                 CERROR("Can not open tail file %s", filename_tail);
2100                 rc = PTR_ERR(tail_filp);
2101                 GOTO(cleanup, rc);
2102         }
2103         tail = igrab(tail_filp->f_dentry->d_inode);
2104
2105         tlli = ll_i2info(tail);
2106         tail_dentry = tail_filp->f_dentry;
2107         LASSERT(tail_dentry);
2108         cleanup_phase = 1;
2109
2110         /*reorder the inode for lock sequence*/
2111         first = head->i_ino > tail->i_ino ? head : tail;
2112         second = head->i_ino > tail->i_ino ? tail : head;
2113         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2114         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2115
2116         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2117                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2118         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2119         if (IS_ERR(first_node)){
2120                 rc = PTR_ERR(first_node);
2121                 GOTO(cleanup, rc);
2122         }
2123         first_tree.lt_fd = first_filp->private_data;
2124         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2125         if (rc != 0)
2126                 GOTO(cleanup, rc);
2127         cleanup_phase = 2;
2128
2129         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2130         if (IS_ERR(second_node)){
2131                 rc = PTR_ERR(second_node);
2132                 GOTO(cleanup, rc);
2133         }
2134         second_tree.lt_fd = second_filp->private_data;
2135         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2136         if (rc != 0)
2137                 GOTO(cleanup, rc);
2138         cleanup_phase = 3;
2139
2140         rc = join_sanity_check(head, tail);
2141         if (rc)
2142                 GOTO(cleanup, rc);
2143
2144         rc = join_file(head, filp, tail_filp);
2145         if (rc)
2146                 GOTO(cleanup, rc);
2147 cleanup:
2148         switch (cleanup_phase) {
2149         case 3:
2150                 ll_tree_unlock(&second_tree);
2151                 obd_cancel_unused(ll_i2dtexp(second),
2152                                   ll_i2info(second)->lli_smd, 0, NULL);
2153         case 2:
2154                 ll_tree_unlock(&first_tree);
2155                 obd_cancel_unused(ll_i2dtexp(first),
2156                                   ll_i2info(first)->lli_smd, 0, NULL);
2157         case 1:
2158                 filp_close(tail_filp, 0);
2159                 if (tail)
2160                         iput(tail);
2161                 if (head && rc == 0) {
2162                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2163                                        &hlli->lli_smd);
2164                         hlli->lli_smd = NULL;
2165                 }
2166         case 0:
2167                 break;
2168         default:
2169                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2170                 LBUG();
2171         }
2172         RETURN(rc);
2173 }
2174
2175 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2176 {
2177         struct inode *inode = dentry->d_inode;
2178         struct obd_client_handle *och;
2179         int rc;
2180         ENTRY;
2181
2182         LASSERT(inode);
2183
2184         /* Root ? Do nothing. */
2185         if (dentry->d_inode->i_sb->s_root == dentry)
2186                 RETURN(0);
2187
2188         /* No open handle to close? Move away */
2189         if (!it_disposition(it, DISP_OPEN_OPEN))
2190                 RETURN(0);
2191
2192         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2193
2194         OBD_ALLOC(och, sizeof(*och));
2195         if (!och)
2196                 GOTO(out, rc = -ENOMEM);
2197
2198         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2199                     ll_i2info(inode), it, och);
2200
2201         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2202                                        inode, och);
2203  out:
2204         /* this one is in place of ll_file_open */
2205         ptlrpc_req_finished(it->d.lustre.it_data);
2206         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2207         RETURN(rc);
2208 }
2209
2210 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2211                   unsigned long arg)
2212 {
2213         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2214         int flags;
2215         ENTRY;
2216
2217         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2218                inode->i_generation, inode, cmd);
2219         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2220
2221         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2222         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2223                 RETURN(-ENOTTY);
2224
2225         switch(cmd) {
2226         case LL_IOC_GETFLAGS:
2227                 /* Get the current value of the file flags */
2228                 return put_user(fd->fd_flags, (int *)arg);
2229         case LL_IOC_SETFLAGS:
2230         case LL_IOC_CLRFLAGS:
2231                 /* Set or clear specific file flags */
2232                 /* XXX This probably needs checks to ensure the flags are
2233                  *     not abused, and to handle any flag side effects.
2234                  */
2235                 if (get_user(flags, (int *) arg))
2236                         RETURN(-EFAULT);
2237
2238                 if (cmd == LL_IOC_SETFLAGS) {
2239                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2240                             !(file->f_flags & O_DIRECT)) {
2241                                 CERROR("%s: unable to disable locking on "
2242                                        "non-O_DIRECT file\n", current->comm);
2243                                 RETURN(-EINVAL);
2244                         }
2245
2246                         fd->fd_flags |= flags;
2247                 } else {
2248                         fd->fd_flags &= ~flags;
2249                 }
2250                 RETURN(0);
2251         case LL_IOC_LOV_SETSTRIPE:
2252                 RETURN(ll_lov_setstripe(inode, file, arg));
2253         case LL_IOC_LOV_SETEA:
2254                 RETURN(ll_lov_setea(inode, file, arg));
2255         case LL_IOC_LOV_GETSTRIPE:
2256                 RETURN(ll_lov_getstripe(inode, arg));
2257         case LL_IOC_RECREATE_OBJ:
2258                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2259         case EXT3_IOC_GETFLAGS:
2260         case EXT3_IOC_SETFLAGS:
2261                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2262         case EXT3_IOC_GETVERSION_OLD:
2263         case EXT3_IOC_GETVERSION:
2264                 RETURN(put_user(inode->i_generation, (int *)arg));
2265         case LL_IOC_JOIN: {
2266                 char *ftail;
2267                 int rc;
2268
2269                 ftail = getname((const char *)arg);
2270                 if (IS_ERR(ftail))
2271                         RETURN(PTR_ERR(ftail));
2272                 rc = ll_file_join(inode, file, ftail);
2273                 putname(ftail);
2274                 RETURN(rc);
2275         }
2276         case LL_IOC_GROUP_LOCK:
2277                 RETURN(ll_get_grouplock(inode, file, arg));
2278         case LL_IOC_GROUP_UNLOCK:
2279                 RETURN(ll_put_grouplock(inode, file, arg));
2280         case IOC_OBD_STATFS:
2281                 RETURN(ll_obd_statfs(inode, (void *)arg));
2282
2283         /* We need to special case any other ioctls we want to handle,
2284          * to send them to the MDS/OST as appropriate and to properly
2285          * network encode the arg field.
2286         case EXT3_IOC_SETVERSION_OLD:
2287         case EXT3_IOC_SETVERSION:
2288         */
2289         case LL_IOC_FLUSHCTX:
2290                 RETURN(ll_flush_ctx(inode));
2291         case LL_IOC_GETFACL: {
2292                 struct rmtacl_ioctl_data ioc;
2293
2294                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2295                         RETURN(-EFAULT);
2296
2297                 RETURN(ll_ioctl_getfacl(inode, &ioc));
2298         }
2299         case LL_IOC_SETFACL: {
2300                 struct rmtacl_ioctl_data ioc;
2301
2302                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2303                         RETURN(-EFAULT);
2304
2305                 RETURN(ll_ioctl_setfacl(inode, &ioc));
2306         }
2307         default: {
2308                 int err;
2309
2310                 if (LLIOC_STOP == 
2311                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2312                         RETURN(err);
2313
2314                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2315                                      (void *)arg));
2316         }
2317         }
2318 }
2319
2320 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2321 {
2322         struct inode *inode = file->f_dentry->d_inode;
2323         struct ll_inode_info *lli = ll_i2info(inode);
2324         struct lov_stripe_md *lsm = lli->lli_smd;
2325         loff_t retval;
2326         ENTRY;
2327         retval = offset + ((origin == 2) ? i_size_read(inode) :
2328                            (origin == 1) ? file->f_pos : 0);
2329         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2330                inode->i_ino, inode->i_generation, inode, retval, retval,
2331                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2332         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2333
2334         if (origin == 2) { /* SEEK_END */
2335                 int nonblock = 0, rc;
2336
2337                 if (file->f_flags & O_NONBLOCK)
2338                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2339
2340                 if (lsm != NULL) {
2341                         rc = ll_glimpse_size(inode, nonblock);
2342                         if (rc != 0)
2343                                 RETURN(rc);
2344                 }
2345
2346                 ll_inode_size_lock(inode, 0);
2347                 offset += i_size_read(inode);
2348                 ll_inode_size_unlock(inode, 0);
2349         } else if (origin == 1) { /* SEEK_CUR */
2350                 offset += file->f_pos;
2351         }
2352
2353         retval = -EINVAL;
2354         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2355                 if (offset != file->f_pos) {
2356                         file->f_pos = offset;
2357 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2358                         file->f_reada = 0;
2359                         file->f_version = ++event;
2360 #endif
2361                 }
2362                 retval = offset;
2363         }
2364         
2365         RETURN(retval);
2366 }
2367
2368 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2369 {
2370         struct inode *inode = dentry->d_inode;
2371         struct ll_inode_info *lli = ll_i2info(inode);
2372         struct lov_stripe_md *lsm = lli->lli_smd;
2373         struct ptlrpc_request *req;
2374         struct obd_capa *oc;
2375         int rc, err;
2376         ENTRY;
2377         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2378                inode->i_generation, inode);
2379         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2380
2381         /* fsync's caller has already called _fdata{sync,write}, we want
2382          * that IO to finish before calling the osc and mdc sync methods */
2383         rc = filemap_fdatawait(inode->i_mapping);
2384
2385         /* catch async errors that were recorded back when async writeback
2386          * failed for pages in this mapping. */
2387         err = lli->lli_async_rc;
2388         lli->lli_async_rc = 0;
2389         if (rc == 0)
2390                 rc = err;
2391         if (lsm) {
2392                 err = lov_test_and_clear_async_rc(lsm);
2393                 if (rc == 0)
2394                         rc = err;
2395         }
2396
2397         oc = ll_mdscapa_get(inode);
2398         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2399                       &req);
2400         capa_put(oc);
2401         if (!rc)
2402                 rc = err;
2403         if (!err)
2404                 ptlrpc_req_finished(req);
2405
2406         if (data && lsm) {
2407                 struct obdo *oa;
2408                 
2409                 OBDO_ALLOC(oa);
2410                 if (!oa)
2411                         RETURN(rc ? rc : -ENOMEM);
2412
2413                 oa->o_id = lsm->lsm_object_id;
2414                 oa->o_gr = lsm->lsm_object_gr;
2415                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2416                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2417                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2418                                            OBD_MD_FLGROUP);
2419
2420                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2421                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2422                                0, OBD_OBJECT_EOF, oc);
2423                 capa_put(oc);
2424                 if (!rc)
2425                         rc = err;
2426                 OBDO_FREE(oa);
2427         }
2428
2429         RETURN(rc);
2430 }
2431
2432 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2433 {
2434         struct inode *inode = file->f_dentry->d_inode;
2435         struct ll_sb_info *sbi = ll_i2sbi(inode);
2436         struct ldlm_res_id res_id =
2437                 { .name = { fid_seq(ll_inode2fid(inode)),
2438                             fid_oid(ll_inode2fid(inode)),
2439                             fid_ver(ll_inode2fid(inode)),
2440                             LDLM_FLOCK} };
2441         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2442                 ldlm_flock_completion_ast, NULL, file_lock };
2443         struct lustre_handle lockh = {0};
2444         ldlm_policy_data_t flock;
2445         int flags = 0;
2446         int rc;
2447         ENTRY;
2448
2449         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2450                inode->i_ino, file_lock);
2451
2452         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2453  
2454         if (file_lock->fl_flags & FL_FLOCK) {
2455                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2456                 /* set missing params for flock() calls */
2457                 file_lock->fl_end = OFFSET_MAX;
2458                 file_lock->fl_pid = current->tgid;
2459         }
2460         flock.l_flock.pid = file_lock->fl_pid;
2461         flock.l_flock.start = file_lock->fl_start;
2462         flock.l_flock.end = file_lock->fl_end;
2463
2464         switch (file_lock->fl_type) {
2465         case F_RDLCK:
2466                 einfo.ei_mode = LCK_PR;
2467                 break;
2468         case F_UNLCK:
2469                 /* An unlock request may or may not have any relation to
2470                  * existing locks so we may not be able to pass a lock handle
2471                  * via a normal ldlm_lock_cancel() request. The request may even
2472                  * unlock a byte range in the middle of an existing lock. In
2473                  * order to process an unlock request we need all of the same
2474                  * information that is given with a normal read or write record
2475                  * lock request. To avoid creating another ldlm unlock (cancel)
2476                  * message we'll treat a LCK_NL flock request as an unlock. */
2477                 einfo.ei_mode = LCK_NL;
2478                 break;
2479         case F_WRLCK:
2480                 einfo.ei_mode = LCK_PW;
2481                 break;
2482         default:
2483                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2484                 LBUG();
2485         }
2486
2487         switch (cmd) {
2488         case F_SETLKW:
2489 #ifdef F_SETLKW64
2490         case F_SETLKW64:
2491 #endif
2492                 flags = 0;
2493                 break;
2494         case F_SETLK:
2495 #ifdef F_SETLK64
2496         case F_SETLK64:
2497 #endif
2498                 flags = LDLM_FL_BLOCK_NOWAIT;
2499                 break;
2500         case F_GETLK:
2501 #ifdef F_GETLK64
2502         case F_GETLK64:
2503 #endif
2504                 flags = LDLM_FL_TEST_LOCK;
2505                 /* Save the old mode so that if the mode in the lock changes we
2506                  * can decrement the appropriate reader or writer refcount. */
2507                 file_lock->fl_type = einfo.ei_mode;
2508                 break;
2509         default:
2510                 CERROR("unknown fcntl lock command: %d\n", cmd);
2511                 LBUG();
2512         }
2513
2514         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2515                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2516                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2517
2518         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2519                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2520         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2521                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2522 #ifdef HAVE_F_OP_FLOCK
2523         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2524             !(flags & LDLM_FL_TEST_LOCK))
2525                 posix_lock_file_wait(file, file_lock);
2526 #endif
2527
2528         RETURN(rc);
2529 }
2530
2531 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2532 {
2533         ENTRY;
2534
2535         RETURN(-ENOSYS);
2536 }
2537
2538 int ll_have_md_lock(struct inode *inode, __u64 bits)
2539 {
2540         struct lustre_handle lockh;
2541         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2542         struct lu_fid *fid;
2543         int flags;
2544         ENTRY;
2545
2546         if (!inode)
2547                RETURN(0);
2548
2549         fid = &ll_i2info(inode)->lli_fid;
2550         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2551
2552         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2553         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2554                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2555                 RETURN(1);
2556         }
2557         RETURN(0);
2558 }
2559
2560 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2561                             struct lustre_handle *lockh)
2562 {
2563         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2564         struct lu_fid *fid;
2565         ldlm_mode_t rc;
2566         int flags;
2567         ENTRY;
2568
2569         fid = &ll_i2info(inode)->lli_fid;
2570         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2571
2572         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2573         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2574                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2575         RETURN(rc);
2576 }
2577
2578 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2579         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2580                               * and return success */
2581                 inode->i_nlink = 0;
2582                 /* This path cannot be hit for regular files unless in
2583                  * case of obscure races, so no need to to validate
2584                  * size. */
2585                 if (!S_ISREG(inode->i_mode) &&
2586                     !S_ISDIR(inode->i_mode))
2587                         return 0;
2588         }
2589
2590         if (rc) {
2591                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2592                 return -abs(rc);
2593
2594         }
2595
2596         return 0;
2597 }
2598
2599 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2600 {
2601         struct inode *inode = dentry->d_inode;
2602         struct ptlrpc_request *req = NULL;
2603         struct ll_sb_info *sbi;
2604         struct obd_export *exp;
2605         int rc;
2606         ENTRY;
2607
2608         if (!inode) {
2609                 CERROR("REPORT THIS LINE TO PETER\n");
2610                 RETURN(0);
2611         }
2612         sbi = ll_i2sbi(inode);
2613
2614         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2615                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2616
2617         exp = ll_i2mdexp(inode);
2618
2619         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2620                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2621                 struct md_op_data *op_data;
2622
2623                 /* Call getattr by fid, so do not provide name at all. */
2624                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2625                                              dentry->d_inode, NULL, 0, 0,
2626                                              LUSTRE_OPC_ANY, NULL);
2627                 if (IS_ERR(op_data))
2628                         RETURN(PTR_ERR(op_data));
2629
2630                 oit.it_flags |= O_CHECK_STALE;
2631                 rc = md_intent_lock(exp, op_data, NULL, 0,
2632                                     /* we are not interested in name
2633                                        based lookup */
2634                                     &oit, 0, &req,
2635                                     ll_md_blocking_ast, 0);
2636                 ll_finish_md_op_data(op_data);
2637                 oit.it_flags &= ~O_CHECK_STALE;
2638                 if (rc < 0) {
2639                         rc = ll_inode_revalidate_fini(inode, rc);
2640                         GOTO (out, rc);
2641                 }
2642
2643                 rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2644                 if (rc != 0) {
2645                         ll_intent_release(&oit);
2646                         GOTO(out, rc);
2647                 }
2648
2649                 /* Unlinked? Unhash dentry, so it is not picked up later by
2650                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2651                    here to preserve get_cwd functionality on 2.6.
2652                    Bug 10503 */
2653                 if (!dentry->d_inode->i_nlink) {
2654                         spin_lock(&dcache_lock);
2655                         ll_drop_dentry(dentry);
2656                         spin_unlock(&dcache_lock);
2657                 }
2658
2659                 ll_lookup_finish_locks(&oit, dentry);
2660         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE)) {
2661                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2662                 obd_valid valid = OBD_MD_FLGETATTR;
2663                 struct obd_capa *oc;
2664                 int ealen = 0;
2665
2666                 if (S_ISREG(inode->i_mode)) {
2667                         rc = ll_get_max_mdsize(sbi, &ealen);
2668                         if (rc)
2669                                 RETURN(rc);
2670                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2671                 }
2672                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2673                  * capa for this inode. Because we only keep capas of dirs
2674                  * fresh. */
2675                 oc = ll_mdscapa_get(inode);
2676                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2677                                 ealen, &req);
2678                 capa_put(oc);
2679                 if (rc) {
2680                         rc = ll_inode_revalidate_fini(inode, rc);
2681                         RETURN(rc);
2682                 }
2683
2684                 rc = ll_prep_inode(&inode, req, REPLY_REC_OFF,
2685                                    NULL);
2686                 if (rc)
2687                         GOTO(out, rc);
2688         }
2689
2690         /* if object not yet allocated, don't validate size */
2691         if (ll_i2info(inode)->lli_smd == NULL)
2692                 GOTO(out, rc = 0);
2693
2694         /* ll_glimpse_size will prefer locally cached writes if they extend
2695          * the file */
2696         rc = ll_glimpse_size(inode, 0);
2697         EXIT;
2698 out:
2699         ptlrpc_req_finished(req);
2700         return rc;
2701 }
2702
2703 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2704                   struct lookup_intent *it, struct kstat *stat)
2705 {
2706         struct inode *inode = de->d_inode;
2707         int res = 0;
2708
2709         res = ll_inode_revalidate_it(de, it);
2710         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2711
2712         if (res)
2713                 return res;
2714
2715         stat->dev = inode->i_sb->s_dev;
2716         stat->ino = inode->i_ino;
2717         stat->mode = inode->i_mode;
2718         stat->nlink = inode->i_nlink;
2719         stat->uid = inode->i_uid;
2720         stat->gid = inode->i_gid;
2721         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2722         stat->atime = inode->i_atime;
2723         stat->mtime = inode->i_mtime;
2724         stat->ctime = inode->i_ctime;
2725 #ifdef HAVE_INODE_BLKSIZE
2726         stat->blksize = inode->i_blksize;
2727 #else
2728         stat->blksize = 1 << inode->i_blkbits;
2729 #endif
2730
2731         ll_inode_size_lock(inode, 0);
2732         stat->size = i_size_read(inode);
2733         stat->blocks = inode->i_blocks;
2734         ll_inode_size_unlock(inode, 0);
2735
2736         return 0;
2737 }
2738 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2739 {
2740         struct lookup_intent it = { .it_op = IT_GETATTR };
2741
2742         return ll_getattr_it(mnt, de, &it, stat);
2743 }
2744
2745 static
2746 int lustre_check_acl(struct inode *inode, int mask)
2747 {
2748 #ifdef CONFIG_FS_POSIX_ACL
2749         struct ll_inode_info *lli = ll_i2info(inode);
2750         struct posix_acl *acl;
2751         int rc;
2752         ENTRY;
2753
2754         spin_lock(&lli->lli_lock);
2755         acl = posix_acl_dup(lli->lli_posix_acl);
2756         spin_unlock(&lli->lli_lock);
2757
2758         if (!acl)
2759                 RETURN(-EAGAIN);
2760
2761         rc = posix_acl_permission(inode, acl, mask);
2762         posix_acl_release(acl);
2763
2764         RETURN(rc);
2765 #else
2766         return -EAGAIN;
2767 #endif
2768 }
2769
2770 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2771 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2772 {
2773         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2774                inode->i_ino, inode->i_generation, inode, mask);
2775         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2776                 return lustre_check_remote_perm(inode, mask);
2777         
2778         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2779         return generic_permission(inode, mask, lustre_check_acl);
2780 }
2781 #else
2782 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2783 {
2784         int mode = inode->i_mode;
2785         int rc;
2786
2787         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2788                inode->i_ino, inode->i_generation, inode, mask);
2789
2790         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2791                 return lustre_check_remote_perm(inode, mask);
2792
2793         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2794
2795         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2796             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2797                 return -EROFS;
2798         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2799                 return -EACCES;
2800         if (current->fsuid == inode->i_uid) {
2801                 mode >>= 6;
2802         } else if (1) {
2803                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2804                         goto check_groups;
2805                 rc = lustre_check_acl(inode, mask);
2806                 if (rc == -EAGAIN)
2807                         goto check_groups;
2808                 if (rc == -EACCES)
2809                         goto check_capabilities;
2810                 return rc;
2811         } else {
2812 check_groups:
2813                 if (in_group_p(inode->i_gid))
2814                         mode >>= 3;
2815         }
2816         if ((mode & mask & S_IRWXO) == mask)
2817                 return 0;
2818
2819 check_capabilities:
2820         if (!(mask & MAY_EXEC) ||
2821             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2822                 if (capable(CAP_DAC_OVERRIDE))
2823                         return 0;
2824
2825         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2826             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2827                 return 0;
2828         
2829         return -EACCES;
2830 }
2831 #endif
2832
2833 /* -o localflock - only provides locally consistent flock locks */
2834 struct file_operations ll_file_operations = {
2835         .read           = ll_file_read,
2836         .write          = ll_file_write,
2837         .ioctl          = ll_file_ioctl,
2838         .open           = ll_file_open,
2839         .release        = ll_file_release,
2840         .mmap           = ll_file_mmap,
2841         .llseek         = ll_file_seek,
2842         .sendfile       = ll_file_sendfile,
2843         .fsync          = ll_fsync,
2844 };
2845
2846 struct file_operations ll_file_operations_flock = {
2847         .read           = ll_file_read,
2848         .write          = ll_file_write,
2849         .ioctl          = ll_file_ioctl,
2850         .open           = ll_file_open,
2851         .release        = ll_file_release,
2852         .mmap           = ll_file_mmap,
2853         .llseek         = ll_file_seek,
2854         .sendfile       = ll_file_sendfile,
2855         .fsync          = ll_fsync,
2856 #ifdef HAVE_F_OP_FLOCK
2857         .flock          = ll_file_flock,
2858 #endif
2859         .lock           = ll_file_flock
2860 };
2861
2862 /* These are for -o noflock - to return ENOSYS on flock calls */
2863 struct file_operations ll_file_operations_noflock = {
2864         .read           = ll_file_read,
2865         .write          = ll_file_write,
2866         .ioctl          = ll_file_ioctl,
2867         .open           = ll_file_open,
2868         .release        = ll_file_release,
2869         .mmap           = ll_file_mmap,
2870         .llseek         = ll_file_seek,
2871         .sendfile       = ll_file_sendfile,
2872         .fsync          = ll_fsync,
2873 #ifdef HAVE_F_OP_FLOCK
2874         .flock          = ll_file_noflock,
2875 #endif
2876         .lock           = ll_file_noflock
2877 };
2878
2879 struct inode_operations ll_file_inode_operations = {
2880 #ifdef HAVE_VFS_INTENT_PATCHES
2881         .setattr_raw    = ll_setattr_raw,
2882 #endif
2883         .setattr        = ll_setattr,
2884         .truncate       = ll_truncate,
2885         .getattr        = ll_getattr,
2886         .permission     = ll_inode_permission,
2887         .setxattr       = ll_setxattr,
2888         .getxattr       = ll_getxattr,
2889         .listxattr      = ll_listxattr,
2890         .removexattr    = ll_removexattr,
2891 };
2892
2893 /* dynamic ioctl number support routins */
2894 static struct llioc_ctl_data {
2895         struct rw_semaphore ioc_sem;
2896         struct list_head    ioc_head;
2897 } llioc = { 
2898         __RWSEM_INITIALIZER(llioc.ioc_sem), 
2899         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2900 };
2901
2902
2903 struct llioc_data {
2904         struct list_head        iocd_list;
2905         unsigned int            iocd_size;
2906         llioc_callback_t        iocd_cb;
2907         unsigned int            iocd_count;
2908         unsigned int            iocd_cmd[0];
2909 };
2910
2911 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2912 {
2913         unsigned int size;
2914         struct llioc_data *in_data = NULL;
2915         ENTRY;
2916
2917         if (cb == NULL || cmd == NULL ||
2918             count > LLIOC_MAX_CMD || count < 0)
2919                 RETURN(NULL);
2920
2921         size = sizeof(*in_data) + count * sizeof(unsigned int);
2922         OBD_ALLOC(in_data, size);
2923         if (in_data == NULL)
2924                 RETURN(NULL);
2925
2926         memset(in_data, 0, sizeof(*in_data));
2927         in_data->iocd_size = size;
2928         in_data->iocd_cb = cb;
2929         in_data->iocd_count = count;
2930         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2931
2932         down_write(&llioc.ioc_sem);
2933         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2934         up_write(&llioc.ioc_sem);
2935
2936         RETURN(in_data);
2937 }
2938
2939 void ll_iocontrol_unregister(void *magic)
2940 {
2941         struct llioc_data *tmp;
2942
2943         if (magic == NULL)
2944                 return;
2945
2946         down_write(&llioc.ioc_sem);
2947         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2948                 if (tmp == magic) {
2949                         unsigned int size = tmp->iocd_size;
2950
2951                         list_del(&tmp->iocd_list);
2952                         up_write(&llioc.ioc_sem);
2953
2954                         OBD_FREE(tmp, size);
2955                         return;
2956                 }
2957         }
2958         up_write(&llioc.ioc_sem);
2959
2960         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2961 }
2962
2963 EXPORT_SYMBOL(ll_iocontrol_register);
2964 EXPORT_SYMBOL(ll_iocontrol_unregister);
2965
2966 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
2967                         unsigned int cmd, unsigned long arg, int *rcp)
2968 {
2969         enum llioc_iter ret = LLIOC_CONT;
2970         struct llioc_data *data;
2971         int rc = -EINVAL, i;
2972
2973         down_read(&llioc.ioc_sem);
2974         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2975                 for (i = 0; i < data->iocd_count; i++) {
2976                         if (cmd != data->iocd_cmd[i]) 
2977                                 continue;
2978
2979                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2980                         break;
2981                 }
2982
2983                 if (ret == LLIOC_STOP)
2984                         break;
2985         }
2986         up_read(&llioc.ioc_sem);
2987
2988         if (rcp)
2989                 *rcp = rc;
2990         return ret;
2991 }