Whamcloud - gitweb
land b_colibri_devel on HEAD:
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #include "llite_internal.h"
32
33 /* also used by llite/special.c:ll_special_open() */
34 struct ll_file_data *ll_file_data_get(void)
35 {
36         struct ll_file_data *fd;
37
38         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
39         return fd;
40 }
41
42 static void ll_file_data_put(struct ll_file_data *fd)
43 {
44         if (fd != NULL)
45                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
46 }
47
48 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
49                           struct lustre_handle *fh)
50 {
51         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
52         op_data->op_attr.ia_mode = inode->i_mode;
53         op_data->op_attr.ia_atime = inode->i_atime;
54         op_data->op_attr.ia_mtime = inode->i_mtime;
55         op_data->op_attr.ia_ctime = inode->i_ctime;
56         op_data->op_attr.ia_size = i_size_read(inode);
57         op_data->op_attr_blocks = inode->i_blocks;
58         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
59         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
60         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
61         op_data->op_capa1 = ll_mdscapa_get(inode);
62 }
63
64 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
65                              struct obd_client_handle *och)
66 {
67         ENTRY;
68
69         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
70                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
71
72         if (!(och->och_flags & FMODE_WRITE))
73                 goto out;
74
75         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
76             !S_ISREG(inode->i_mode))
77                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
78         else
79                 ll_epoch_close(inode, op_data, &och, 0);
80
81 out:
82         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
83         EXIT;
84 }
85
86 static int ll_close_inode_openhandle(struct obd_export *md_exp,
87                                      struct inode *inode,
88                                      struct obd_client_handle *och)
89 {
90         struct obd_export *exp = ll_i2mdexp(inode);
91         struct md_op_data *op_data;
92         struct ptlrpc_request *req = NULL;
93         struct obd_device *obd = class_exp2obd(exp);
94         int epoch_close = 1;
95         int seq_end = 0, rc;
96         ENTRY;
97
98         if (obd == NULL) {
99                 /*
100                  * XXX: in case of LMV, is this correct to access
101                  * ->exp_handle?
102                  */
103                 CERROR("Invalid MDC connection handle "LPX64"\n",
104                        ll_i2mdexp(inode)->exp_handle.h_cookie);
105                 GOTO(out, rc = 0);
106         }
107
108         /*
109          * here we check if this is forced umount. If so this is called on
110          * canceling "open lock" and we do not call md_close() in this case, as
111          * it will not be successful, as import is already deactivated.
112          */
113         if (obd->obd_force)
114                 GOTO(out, rc = 0);
115
116         OBD_ALLOC_PTR(op_data);
117         if (op_data == NULL)
118                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
119
120         ll_prepare_close(inode, op_data, och);
121         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
122         rc = md_close(md_exp, op_data, och->och_mod, &req);
123         if (rc != -EAGAIN)
124                 seq_end = 1;
125
126         if (rc == -EAGAIN) {
127                 /* This close must have the epoch closed. */
128                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
129                 LASSERT(epoch_close);
130                 /* MDS has instructed us to obtain Size-on-MDS attribute from
131                  * OSTs and send setattr to back to MDS. */
132                 rc = ll_sizeonmds_update(inode, och->och_mod,
133                                          &och->och_fh, op_data->op_ioepoch);
134                 if (rc) {
135                         CERROR("inode %lu mdc Size-on-MDS update failed: "
136                                "rc = %d\n", inode->i_ino, rc);
137                         rc = 0;
138                 }
139         } else if (rc) {
140                 CERROR("inode %lu mdc close failed: rc = %d\n",
141                        inode->i_ino, rc);
142         }
143         ll_finish_md_op_data(op_data);
144
145         if (rc == 0) {
146                 rc = ll_objects_destroy(req, inode);
147                 if (rc)
148                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
149                                inode->i_ino, rc);
150         }
151
152         EXIT;
153 out:
154       
155         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
156             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
157                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
158         } else {
159                 if (seq_end)
160                         ptlrpc_close_replay_seq(req);
161                 md_clear_open_replay_data(md_exp, och);
162                 /* Free @och if it is not waiting for DONE_WRITING. */
163                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
164                 OBD_FREE_PTR(och);
165         }
166         if (req) /* This is close request */
167                 ptlrpc_req_finished(req);
168         return rc;
169 }
170
171 int ll_md_real_close(struct inode *inode, int flags)
172 {
173         struct ll_inode_info *lli = ll_i2info(inode);
174         struct obd_client_handle **och_p;
175         struct obd_client_handle *och;
176         __u64 *och_usecount;
177         int rc = 0;
178         ENTRY;
179
180         if (flags & FMODE_WRITE) {
181                 och_p = &lli->lli_mds_write_och;
182                 och_usecount = &lli->lli_open_fd_write_count;
183         } else if (flags & FMODE_EXEC) {
184                 och_p = &lli->lli_mds_exec_och;
185                 och_usecount = &lli->lli_open_fd_exec_count;
186         } else {
187                 LASSERT(flags & FMODE_READ);
188                 och_p = &lli->lli_mds_read_och;
189                 och_usecount = &lli->lli_open_fd_read_count;
190         }
191
192         down(&lli->lli_och_sem);
193         if (*och_usecount) { /* There are still users of this handle, so
194                                 skip freeing it. */
195                 up(&lli->lli_och_sem);
196                 RETURN(0);
197         }
198         och=*och_p;
199         *och_p = NULL;
200         up(&lli->lli_och_sem);
201
202         if (och) { /* There might be a race and somebody have freed this och
203                       already */
204                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
205                                                inode, och);
206         }
207
208         RETURN(rc);
209 }
210
211 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
212                 struct file *file)
213 {
214         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
215         struct ll_inode_info *lli = ll_i2info(inode);
216         int rc = 0;
217         ENTRY;
218
219         /* clear group lock, if present */
220         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
221                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
222                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
223                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
224                                       &fd->fd_cwlockh);
225         }
226
227         /* Let's see if we have good enough OPEN lock on the file and if
228            we can skip talking to MDS */
229         if (file->f_dentry->d_inode) { /* Can this ever be false? */
230                 int lockmode;
231                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
232                 struct lustre_handle lockh;
233                 struct inode *inode = file->f_dentry->d_inode;
234                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
235
236                 down(&lli->lli_och_sem);
237                 if (fd->fd_omode & FMODE_WRITE) {
238                         lockmode = LCK_CW;
239                         LASSERT(lli->lli_open_fd_write_count);
240                         lli->lli_open_fd_write_count--;
241                 } else if (fd->fd_omode & FMODE_EXEC) {
242                         lockmode = LCK_PR;
243                         LASSERT(lli->lli_open_fd_exec_count);
244                         lli->lli_open_fd_exec_count--;
245                 } else {
246                         lockmode = LCK_CR;
247                         LASSERT(lli->lli_open_fd_read_count);
248                         lli->lli_open_fd_read_count--;
249                 }
250                 up(&lli->lli_och_sem);
251
252                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
253                                    LDLM_IBITS, &policy, lockmode,
254                                    &lockh)) {
255                         rc = ll_md_real_close(file->f_dentry->d_inode,
256                                               fd->fd_omode);
257                 }
258         } else {
259                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
260                        file, file->f_dentry, file->f_dentry->d_name.name);
261         }
262
263         LUSTRE_FPRIVATE(file) = NULL;
264         ll_file_data_put(fd);
265         ll_capa_close(inode);
266
267         RETURN(rc);
268 }
269
270 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
271
272 /* While this returns an error code, fput() the caller does not, so we need
273  * to make every effort to clean up all of our state here.  Also, applications
274  * rarely check close errors and even if an error is returned they will not
275  * re-try the close call.
276  */
277 int ll_file_release(struct inode *inode, struct file *file)
278 {
279         struct ll_file_data *fd;
280         struct ll_sb_info *sbi = ll_i2sbi(inode);
281         struct ll_inode_info *lli = ll_i2info(inode);
282         struct lov_stripe_md *lsm = lli->lli_smd;
283         int rc;
284
285         ENTRY;
286         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
287                inode->i_generation, inode);
288
289 #ifdef CONFIG_FS_POSIX_ACL
290         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
291             inode == inode->i_sb->s_root->d_inode) {
292                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
293
294                 LASSERT(fd != NULL);
295                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
296                         fd->fd_flags &= ~LL_FILE_RMTACL;
297                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
298                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
299                 }
300         }
301 #endif
302
303         ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
304         fd = LUSTRE_FPRIVATE(file);
305         LASSERT(fd != NULL);
306
307         /* don't do anything for / */
308         if (inode->i_sb->s_root == file->f_dentry) {
309                 LUSTRE_FPRIVATE(file) = NULL;
310                 ll_file_data_put(fd);
311                 RETURN(0);
312         }
313         
314         if (lsm)
315                 lov_test_and_clear_async_rc(lsm);
316         lli->lli_async_rc = 0;
317
318         rc = ll_md_close(sbi->ll_md_exp, inode, file);
319         RETURN(rc);
320 }
321
322 static int ll_intent_file_open(struct file *file, void *lmm,
323                                int lmmsize, struct lookup_intent *itp)
324 {
325         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
326         struct dentry *parent = file->f_dentry->d_parent;
327         const char *name = file->f_dentry->d_name.name;
328         const int len = file->f_dentry->d_name.len;
329         struct md_op_data *op_data;
330         struct ptlrpc_request *req;
331         int rc;
332
333         if (!parent)
334                 RETURN(-ENOENT);
335
336         /* Usually we come here only for NFSD, and we want open lock.
337            But we can also get here with pre 2.6.15 patchless kernels, and in
338            that case that lock is also ok */
339         /* We can also get here if there was cached open handle in revalidate_it
340          * but it disappeared while we were getting from there to ll_file_open.
341          * But this means this file was closed and immediatelly opened which
342          * makes a good candidate for using OPEN lock */
343         /* If lmmsize & lmm are not 0, we are just setting stripe info
344          * parameters. No need for the open lock */
345         if (!lmm && !lmmsize)
346                 itp->it_flags |= MDS_OPEN_LOCK;
347
348         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
349                                       file->f_dentry->d_inode, name, len,
350                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
351         if (IS_ERR(op_data))
352                 RETURN(PTR_ERR(op_data));
353
354         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
355                             0 /*unused */, &req, ll_md_blocking_ast, 0);
356         ll_finish_md_op_data(op_data);
357         if (rc == -ESTALE) {
358                 /* reason for keep own exit path - don`t flood log
359                 * with messages with -ESTALE errors.
360                 */
361                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
362                      it_open_error(DISP_OPEN_OPEN, itp))
363                         GOTO(out, rc);
364                 ll_release_openhandle(file->f_dentry, itp);
365                 GOTO(out_stale, rc);
366         }
367
368         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
369                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
370                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
371                 GOTO(out, rc);
372         }
373
374         if (itp->d.lustre.it_lock_mode)
375                 md_set_lock_data(sbi->ll_md_exp,
376                                  &itp->d.lustre.it_lock_handle, 
377                                  file->f_dentry->d_inode);
378
379         rc = ll_prep_inode(&file->f_dentry->d_inode, req, DLM_REPLY_REC_OFF,
380                            NULL);
381 out:
382         ptlrpc_req_finished(itp->d.lustre.it_data);
383
384 out_stale:
385         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
386         ll_intent_drop_lock(itp);
387
388         RETURN(rc);
389 }
390
391 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
392                        struct lookup_intent *it, struct obd_client_handle *och)
393 {
394         struct ptlrpc_request *req = it->d.lustre.it_data;
395         struct mdt_body *body;
396
397         LASSERT(och);
398
399         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
400         /* reply already checked out */
401         LASSERT(body != NULL);
402         /* and swabbed in md_enqueue */
403         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
404
405         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
406         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
407         och->och_fid = lli->lli_fid;
408         och->och_flags = it->it_flags;
409         lli->lli_ioepoch = body->ioepoch;
410
411         return md_set_open_replay_data(md_exp, och, req);
412 }
413
414 int ll_local_open(struct file *file, struct lookup_intent *it,
415                   struct ll_file_data *fd, struct obd_client_handle *och)
416 {
417         struct inode *inode = file->f_dentry->d_inode;
418         struct ll_inode_info *lli = ll_i2info(inode);
419         ENTRY;
420
421         LASSERT(!LUSTRE_FPRIVATE(file));
422
423         LASSERT(fd != NULL);
424
425         if (och) {
426                 struct ptlrpc_request *req = it->d.lustre.it_data;
427                 struct mdt_body *body;
428                 int rc;
429
430                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
431                 if (rc)
432                         RETURN(rc);
433
434                 body = lustre_msg_buf(req->rq_repmsg,
435                                       DLM_REPLY_REC_OFF, sizeof(*body));
436
437                 if ((it->it_flags & FMODE_WRITE) &&
438                     (body->valid & OBD_MD_FLSIZE))
439                 {
440                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
441                                lli->lli_ioepoch, PFID(&lli->lli_fid));
442                 }
443         }
444
445         LUSTRE_FPRIVATE(file) = fd;
446         ll_readahead_init(inode, &fd->fd_ras);
447         fd->fd_omode = it->it_flags;
448         RETURN(0);
449 }
450
451 /* Open a file, and (for the very first open) create objects on the OSTs at
452  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
453  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
454  * lli_open_sem to ensure no other process will create objects, send the
455  * stripe MD to the MDS, or try to destroy the objects if that fails.
456  *
457  * If we already have the stripe MD locally then we don't request it in
458  * md_open(), by passing a lmm_size = 0.
459  *
460  * It is up to the application to ensure no other processes open this file
461  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
462  * used.  We might be able to avoid races of that sort by getting lli_open_sem
463  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
464  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
465  */
466 int ll_file_open(struct inode *inode, struct file *file)
467 {
468         struct ll_inode_info *lli = ll_i2info(inode);
469         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
470                                           .it_flags = file->f_flags };
471         struct lov_stripe_md *lsm;
472         struct ptlrpc_request *req = NULL;
473         struct obd_client_handle **och_p;
474         __u64 *och_usecount;
475         struct ll_file_data *fd;
476         int rc = 0;
477         ENTRY;
478
479         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
480                inode->i_generation, inode, file->f_flags);
481
482 #ifdef HAVE_VFS_INTENT_PATCHES
483         it = file->f_it;
484 #else
485         it = file->private_data; /* XXX: compat macro */
486         file->private_data = NULL; /* prevent ll_local_open assertion */
487 #endif
488
489         fd = ll_file_data_get();
490         if (fd == NULL)
491                 RETURN(-ENOMEM);
492
493         /* don't do anything for / */
494         if (inode->i_sb->s_root == file->f_dentry) {
495                 LUSTRE_FPRIVATE(file) = fd;
496                 RETURN(0);
497         }
498
499         if (!it || !it->d.lustre.it_disposition) {
500                 /* Convert f_flags into access mode. We cannot use file->f_mode,
501                  * because everything but O_ACCMODE mask was stripped from
502                  * there */
503                 if ((oit.it_flags + 1) & O_ACCMODE)
504                         oit.it_flags++;
505                 if (file->f_flags & O_TRUNC)
506                         oit.it_flags |= FMODE_WRITE;
507
508                 /* kernel only call f_op->open in dentry_open.  filp_open calls
509                  * dentry_open after call to open_namei that checks permissions.
510                  * Only nfsd_open call dentry_open directly without checking
511                  * permissions and because of that this code below is safe. */
512                 if (oit.it_flags & FMODE_WRITE)
513                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
514
515                 /* We do not want O_EXCL here, presumably we opened the file
516                  * already? XXX - NFS implications? */
517                 oit.it_flags &= ~O_EXCL;
518
519                 it = &oit;
520         }
521
522 restart:
523         /* Let's see if we have file open on MDS already. */
524         if (it->it_flags & FMODE_WRITE) {
525                 och_p = &lli->lli_mds_write_och;
526                 och_usecount = &lli->lli_open_fd_write_count;
527         } else if (it->it_flags & FMODE_EXEC) {
528                 och_p = &lli->lli_mds_exec_och;
529                 och_usecount = &lli->lli_open_fd_exec_count;
530          } else {
531                 och_p = &lli->lli_mds_read_och;
532                 och_usecount = &lli->lli_open_fd_read_count;
533         }
534         
535         down(&lli->lli_och_sem);
536         if (*och_p) { /* Open handle is present */
537                 if (it_disposition(it, DISP_OPEN_OPEN)) {
538                         /* Well, there's extra open request that we do not need,
539                            let's close it somehow. This will decref request. */
540                         rc = it_open_error(DISP_OPEN_OPEN, it);
541                         if (rc) {
542                                 ll_file_data_put(fd);
543                                 GOTO(out_och_free, rc);
544                         }       
545                         ll_release_openhandle(file->f_dentry, it);
546                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
547                                              LPROC_LL_OPEN);
548                 }
549                 (*och_usecount)++;
550
551                 rc = ll_local_open(file, it, fd, NULL);
552                 if (rc) {
553                         up(&lli->lli_och_sem);
554                         ll_file_data_put(fd);
555                         RETURN(rc);
556                 }
557         } else {
558                 LASSERT(*och_usecount == 0);
559                 if (!it->d.lustre.it_disposition) {
560                         /* We cannot just request lock handle now, new ELC code
561                            means that one of other OPEN locks for this file
562                            could be cancelled, and since blocking ast handler
563                            would attempt to grab och_sem as well, that would
564                            result in a deadlock */
565                         up(&lli->lli_och_sem);
566                         it->it_flags |= O_CHECK_STALE;
567                         rc = ll_intent_file_open(file, NULL, 0, it);
568                         it->it_flags &= ~O_CHECK_STALE;
569                         if (rc) {
570                                 ll_file_data_put(fd);
571                                 GOTO(out_openerr, rc);
572                         }
573
574                         /* Got some error? Release the request */
575                         if (it->d.lustre.it_status < 0) {
576                                 req = it->d.lustre.it_data;
577                                 ptlrpc_req_finished(req);
578                         }
579                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
580                                          &it->d.lustre.it_lock_handle,
581                                          file->f_dentry->d_inode);
582                         goto restart;
583                 }
584                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
585                 if (!*och_p) {
586                         ll_file_data_put(fd);
587                         GOTO(out_och_free, rc = -ENOMEM);
588                 }
589                 (*och_usecount)++;
590                 req = it->d.lustre.it_data;
591
592                 /* md_intent_lock() didn't get a request ref if there was an
593                  * open error, so don't do cleanup on the request here
594                  * (bug 3430) */
595                 /* XXX (green): Should not we bail out on any error here, not
596                  * just open error? */
597                 rc = it_open_error(DISP_OPEN_OPEN, it);
598                 if (rc) {
599                         ll_file_data_put(fd);
600                         GOTO(out_och_free, rc);
601                 }
602
603                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
604                 rc = ll_local_open(file, it, fd, *och_p);
605                 if (rc) {
606                         up(&lli->lli_och_sem);
607                         ll_file_data_put(fd);
608                         GOTO(out_och_free, rc);
609                 }
610         }
611         up(&lli->lli_och_sem);
612
613         /* Must do this outside lli_och_sem lock to prevent deadlock where
614            different kind of OPEN lock for this same inode gets cancelled
615            by ldlm_cancel_lru */
616         if (!S_ISREG(inode->i_mode))
617                 GOTO(out, rc);
618
619         ll_capa_open(inode);
620
621         lsm = lli->lli_smd;
622         if (lsm == NULL) {
623                 if (file->f_flags & O_LOV_DELAY_CREATE ||
624                     !(file->f_mode & FMODE_WRITE)) {
625                         CDEBUG(D_INODE, "object creation was delayed\n");
626                         GOTO(out, rc);
627                 }
628         }
629         file->f_flags &= ~O_LOV_DELAY_CREATE;
630         GOTO(out, rc);
631 out:
632         ptlrpc_req_finished(req);
633         if (req)
634                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
635 out_och_free:
636         if (rc) {
637                 if (*och_p) {
638                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
639                         *och_p = NULL; /* OBD_FREE writes some magic there */
640                         (*och_usecount)--;
641                 }
642                 up(&lli->lli_och_sem);
643 out_openerr: ;/* Looks weierd, eh? Just wait for statahead code to insert
644                 a statement here <-- remove this comment after statahead
645                 landing */
646         }
647
648         return rc;
649 }
650
651 /* Fills the obdo with the attributes for the inode defined by lsm */
652 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
653 {
654         struct ptlrpc_request_set *set;
655         struct ll_inode_info *lli = ll_i2info(inode);
656         struct lov_stripe_md *lsm = lli->lli_smd;
657
658         struct obd_info oinfo = { { { 0 } } };
659         int rc;
660         ENTRY;
661
662         LASSERT(lsm != NULL);
663
664         oinfo.oi_md = lsm;
665         oinfo.oi_oa = obdo;
666         oinfo.oi_oa->o_id = lsm->lsm_object_id;
667         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
668         oinfo.oi_oa->o_mode = S_IFREG;
669         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
670                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
671                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
672                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
673                                OBD_MD_FLGROUP;
674         oinfo.oi_capa = ll_mdscapa_get(inode);
675
676         set = ptlrpc_prep_set();
677         if (set == NULL) {
678                 CERROR("can't allocate ptlrpc set\n");
679                 rc = -ENOMEM;
680         } else {
681                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
682                 if (rc == 0)
683                         rc = ptlrpc_set_wait(set);
684                 ptlrpc_set_destroy(set);
685         }
686         capa_put(oinfo.oi_capa);
687         if (rc)
688                 RETURN(rc);
689
690         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
691                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
692                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
693
694         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
695         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
696                lli->lli_smd->lsm_object_id, i_size_read(inode),
697                inode->i_blocks, inode->i_blksize);
698         RETURN(0);
699 }
700
701 static inline void ll_remove_suid(struct inode *inode)
702 {
703         unsigned int mode;
704
705         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
706         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
707
708         /* was any of the uid bits set? */
709         mode &= inode->i_mode;
710         if (mode && !capable(CAP_FSETID)) {
711                 inode->i_mode &= ~mode;
712                 // XXX careful here - we cannot change the size
713         }
714 }
715
716 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
717 {
718         struct ll_inode_info *lli = ll_i2info(inode);
719         struct lov_stripe_md *lsm = lli->lli_smd;
720         struct obd_export *exp = ll_i2dtexp(inode);
721         struct {
722                 char name[16];
723                 struct ldlm_lock *lock;
724                 struct lov_stripe_md *lsm;
725         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
726         __u32 stripe, vallen = sizeof(stripe);
727         int rc;
728         ENTRY;
729
730         if (lsm->lsm_stripe_count == 1)
731                 GOTO(check, stripe = 0);
732
733         /* get our offset in the lov */
734         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
735         if (rc != 0) {
736                 CERROR("obd_get_info: rc = %d\n", rc);
737                 RETURN(rc);
738         }
739         LASSERT(stripe < lsm->lsm_stripe_count);
740
741 check:
742         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
743             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
744                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
745                            lsm->lsm_oinfo[stripe]->loi_id,
746                            lsm->lsm_oinfo[stripe]->loi_gr);
747                 RETURN(-ELDLM_NO_LOCK_DATA);
748         }
749
750         RETURN(stripe);
751 }
752
753 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
754  * we get a lock cancellation for each stripe, so we have to map the obd's
755  * region back onto the stripes in the file that it held.
756  *
757  * No one can dirty the extent until we've finished our work and they can
758  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
759  * but other kernel actors could have pages locked.
760  *
761  * Called with the DLM lock held. */
762 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
763                               struct ldlm_lock *lock, __u32 stripe)
764 {
765         ldlm_policy_data_t tmpex;
766         unsigned long start, end, count, skip, i, j;
767         struct page *page;
768         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
769         struct lustre_handle lockh;
770         struct address_space *mapping = inode->i_mapping;
771
772         ENTRY;
773         tmpex = lock->l_policy_data;
774         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
775                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
776                i_size_read(inode));
777
778         /* our locks are page granular thanks to osc_enqueue, we invalidate the
779          * whole page. */
780         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
781             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
782                 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
783                            CFS_PAGE_SIZE);
784         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
785         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
786
787         count = ~0;
788         skip = 0;
789         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
790         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
791         if (lsm->lsm_stripe_count > 1) {
792                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
793                 skip = (lsm->lsm_stripe_count - 1) * count;
794                 start += start/count * skip + stripe * count;
795                 if (end != ~0)
796                         end += end/count * skip + stripe * count;
797         }
798         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
799                 end = ~0;
800
801         i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
802             CFS_PAGE_SHIFT : 0;
803         if (i < end)
804                 end = i;
805
806         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
807                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
808                count, skip, end, discard ? " (DISCARDING)" : "");
809
810         /* walk through the vmas on the inode and tear down mmaped pages that
811          * intersect with the lock.  this stops immediately if there are no
812          * mmap()ed regions of the file.  This is not efficient at all and
813          * should be short lived. We'll associate mmap()ed pages with the lock
814          * and will be able to find them directly */
815         for (i = start; i <= end; i += (j + skip)) {
816                 j = min(count - (i % count), end - i + 1);
817                 LASSERT(j > 0);
818                 LASSERT(mapping);
819                 if (ll_teardown_mmaps(mapping,
820                                       (__u64)i << CFS_PAGE_SHIFT,
821                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
822                         break;
823         }
824
825         /* this is the simplistic implementation of page eviction at
826          * cancelation.  It is careful to get races with other page
827          * lockers handled correctly.  fixes from bug 20 will make it
828          * more efficient by associating locks with pages and with
829          * batching writeback under the lock explicitly. */
830         for (i = start, j = start % count; i <= end;
831              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
832                 if (j == count) {
833                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
834                         i += skip;
835                         j = 0;
836                         if (i > end)
837                                 break;
838                 }
839                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
840                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
841                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
842                          start, i, end);
843
844                 if (!mapping_has_pages(mapping)) {
845                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
846                         break;
847                 }
848
849                 cond_resched();
850
851                 page = find_get_page(mapping, i);
852                 if (page == NULL)
853                         continue;
854                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
855                                i, tmpex.l_extent.start);
856                 lock_page(page);
857
858                 /* page->mapping to check with racing against teardown */
859                 if (!discard && clear_page_dirty_for_io(page)) {
860                         rc = ll_call_writepage(inode, page);
861                         /* either waiting for io to complete or reacquiring
862                          * the lock that the failed writepage released */
863                         lock_page(page);
864                         wait_on_page_writeback(page);
865                         if (rc != 0) {
866                                 CERROR("writepage inode %lu(%p) of page %p "
867                                        "failed: %d\n", inode->i_ino, inode,
868                                        page, rc);
869                                 if (rc == -ENOSPC)
870                                         set_bit(AS_ENOSPC, &mapping->flags);
871                                 else
872                                         set_bit(AS_EIO, &mapping->flags);
873                         }
874                 }
875
876                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
877                 /* check to see if another DLM lock covers this page b=2765 */
878                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
879                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
880                                       LDLM_FL_TEST_LOCK,
881                                       &lock->l_resource->lr_name, LDLM_EXTENT,
882                                       &tmpex, LCK_PR | LCK_PW, &lockh);
883
884                 if (rc2 <= 0 && page->mapping != NULL) {
885                         struct ll_async_page *llap = llap_cast_private(page);
886                         /* checking again to account for writeback's
887                          * lock_page() */
888                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
889                         if (llap)
890                                 ll_ra_accounting(llap, mapping);
891                         ll_truncate_complete_page(page);
892                 }
893                 unlock_page(page);
894                 page_cache_release(page);
895         }
896         LASSERTF(tmpex.l_extent.start <=
897                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
898                   lock->l_policy_data.l_extent.end + 1),
899                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
900                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
901                  start, i, end);
902         EXIT;
903 }
904
905 static int ll_extent_lock_callback(struct ldlm_lock *lock,
906                                    struct ldlm_lock_desc *new, void *data,
907                                    int flag)
908 {
909         struct lustre_handle lockh = { 0 };
910         int rc;
911         ENTRY;
912
913         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
914                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
915                 LBUG();
916         }
917
918         switch (flag) {
919         case LDLM_CB_BLOCKING:
920                 ldlm_lock2handle(lock, &lockh);
921                 rc = ldlm_cli_cancel(&lockh);
922                 if (rc != ELDLM_OK)
923                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
924                 break;
925         case LDLM_CB_CANCELING: {
926                 struct inode *inode;
927                 struct ll_inode_info *lli;
928                 struct lov_stripe_md *lsm;
929                 int stripe;
930                 __u64 kms;
931
932                 /* This lock wasn't granted, don't try to evict pages */
933                 if (lock->l_req_mode != lock->l_granted_mode)
934                         RETURN(0);
935
936                 inode = ll_inode_from_lock(lock);
937                 if (inode == NULL)
938                         RETURN(0);
939                 lli = ll_i2info(inode);
940                 if (lli == NULL)
941                         goto iput;
942                 if (lli->lli_smd == NULL)
943                         goto iput;
944                 lsm = lli->lli_smd;
945
946                 stripe = ll_lock_to_stripe_offset(inode, lock);
947                 if (stripe < 0)
948                         goto iput;
949
950                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
951
952                 lov_stripe_lock(lsm);
953                 lock_res_and_lock(lock);
954                 kms = ldlm_extent_shift_kms(lock,
955                                             lsm->lsm_oinfo[stripe]->loi_kms);
956
957                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
958                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
959                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
960                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
961                 unlock_res_and_lock(lock);
962                 lov_stripe_unlock(lsm);
963         iput:
964                 iput(inode);
965                 break;
966         }
967         default:
968                 LBUG();
969         }
970
971         RETURN(0);
972 }
973
974 #if 0
975 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
976 {
977         /* XXX ALLOCATE - 160 bytes */
978         struct inode *inode = ll_inode_from_lock(lock);
979         struct ll_inode_info *lli = ll_i2info(inode);
980         struct lustre_handle lockh = { 0 };
981         struct ost_lvb *lvb;
982         int stripe;
983         ENTRY;
984
985         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
986                      LDLM_FL_BLOCK_CONV)) {
987                 LBUG(); /* not expecting any blocked async locks yet */
988                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
989                            "lock, returning");
990                 ldlm_lock_dump(D_OTHER, lock, 0);
991                 ldlm_reprocess_all(lock->l_resource);
992                 RETURN(0);
993         }
994
995         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
996
997         stripe = ll_lock_to_stripe_offset(inode, lock);
998         if (stripe < 0)
999                 goto iput;
1000
1001         if (lock->l_lvb_len) {
1002                 struct lov_stripe_md *lsm = lli->lli_smd;
1003                 __u64 kms;
1004                 lvb = lock->l_lvb_data;
1005                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
1006
1007                 lock_res_and_lock(lock);
1008                 ll_inode_size_lock(inode, 1);
1009                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
1010                 kms = ldlm_extent_shift_kms(NULL, kms);
1011                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
1012                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
1013                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
1014                 lsm->lsm_oinfo[stripe].loi_kms = kms;
1015                 ll_inode_size_unlock(inode, 1);
1016                 unlock_res_and_lock(lock);
1017         }
1018
1019 iput:
1020         iput(inode);
1021         wake_up(&lock->l_waitq);
1022
1023         ldlm_lock2handle(lock, &lockh);
1024         ldlm_lock_decref(&lockh, LCK_PR);
1025         RETURN(0);
1026 }
1027 #endif
1028
1029 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
1030 {
1031         struct ptlrpc_request *req = reqp;
1032         struct inode *inode = ll_inode_from_lock(lock);
1033         struct ll_inode_info *lli;
1034         struct lov_stripe_md *lsm;
1035         struct ost_lvb *lvb;
1036         int rc, stripe;
1037         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
1038         ENTRY;
1039
1040         if (inode == NULL)
1041                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
1042         lli = ll_i2info(inode);
1043         if (lli == NULL)
1044                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1045         lsm = lli->lli_smd;
1046         if (lsm == NULL)
1047                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1048
1049         /* First, find out which stripe index this lock corresponds to. */
1050         stripe = ll_lock_to_stripe_offset(inode, lock);
1051         if (stripe < 0)
1052                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1053
1054         rc = lustre_pack_reply(req, 2, size, NULL);
1055         if (rc)
1056                 GOTO(iput, rc);
1057
1058         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
1059         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1060         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1061         lvb->lvb_atime = LTIME_S(inode->i_atime);
1062         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1063
1064         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1065                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1066                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1067                    lvb->lvb_atime, lvb->lvb_ctime);
1068  iput:
1069         iput(inode);
1070
1071  out:
1072         /* These errors are normal races, so we don't want to fill the console
1073          * with messages by calling ptlrpc_error() */
1074         if (rc == -ELDLM_NO_LOCK_DATA)
1075                 lustre_pack_reply(req, 1, NULL, NULL);
1076
1077         req->rq_status = rc;
1078         return rc;
1079 }
1080
1081 static int ll_merge_lvb(struct inode *inode)
1082 {
1083         struct ll_inode_info *lli = ll_i2info(inode);
1084         struct ll_sb_info *sbi = ll_i2sbi(inode);
1085         struct ost_lvb lvb;
1086         int rc;
1087
1088         ENTRY;
1089
1090         ll_inode_size_lock(inode, 1);
1091         inode_init_lvb(inode, &lvb);
1092         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1093         i_size_write(inode, lvb.lvb_size);
1094         inode->i_blocks = lvb.lvb_blocks;
1095
1096         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1097         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1098         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1099         ll_inode_size_unlock(inode, 1);
1100
1101         RETURN(rc);
1102 }
1103
1104 int ll_local_size(struct inode *inode)
1105 {
1106         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1107         struct ll_inode_info *lli = ll_i2info(inode);
1108         struct ll_sb_info *sbi = ll_i2sbi(inode);
1109         struct lustre_handle lockh = { 0 };
1110         int flags = 0;
1111         int rc;
1112         ENTRY;
1113
1114         if (lli->lli_smd->lsm_stripe_count == 0)
1115                 RETURN(0);
1116
1117         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1118                        &policy, LCK_PR, &flags, inode, &lockh);
1119         if (rc < 0)
1120                 RETURN(rc);
1121         else if (rc == 0)
1122                 RETURN(-ENODATA);
1123
1124         rc = ll_merge_lvb(inode);
1125         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1126         RETURN(rc);
1127 }
1128
1129 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1130                      lstat_t *st)
1131 {
1132         struct lustre_handle lockh = { 0 };
1133         struct ldlm_enqueue_info einfo = { 0 };
1134         struct obd_info oinfo = { { { 0 } } };
1135         struct ost_lvb lvb;
1136         int rc;
1137
1138         ENTRY;
1139
1140         einfo.ei_type = LDLM_EXTENT;
1141         einfo.ei_mode = LCK_PR;
1142         einfo.ei_cb_bl = ll_extent_lock_callback;
1143         einfo.ei_cb_cp = ldlm_completion_ast;
1144         einfo.ei_cb_gl = ll_glimpse_callback;
1145         einfo.ei_cbdata = NULL;
1146
1147         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1148         oinfo.oi_lockh = &lockh;
1149         oinfo.oi_md = lsm;
1150         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1151
1152         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1153         if (rc == -ENOENT)
1154                 RETURN(rc);
1155         if (rc != 0) {
1156                 CERROR("obd_enqueue returned rc %d, "
1157                        "returning -EIO\n", rc);
1158                 RETURN(rc > 0 ? -EIO : rc);
1159         }
1160
1161         lov_stripe_lock(lsm);
1162         memset(&lvb, 0, sizeof(lvb));
1163         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1164         st->st_size = lvb.lvb_size;
1165         st->st_blocks = lvb.lvb_blocks;
1166         st->st_mtime = lvb.lvb_mtime;
1167         st->st_atime = lvb.lvb_atime;
1168         st->st_ctime = lvb.lvb_ctime;
1169         lov_stripe_unlock(lsm);
1170
1171         RETURN(rc);
1172 }
1173
1174 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1175  * file (because it prefers KMS over RSS when larger) */
1176 int ll_glimpse_size(struct inode *inode, int ast_flags)
1177 {
1178         struct ll_inode_info *lli = ll_i2info(inode);
1179         struct ll_sb_info *sbi = ll_i2sbi(inode);
1180         struct lustre_handle lockh = { 0 };
1181         struct ldlm_enqueue_info einfo = { 0 };
1182         struct obd_info oinfo = { { { 0 } } };
1183         int rc;
1184         ENTRY;
1185
1186         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1187                 RETURN(0);
1188
1189         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1190
1191         if (!lli->lli_smd) {
1192                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1193                 RETURN(0);
1194         }
1195
1196         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1197          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1198          *       won't revoke any conflicting DLM locks held. Instead,
1199          *       ll_glimpse_callback() will be called on each client
1200          *       holding a DLM lock against this file, and resulting size
1201          *       will be returned for each stripe. DLM lock on [0, EOF] is
1202          *       acquired only if there were no conflicting locks. */
1203         einfo.ei_type = LDLM_EXTENT;
1204         einfo.ei_mode = LCK_PR;
1205         einfo.ei_cb_bl = ll_extent_lock_callback;
1206         einfo.ei_cb_cp = ldlm_completion_ast;
1207         einfo.ei_cb_gl = ll_glimpse_callback;
1208         einfo.ei_cbdata = inode;
1209
1210         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1211         oinfo.oi_lockh = &lockh;
1212         oinfo.oi_md = lli->lli_smd;
1213         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1214
1215         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1216         if (rc == -ENOENT)
1217                 RETURN(rc);
1218         if (rc != 0) {
1219                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1220                 RETURN(rc > 0 ? -EIO : rc);
1221         }
1222
1223         rc = ll_merge_lvb(inode);
1224
1225         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1226                i_size_read(inode), inode->i_blocks);
1227
1228         RETURN(rc);
1229 }
1230
1231 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1232                    struct lov_stripe_md *lsm, int mode,
1233                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1234                    int ast_flags)
1235 {
1236         struct ll_sb_info *sbi = ll_i2sbi(inode);
1237         struct ost_lvb lvb;
1238         struct ldlm_enqueue_info einfo = { 0 };
1239         struct obd_info oinfo = { { { 0 } } };
1240         int rc;
1241         ENTRY;
1242
1243         LASSERT(!lustre_handle_is_used(lockh));
1244         LASSERT(lsm != NULL);
1245
1246         /* don't drop the mmapped file to LRU */
1247         if (mapping_mapped(inode->i_mapping))
1248                 ast_flags |= LDLM_FL_NO_LRU;
1249
1250         /* XXX phil: can we do this?  won't it screw the file size up? */
1251         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1252             (sbi->ll_flags & LL_SBI_NOLCK))
1253                 RETURN(0);
1254
1255         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1256                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1257
1258         einfo.ei_type = LDLM_EXTENT;
1259         einfo.ei_mode = mode;
1260         einfo.ei_cb_bl = ll_extent_lock_callback;
1261         einfo.ei_cb_cp = ldlm_completion_ast;
1262         einfo.ei_cb_gl = ll_glimpse_callback;
1263         einfo.ei_cbdata = inode;
1264
1265         oinfo.oi_policy = *policy;
1266         oinfo.oi_lockh = lockh;
1267         oinfo.oi_md = lsm;
1268         oinfo.oi_flags = ast_flags;
1269
1270         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1271         *policy = oinfo.oi_policy;
1272         if (rc > 0)
1273                 rc = -EIO;
1274
1275         ll_inode_size_lock(inode, 1);
1276         inode_init_lvb(inode, &lvb);
1277         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1278
1279         if (policy->l_extent.start == 0 &&
1280             policy->l_extent.end == OBD_OBJECT_EOF) {
1281                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1282                  * the kms under both a DLM lock and the
1283                  * ll_inode_size_lock().  If we don't get the
1284                  * ll_inode_size_lock() here we can match the DLM lock and
1285                  * reset i_size from the kms before the truncating path has
1286                  * updated the kms.  generic_file_write can then trust the
1287                  * stale i_size when doing appending writes and effectively
1288                  * cancel the result of the truncate.  Getting the
1289                  * ll_inode_size_lock() after the enqueue maintains the DLM
1290                  * -> ll_inode_size_lock() acquiring order. */
1291                 i_size_write(inode, lvb.lvb_size);
1292                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1293                        inode->i_ino, i_size_read(inode));
1294         }
1295
1296         if (rc == 0) {
1297                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1298                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1299                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1300         }
1301         ll_inode_size_unlock(inode, 1);
1302
1303         RETURN(rc);
1304 }
1305
1306 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1307                      struct lov_stripe_md *lsm, int mode,
1308                      struct lustre_handle *lockh)
1309 {
1310         struct ll_sb_info *sbi = ll_i2sbi(inode);
1311         int rc;
1312         ENTRY;
1313
1314         /* XXX phil: can we do this?  won't it screw the file size up? */
1315         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1316             (sbi->ll_flags & LL_SBI_NOLCK))
1317                 RETURN(0);
1318
1319         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1320
1321         RETURN(rc);
1322 }
1323
1324 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1325                             loff_t *ppos)
1326 {
1327         struct inode *inode = file->f_dentry->d_inode;
1328         struct ll_inode_info *lli = ll_i2info(inode);
1329         struct lov_stripe_md *lsm = lli->lli_smd;
1330         struct ll_sb_info *sbi = ll_i2sbi(inode);
1331         struct ll_lock_tree tree;
1332         struct ll_lock_tree_node *node;
1333         struct ost_lvb lvb;
1334         struct ll_ra_read bead;
1335         int rc, ra = 0;
1336         loff_t end;
1337         ssize_t retval, chunk, sum = 0;
1338
1339         __u64 kms;
1340         ENTRY;
1341         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1342                inode->i_ino, inode->i_generation, inode, count, *ppos);
1343         /* "If nbyte is 0, read() will return 0 and have no other results."
1344          *                      -- Single Unix Spec */
1345         if (count == 0)
1346                 RETURN(0);
1347
1348         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1349
1350         if (!lsm) {
1351                 /* Read on file with no objects should return zero-filled
1352                  * buffers up to file size (we can get non-zero sizes with
1353                  * mknod + truncate, then opening file for read. This is a
1354                  * common pattern in NFS case, it seems). Bug 6243 */
1355                 int notzeroed;
1356                 /* Since there are no objects on OSTs, we have nothing to get
1357                  * lock on and so we are forced to access inode->i_size
1358                  * unguarded */
1359
1360                 /* Read beyond end of file */
1361                 if (*ppos >= i_size_read(inode))
1362                         RETURN(0);
1363
1364                 if (count > i_size_read(inode) - *ppos)
1365                         count = i_size_read(inode) - *ppos;
1366                 /* Make sure to correctly adjust the file pos pointer for
1367                  * EFAULT case */
1368                 notzeroed = clear_user(buf, count);
1369                 count -= notzeroed;
1370                 *ppos += count;
1371                 if (!count)
1372                         RETURN(-EFAULT);
1373                 RETURN(count);
1374         }
1375
1376 repeat:
1377         if (sbi->ll_max_rw_chunk != 0) {
1378                 /* first, let's know the end of the current stripe */
1379                 end = *ppos;
1380                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
1381                                 (obd_off *)&end);
1382
1383                 /* correct, the end is beyond the request */
1384                 if (end > *ppos + count - 1)
1385                         end = *ppos + count - 1;
1386
1387                 /* and chunk shouldn't be too large even if striping is wide */
1388                 if (end - *ppos > sbi->ll_max_rw_chunk)
1389                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1390         } else {
1391                 end = *ppos + count - 1;
1392         }
1393
1394         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1395         if (IS_ERR(node)){
1396                 GOTO(out, retval = PTR_ERR(node));
1397         }
1398
1399         tree.lt_fd = LUSTRE_FPRIVATE(file);
1400         rc = ll_tree_lock(&tree, node, buf, count,
1401                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1402         if (rc != 0)
1403                 GOTO(out, retval = rc);
1404
1405         ll_inode_size_lock(inode, 1);
1406         /*
1407          * Consistency guarantees: following possibilities exist for the
1408          * relation between region being read and real file size at this
1409          * moment:
1410          *
1411          *  (A): the region is completely inside of the file;
1412          *
1413          *  (B-x): x bytes of region are inside of the file, the rest is
1414          *  outside;
1415          *
1416          *  (C): the region is completely outside of the file.
1417          *
1418          * This classification is stable under DLM lock acquired by
1419          * ll_tree_lock() above, because to change class, other client has to
1420          * take DLM lock conflicting with our lock. Also, any updates to
1421          * ->i_size by other threads on this client are serialized by
1422          * ll_inode_size_lock(). This guarantees that short reads are handled
1423          * correctly in the face of concurrent writes and truncates.
1424          */
1425         inode_init_lvb(inode, &lvb);
1426         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1427         kms = lvb.lvb_size;
1428         if (*ppos + count - 1 > kms) {
1429                 /* A glimpse is necessary to determine whether we return a
1430                  * short read (B) or some zeroes at the end of the buffer (C) */
1431                 ll_inode_size_unlock(inode, 1);
1432                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1433                 if (retval) {
1434                         ll_tree_unlock(&tree);
1435                         goto out;
1436                 }
1437         } else {
1438                 /* region is within kms and, hence, within real file size (A).
1439                  * We need to increase i_size to cover the read region so that
1440                  * generic_file_read() will do its job, but that doesn't mean
1441                  * the kms size is _correct_, it is only the _minimum_ size.
1442                  * If someone does a stat they will get the correct size which
1443                  * will always be >= the kms value here.  b=11081 */
1444                 if (i_size_read(inode) < kms)
1445                         i_size_write(inode, kms);
1446                 ll_inode_size_unlock(inode, 1);
1447         }
1448
1449         chunk = end - *ppos + 1;
1450         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1451                inode->i_ino, chunk, *ppos, i_size_read(inode));
1452
1453         /* turn off the kernel's read-ahead */
1454         file->f_ra.ra_pages = 0;
1455
1456         /* initialize read-ahead window once per syscall */
1457         if (ra == 0) {
1458                 ra = 1;
1459                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1460                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1461                 ll_ra_read_in(file, &bead);
1462         }
1463
1464         /* BUG: 5972 */
1465         file_accessed(file);
1466         retval = generic_file_read(file, buf, chunk, ppos);
1467         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1468
1469         ll_tree_unlock(&tree);
1470
1471         if (retval > 0) {
1472                 buf += retval;
1473                 count -= retval;
1474                 sum += retval;
1475                 if (retval == chunk && count > 0)
1476                         goto repeat;
1477         }
1478
1479  out:
1480         if (ra != 0)
1481                 ll_ra_read_ex(file, &bead);
1482         retval = (sum > 0) ? sum : retval;
1483         RETURN(retval);
1484 }
1485
1486 /*
1487  * Write to a file (through the page cache).
1488  */
1489 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1490                              loff_t *ppos)
1491 {
1492         struct inode *inode = file->f_dentry->d_inode;
1493         struct ll_sb_info *sbi = ll_i2sbi(inode);
1494         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1495         struct ll_lock_tree tree;
1496         struct ll_lock_tree_node *node;
1497         loff_t maxbytes = ll_file_maxbytes(inode);
1498         loff_t lock_start, lock_end, end;
1499         ssize_t retval, chunk, sum = 0;
1500         int rc;
1501         ENTRY;
1502
1503         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1504                inode->i_ino, inode->i_generation, inode, count, *ppos);
1505
1506         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1507
1508         /* POSIX, but surprised the VFS doesn't check this already */
1509         if (count == 0)
1510                 RETURN(0);
1511
1512         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1513          * called on the file, don't fail the below assertion (bug 2388). */
1514         if (file->f_flags & O_LOV_DELAY_CREATE &&
1515             ll_i2info(inode)->lli_smd == NULL)
1516                 RETURN(-EBADF);
1517
1518         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1519
1520         down(&ll_i2info(inode)->lli_write_sem);
1521
1522 repeat:
1523         chunk = 0; /* just to fix gcc's warning */
1524         end = *ppos + count - 1;
1525
1526         if (file->f_flags & O_APPEND) {
1527                 lock_start = 0;
1528                 lock_end = OBD_OBJECT_EOF;
1529         } else if (sbi->ll_max_rw_chunk != 0) {
1530                 /* first, let's know the end of the current stripe */
1531                 end = *ppos;
1532                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1533                                 (obd_off *)&end);
1534
1535                 /* correct, the end is beyond the request */
1536                 if (end > *ppos + count - 1)
1537                         end = *ppos + count - 1;
1538
1539                 /* and chunk shouldn't be too large even if striping is wide */
1540                 if (end - *ppos > sbi->ll_max_rw_chunk)
1541                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1542                 lock_start = *ppos;
1543                 lock_end = end;
1544         } else {
1545                 lock_start = *ppos;
1546                 lock_end = *ppos + count - 1;
1547         }
1548         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1549
1550         if (IS_ERR(node))
1551                 GOTO(out, retval = PTR_ERR(node));
1552
1553         tree.lt_fd = LUSTRE_FPRIVATE(file);
1554         rc = ll_tree_lock(&tree, node, buf, count,
1555                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1556         if (rc != 0)
1557                 GOTO(out, retval = rc);
1558
1559         /* This is ok, g_f_w will overwrite this under i_sem if it races
1560          * with a local truncate, it just makes our maxbyte checking easier.
1561          * The i_size value gets updated in ll_extent_lock() as a consequence
1562          * of the [0,EOF] extent lock we requested above. */
1563         if (file->f_flags & O_APPEND) {
1564                 *ppos = i_size_read(inode);
1565                 end = *ppos + count - 1;
1566         }
1567
1568         if (*ppos >= maxbytes) {
1569                 send_sig(SIGXFSZ, current, 0);
1570                 GOTO(out_unlock, retval = -EFBIG);
1571         }
1572         if (*ppos + count > maxbytes)
1573                 count = maxbytes - *ppos;
1574
1575         /* generic_file_write handles O_APPEND after getting i_mutex */
1576         chunk = end - *ppos + 1;
1577         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1578                inode->i_ino, chunk, *ppos);
1579         retval = generic_file_write(file, buf, chunk, ppos);
1580         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1581
1582 out_unlock:
1583         ll_tree_unlock(&tree);
1584
1585 out:
1586         if (retval > 0) {
1587                 buf += retval;
1588                 count -= retval;
1589                 sum += retval;
1590                 if (retval == chunk && count > 0)
1591                         goto repeat;
1592         }
1593
1594         up(&ll_i2info(inode)->lli_write_sem);
1595
1596         retval = (sum > 0) ? sum : retval;
1597         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1598                            retval > 0 ? retval : 0);
1599         RETURN(retval);
1600 }
1601
1602 /*
1603  * Send file content (through pagecache) somewhere with helper
1604  */
1605 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1606                                 read_actor_t actor, void *target)
1607 {
1608         struct inode *inode = in_file->f_dentry->d_inode;
1609         struct ll_inode_info *lli = ll_i2info(inode);
1610         struct lov_stripe_md *lsm = lli->lli_smd;
1611         struct ll_lock_tree tree;
1612         struct ll_lock_tree_node *node;
1613         struct ost_lvb lvb;
1614         struct ll_ra_read bead;
1615         int rc;
1616         ssize_t retval;
1617         __u64 kms;
1618         ENTRY;
1619         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1620                inode->i_ino, inode->i_generation, inode, count, *ppos);
1621
1622         /* "If nbyte is 0, read() will return 0 and have no other results."
1623          *                      -- Single Unix Spec */
1624         if (count == 0)
1625                 RETURN(0);
1626
1627         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1628         /* turn off the kernel's read-ahead */
1629         in_file->f_ra.ra_pages = 0;
1630
1631         /* File with no objects, nothing to lock */
1632         if (!lsm)
1633                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1634
1635         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1636         if (IS_ERR(node))
1637                 RETURN(PTR_ERR(node));
1638
1639         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1640         rc = ll_tree_lock(&tree, node, NULL, count,
1641                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1642         if (rc != 0)
1643                 RETURN(rc);
1644
1645         ll_inode_size_lock(inode, 1);
1646         /*
1647          * Consistency guarantees: following possibilities exist for the
1648          * relation between region being read and real file size at this
1649          * moment:
1650          *
1651          *  (A): the region is completely inside of the file;
1652          *
1653          *  (B-x): x bytes of region are inside of the file, the rest is
1654          *  outside;
1655          *
1656          *  (C): the region is completely outside of the file.
1657          *
1658          * This classification is stable under DLM lock acquired by
1659          * ll_tree_lock() above, because to change class, other client has to
1660          * take DLM lock conflicting with our lock. Also, any updates to
1661          * ->i_size by other threads on this client are serialized by
1662          * ll_inode_size_lock(). This guarantees that short reads are handled
1663          * correctly in the face of concurrent writes and truncates.
1664          */
1665         inode_init_lvb(inode, &lvb);
1666         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1667         kms = lvb.lvb_size;
1668         if (*ppos + count - 1 > kms) {
1669                 /* A glimpse is necessary to determine whether we return a
1670                  * short read (B) or some zeroes at the end of the buffer (C) */
1671                 ll_inode_size_unlock(inode, 1);
1672                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1673                 if (retval)
1674                         goto out;
1675         } else {
1676                 /* region is within kms and, hence, within real file size (A) */
1677                 i_size_write(inode, kms);
1678                 ll_inode_size_unlock(inode, 1);
1679         }
1680
1681         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1682                inode->i_ino, count, *ppos, i_size_read(inode));
1683
1684         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1685         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1686         ll_ra_read_in(in_file, &bead);
1687         /* BUG: 5972 */
1688         file_accessed(in_file);
1689         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1690         ll_ra_read_ex(in_file, &bead);
1691
1692  out:
1693         ll_tree_unlock(&tree);
1694         RETURN(retval);
1695 }
1696
1697 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1698                                unsigned long arg)
1699 {
1700         struct ll_inode_info *lli = ll_i2info(inode);
1701         struct obd_export *exp = ll_i2dtexp(inode);
1702         struct ll_recreate_obj ucreatp;
1703         struct obd_trans_info oti = { 0 };
1704         struct obdo *oa = NULL;
1705         int lsm_size;
1706         int rc = 0;
1707         struct lov_stripe_md *lsm, *lsm2;
1708         ENTRY;
1709
1710         if (!capable (CAP_SYS_ADMIN))
1711                 RETURN(-EPERM);
1712
1713         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1714                             sizeof(struct ll_recreate_obj));
1715         if (rc) {
1716                 RETURN(-EFAULT);
1717         }
1718         OBDO_ALLOC(oa);
1719         if (oa == NULL)
1720                 RETURN(-ENOMEM);
1721
1722         down(&lli->lli_size_sem);
1723         lsm = lli->lli_smd;
1724         if (lsm == NULL)
1725                 GOTO(out, rc = -ENOENT);
1726         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1727                    (lsm->lsm_stripe_count));
1728
1729         OBD_ALLOC(lsm2, lsm_size);
1730         if (lsm2 == NULL)
1731                 GOTO(out, rc = -ENOMEM);
1732
1733         oa->o_id = ucreatp.lrc_id;
1734         oa->o_gr = ucreatp.lrc_group;
1735         oa->o_nlink = ucreatp.lrc_ost_idx;
1736         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1737         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1738         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1739                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1740
1741         memcpy(lsm2, lsm, lsm_size);
1742         rc = obd_create(exp, oa, &lsm2, &oti);
1743
1744         OBD_FREE(lsm2, lsm_size);
1745         GOTO(out, rc);
1746 out:
1747         up(&lli->lli_size_sem);
1748         OBDO_FREE(oa);
1749         return rc;
1750 }
1751
1752 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1753                              int flags, struct lov_user_md *lum, int lum_size)
1754 {
1755         struct ll_inode_info *lli = ll_i2info(inode);
1756         struct lov_stripe_md *lsm;
1757         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1758         int rc = 0;
1759         ENTRY;
1760
1761         down(&lli->lli_size_sem);
1762         lsm = lli->lli_smd;
1763         if (lsm) {
1764                 up(&lli->lli_size_sem);
1765                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1766                        inode->i_ino);
1767                 RETURN(-EEXIST);
1768         }
1769
1770         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1771         if (rc)
1772                 GOTO(out, rc);
1773         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1774                 GOTO(out_req_free, rc = -ENOENT);
1775         rc = oit.d.lustre.it_status;
1776         if (rc < 0)
1777                 GOTO(out_req_free, rc);
1778
1779         ll_release_openhandle(file->f_dentry, &oit);
1780
1781  out:
1782         up(&lli->lli_size_sem);
1783         ll_intent_release(&oit);
1784         RETURN(rc);
1785 out_req_free:
1786         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1787         goto out;
1788 }
1789
1790 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1791                              struct lov_mds_md **lmmp, int *lmm_size, 
1792                              struct ptlrpc_request **request)
1793 {
1794         struct ll_sb_info *sbi = ll_i2sbi(inode);
1795         struct mdt_body  *body;
1796         struct lov_mds_md *lmm = NULL;
1797         struct ptlrpc_request *req = NULL;
1798         struct obd_capa *oc;
1799         int rc, lmmsize;
1800
1801         rc = ll_get_max_mdsize(sbi, &lmmsize);
1802         if (rc)
1803                 RETURN(rc);
1804
1805         oc = ll_mdscapa_get(inode);
1806         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1807                              oc, filename, strlen(filename) + 1,
1808                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
1809                              ll_i2suppgid(inode), &req);
1810         capa_put(oc);
1811         if (rc < 0) {
1812                 CDEBUG(D_INFO, "md_getattr_name failed "
1813                        "on %s: rc %d\n", filename, rc);
1814                 GOTO(out, rc);
1815         }
1816
1817         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
1818         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1819         /* swabbed by mdc_getattr_name */
1820         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF));
1821
1822         lmmsize = body->eadatasize;
1823
1824         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1825                         lmmsize == 0) {
1826                 GOTO(out, rc = -ENODATA);
1827         }
1828
1829         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, lmmsize);
1830         LASSERT(lmm != NULL);
1831         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF + 1));
1832
1833         /*
1834          * This is coming from the MDS, so is probably in
1835          * little endian.  We convert it to host endian before
1836          * passing it to userspace.
1837          */
1838         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1839                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1840                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1841         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1842                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1843         }
1844
1845         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1846                 struct lov_stripe_md *lsm;
1847                 struct lov_user_md_join *lmj;
1848                 int lmj_size, i, aindex = 0;
1849
1850                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1851                 if (rc < 0)
1852                         GOTO(out, rc = -ENOMEM);
1853                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1854                 if (rc)
1855                         GOTO(out_free_memmd, rc);
1856
1857                 lmj_size = sizeof(struct lov_user_md_join) +
1858                            lsm->lsm_stripe_count *
1859                            sizeof(struct lov_user_ost_data_join);
1860                 OBD_ALLOC(lmj, lmj_size);
1861                 if (!lmj)
1862                         GOTO(out_free_memmd, rc = -ENOMEM);
1863
1864                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1865                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1866                         struct lov_extent *lex =
1867                                 &lsm->lsm_array->lai_ext_array[aindex];
1868
1869                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1870                                 aindex ++;
1871                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1872                                         LPU64" len %d\n", aindex, i,
1873                                         lex->le_start, (int)lex->le_len);
1874                         lmj->lmm_objects[i].l_extent_start =
1875                                 lex->le_start;
1876
1877                         if ((int)lex->le_len == -1)
1878                                 lmj->lmm_objects[i].l_extent_end = -1;
1879                         else
1880                                 lmj->lmm_objects[i].l_extent_end =
1881                                         lex->le_start + lex->le_len;
1882                         lmj->lmm_objects[i].l_object_id =
1883                                 lsm->lsm_oinfo[i]->loi_id;
1884                         lmj->lmm_objects[i].l_object_gr =
1885                                 lsm->lsm_oinfo[i]->loi_gr;
1886                         lmj->lmm_objects[i].l_ost_gen =
1887                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1888                         lmj->lmm_objects[i].l_ost_idx =
1889                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1890                 }
1891                 lmm = (struct lov_mds_md *)lmj;
1892                 lmmsize = lmj_size;
1893 out_free_memmd:
1894                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
1895         }
1896 out:
1897         *lmmp = lmm;
1898         *lmm_size = lmmsize;
1899         *request = req;
1900         return rc;
1901 }
1902
1903 static int ll_lov_setea(struct inode *inode, struct file *file,
1904                             unsigned long arg)
1905 {
1906         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1907         struct lov_user_md  *lump;
1908         int lum_size = sizeof(struct lov_user_md) +
1909                        sizeof(struct lov_user_ost_data);
1910         int rc;
1911         ENTRY;
1912
1913         if (!capable (CAP_SYS_ADMIN))
1914                 RETURN(-EPERM);
1915
1916         OBD_ALLOC(lump, lum_size);
1917         if (lump == NULL) {
1918                 RETURN(-ENOMEM);
1919         }
1920         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1921         if (rc) {
1922                 OBD_FREE(lump, lum_size);
1923                 RETURN(-EFAULT);
1924         }
1925
1926         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1927
1928         OBD_FREE(lump, lum_size);
1929         RETURN(rc);
1930 }
1931
1932 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1933                             unsigned long arg)
1934 {
1935         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1936         int rc;
1937         int flags = FMODE_WRITE;
1938         ENTRY;
1939
1940         /* Bug 1152: copy properly when this is no longer true */
1941         LASSERT(sizeof(lum) == sizeof(*lump));
1942         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1943         rc = copy_from_user(&lum, lump, sizeof(lum));
1944         if (rc)
1945                 RETURN(-EFAULT);
1946
1947         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1948         if (rc == 0) {
1949                  put_user(0, &lump->lmm_stripe_count);
1950                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1951                                     0, ll_i2info(inode)->lli_smd, lump);
1952         }
1953         RETURN(rc);
1954 }
1955
1956 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1957 {
1958         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1959
1960         if (!lsm)
1961                 RETURN(-ENODATA);
1962
1963         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1964                             (void *)arg);
1965 }
1966
1967 static int ll_get_grouplock(struct inode *inode, struct file *file,
1968                             unsigned long arg)
1969 {
1970         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1971         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1972                                                     .end = OBD_OBJECT_EOF}};
1973         struct lustre_handle lockh = { 0 };
1974         struct ll_inode_info *lli = ll_i2info(inode);
1975         struct lov_stripe_md *lsm = lli->lli_smd;
1976         int flags = 0, rc;
1977         ENTRY;
1978
1979         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1980                 RETURN(-EINVAL);
1981         }
1982
1983         policy.l_extent.gid = arg;
1984         if (file->f_flags & O_NONBLOCK)
1985                 flags = LDLM_FL_BLOCK_NOWAIT;
1986
1987         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1988         if (rc)
1989                 RETURN(rc);
1990
1991         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1992         fd->fd_gid = arg;
1993         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1994
1995         RETURN(0);
1996 }
1997
1998 static int ll_put_grouplock(struct inode *inode, struct file *file,
1999                             unsigned long arg)
2000 {
2001         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2002         struct ll_inode_info *lli = ll_i2info(inode);
2003         struct lov_stripe_md *lsm = lli->lli_smd;
2004         int rc;
2005         ENTRY;
2006
2007         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2008                 /* Ugh, it's already unlocked. */
2009                 RETURN(-EINVAL);
2010         }
2011
2012         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2013                 RETURN(-EINVAL);
2014
2015         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2016
2017         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2018         if (rc)
2019                 RETURN(rc);
2020
2021         fd->fd_gid = 0;
2022         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2023
2024         RETURN(0);
2025 }
2026
2027 static int join_sanity_check(struct inode *head, struct inode *tail)
2028 {
2029         ENTRY;
2030         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2031                 CERROR("server do not support join \n");
2032                 RETURN(-EINVAL);
2033         }
2034         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2035                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2036                        head->i_ino, tail->i_ino);
2037                 RETURN(-EINVAL);
2038         }
2039         if (head->i_ino == tail->i_ino) {
2040                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2041                 RETURN(-EINVAL);
2042         }
2043         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2044                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2045                 RETURN(-EINVAL);
2046         }
2047         RETURN(0);
2048 }
2049
2050 static int join_file(struct inode *head_inode, struct file *head_filp,
2051                      struct file *tail_filp)
2052 {
2053         struct dentry *tail_dentry = tail_filp->f_dentry;
2054         struct lookup_intent oit = {.it_op = IT_OPEN,
2055                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2056         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2057                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2058
2059         struct lustre_handle lockh;
2060         struct md_op_data *op_data;
2061         int    rc;
2062         loff_t data;
2063         ENTRY;
2064
2065         tail_dentry = tail_filp->f_dentry;
2066
2067         data = i_size_read(head_inode);
2068         op_data = ll_prep_md_op_data(NULL, head_inode,
2069                                      tail_dentry->d_parent->d_inode,
2070                                      tail_dentry->d_name.name,
2071                                      tail_dentry->d_name.len, 0,
2072                                      LUSTRE_OPC_ANY, &data);
2073         if (IS_ERR(op_data))
2074                 RETURN(PTR_ERR(op_data));
2075
2076         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2077                          op_data, &lockh, NULL, 0, 0);
2078
2079         ll_finish_md_op_data(op_data);
2080         if (rc < 0)
2081                 GOTO(out, rc);
2082
2083         rc = oit.d.lustre.it_status;
2084
2085         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2086                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2087                 ptlrpc_req_finished((struct ptlrpc_request *)
2088                                     oit.d.lustre.it_data);
2089                 GOTO(out, rc);
2090         }
2091
2092         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2093                                            * away */
2094                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2095                 oit.d.lustre.it_lock_mode = 0;
2096         }
2097         ll_release_openhandle(head_filp->f_dentry, &oit);
2098 out:
2099         ll_intent_release(&oit);
2100         RETURN(rc);
2101 }
2102
2103 static int ll_file_join(struct inode *head, struct file *filp,
2104                         char *filename_tail)
2105 {
2106         struct inode *tail = NULL, *first = NULL, *second = NULL;
2107         struct dentry *tail_dentry;
2108         struct file *tail_filp, *first_filp, *second_filp;
2109         struct ll_lock_tree first_tree, second_tree;
2110         struct ll_lock_tree_node *first_node, *second_node;
2111         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2112         int rc = 0, cleanup_phase = 0;
2113         ENTRY;
2114
2115         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2116                head->i_ino, head->i_generation, head, filename_tail);
2117
2118         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2119         if (IS_ERR(tail_filp)) {
2120                 CERROR("Can not open tail file %s", filename_tail);
2121                 rc = PTR_ERR(tail_filp);
2122                 GOTO(cleanup, rc);
2123         }
2124         tail = igrab(tail_filp->f_dentry->d_inode);
2125
2126         tlli = ll_i2info(tail);
2127         tail_dentry = tail_filp->f_dentry;
2128         LASSERT(tail_dentry);
2129         cleanup_phase = 1;
2130
2131         /*reorder the inode for lock sequence*/
2132         first = head->i_ino > tail->i_ino ? head : tail;
2133         second = head->i_ino > tail->i_ino ? tail : head;
2134         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2135         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2136
2137         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2138                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2139         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2140         if (IS_ERR(first_node)){
2141                 rc = PTR_ERR(first_node);
2142                 GOTO(cleanup, rc);
2143         }
2144         first_tree.lt_fd = first_filp->private_data;
2145         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2146         if (rc != 0)
2147                 GOTO(cleanup, rc);
2148         cleanup_phase = 2;
2149
2150         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2151         if (IS_ERR(second_node)){
2152                 rc = PTR_ERR(second_node);
2153                 GOTO(cleanup, rc);
2154         }
2155         second_tree.lt_fd = second_filp->private_data;
2156         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2157         if (rc != 0)
2158                 GOTO(cleanup, rc);
2159         cleanup_phase = 3;
2160
2161         rc = join_sanity_check(head, tail);
2162         if (rc)
2163                 GOTO(cleanup, rc);
2164
2165         rc = join_file(head, filp, tail_filp);
2166         if (rc)
2167                 GOTO(cleanup, rc);
2168 cleanup:
2169         switch (cleanup_phase) {
2170         case 3:
2171                 ll_tree_unlock(&second_tree);
2172                 obd_cancel_unused(ll_i2dtexp(second),
2173                                   ll_i2info(second)->lli_smd, 0, NULL);
2174         case 2:
2175                 ll_tree_unlock(&first_tree);
2176                 obd_cancel_unused(ll_i2dtexp(first),
2177                                   ll_i2info(first)->lli_smd, 0, NULL);
2178         case 1:
2179                 filp_close(tail_filp, 0);
2180                 if (tail)
2181                         iput(tail);
2182                 if (head && rc == 0) {
2183                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2184                                        &hlli->lli_smd);
2185                         hlli->lli_smd = NULL;
2186                 }
2187         case 0:
2188                 break;
2189         default:
2190                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2191                 LBUG();
2192         }
2193         RETURN(rc);
2194 }
2195
2196 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2197 {
2198         struct inode *inode = dentry->d_inode;
2199         struct obd_client_handle *och;
2200         int rc;
2201         ENTRY;
2202
2203         LASSERT(inode);
2204
2205         /* Root ? Do nothing. */
2206         if (dentry->d_inode->i_sb->s_root == dentry)
2207                 RETURN(0);
2208
2209         /* No open handle to close? Move away */
2210         if (!it_disposition(it, DISP_OPEN_OPEN))
2211                 RETURN(0);
2212
2213         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2214
2215         OBD_ALLOC(och, sizeof(*och));
2216         if (!och)
2217                 GOTO(out, rc = -ENOMEM);
2218
2219         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2220                     ll_i2info(inode), it, och);
2221
2222         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2223                                        inode, och);
2224  out:
2225         /* this one is in place of ll_file_open */
2226         ptlrpc_req_finished(it->d.lustre.it_data);
2227         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2228         RETURN(rc);
2229 }
2230
2231 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2232                   unsigned long arg)
2233 {
2234         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2235         int flags;
2236         ENTRY;
2237
2238         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2239                inode->i_generation, inode, cmd);
2240         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2241
2242         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2243         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2244                 RETURN(-ENOTTY);
2245
2246         switch(cmd) {
2247         case LL_IOC_GETFLAGS:
2248                 /* Get the current value of the file flags */
2249                 return put_user(fd->fd_flags, (int *)arg);
2250         case LL_IOC_SETFLAGS:
2251         case LL_IOC_CLRFLAGS:
2252                 /* Set or clear specific file flags */
2253                 /* XXX This probably needs checks to ensure the flags are
2254                  *     not abused, and to handle any flag side effects.
2255                  */
2256                 if (get_user(flags, (int *) arg))
2257                         RETURN(-EFAULT);
2258
2259                 if (cmd == LL_IOC_SETFLAGS) {
2260                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2261                             !(file->f_flags & O_DIRECT)) {
2262                                 CERROR("%s: unable to disable locking on "
2263                                        "non-O_DIRECT file\n", current->comm);
2264                                 RETURN(-EINVAL);
2265                         }
2266
2267                         fd->fd_flags |= flags;
2268                 } else {
2269                         fd->fd_flags &= ~flags;
2270                 }
2271                 RETURN(0);
2272         case LL_IOC_LOV_SETSTRIPE:
2273                 RETURN(ll_lov_setstripe(inode, file, arg));
2274         case LL_IOC_LOV_SETEA:
2275                 RETURN(ll_lov_setea(inode, file, arg));
2276         case LL_IOC_LOV_GETSTRIPE:
2277                 RETURN(ll_lov_getstripe(inode, arg));
2278         case LL_IOC_RECREATE_OBJ:
2279                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2280         case EXT3_IOC_GETFLAGS:
2281         case EXT3_IOC_SETFLAGS:
2282                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2283         case EXT3_IOC_GETVERSION_OLD:
2284         case EXT3_IOC_GETVERSION:
2285                 RETURN(put_user(inode->i_generation, (int *)arg));
2286         case LL_IOC_JOIN: {
2287                 char *ftail;
2288                 int rc;
2289
2290                 ftail = getname((const char *)arg);
2291                 if (IS_ERR(ftail))
2292                         RETURN(PTR_ERR(ftail));
2293                 rc = ll_file_join(inode, file, ftail);
2294                 putname(ftail);
2295                 RETURN(rc);
2296         }
2297         case LL_IOC_GROUP_LOCK:
2298                 RETURN(ll_get_grouplock(inode, file, arg));
2299         case LL_IOC_GROUP_UNLOCK:
2300                 RETURN(ll_put_grouplock(inode, file, arg));
2301         case IOC_OBD_STATFS:
2302                 RETURN(ll_obd_statfs(inode, (void *)arg));
2303
2304         /* We need to special case any other ioctls we want to handle,
2305          * to send them to the MDS/OST as appropriate and to properly
2306          * network encode the arg field.
2307         case EXT3_IOC_SETVERSION_OLD:
2308         case EXT3_IOC_SETVERSION:
2309         */
2310         case LL_IOC_FLUSHCTX:
2311                 RETURN(ll_flush_ctx(inode));
2312         default: {
2313                 int err;
2314
2315                 if (LLIOC_STOP == 
2316                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2317                         RETURN(err);
2318
2319                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2320                                      (void *)arg));
2321         }
2322         }
2323 }
2324
2325 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2326 {
2327         struct inode *inode = file->f_dentry->d_inode;
2328         struct ll_inode_info *lli = ll_i2info(inode);
2329         struct lov_stripe_md *lsm = lli->lli_smd;
2330         loff_t retval;
2331         ENTRY;
2332         retval = offset + ((origin == 2) ? i_size_read(inode) :
2333                            (origin == 1) ? file->f_pos : 0);
2334         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2335                inode->i_ino, inode->i_generation, inode, retval, retval,
2336                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2337         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2338
2339         if (origin == 2) { /* SEEK_END */
2340                 int nonblock = 0, rc;
2341
2342                 if (file->f_flags & O_NONBLOCK)
2343                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2344
2345                 if (lsm != NULL) {
2346                         rc = ll_glimpse_size(inode, nonblock);
2347                         if (rc != 0)
2348                                 RETURN(rc);
2349                 }
2350
2351                 ll_inode_size_lock(inode, 0);
2352                 offset += i_size_read(inode);
2353                 ll_inode_size_unlock(inode, 0);
2354         } else if (origin == 1) { /* SEEK_CUR */
2355                 offset += file->f_pos;
2356         }
2357
2358         retval = -EINVAL;
2359         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2360                 if (offset != file->f_pos) {
2361                         file->f_pos = offset;
2362 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2363                         file->f_reada = 0;
2364                         file->f_version = ++event;
2365 #endif
2366                 }
2367                 retval = offset;
2368         }
2369         
2370         RETURN(retval);
2371 }
2372
2373 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2374 {
2375         struct inode *inode = dentry->d_inode;
2376         struct ll_inode_info *lli = ll_i2info(inode);
2377         struct lov_stripe_md *lsm = lli->lli_smd;
2378         struct ptlrpc_request *req;
2379         struct obd_capa *oc;
2380         int rc, err;
2381         ENTRY;
2382         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2383                inode->i_generation, inode);
2384         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2385
2386         /* fsync's caller has already called _fdata{sync,write}, we want
2387          * that IO to finish before calling the osc and mdc sync methods */
2388         rc = filemap_fdatawait(inode->i_mapping);
2389
2390         /* catch async errors that were recorded back when async writeback
2391          * failed for pages in this mapping. */
2392         err = lli->lli_async_rc;
2393         lli->lli_async_rc = 0;
2394         if (rc == 0)
2395                 rc = err;
2396         if (lsm) {
2397                 err = lov_test_and_clear_async_rc(lsm);
2398                 if (rc == 0)
2399                         rc = err;
2400         }
2401
2402         oc = ll_mdscapa_get(inode);
2403         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2404                       &req);
2405         capa_put(oc);
2406         if (!rc)
2407                 rc = err;
2408         if (!err)
2409                 ptlrpc_req_finished(req);
2410
2411         if (data && lsm) {
2412                 struct obdo *oa;
2413                 
2414                 OBDO_ALLOC(oa);
2415                 if (!oa)
2416                         RETURN(rc ? rc : -ENOMEM);
2417
2418                 oa->o_id = lsm->lsm_object_id;
2419                 oa->o_gr = lsm->lsm_object_gr;
2420                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2421                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2422                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2423                                            OBD_MD_FLGROUP);
2424
2425                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2426                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2427                                0, OBD_OBJECT_EOF, oc);
2428                 capa_put(oc);
2429                 if (!rc)
2430                         rc = err;
2431                 OBDO_FREE(oa);
2432         }
2433
2434         RETURN(rc);
2435 }
2436
2437 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2438 {
2439         struct inode *inode = file->f_dentry->d_inode;
2440         struct ll_sb_info *sbi = ll_i2sbi(inode);
2441         struct ldlm_res_id res_id =
2442                 { .name = { fid_seq(ll_inode2fid(inode)),
2443                             fid_oid(ll_inode2fid(inode)),
2444                             fid_ver(ll_inode2fid(inode)),
2445                             LDLM_FLOCK} };
2446         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2447                 ldlm_flock_completion_ast, NULL, file_lock };
2448         struct lustre_handle lockh = {0};
2449         ldlm_policy_data_t flock;
2450         int flags = 0;
2451         int rc;
2452         ENTRY;
2453
2454         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2455                inode->i_ino, file_lock);
2456
2457         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2458  
2459         if (file_lock->fl_flags & FL_FLOCK) {
2460                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2461                 /* set missing params for flock() calls */
2462                 file_lock->fl_end = OFFSET_MAX;
2463                 file_lock->fl_pid = current->tgid;
2464         }
2465         flock.l_flock.pid = file_lock->fl_pid;
2466         flock.l_flock.start = file_lock->fl_start;
2467         flock.l_flock.end = file_lock->fl_end;
2468
2469         switch (file_lock->fl_type) {
2470         case F_RDLCK:
2471                 einfo.ei_mode = LCK_PR;
2472                 break;
2473         case F_UNLCK:
2474                 /* An unlock request may or may not have any relation to
2475                  * existing locks so we may not be able to pass a lock handle
2476                  * via a normal ldlm_lock_cancel() request. The request may even
2477                  * unlock a byte range in the middle of an existing lock. In
2478                  * order to process an unlock request we need all of the same
2479                  * information that is given with a normal read or write record
2480                  * lock request. To avoid creating another ldlm unlock (cancel)
2481                  * message we'll treat a LCK_NL flock request as an unlock. */
2482                 einfo.ei_mode = LCK_NL;
2483                 break;
2484         case F_WRLCK:
2485                 einfo.ei_mode = LCK_PW;
2486                 break;
2487         default:
2488                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2489                 LBUG();
2490         }
2491
2492         switch (cmd) {
2493         case F_SETLKW:
2494 #ifdef F_SETLKW64
2495         case F_SETLKW64:
2496 #endif
2497                 flags = 0;
2498                 break;
2499         case F_SETLK:
2500 #ifdef F_SETLK64
2501         case F_SETLK64:
2502 #endif
2503                 flags = LDLM_FL_BLOCK_NOWAIT;
2504                 break;
2505         case F_GETLK:
2506 #ifdef F_GETLK64
2507         case F_GETLK64:
2508 #endif
2509                 flags = LDLM_FL_TEST_LOCK;
2510                 /* Save the old mode so that if the mode in the lock changes we
2511                  * can decrement the appropriate reader or writer refcount. */
2512                 file_lock->fl_type = einfo.ei_mode;
2513                 break;
2514         default:
2515                 CERROR("unknown fcntl lock command: %d\n", cmd);
2516                 LBUG();
2517         }
2518
2519         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2520                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2521                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2522
2523         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2524                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2525         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2526                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2527 #ifdef HAVE_F_OP_FLOCK
2528         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2529             !(flags & LDLM_FL_TEST_LOCK))
2530                 posix_lock_file_wait(file, file_lock);
2531 #endif
2532
2533         RETURN(rc);
2534 }
2535
2536 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2537 {
2538         ENTRY;
2539
2540         RETURN(-ENOSYS);
2541 }
2542
2543 int ll_have_md_lock(struct inode *inode, __u64 bits)
2544 {
2545         struct lustre_handle lockh;
2546         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2547         struct lu_fid *fid;
2548         int flags;
2549         ENTRY;
2550
2551         if (!inode)
2552                RETURN(0);
2553
2554         fid = &ll_i2info(inode)->lli_fid;
2555         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2556
2557         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2558         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2559                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2560                 RETURN(1);
2561         }
2562         RETURN(0);
2563 }
2564
2565 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2566                             struct lustre_handle *lockh)
2567 {
2568         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2569         struct lu_fid *fid;
2570         ldlm_mode_t rc;
2571         int flags;
2572         ENTRY;
2573
2574         fid = &ll_i2info(inode)->lli_fid;
2575         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2576
2577         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2578         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2579                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2580         RETURN(rc);
2581 }
2582
2583 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2584         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2585                               * and return success */
2586                 inode->i_nlink = 0;
2587                 /* This path cannot be hit for regular files unless in
2588                  * case of obscure races, so no need to to validate
2589                  * size. */
2590                 if (!S_ISREG(inode->i_mode) &&
2591                     !S_ISDIR(inode->i_mode))
2592                         return 0;
2593         }
2594
2595         if (rc) {
2596                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2597                 return -abs(rc);
2598
2599         }
2600
2601         return 0;
2602 }
2603
2604 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2605 {
2606         struct inode *inode = dentry->d_inode;
2607         struct ptlrpc_request *req = NULL;
2608         struct ll_sb_info *sbi;
2609         struct obd_export *exp;
2610         int rc;
2611         ENTRY;
2612
2613         if (!inode) {
2614                 CERROR("REPORT THIS LINE TO PETER\n");
2615                 RETURN(0);
2616         }
2617         sbi = ll_i2sbi(inode);
2618
2619         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2620                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2621
2622         exp = ll_i2mdexp(inode);
2623
2624         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2625                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2626                 struct md_op_data *op_data;
2627
2628                 /* Call getattr by fid, so do not provide name at all. */
2629                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2630                                              dentry->d_inode, NULL, 0, 0,
2631                                              LUSTRE_OPC_ANY, NULL);
2632                 if (IS_ERR(op_data))
2633                         RETURN(PTR_ERR(op_data));
2634
2635                 oit.it_flags |= O_CHECK_STALE;
2636                 rc = md_intent_lock(exp, op_data, NULL, 0,
2637                                     /* we are not interested in name
2638                                        based lookup */
2639                                     &oit, 0, &req,
2640                                     ll_md_blocking_ast, 0);
2641                 ll_finish_md_op_data(op_data);
2642                 oit.it_flags &= ~O_CHECK_STALE;
2643                 if (rc < 0) {
2644                         rc = ll_inode_revalidate_fini(inode, rc);
2645                         GOTO (out, rc);
2646                 }
2647
2648                 rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2649                 if (rc != 0) {
2650                         ll_intent_release(&oit);
2651                         GOTO(out, rc);
2652                 }
2653
2654                 /* Unlinked? Unhash dentry, so it is not picked up later by
2655                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2656                    here to preserve get_cwd functionality on 2.6.
2657                    Bug 10503 */
2658                 if (!dentry->d_inode->i_nlink) {
2659                         spin_lock(&dcache_lock);
2660                         ll_drop_dentry(dentry);
2661                         spin_unlock(&dcache_lock);
2662                 }
2663
2664                 ll_lookup_finish_locks(&oit, dentry);
2665         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE)) {
2666                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2667                 obd_valid valid = OBD_MD_FLGETATTR;
2668                 struct obd_capa *oc;
2669                 int ealen = 0;
2670
2671                 if (S_ISREG(inode->i_mode)) {
2672                         rc = ll_get_max_mdsize(sbi, &ealen);
2673                         if (rc)
2674                                 RETURN(rc);
2675                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2676                 }
2677                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2678                  * capa for this inode. Because we only keep capas of dirs
2679                  * fresh. */
2680                 oc = ll_mdscapa_get(inode);
2681                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2682                                 ealen, &req);
2683                 capa_put(oc);
2684                 if (rc) {
2685                         rc = ll_inode_revalidate_fini(inode, rc);
2686                         RETURN(rc);
2687                 }
2688
2689                 rc = ll_prep_inode(&inode, req, REPLY_REC_OFF,
2690                                    NULL);
2691                 if (rc)
2692                         GOTO(out, rc);
2693         }
2694
2695         /* if object not yet allocated, don't validate size */
2696         if (ll_i2info(inode)->lli_smd == NULL)
2697                 GOTO(out, rc = 0);
2698
2699         /* ll_glimpse_size will prefer locally cached writes if they extend
2700          * the file */
2701         rc = ll_glimpse_size(inode, 0);
2702         EXIT;
2703 out:
2704         ptlrpc_req_finished(req);
2705         return rc;
2706 }
2707
2708 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2709                   struct lookup_intent *it, struct kstat *stat)
2710 {
2711         struct inode *inode = de->d_inode;
2712         int res = 0;
2713
2714         res = ll_inode_revalidate_it(de, it);
2715         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2716
2717         if (res)
2718                 return res;
2719
2720         stat->dev = inode->i_sb->s_dev;
2721         stat->ino = inode->i_ino;
2722         stat->mode = inode->i_mode;
2723         stat->nlink = inode->i_nlink;
2724         stat->uid = inode->i_uid;
2725         stat->gid = inode->i_gid;
2726         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2727         stat->atime = inode->i_atime;
2728         stat->mtime = inode->i_mtime;
2729         stat->ctime = inode->i_ctime;
2730 #ifdef HAVE_INODE_BLKSIZE
2731         stat->blksize = inode->i_blksize;
2732 #else
2733         stat->blksize = 1 << inode->i_blkbits;
2734 #endif
2735
2736         ll_inode_size_lock(inode, 0);
2737         stat->size = i_size_read(inode);
2738         stat->blocks = inode->i_blocks;
2739         ll_inode_size_unlock(inode, 0);
2740
2741         return 0;
2742 }
2743 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2744 {
2745         struct lookup_intent it = { .it_op = IT_GETATTR };
2746
2747         return ll_getattr_it(mnt, de, &it, stat);
2748 }
2749
2750 static
2751 int lustre_check_acl(struct inode *inode, int mask)
2752 {
2753 #ifdef CONFIG_FS_POSIX_ACL
2754         struct ll_inode_info *lli = ll_i2info(inode);
2755         struct posix_acl *acl;
2756         int rc;
2757         ENTRY;
2758
2759         spin_lock(&lli->lli_lock);
2760         acl = posix_acl_dup(lli->lli_posix_acl);
2761         spin_unlock(&lli->lli_lock);
2762
2763         if (!acl)
2764                 RETURN(-EAGAIN);
2765
2766         rc = posix_acl_permission(inode, acl, mask);
2767         posix_acl_release(acl);
2768
2769         RETURN(rc);
2770 #else
2771         return -EAGAIN;
2772 #endif
2773 }
2774
2775 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2776 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2777 {
2778         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2779                inode->i_ino, inode->i_generation, inode, mask);
2780         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2781                 return lustre_check_remote_perm(inode, mask);
2782         
2783         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2784         return generic_permission(inode, mask, lustre_check_acl);
2785 }
2786 #else
2787 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2788 {
2789         int mode = inode->i_mode;
2790         int rc;
2791
2792         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2793                inode->i_ino, inode->i_generation, inode, mask);
2794
2795         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2796                 return lustre_check_remote_perm(inode, mask);
2797
2798         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2799
2800         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2801             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2802                 return -EROFS;
2803         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2804                 return -EACCES;
2805         if (current->fsuid == inode->i_uid) {
2806                 mode >>= 6;
2807         } else if (1) {
2808                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2809                         goto check_groups;
2810                 rc = lustre_check_acl(inode, mask);
2811                 if (rc == -EAGAIN)
2812                         goto check_groups;
2813                 if (rc == -EACCES)
2814                         goto check_capabilities;
2815                 return rc;
2816         } else {
2817 check_groups:
2818                 if (in_group_p(inode->i_gid))
2819                         mode >>= 3;
2820         }
2821         if ((mode & mask & S_IRWXO) == mask)
2822                 return 0;
2823
2824 check_capabilities:
2825         if (!(mask & MAY_EXEC) ||
2826             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2827                 if (capable(CAP_DAC_OVERRIDE))
2828                         return 0;
2829
2830         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2831             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2832                 return 0;
2833         
2834         return -EACCES;
2835 }
2836 #endif
2837
2838 /* -o localflock - only provides locally consistent flock locks */
2839 struct file_operations ll_file_operations = {
2840         .read           = ll_file_read,
2841         .write          = ll_file_write,
2842         .ioctl          = ll_file_ioctl,
2843         .open           = ll_file_open,
2844         .release        = ll_file_release,
2845         .mmap           = ll_file_mmap,
2846         .llseek         = ll_file_seek,
2847         .sendfile       = ll_file_sendfile,
2848         .fsync          = ll_fsync,
2849 };
2850
2851 struct file_operations ll_file_operations_flock = {
2852         .read           = ll_file_read,
2853         .write          = ll_file_write,
2854         .ioctl          = ll_file_ioctl,
2855         .open           = ll_file_open,
2856         .release        = ll_file_release,
2857         .mmap           = ll_file_mmap,
2858         .llseek         = ll_file_seek,
2859         .sendfile       = ll_file_sendfile,
2860         .fsync          = ll_fsync,
2861 #ifdef HAVE_F_OP_FLOCK
2862         .flock          = ll_file_flock,
2863 #endif
2864         .lock           = ll_file_flock
2865 };
2866
2867 /* These are for -o noflock - to return ENOSYS on flock calls */
2868 struct file_operations ll_file_operations_noflock = {
2869         .read           = ll_file_read,
2870         .write          = ll_file_write,
2871         .ioctl          = ll_file_ioctl,
2872         .open           = ll_file_open,
2873         .release        = ll_file_release,
2874         .mmap           = ll_file_mmap,
2875         .llseek         = ll_file_seek,
2876         .sendfile       = ll_file_sendfile,
2877         .fsync          = ll_fsync,
2878 #ifdef HAVE_F_OP_FLOCK
2879         .flock          = ll_file_noflock,
2880 #endif
2881         .lock           = ll_file_noflock
2882 };
2883
2884 struct inode_operations ll_file_inode_operations = {
2885 #ifdef HAVE_VFS_INTENT_PATCHES
2886         .setattr_raw    = ll_setattr_raw,
2887 #endif
2888         .setattr        = ll_setattr,
2889         .truncate       = ll_truncate,
2890         .getattr        = ll_getattr,
2891         .permission     = ll_inode_permission,
2892         .setxattr       = ll_setxattr,
2893         .getxattr       = ll_getxattr,
2894         .listxattr      = ll_listxattr,
2895         .removexattr    = ll_removexattr,
2896 };
2897
2898 /* dynamic ioctl number support routins */
2899 static struct llioc_ctl_data {
2900         struct rw_semaphore ioc_sem;
2901         struct list_head    ioc_head;
2902 } llioc = { 
2903         __RWSEM_INITIALIZER(llioc.ioc_sem), 
2904         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2905 };
2906
2907
2908 struct llioc_data {
2909         struct list_head        iocd_list;
2910         unsigned int            iocd_size;
2911         llioc_callback_t        iocd_cb;
2912         unsigned int            iocd_count;
2913         unsigned int            iocd_cmd[0];
2914 };
2915
2916 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2917 {
2918         unsigned int size;
2919         struct llioc_data *in_data = NULL;
2920         ENTRY;
2921
2922         if (cb == NULL || cmd == NULL ||
2923             count > LLIOC_MAX_CMD || count < 0)
2924                 RETURN(NULL);
2925
2926         size = sizeof(*in_data) + count * sizeof(unsigned int);
2927         OBD_ALLOC(in_data, size);
2928         if (in_data == NULL)
2929                 RETURN(NULL);
2930
2931         memset(in_data, 0, sizeof(*in_data));
2932         in_data->iocd_size = size;
2933         in_data->iocd_cb = cb;
2934         in_data->iocd_count = count;
2935         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2936
2937         down_write(&llioc.ioc_sem);
2938         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2939         up_write(&llioc.ioc_sem);
2940
2941         RETURN(in_data);
2942 }
2943
2944 void ll_iocontrol_unregister(void *magic)
2945 {
2946         struct llioc_data *tmp;
2947
2948         if (magic == NULL)
2949                 return;
2950
2951         down_write(&llioc.ioc_sem);
2952         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2953                 if (tmp == magic) {
2954                         unsigned int size = tmp->iocd_size;
2955
2956                         list_del(&tmp->iocd_list);
2957                         up_write(&llioc.ioc_sem);
2958
2959                         OBD_FREE(tmp, size);
2960                         return;
2961                 }
2962         }
2963         up_write(&llioc.ioc_sem);
2964
2965         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2966 }
2967
2968 EXPORT_SYMBOL(ll_iocontrol_register);
2969 EXPORT_SYMBOL(ll_iocontrol_unregister);
2970
2971 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
2972                         unsigned int cmd, unsigned long arg, int *rcp)
2973 {
2974         enum llioc_iter ret = LLIOC_CONT;
2975         struct llioc_data *data;
2976         int rc = -EINVAL, i;
2977
2978         down_read(&llioc.ioc_sem);
2979         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2980                 for (i = 0; i < data->iocd_count; i++) {
2981                         if (cmd != data->iocd_cmd[i]) 
2982                                 continue;
2983
2984                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2985                         break;
2986                 }
2987
2988                 if (ret == LLIOC_STOP)
2989                         break;
2990         }
2991         up_read(&llioc.ioc_sem);
2992
2993         if (rcp)
2994                 *rcp = rc;
2995         return ret;
2996 }