Whamcloud - gitweb
b=11270
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #include "llite_internal.h"
32
33 /* also used by llite/special.c:ll_special_open() */
34 struct ll_file_data *ll_file_data_get(void)
35 {
36         struct ll_file_data *fd;
37
38         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
39         return fd;
40 }
41
42 static void ll_file_data_put(struct ll_file_data *fd)
43 {
44         if (fd != NULL)
45                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
46 }
47
48 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
49                           struct lustre_handle *fh)
50 {
51         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
52         op_data->op_attr.ia_mode = inode->i_mode;
53         op_data->op_attr.ia_atime = inode->i_atime;
54         op_data->op_attr.ia_mtime = inode->i_mtime;
55         op_data->op_attr.ia_ctime = inode->i_ctime;
56         op_data->op_attr.ia_size = i_size_read(inode);
57         op_data->op_attr_blocks = inode->i_blocks;
58         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
59         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
60         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
61         op_data->op_capa1 = ll_mdscapa_get(inode);
62 }
63
64 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
65                              struct obd_client_handle *och)
66 {
67         ENTRY;
68
69         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
70                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
71
72         if (!(och->och_flags & FMODE_WRITE))
73                 goto out;
74
75         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
76             !S_ISREG(inode->i_mode))
77                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
78         else
79                 ll_epoch_close(inode, op_data, &och, 0);
80
81 out:
82         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
83         EXIT;
84 }
85
86 static int ll_close_inode_openhandle(struct obd_export *md_exp,
87                                      struct inode *inode,
88                                      struct obd_client_handle *och)
89 {
90         struct obd_export *exp = ll_i2mdexp(inode);
91         struct md_op_data *op_data;
92         struct ptlrpc_request *req = NULL;
93         struct obd_device *obd = class_exp2obd(exp);
94         int epoch_close = 1;
95         int seq_end = 0, rc;
96         ENTRY;
97
98         if (obd == NULL) {
99                 /*
100                  * XXX: in case of LMV, is this correct to access
101                  * ->exp_handle?
102                  */
103                 CERROR("Invalid MDC connection handle "LPX64"\n",
104                        ll_i2mdexp(inode)->exp_handle.h_cookie);
105                 GOTO(out, rc = 0);
106         }
107
108         /*
109          * here we check if this is forced umount. If so this is called on
110          * canceling "open lock" and we do not call md_close() in this case, as
111          * it will not be successful, as import is already deactivated.
112          */
113         if (obd->obd_force)
114                 GOTO(out, rc = 0);
115
116         OBD_ALLOC_PTR(op_data);
117         if (op_data == NULL)
118                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
119
120         ll_prepare_close(inode, op_data, och);
121         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
122         rc = md_close(md_exp, op_data, och->och_mod, &req);
123         if (rc != -EAGAIN)
124                 seq_end = 1;
125
126         if (rc == -EAGAIN) {
127                 /* This close must have the epoch closed. */
128                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
129                 LASSERT(epoch_close);
130                 /* MDS has instructed us to obtain Size-on-MDS attribute from
131                  * OSTs and send setattr to back to MDS. */
132                 rc = ll_sizeonmds_update(inode, och->och_mod,
133                                          &och->och_fh, op_data->op_ioepoch);
134                 if (rc) {
135                         CERROR("inode %lu mdc Size-on-MDS update failed: "
136                                "rc = %d\n", inode->i_ino, rc);
137                         rc = 0;
138                 }
139         } else if (rc) {
140                 CERROR("inode %lu mdc close failed: rc = %d\n",
141                        inode->i_ino, rc);
142         }
143         ll_finish_md_op_data(op_data);
144
145         if (rc == 0) {
146                 rc = ll_objects_destroy(req, inode);
147                 if (rc)
148                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
149                                inode->i_ino, rc);
150         }
151
152         EXIT;
153 out:
154       
155         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
156             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
157                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
158         } else {
159                 if (seq_end)
160                         ptlrpc_close_replay_seq(req);
161                 md_clear_open_replay_data(md_exp, och);
162                 /* Free @och if it is not waiting for DONE_WRITING. */
163                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
164                 OBD_FREE_PTR(och);
165         }
166         if (req) /* This is close request */
167                 ptlrpc_req_finished(req);
168         return rc;
169 }
170
171 int ll_md_real_close(struct inode *inode, int flags)
172 {
173         struct ll_inode_info *lli = ll_i2info(inode);
174         struct obd_client_handle **och_p;
175         struct obd_client_handle *och;
176         __u64 *och_usecount;
177         int rc = 0;
178         ENTRY;
179
180         if (flags & FMODE_WRITE) {
181                 och_p = &lli->lli_mds_write_och;
182                 och_usecount = &lli->lli_open_fd_write_count;
183         } else if (flags & FMODE_EXEC) {
184                 och_p = &lli->lli_mds_exec_och;
185                 och_usecount = &lli->lli_open_fd_exec_count;
186         } else {
187                 LASSERT(flags & FMODE_READ);
188                 och_p = &lli->lli_mds_read_och;
189                 och_usecount = &lli->lli_open_fd_read_count;
190         }
191
192         down(&lli->lli_och_sem);
193         if (*och_usecount) { /* There are still users of this handle, so
194                                 skip freeing it. */
195                 up(&lli->lli_och_sem);
196                 RETURN(0);
197         }
198         och=*och_p;
199         *och_p = NULL;
200         up(&lli->lli_och_sem);
201
202         if (och) { /* There might be a race and somebody have freed this och
203                       already */
204                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
205                                                inode, och);
206         }
207
208         RETURN(rc);
209 }
210
211 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
212                 struct file *file)
213 {
214         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
215         struct ll_inode_info *lli = ll_i2info(inode);
216         int rc = 0;
217         ENTRY;
218
219         /* clear group lock, if present */
220         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
221                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
222                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
223                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
224                                       &fd->fd_cwlockh);
225         }
226
227         /* Let's see if we have good enough OPEN lock on the file and if
228            we can skip talking to MDS */
229         if (file->f_dentry->d_inode) { /* Can this ever be false? */
230                 int lockmode;
231                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
232                 struct lustre_handle lockh;
233                 struct inode *inode = file->f_dentry->d_inode;
234                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
235
236                 down(&lli->lli_och_sem);
237                 if (fd->fd_omode & FMODE_WRITE) {
238                         lockmode = LCK_CW;
239                         LASSERT(lli->lli_open_fd_write_count);
240                         lli->lli_open_fd_write_count--;
241                 } else if (fd->fd_omode & FMODE_EXEC) {
242                         lockmode = LCK_PR;
243                         LASSERT(lli->lli_open_fd_exec_count);
244                         lli->lli_open_fd_exec_count--;
245                 } else {
246                         lockmode = LCK_CR;
247                         LASSERT(lli->lli_open_fd_read_count);
248                         lli->lli_open_fd_read_count--;
249                 }
250                 up(&lli->lli_och_sem);
251
252                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
253                                    LDLM_IBITS, &policy, lockmode,
254                                    &lockh)) {
255                         rc = ll_md_real_close(file->f_dentry->d_inode,
256                                               fd->fd_omode);
257                 }
258         } else {
259                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
260                        file, file->f_dentry, file->f_dentry->d_name.name);
261         }
262
263         LUSTRE_FPRIVATE(file) = NULL;
264         ll_file_data_put(fd);
265         ll_capa_close(inode);
266
267         RETURN(rc);
268 }
269
270 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
271
272 /* While this returns an error code, fput() the caller does not, so we need
273  * to make every effort to clean up all of our state here.  Also, applications
274  * rarely check close errors and even if an error is returned they will not
275  * re-try the close call.
276  */
277 int ll_file_release(struct inode *inode, struct file *file)
278 {
279         struct ll_file_data *fd;
280         struct ll_sb_info *sbi = ll_i2sbi(inode);
281         struct ll_inode_info *lli = ll_i2info(inode);
282         struct lov_stripe_md *lsm = lli->lli_smd;
283         int rc;
284
285         ENTRY;
286         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
287                inode->i_generation, inode);
288
289 #ifdef CONFIG_FS_POSIX_ACL
290         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
291             inode == inode->i_sb->s_root->d_inode) {
292                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
293
294                 LASSERT(fd != NULL);
295                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
296                         fd->fd_flags &= ~LL_FILE_RMTACL;
297                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
298                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
299                 }
300         }
301 #endif
302
303         ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
304         fd = LUSTRE_FPRIVATE(file);
305         LASSERT(fd != NULL);
306
307         /* don't do anything for / */
308         if (inode->i_sb->s_root == file->f_dentry) {
309                 LUSTRE_FPRIVATE(file) = NULL;
310                 ll_file_data_put(fd);
311                 RETURN(0);
312         }
313         
314         if (lsm)
315                 lov_test_and_clear_async_rc(lsm);
316         lli->lli_async_rc = 0;
317
318         rc = ll_md_close(sbi->ll_md_exp, inode, file);
319         RETURN(rc);
320 }
321
322 static int ll_intent_file_open(struct file *file, void *lmm,
323                                int lmmsize, struct lookup_intent *itp)
324 {
325         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
326         struct dentry *parent = file->f_dentry->d_parent;
327         const char *name = file->f_dentry->d_name.name;
328         const int len = file->f_dentry->d_name.len;
329         struct md_op_data *op_data;
330         struct ptlrpc_request *req;
331         int rc;
332
333         if (!parent)
334                 RETURN(-ENOENT);
335
336         /* Usually we come here only for NFSD, and we want open lock.
337            But we can also get here with pre 2.6.15 patchless kernels, and in
338            that case that lock is also ok */
339         /* We can also get here if there was cached open handle in revalidate_it
340          * but it disappeared while we were getting from there to ll_file_open.
341          * But this means this file was closed and immediatelly opened which
342          * makes a good candidate for using OPEN lock */
343         /* If lmmsize & lmm are not 0, we are just setting stripe info
344          * parameters. No need for the open lock */
345         if (!lmm && !lmmsize)
346                 itp->it_flags |= MDS_OPEN_LOCK;
347
348         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
349                                       file->f_dentry->d_inode, name, len,
350                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
351         if (IS_ERR(op_data))
352                 RETURN(PTR_ERR(op_data));
353
354         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
355                             0 /*unused */, &req, ll_md_blocking_ast, 0);
356         ll_finish_md_op_data(op_data);
357         if (rc == -ESTALE) {
358                 /* reason for keep own exit path - don`t flood log
359                 * with messages with -ESTALE errors.
360                 */
361                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
362                      it_open_error(DISP_OPEN_OPEN, itp))
363                         GOTO(out, rc);
364                 ll_release_openhandle(file->f_dentry, itp);
365                 GOTO(out_stale, rc);
366         }
367
368         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
369                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
370                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
371                 GOTO(out, rc);
372         }
373
374         if (itp->d.lustre.it_lock_mode)
375                 md_set_lock_data(sbi->ll_md_exp,
376                                  &itp->d.lustre.it_lock_handle, 
377                                  file->f_dentry->d_inode);
378
379         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
380 out:
381         ptlrpc_req_finished(itp->d.lustre.it_data);
382
383 out_stale:
384         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
385         ll_intent_drop_lock(itp);
386
387         RETURN(rc);
388 }
389
390 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
391                        struct lookup_intent *it, struct obd_client_handle *och)
392 {
393         struct ptlrpc_request *req = it->d.lustre.it_data;
394         struct mdt_body *body;
395
396         LASSERT(och);
397
398         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
399         LASSERT(body != NULL);                      /* reply already checked out */
400
401         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
402         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
403         och->och_fid = lli->lli_fid;
404         och->och_flags = it->it_flags;
405         lli->lli_ioepoch = body->ioepoch;
406
407         return md_set_open_replay_data(md_exp, och, req);
408 }
409
410 int ll_local_open(struct file *file, struct lookup_intent *it,
411                   struct ll_file_data *fd, struct obd_client_handle *och)
412 {
413         struct inode *inode = file->f_dentry->d_inode;
414         struct ll_inode_info *lli = ll_i2info(inode);
415         ENTRY;
416
417         LASSERT(!LUSTRE_FPRIVATE(file));
418
419         LASSERT(fd != NULL);
420
421         if (och) {
422                 struct ptlrpc_request *req = it->d.lustre.it_data;
423                 struct mdt_body *body;
424                 int rc;
425
426                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
427                 if (rc)
428                         RETURN(rc);
429
430                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
431                 if ((it->it_flags & FMODE_WRITE) &&
432                     (body->valid & OBD_MD_FLSIZE))
433                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
434                                lli->lli_ioepoch, PFID(&lli->lli_fid));
435         }
436
437         LUSTRE_FPRIVATE(file) = fd;
438         ll_readahead_init(inode, &fd->fd_ras);
439         fd->fd_omode = it->it_flags;
440         RETURN(0);
441 }
442
443 /* Open a file, and (for the very first open) create objects on the OSTs at
444  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
445  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
446  * lli_open_sem to ensure no other process will create objects, send the
447  * stripe MD to the MDS, or try to destroy the objects if that fails.
448  *
449  * If we already have the stripe MD locally then we don't request it in
450  * md_open(), by passing a lmm_size = 0.
451  *
452  * It is up to the application to ensure no other processes open this file
453  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
454  * used.  We might be able to avoid races of that sort by getting lli_open_sem
455  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
456  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
457  */
458 int ll_file_open(struct inode *inode, struct file *file)
459 {
460         struct ll_inode_info *lli = ll_i2info(inode);
461         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
462                                           .it_flags = file->f_flags };
463         struct lov_stripe_md *lsm;
464         struct ptlrpc_request *req = NULL;
465         struct obd_client_handle **och_p;
466         __u64 *och_usecount;
467         struct ll_file_data *fd;
468         int rc = 0;
469         ENTRY;
470
471         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
472                inode->i_generation, inode, file->f_flags);
473
474 #ifdef HAVE_VFS_INTENT_PATCHES
475         it = file->f_it;
476 #else
477         it = file->private_data; /* XXX: compat macro */
478         file->private_data = NULL; /* prevent ll_local_open assertion */
479 #endif
480
481         fd = ll_file_data_get();
482         if (fd == NULL)
483                 RETURN(-ENOMEM);
484
485         /* don't do anything for / */
486         if (inode->i_sb->s_root == file->f_dentry) {
487                 LUSTRE_FPRIVATE(file) = fd;
488                 RETURN(0);
489         }
490
491         if (!it || !it->d.lustre.it_disposition) {
492                 /* Convert f_flags into access mode. We cannot use file->f_mode,
493                  * because everything but O_ACCMODE mask was stripped from
494                  * there */
495                 if ((oit.it_flags + 1) & O_ACCMODE)
496                         oit.it_flags++;
497                 if (file->f_flags & O_TRUNC)
498                         oit.it_flags |= FMODE_WRITE;
499
500                 /* kernel only call f_op->open in dentry_open.  filp_open calls
501                  * dentry_open after call to open_namei that checks permissions.
502                  * Only nfsd_open call dentry_open directly without checking
503                  * permissions and because of that this code below is safe. */
504                 if (oit.it_flags & FMODE_WRITE)
505                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
506
507                 /* We do not want O_EXCL here, presumably we opened the file
508                  * already? XXX - NFS implications? */
509                 oit.it_flags &= ~O_EXCL;
510
511                 it = &oit;
512         }
513
514 restart:
515         /* Let's see if we have file open on MDS already. */
516         if (it->it_flags & FMODE_WRITE) {
517                 och_p = &lli->lli_mds_write_och;
518                 och_usecount = &lli->lli_open_fd_write_count;
519         } else if (it->it_flags & FMODE_EXEC) {
520                 och_p = &lli->lli_mds_exec_och;
521                 och_usecount = &lli->lli_open_fd_exec_count;
522          } else {
523                 och_p = &lli->lli_mds_read_och;
524                 och_usecount = &lli->lli_open_fd_read_count;
525         }
526         
527         down(&lli->lli_och_sem);
528         if (*och_p) { /* Open handle is present */
529                 if (it_disposition(it, DISP_OPEN_OPEN)) {
530                         /* Well, there's extra open request that we do not need,
531                            let's close it somehow. This will decref request. */
532                         rc = it_open_error(DISP_OPEN_OPEN, it);
533                         if (rc) {
534                                 ll_file_data_put(fd);
535                                 GOTO(out_och_free, rc);
536                         }       
537                         ll_release_openhandle(file->f_dentry, it);
538                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
539                                              LPROC_LL_OPEN);
540                 }
541                 (*och_usecount)++;
542
543                 rc = ll_local_open(file, it, fd, NULL);
544                 if (rc) {
545                         up(&lli->lli_och_sem);
546                         ll_file_data_put(fd);
547                         RETURN(rc);
548                 }
549         } else {
550                 LASSERT(*och_usecount == 0);
551                 if (!it->d.lustre.it_disposition) {
552                         /* We cannot just request lock handle now, new ELC code
553                            means that one of other OPEN locks for this file
554                            could be cancelled, and since blocking ast handler
555                            would attempt to grab och_sem as well, that would
556                            result in a deadlock */
557                         up(&lli->lli_och_sem);
558                         it->it_flags |= O_CHECK_STALE;
559                         rc = ll_intent_file_open(file, NULL, 0, it);
560                         it->it_flags &= ~O_CHECK_STALE;
561                         if (rc) {
562                                 ll_file_data_put(fd);
563                                 GOTO(out_openerr, rc);
564                         }
565
566                         /* Got some error? Release the request */
567                         if (it->d.lustre.it_status < 0) {
568                                 req = it->d.lustre.it_data;
569                                 ptlrpc_req_finished(req);
570                         }
571                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
572                                          &it->d.lustre.it_lock_handle,
573                                          file->f_dentry->d_inode);
574                         goto restart;
575                 }
576                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
577                 if (!*och_p) {
578                         ll_file_data_put(fd);
579                         GOTO(out_och_free, rc = -ENOMEM);
580                 }
581                 (*och_usecount)++;
582                 req = it->d.lustre.it_data;
583
584                 /* md_intent_lock() didn't get a request ref if there was an
585                  * open error, so don't do cleanup on the request here
586                  * (bug 3430) */
587                 /* XXX (green): Should not we bail out on any error here, not
588                  * just open error? */
589                 rc = it_open_error(DISP_OPEN_OPEN, it);
590                 if (rc) {
591                         ll_file_data_put(fd);
592                         GOTO(out_och_free, rc);
593                 }
594
595                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
596                 rc = ll_local_open(file, it, fd, *och_p);
597                 if (rc) {
598                         up(&lli->lli_och_sem);
599                         ll_file_data_put(fd);
600                         GOTO(out_och_free, rc);
601                 }
602         }
603         up(&lli->lli_och_sem);
604
605         /* Must do this outside lli_och_sem lock to prevent deadlock where
606            different kind of OPEN lock for this same inode gets cancelled
607            by ldlm_cancel_lru */
608         if (!S_ISREG(inode->i_mode))
609                 GOTO(out, rc);
610
611         ll_capa_open(inode);
612
613         lsm = lli->lli_smd;
614         if (lsm == NULL) {
615                 if (file->f_flags & O_LOV_DELAY_CREATE ||
616                     !(file->f_mode & FMODE_WRITE)) {
617                         CDEBUG(D_INODE, "object creation was delayed\n");
618                         GOTO(out, rc);
619                 }
620         }
621         file->f_flags &= ~O_LOV_DELAY_CREATE;
622         GOTO(out, rc);
623 out:
624         ptlrpc_req_finished(req);
625         if (req)
626                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
627 out_och_free:
628         if (rc) {
629                 if (*och_p) {
630                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
631                         *och_p = NULL; /* OBD_FREE writes some magic there */
632                         (*och_usecount)--;
633                 }
634                 up(&lli->lli_och_sem);
635 out_openerr: ;/* Looks weierd, eh? Just wait for statahead code to insert
636                 a statement here <-- remove this comment after statahead
637                 landing */
638         }
639
640         return rc;
641 }
642
643 /* Fills the obdo with the attributes for the inode defined by lsm */
644 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
645 {
646         struct ptlrpc_request_set *set;
647         struct ll_inode_info *lli = ll_i2info(inode);
648         struct lov_stripe_md *lsm = lli->lli_smd;
649
650         struct obd_info oinfo = { { { 0 } } };
651         int rc;
652         ENTRY;
653
654         LASSERT(lsm != NULL);
655
656         oinfo.oi_md = lsm;
657         oinfo.oi_oa = obdo;
658         oinfo.oi_oa->o_id = lsm->lsm_object_id;
659         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
660         oinfo.oi_oa->o_mode = S_IFREG;
661         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
662                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
663                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
664                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
665                                OBD_MD_FLGROUP;
666         oinfo.oi_capa = ll_mdscapa_get(inode);
667
668         set = ptlrpc_prep_set();
669         if (set == NULL) {
670                 CERROR("can't allocate ptlrpc set\n");
671                 rc = -ENOMEM;
672         } else {
673                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
674                 if (rc == 0)
675                         rc = ptlrpc_set_wait(set);
676                 ptlrpc_set_destroy(set);
677         }
678         capa_put(oinfo.oi_capa);
679         if (rc)
680                 RETURN(rc);
681
682         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
683                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
684                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
685
686         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
687         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
688                lli->lli_smd->lsm_object_id, i_size_read(inode),
689                (unsigned long long)inode->i_blocks, ll_inode_blksize(inode));
690         RETURN(0);
691 }
692
693 static inline void ll_remove_suid(struct inode *inode)
694 {
695         unsigned int mode;
696
697         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
698         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
699
700         /* was any of the uid bits set? */
701         mode &= inode->i_mode;
702         if (mode && !capable(CAP_FSETID)) {
703                 inode->i_mode &= ~mode;
704                 // XXX careful here - we cannot change the size
705         }
706 }
707
708 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
709 {
710         struct ll_inode_info *lli = ll_i2info(inode);
711         struct lov_stripe_md *lsm = lli->lli_smd;
712         struct obd_export *exp = ll_i2dtexp(inode);
713         struct {
714                 char name[16];
715                 struct ldlm_lock *lock;
716                 struct lov_stripe_md *lsm;
717         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
718         __u32 stripe, vallen = sizeof(stripe);
719         int rc;
720         ENTRY;
721
722         if (lsm->lsm_stripe_count == 1)
723                 GOTO(check, stripe = 0);
724
725         /* get our offset in the lov */
726         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
727         if (rc != 0) {
728                 CERROR("obd_get_info: rc = %d\n", rc);
729                 RETURN(rc);
730         }
731         LASSERT(stripe < lsm->lsm_stripe_count);
732
733 check:
734         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
735             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
736                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
737                            lsm->lsm_oinfo[stripe]->loi_id,
738                            lsm->lsm_oinfo[stripe]->loi_gr);
739                 RETURN(-ELDLM_NO_LOCK_DATA);
740         }
741
742         RETURN(stripe);
743 }
744
745 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
746  * we get a lock cancellation for each stripe, so we have to map the obd's
747  * region back onto the stripes in the file that it held.
748  *
749  * No one can dirty the extent until we've finished our work and they can
750  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
751  * but other kernel actors could have pages locked.
752  *
753  * Called with the DLM lock held. */
754 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
755                               struct ldlm_lock *lock, __u32 stripe)
756 {
757         ldlm_policy_data_t tmpex;
758         unsigned long start, end, count, skip, i, j;
759         struct page *page;
760         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
761         struct lustre_handle lockh;
762         struct address_space *mapping = inode->i_mapping;
763
764         ENTRY;
765         tmpex = lock->l_policy_data;
766         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
767                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
768                i_size_read(inode));
769
770         /* our locks are page granular thanks to osc_enqueue, we invalidate the
771          * whole page. */
772         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
773             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
774                 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
775                            CFS_PAGE_SIZE);
776         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
777         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
778
779         count = ~0;
780         skip = 0;
781         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
782         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
783         if (lsm->lsm_stripe_count > 1) {
784                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
785                 skip = (lsm->lsm_stripe_count - 1) * count;
786                 start += start/count * skip + stripe * count;
787                 if (end != ~0)
788                         end += end/count * skip + stripe * count;
789         }
790         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
791                 end = ~0;
792
793         i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
794             CFS_PAGE_SHIFT : 0;
795         if (i < end)
796                 end = i;
797
798         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
799                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
800                count, skip, end, discard ? " (DISCARDING)" : "");
801
802         /* walk through the vmas on the inode and tear down mmaped pages that
803          * intersect with the lock.  this stops immediately if there are no
804          * mmap()ed regions of the file.  This is not efficient at all and
805          * should be short lived. We'll associate mmap()ed pages with the lock
806          * and will be able to find them directly */
807         for (i = start; i <= end; i += (j + skip)) {
808                 j = min(count - (i % count), end - i + 1);
809                 LASSERT(j > 0);
810                 LASSERT(mapping);
811                 if (ll_teardown_mmaps(mapping,
812                                       (__u64)i << CFS_PAGE_SHIFT,
813                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
814                         break;
815         }
816
817         /* this is the simplistic implementation of page eviction at
818          * cancelation.  It is careful to get races with other page
819          * lockers handled correctly.  fixes from bug 20 will make it
820          * more efficient by associating locks with pages and with
821          * batching writeback under the lock explicitly. */
822         for (i = start, j = start % count; i <= end;
823              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
824                 if (j == count) {
825                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
826                         i += skip;
827                         j = 0;
828                         if (i > end)
829                                 break;
830                 }
831                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
832                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
833                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
834                          start, i, end);
835
836                 if (!mapping_has_pages(mapping)) {
837                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
838                         break;
839                 }
840
841                 cond_resched();
842
843                 page = find_get_page(mapping, i);
844                 if (page == NULL)
845                         continue;
846                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
847                                i, tmpex.l_extent.start);
848                 lock_page(page);
849
850                 /* page->mapping to check with racing against teardown */
851                 if (!discard && clear_page_dirty_for_io(page)) {
852                         rc = ll_call_writepage(inode, page);
853                         /* either waiting for io to complete or reacquiring
854                          * the lock that the failed writepage released */
855                         lock_page(page);
856                         wait_on_page_writeback(page);
857                         if (rc != 0) {
858                                 CERROR("writepage inode %lu(%p) of page %p "
859                                        "failed: %d\n", inode->i_ino, inode,
860                                        page, rc);
861                                 if (rc == -ENOSPC)
862                                         set_bit(AS_ENOSPC, &mapping->flags);
863                                 else
864                                         set_bit(AS_EIO, &mapping->flags);
865                         }
866                 }
867
868                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
869                 /* check to see if another DLM lock covers this page b=2765 */
870                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
871                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
872                                       LDLM_FL_TEST_LOCK,
873                                       &lock->l_resource->lr_name, LDLM_EXTENT,
874                                       &tmpex, LCK_PR | LCK_PW, &lockh);
875
876                 if (rc2 <= 0 && page->mapping != NULL) {
877                         struct ll_async_page *llap = llap_cast_private(page);
878                         /* checking again to account for writeback's
879                          * lock_page() */
880                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
881                         if (llap)
882                                 ll_ra_accounting(llap, mapping);
883                         ll_truncate_complete_page(page);
884                 }
885                 unlock_page(page);
886                 page_cache_release(page);
887         }
888         LASSERTF(tmpex.l_extent.start <=
889                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
890                   lock->l_policy_data.l_extent.end + 1),
891                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
892                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
893                  start, i, end);
894         EXIT;
895 }
896
897 static int ll_extent_lock_callback(struct ldlm_lock *lock,
898                                    struct ldlm_lock_desc *new, void *data,
899                                    int flag)
900 {
901         struct lustre_handle lockh = { 0 };
902         int rc;
903         ENTRY;
904
905         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
906                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
907                 LBUG();
908         }
909
910         switch (flag) {
911         case LDLM_CB_BLOCKING:
912                 ldlm_lock2handle(lock, &lockh);
913                 rc = ldlm_cli_cancel(&lockh);
914                 if (rc != ELDLM_OK)
915                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
916                 break;
917         case LDLM_CB_CANCELING: {
918                 struct inode *inode;
919                 struct ll_inode_info *lli;
920                 struct lov_stripe_md *lsm;
921                 int stripe;
922                 __u64 kms;
923
924                 /* This lock wasn't granted, don't try to evict pages */
925                 if (lock->l_req_mode != lock->l_granted_mode)
926                         RETURN(0);
927
928                 inode = ll_inode_from_lock(lock);
929                 if (inode == NULL)
930                         RETURN(0);
931                 lli = ll_i2info(inode);
932                 if (lli == NULL)
933                         goto iput;
934                 if (lli->lli_smd == NULL)
935                         goto iput;
936                 lsm = lli->lli_smd;
937
938                 stripe = ll_lock_to_stripe_offset(inode, lock);
939                 if (stripe < 0)
940                         goto iput;
941
942                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
943
944                 lov_stripe_lock(lsm);
945                 lock_res_and_lock(lock);
946                 kms = ldlm_extent_shift_kms(lock,
947                                             lsm->lsm_oinfo[stripe]->loi_kms);
948
949                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
950                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
951                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
952                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
953                 unlock_res_and_lock(lock);
954                 lov_stripe_unlock(lsm);
955         iput:
956                 iput(inode);
957                 break;
958         }
959         default:
960                 LBUG();
961         }
962
963         RETURN(0);
964 }
965
966 #if 0
967 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
968 {
969         /* XXX ALLOCATE - 160 bytes */
970         struct inode *inode = ll_inode_from_lock(lock);
971         struct ll_inode_info *lli = ll_i2info(inode);
972         struct lustre_handle lockh = { 0 };
973         struct ost_lvb *lvb;
974         int stripe;
975         ENTRY;
976
977         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
978                      LDLM_FL_BLOCK_CONV)) {
979                 LBUG(); /* not expecting any blocked async locks yet */
980                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
981                            "lock, returning");
982                 ldlm_lock_dump(D_OTHER, lock, 0);
983                 ldlm_reprocess_all(lock->l_resource);
984                 RETURN(0);
985         }
986
987         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
988
989         stripe = ll_lock_to_stripe_offset(inode, lock);
990         if (stripe < 0)
991                 goto iput;
992
993         if (lock->l_lvb_len) {
994                 struct lov_stripe_md *lsm = lli->lli_smd;
995                 __u64 kms;
996                 lvb = lock->l_lvb_data;
997                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
998
999                 lock_res_and_lock(lock);
1000                 ll_inode_size_lock(inode, 1);
1001                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
1002                 kms = ldlm_extent_shift_kms(NULL, kms);
1003                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
1004                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
1005                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
1006                 lsm->lsm_oinfo[stripe].loi_kms = kms;
1007                 ll_inode_size_unlock(inode, 1);
1008                 unlock_res_and_lock(lock);
1009         }
1010
1011 iput:
1012         iput(inode);
1013         wake_up(&lock->l_waitq);
1014
1015         ldlm_lock2handle(lock, &lockh);
1016         ldlm_lock_decref(&lockh, LCK_PR);
1017         RETURN(0);
1018 }
1019 #endif
1020
1021 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
1022 {
1023         struct ptlrpc_request *req = reqp;
1024         struct inode *inode = ll_inode_from_lock(lock);
1025         struct ll_inode_info *lli;
1026         struct lov_stripe_md *lsm;
1027         struct ost_lvb *lvb;
1028         int rc, stripe;
1029         ENTRY;
1030
1031         if (inode == NULL)
1032                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
1033         lli = ll_i2info(inode);
1034         if (lli == NULL)
1035                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1036         lsm = lli->lli_smd;
1037         if (lsm == NULL)
1038                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1039
1040         /* First, find out which stripe index this lock corresponds to. */
1041         stripe = ll_lock_to_stripe_offset(inode, lock);
1042         if (stripe < 0)
1043                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1044
1045         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
1046         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
1047                              sizeof(*lvb));
1048         rc = req_capsule_server_pack(&req->rq_pill);
1049         if (rc) {
1050                 CERROR("lustre_pack_reply: %d\n", rc);
1051                 GOTO(iput, rc);
1052         }
1053
1054         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
1055         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1056         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1057         lvb->lvb_atime = LTIME_S(inode->i_atime);
1058         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1059
1060         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1061                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1062                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1063                    lvb->lvb_atime, lvb->lvb_ctime);
1064  iput:
1065         iput(inode);
1066
1067  out:
1068         /* These errors are normal races, so we don't want to fill the console
1069          * with messages by calling ptlrpc_error() */
1070         if (rc == -ELDLM_NO_LOCK_DATA)
1071                 lustre_pack_reply(req, 1, NULL, NULL);
1072
1073         req->rq_status = rc;
1074         return rc;
1075 }
1076
1077 static int ll_merge_lvb(struct inode *inode)
1078 {
1079         struct ll_inode_info *lli = ll_i2info(inode);
1080         struct ll_sb_info *sbi = ll_i2sbi(inode);
1081         struct ost_lvb lvb;
1082         int rc;
1083
1084         ENTRY;
1085
1086         ll_inode_size_lock(inode, 1);
1087         inode_init_lvb(inode, &lvb);
1088         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1089         i_size_write(inode, lvb.lvb_size);
1090         inode->i_blocks = lvb.lvb_blocks;
1091
1092         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1093         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1094         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1095         ll_inode_size_unlock(inode, 1);
1096
1097         RETURN(rc);
1098 }
1099
1100 int ll_local_size(struct inode *inode)
1101 {
1102         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1103         struct ll_inode_info *lli = ll_i2info(inode);
1104         struct ll_sb_info *sbi = ll_i2sbi(inode);
1105         struct lustre_handle lockh = { 0 };
1106         int flags = 0;
1107         int rc;
1108         ENTRY;
1109
1110         if (lli->lli_smd->lsm_stripe_count == 0)
1111                 RETURN(0);
1112
1113         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1114                        &policy, LCK_PR, &flags, inode, &lockh);
1115         if (rc < 0)
1116                 RETURN(rc);
1117         else if (rc == 0)
1118                 RETURN(-ENODATA);
1119
1120         rc = ll_merge_lvb(inode);
1121         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1122         RETURN(rc);
1123 }
1124
1125 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1126                      lstat_t *st)
1127 {
1128         struct lustre_handle lockh = { 0 };
1129         struct ldlm_enqueue_info einfo = { 0 };
1130         struct obd_info oinfo = { { { 0 } } };
1131         struct ost_lvb lvb;
1132         int rc;
1133
1134         ENTRY;
1135
1136         einfo.ei_type = LDLM_EXTENT;
1137         einfo.ei_mode = LCK_PR;
1138         einfo.ei_cb_bl = ll_extent_lock_callback;
1139         einfo.ei_cb_cp = ldlm_completion_ast;
1140         einfo.ei_cb_gl = ll_glimpse_callback;
1141         einfo.ei_cbdata = NULL;
1142
1143         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1144         oinfo.oi_lockh = &lockh;
1145         oinfo.oi_md = lsm;
1146         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1147
1148         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1149         if (rc == -ENOENT)
1150                 RETURN(rc);
1151         if (rc != 0) {
1152                 CERROR("obd_enqueue returned rc %d, "
1153                        "returning -EIO\n", rc);
1154                 RETURN(rc > 0 ? -EIO : rc);
1155         }
1156
1157         lov_stripe_lock(lsm);
1158         memset(&lvb, 0, sizeof(lvb));
1159         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1160         st->st_size = lvb.lvb_size;
1161         st->st_blocks = lvb.lvb_blocks;
1162         st->st_mtime = lvb.lvb_mtime;
1163         st->st_atime = lvb.lvb_atime;
1164         st->st_ctime = lvb.lvb_ctime;
1165         lov_stripe_unlock(lsm);
1166
1167         RETURN(rc);
1168 }
1169
1170 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1171  * file (because it prefers KMS over RSS when larger) */
1172 int ll_glimpse_size(struct inode *inode, int ast_flags)
1173 {
1174         struct ll_inode_info *lli = ll_i2info(inode);
1175         struct ll_sb_info *sbi = ll_i2sbi(inode);
1176         struct lustre_handle lockh = { 0 };
1177         struct ldlm_enqueue_info einfo = { 0 };
1178         struct obd_info oinfo = { { { 0 } } };
1179         int rc;
1180         ENTRY;
1181
1182         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1183                 RETURN(0);
1184
1185         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1186
1187         if (!lli->lli_smd) {
1188                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1189                 RETURN(0);
1190         }
1191
1192         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1193          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1194          *       won't revoke any conflicting DLM locks held. Instead,
1195          *       ll_glimpse_callback() will be called on each client
1196          *       holding a DLM lock against this file, and resulting size
1197          *       will be returned for each stripe. DLM lock on [0, EOF] is
1198          *       acquired only if there were no conflicting locks. */
1199         einfo.ei_type = LDLM_EXTENT;
1200         einfo.ei_mode = LCK_PR;
1201         einfo.ei_cb_bl = ll_extent_lock_callback;
1202         einfo.ei_cb_cp = ldlm_completion_ast;
1203         einfo.ei_cb_gl = ll_glimpse_callback;
1204         einfo.ei_cbdata = inode;
1205
1206         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1207         oinfo.oi_lockh = &lockh;
1208         oinfo.oi_md = lli->lli_smd;
1209         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1210
1211         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1212         if (rc == -ENOENT)
1213                 RETURN(rc);
1214         if (rc != 0) {
1215                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1216                 RETURN(rc > 0 ? -EIO : rc);
1217         }
1218
1219         rc = ll_merge_lvb(inode);
1220
1221         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1222                i_size_read(inode), (unsigned long long)inode->i_blocks);
1223
1224         RETURN(rc);
1225 }
1226
1227 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1228                    struct lov_stripe_md *lsm, int mode,
1229                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1230                    int ast_flags)
1231 {
1232         struct ll_sb_info *sbi = ll_i2sbi(inode);
1233         struct ost_lvb lvb;
1234         struct ldlm_enqueue_info einfo = { 0 };
1235         struct obd_info oinfo = { { { 0 } } };
1236         int rc;
1237         ENTRY;
1238
1239         LASSERT(!lustre_handle_is_used(lockh));
1240         LASSERT(lsm != NULL);
1241
1242         /* don't drop the mmapped file to LRU */
1243         if (mapping_mapped(inode->i_mapping))
1244                 ast_flags |= LDLM_FL_NO_LRU;
1245
1246         /* XXX phil: can we do this?  won't it screw the file size up? */
1247         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1248             (sbi->ll_flags & LL_SBI_NOLCK))
1249                 RETURN(0);
1250
1251         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1252                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1253
1254         einfo.ei_type = LDLM_EXTENT;
1255         einfo.ei_mode = mode;
1256         einfo.ei_cb_bl = ll_extent_lock_callback;
1257         einfo.ei_cb_cp = ldlm_completion_ast;
1258         einfo.ei_cb_gl = ll_glimpse_callback;
1259         einfo.ei_cbdata = inode;
1260
1261         oinfo.oi_policy = *policy;
1262         oinfo.oi_lockh = lockh;
1263         oinfo.oi_md = lsm;
1264         oinfo.oi_flags = ast_flags;
1265
1266         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1267         *policy = oinfo.oi_policy;
1268         if (rc > 0)
1269                 rc = -EIO;
1270
1271         ll_inode_size_lock(inode, 1);
1272         inode_init_lvb(inode, &lvb);
1273         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1274
1275         if (policy->l_extent.start == 0 &&
1276             policy->l_extent.end == OBD_OBJECT_EOF) {
1277                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1278                  * the kms under both a DLM lock and the
1279                  * ll_inode_size_lock().  If we don't get the
1280                  * ll_inode_size_lock() here we can match the DLM lock and
1281                  * reset i_size from the kms before the truncating path has
1282                  * updated the kms.  generic_file_write can then trust the
1283                  * stale i_size when doing appending writes and effectively
1284                  * cancel the result of the truncate.  Getting the
1285                  * ll_inode_size_lock() after the enqueue maintains the DLM
1286                  * -> ll_inode_size_lock() acquiring order. */
1287                 i_size_write(inode, lvb.lvb_size);
1288                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1289                        inode->i_ino, i_size_read(inode));
1290         }
1291
1292         if (rc == 0) {
1293                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1294                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1295                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1296         }
1297         ll_inode_size_unlock(inode, 1);
1298
1299         RETURN(rc);
1300 }
1301
1302 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1303                      struct lov_stripe_md *lsm, int mode,
1304                      struct lustre_handle *lockh)
1305 {
1306         struct ll_sb_info *sbi = ll_i2sbi(inode);
1307         int rc;
1308         ENTRY;
1309
1310         /* XXX phil: can we do this?  won't it screw the file size up? */
1311         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1312             (sbi->ll_flags & LL_SBI_NOLCK))
1313                 RETURN(0);
1314
1315         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1316
1317         RETURN(rc);
1318 }
1319
1320 static void ll_set_file_contended(struct inode *inode)
1321 {
1322         struct ll_inode_info *lli = ll_i2info(inode);
1323         cfs_time_t now = cfs_time_current();
1324
1325         spin_lock(&lli->lli_lock);
1326         lli->lli_contention_time = now;
1327         lli->lli_flags |= LLIF_CONTENDED;
1328         spin_unlock(&lli->lli_lock);
1329 }
1330
1331 void ll_clear_file_contended(struct inode *inode)
1332 {
1333         struct ll_inode_info *lli = ll_i2info(inode);
1334
1335         spin_lock(&lli->lli_lock);
1336         lli->lli_flags &= ~LLIF_CONTENDED;
1337         spin_unlock(&lli->lli_lock);
1338 }
1339
1340 static int ll_is_file_contended(struct file *file)
1341 {
1342         struct inode *inode = file->f_dentry->d_inode;
1343         struct ll_inode_info *lli = ll_i2info(inode);
1344         struct ll_sb_info *sbi = ll_i2sbi(inode);
1345         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1346         ENTRY;
1347
1348         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1349                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1350                        " osc connect flags = 0x"LPX64"\n",
1351                        sbi->ll_lco.lco_flags);
1352                 RETURN(0);
1353         }
1354         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1355                 RETURN(1);
1356         if (lli->lli_flags & LLIF_CONTENDED) {
1357                 cfs_time_t cur_time = cfs_time_current();
1358                 cfs_time_t retry_time;
1359
1360                 retry_time = cfs_time_add(
1361                         lli->lli_contention_time,
1362                         cfs_time_seconds(sbi->ll_contention_time));
1363                 if (cfs_time_after(cur_time, retry_time)) {
1364                         ll_clear_file_contended(inode);
1365                         RETURN(0);
1366                 }
1367                 RETURN(1);
1368         }
1369         RETURN(0);
1370 }
1371
1372 static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
1373                                  const char *buf, size_t count,
1374                                  loff_t start, loff_t end, int rw)
1375 {
1376         int append;
1377         int tree_locked = 0;
1378         int rc;
1379         struct inode * inode = file->f_dentry->d_inode;
1380         ENTRY;
1381
1382         append = (rw == WRITE) && (file->f_flags & O_APPEND);
1383
1384         if (append || !ll_is_file_contended(file)) {
1385                 struct ll_lock_tree_node *node;
1386                 int ast_flags;
1387
1388                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1389                 if (file->f_flags & O_NONBLOCK)
1390                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1391                 node = ll_node_from_inode(inode, start, end,
1392                                           (rw == WRITE) ? LCK_PW : LCK_PR);
1393                 if (IS_ERR(node)) {
1394                         rc = PTR_ERR(node);
1395                         GOTO(out, rc);
1396                 }
1397                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1398                 rc = ll_tree_lock(tree, node, buf, count, ast_flags);
1399                 if (rc == 0)
1400                         tree_locked = 1;
1401                 else if (rc == -EUSERS)
1402                         ll_set_file_contended(inode);
1403                 else
1404                         GOTO(out, rc);
1405         }
1406         RETURN(tree_locked);
1407 out:
1408         return rc;
1409 }
1410
1411 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1412                             loff_t *ppos)
1413 {
1414         struct inode *inode = file->f_dentry->d_inode;
1415         struct ll_inode_info *lli = ll_i2info(inode);
1416         struct lov_stripe_md *lsm = lli->lli_smd;
1417         struct ll_sb_info *sbi = ll_i2sbi(inode);
1418         struct ll_lock_tree tree;
1419         struct ost_lvb lvb;
1420         struct ll_ra_read bead;
1421         int ra = 0;
1422         loff_t end;
1423         ssize_t retval, chunk, sum = 0;
1424         int tree_locked;
1425
1426         __u64 kms;
1427         ENTRY;
1428         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1429                inode->i_ino, inode->i_generation, inode, count, *ppos);
1430         /* "If nbyte is 0, read() will return 0 and have no other results."
1431          *                      -- Single Unix Spec */
1432         if (count == 0)
1433                 RETURN(0);
1434
1435         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1436
1437         if (!lsm) {
1438                 /* Read on file with no objects should return zero-filled
1439                  * buffers up to file size (we can get non-zero sizes with
1440                  * mknod + truncate, then opening file for read. This is a
1441                  * common pattern in NFS case, it seems). Bug 6243 */
1442                 int notzeroed;
1443                 /* Since there are no objects on OSTs, we have nothing to get
1444                  * lock on and so we are forced to access inode->i_size
1445                  * unguarded */
1446
1447                 /* Read beyond end of file */
1448                 if (*ppos >= i_size_read(inode))
1449                         RETURN(0);
1450
1451                 if (count > i_size_read(inode) - *ppos)
1452                         count = i_size_read(inode) - *ppos;
1453                 /* Make sure to correctly adjust the file pos pointer for
1454                  * EFAULT case */
1455                 notzeroed = clear_user(buf, count);
1456                 count -= notzeroed;
1457                 *ppos += count;
1458                 if (!count)
1459                         RETURN(-EFAULT);
1460                 RETURN(count);
1461         }
1462 repeat:
1463         if (sbi->ll_max_rw_chunk != 0) {
1464                 /* first, let's know the end of the current stripe */
1465                 end = *ppos;
1466                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
1467                                 (obd_off *)&end);
1468
1469                 /* correct, the end is beyond the request */
1470                 if (end > *ppos + count - 1)
1471                         end = *ppos + count - 1;
1472
1473                 /* and chunk shouldn't be too large even if striping is wide */
1474                 if (end - *ppos > sbi->ll_max_rw_chunk)
1475                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1476         } else {
1477                 end = *ppos + count - 1;
1478         }
1479
1480         tree_locked = ll_file_get_tree_lock(&tree, file, buf,
1481                                             count, *ppos, end, READ);
1482         if (tree_locked < 0)
1483                 GOTO(out, retval = tree_locked);
1484
1485         ll_inode_size_lock(inode, 1);
1486         /*
1487          * Consistency guarantees: following possibilities exist for the
1488          * relation between region being read and real file size at this
1489          * moment:
1490          *
1491          *  (A): the region is completely inside of the file;
1492          *
1493          *  (B-x): x bytes of region are inside of the file, the rest is
1494          *  outside;
1495          *
1496          *  (C): the region is completely outside of the file.
1497          *
1498          * This classification is stable under DLM lock acquired by
1499          * ll_tree_lock() above, because to change class, other client has to
1500          * take DLM lock conflicting with our lock. Also, any updates to
1501          * ->i_size by other threads on this client are serialized by
1502          * ll_inode_size_lock(). This guarantees that short reads are handled
1503          * correctly in the face of concurrent writes and truncates.
1504          */
1505         inode_init_lvb(inode, &lvb);
1506         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1507         kms = lvb.lvb_size;
1508         if (*ppos + count - 1 > kms) {
1509                 /* A glimpse is necessary to determine whether we return a
1510                  * short read (B) or some zeroes at the end of the buffer (C) */
1511                 ll_inode_size_unlock(inode, 1);
1512                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1513                 if (retval) {
1514                         if (tree_locked)
1515                                 ll_tree_unlock(&tree);
1516                         goto out;
1517                 }
1518         } else {
1519                 /* region is within kms and, hence, within real file size (A).
1520                  * We need to increase i_size to cover the read region so that
1521                  * generic_file_read() will do its job, but that doesn't mean
1522                  * the kms size is _correct_, it is only the _minimum_ size.
1523                  * If someone does a stat they will get the correct size which
1524                  * will always be >= the kms value here.  b=11081 */
1525                 if (i_size_read(inode) < kms)
1526                         i_size_write(inode, kms);
1527                 ll_inode_size_unlock(inode, 1);
1528         }
1529
1530         chunk = end - *ppos + 1;
1531         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1532                inode->i_ino, chunk, *ppos, i_size_read(inode));
1533
1534         if (tree_locked) {
1535                 /* turn off the kernel's read-ahead */
1536                 file->f_ra.ra_pages = 0;
1537
1538                 /* initialize read-ahead window once per syscall */
1539                 if (ra == 0) {
1540                         ra = 1;
1541                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1542                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1543                         ll_ra_read_in(file, &bead);
1544                 }
1545
1546                 /* BUG: 5972 */
1547                 file_accessed(file);
1548                 retval = generic_file_read(file, buf, chunk, ppos);
1549                 ll_tree_unlock(&tree);
1550         } else {
1551                 retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
1552         }
1553
1554         ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
1555
1556         if (retval > 0) {
1557                 buf += retval;
1558                 count -= retval;
1559                 sum += retval;
1560                 if (retval == chunk && count > 0)
1561                         goto repeat;
1562         }
1563
1564  out:
1565         if (ra != 0)
1566                 ll_ra_read_ex(file, &bead);
1567         retval = (sum > 0) ? sum : retval;
1568         RETURN(retval);
1569 }
1570
1571 /*
1572  * Write to a file (through the page cache).
1573  */
1574 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1575                              loff_t *ppos)
1576 {
1577         struct inode *inode = file->f_dentry->d_inode;
1578         struct ll_sb_info *sbi = ll_i2sbi(inode);
1579         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1580         struct ll_lock_tree tree;
1581         loff_t maxbytes = ll_file_maxbytes(inode);
1582         loff_t lock_start, lock_end, end;
1583         ssize_t retval, chunk, sum = 0;
1584         int tree_locked;
1585         ENTRY;
1586
1587         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1588                inode->i_ino, inode->i_generation, inode, count, *ppos);
1589
1590         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1591
1592         /* POSIX, but surprised the VFS doesn't check this already */
1593         if (count == 0)
1594                 RETURN(0);
1595
1596         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1597          * called on the file, don't fail the below assertion (bug 2388). */
1598         if (file->f_flags & O_LOV_DELAY_CREATE &&
1599             ll_i2info(inode)->lli_smd == NULL)
1600                 RETURN(-EBADF);
1601
1602         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1603
1604         down(&ll_i2info(inode)->lli_write_sem);
1605
1606 repeat:
1607         chunk = 0; /* just to fix gcc's warning */
1608         end = *ppos + count - 1;
1609
1610         if (file->f_flags & O_APPEND) {
1611                 lock_start = 0;
1612                 lock_end = OBD_OBJECT_EOF;
1613         } else if (sbi->ll_max_rw_chunk != 0) {
1614                 /* first, let's know the end of the current stripe */
1615                 end = *ppos;
1616                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1617                                 (obd_off *)&end);
1618
1619                 /* correct, the end is beyond the request */
1620                 if (end > *ppos + count - 1)
1621                         end = *ppos + count - 1;
1622
1623                 /* and chunk shouldn't be too large even if striping is wide */
1624                 if (end - *ppos > sbi->ll_max_rw_chunk)
1625                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1626                 lock_start = *ppos;
1627                 lock_end = end;
1628         } else {
1629                 lock_start = *ppos;
1630                 lock_end = *ppos + count - 1;
1631         }
1632
1633         tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
1634                                             lock_start, lock_end, WRITE);
1635         if (tree_locked < 0)
1636                 GOTO(out, retval = tree_locked);
1637
1638         /* This is ok, g_f_w will overwrite this under i_sem if it races
1639          * with a local truncate, it just makes our maxbyte checking easier.
1640          * The i_size value gets updated in ll_extent_lock() as a consequence
1641          * of the [0,EOF] extent lock we requested above. */
1642         if (file->f_flags & O_APPEND) {
1643                 *ppos = i_size_read(inode);
1644                 end = *ppos + count - 1;
1645         }
1646
1647         if (*ppos >= maxbytes) {
1648                 send_sig(SIGXFSZ, current, 0);
1649                 GOTO(out_unlock, retval = -EFBIG);
1650         }
1651         if (end > maxbytes - 1)
1652                 end = maxbytes - 1;
1653
1654         /* generic_file_write handles O_APPEND after getting i_mutex */
1655         chunk = end - *ppos + 1;
1656         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1657                inode->i_ino, chunk, *ppos);
1658         if (tree_locked)
1659                 retval = generic_file_write(file, buf, chunk, ppos);
1660         else
1661                 retval = ll_file_lockless_io(file, (char*)buf, chunk,
1662                                              ppos, WRITE);
1663         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1664
1665 out_unlock:
1666         if (tree_locked)
1667                 ll_tree_unlock(&tree);
1668
1669 out:
1670         if (retval > 0) {
1671                 buf += retval;
1672                 count -= retval;
1673                 sum += retval;
1674                 if (retval == chunk && count > 0)
1675                         goto repeat;
1676         }
1677
1678         up(&ll_i2info(inode)->lli_write_sem);
1679
1680         retval = (sum > 0) ? sum : retval;
1681         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1682                            retval > 0 ? retval : 0);
1683         RETURN(retval);
1684 }
1685
1686 /*
1687  * Send file content (through pagecache) somewhere with helper
1688  */
1689 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1690                                 read_actor_t actor, void *target)
1691 {
1692         struct inode *inode = in_file->f_dentry->d_inode;
1693         struct ll_inode_info *lli = ll_i2info(inode);
1694         struct lov_stripe_md *lsm = lli->lli_smd;
1695         struct ll_lock_tree tree;
1696         struct ll_lock_tree_node *node;
1697         struct ost_lvb lvb;
1698         struct ll_ra_read bead;
1699         int rc;
1700         ssize_t retval;
1701         __u64 kms;
1702         ENTRY;
1703         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1704                inode->i_ino, inode->i_generation, inode, count, *ppos);
1705
1706         /* "If nbyte is 0, read() will return 0 and have no other results."
1707          *                      -- Single Unix Spec */
1708         if (count == 0)
1709                 RETURN(0);
1710
1711         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1712         /* turn off the kernel's read-ahead */
1713         in_file->f_ra.ra_pages = 0;
1714
1715         /* File with no objects, nothing to lock */
1716         if (!lsm)
1717                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1718
1719         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1720         if (IS_ERR(node))
1721                 RETURN(PTR_ERR(node));
1722
1723         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1724         rc = ll_tree_lock(&tree, node, NULL, count,
1725                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1726         if (rc != 0)
1727                 RETURN(rc);
1728
1729         ll_clear_file_contended(inode);
1730         ll_inode_size_lock(inode, 1);
1731         /*
1732          * Consistency guarantees: following possibilities exist for the
1733          * relation between region being read and real file size at this
1734          * moment:
1735          *
1736          *  (A): the region is completely inside of the file;
1737          *
1738          *  (B-x): x bytes of region are inside of the file, the rest is
1739          *  outside;
1740          *
1741          *  (C): the region is completely outside of the file.
1742          *
1743          * This classification is stable under DLM lock acquired by
1744          * ll_tree_lock() above, because to change class, other client has to
1745          * take DLM lock conflicting with our lock. Also, any updates to
1746          * ->i_size by other threads on this client are serialized by
1747          * ll_inode_size_lock(). This guarantees that short reads are handled
1748          * correctly in the face of concurrent writes and truncates.
1749          */
1750         inode_init_lvb(inode, &lvb);
1751         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1752         kms = lvb.lvb_size;
1753         if (*ppos + count - 1 > kms) {
1754                 /* A glimpse is necessary to determine whether we return a
1755                  * short read (B) or some zeroes at the end of the buffer (C) */
1756                 ll_inode_size_unlock(inode, 1);
1757                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1758                 if (retval)
1759                         goto out;
1760         } else {
1761                 /* region is within kms and, hence, within real file size (A) */
1762                 i_size_write(inode, kms);
1763                 ll_inode_size_unlock(inode, 1);
1764         }
1765
1766         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1767                inode->i_ino, count, *ppos, i_size_read(inode));
1768
1769         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1770         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1771         ll_ra_read_in(in_file, &bead);
1772         /* BUG: 5972 */
1773         file_accessed(in_file);
1774         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1775         ll_ra_read_ex(in_file, &bead);
1776
1777  out:
1778         ll_tree_unlock(&tree);
1779         RETURN(retval);
1780 }
1781
1782 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1783                                unsigned long arg)
1784 {
1785         struct ll_inode_info *lli = ll_i2info(inode);
1786         struct obd_export *exp = ll_i2dtexp(inode);
1787         struct ll_recreate_obj ucreatp;
1788         struct obd_trans_info oti = { 0 };
1789         struct obdo *oa = NULL;
1790         int lsm_size;
1791         int rc = 0;
1792         struct lov_stripe_md *lsm, *lsm2;
1793         ENTRY;
1794
1795         if (!capable (CAP_SYS_ADMIN))
1796                 RETURN(-EPERM);
1797
1798         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1799                             sizeof(struct ll_recreate_obj));
1800         if (rc) {
1801                 RETURN(-EFAULT);
1802         }
1803         OBDO_ALLOC(oa);
1804         if (oa == NULL)
1805                 RETURN(-ENOMEM);
1806
1807         down(&lli->lli_size_sem);
1808         lsm = lli->lli_smd;
1809         if (lsm == NULL)
1810                 GOTO(out, rc = -ENOENT);
1811         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1812                    (lsm->lsm_stripe_count));
1813
1814         OBD_ALLOC(lsm2, lsm_size);
1815         if (lsm2 == NULL)
1816                 GOTO(out, rc = -ENOMEM);
1817
1818         oa->o_id = ucreatp.lrc_id;
1819         oa->o_gr = ucreatp.lrc_group;
1820         oa->o_nlink = ucreatp.lrc_ost_idx;
1821         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1822         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1823         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1824                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1825
1826         memcpy(lsm2, lsm, lsm_size);
1827         rc = obd_create(exp, oa, &lsm2, &oti);
1828
1829         OBD_FREE(lsm2, lsm_size);
1830         GOTO(out, rc);
1831 out:
1832         up(&lli->lli_size_sem);
1833         OBDO_FREE(oa);
1834         return rc;
1835 }
1836
1837 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1838                              int flags, struct lov_user_md *lum, int lum_size)
1839 {
1840         struct ll_inode_info *lli = ll_i2info(inode);
1841         struct lov_stripe_md *lsm;
1842         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1843         int rc = 0;
1844         ENTRY;
1845
1846         down(&lli->lli_size_sem);
1847         lsm = lli->lli_smd;
1848         if (lsm) {
1849                 up(&lli->lli_size_sem);
1850                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1851                        inode->i_ino);
1852                 RETURN(-EEXIST);
1853         }
1854
1855         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1856         if (rc)
1857                 GOTO(out, rc);
1858         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1859                 GOTO(out_req_free, rc = -ENOENT);
1860         rc = oit.d.lustre.it_status;
1861         if (rc < 0)
1862                 GOTO(out_req_free, rc);
1863
1864         ll_release_openhandle(file->f_dentry, &oit);
1865
1866  out:
1867         up(&lli->lli_size_sem);
1868         ll_intent_release(&oit);
1869         RETURN(rc);
1870 out_req_free:
1871         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1872         goto out;
1873 }
1874
1875 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1876                              struct lov_mds_md **lmmp, int *lmm_size, 
1877                              struct ptlrpc_request **request)
1878 {
1879         struct ll_sb_info *sbi = ll_i2sbi(inode);
1880         struct mdt_body  *body;
1881         struct lov_mds_md *lmm = NULL;
1882         struct ptlrpc_request *req = NULL;
1883         struct obd_capa *oc;
1884         int rc, lmmsize;
1885
1886         rc = ll_get_max_mdsize(sbi, &lmmsize);
1887         if (rc)
1888                 RETURN(rc);
1889
1890         oc = ll_mdscapa_get(inode);
1891         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1892                              oc, filename, strlen(filename) + 1,
1893                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
1894                              ll_i2suppgid(inode), &req);
1895         capa_put(oc);
1896         if (rc < 0) {
1897                 CDEBUG(D_INFO, "md_getattr_name failed "
1898                        "on %s: rc %d\n", filename, rc);
1899                 GOTO(out, rc);
1900         }
1901
1902         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1903         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1904
1905         lmmsize = body->eadatasize;
1906
1907         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1908                         lmmsize == 0) {
1909                 GOTO(out, rc = -ENODATA);
1910         }
1911
1912         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1913         LASSERT(lmm != NULL);
1914
1915         /*
1916          * This is coming from the MDS, so is probably in
1917          * little endian.  We convert it to host endian before
1918          * passing it to userspace.
1919          */
1920         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1921                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1922                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1923         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1924                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1925         }
1926
1927         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1928                 struct lov_stripe_md *lsm;
1929                 struct lov_user_md_join *lmj;
1930                 int lmj_size, i, aindex = 0;
1931
1932                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1933                 if (rc < 0)
1934                         GOTO(out, rc = -ENOMEM);
1935                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1936                 if (rc)
1937                         GOTO(out_free_memmd, rc);
1938
1939                 lmj_size = sizeof(struct lov_user_md_join) +
1940                            lsm->lsm_stripe_count *
1941                            sizeof(struct lov_user_ost_data_join);
1942                 OBD_ALLOC(lmj, lmj_size);
1943                 if (!lmj)
1944                         GOTO(out_free_memmd, rc = -ENOMEM);
1945
1946                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1947                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1948                         struct lov_extent *lex =
1949                                 &lsm->lsm_array->lai_ext_array[aindex];
1950
1951                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1952                                 aindex ++;
1953                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1954                                         LPU64" len %d\n", aindex, i,
1955                                         lex->le_start, (int)lex->le_len);
1956                         lmj->lmm_objects[i].l_extent_start =
1957                                 lex->le_start;
1958
1959                         if ((int)lex->le_len == -1)
1960                                 lmj->lmm_objects[i].l_extent_end = -1;
1961                         else
1962                                 lmj->lmm_objects[i].l_extent_end =
1963                                         lex->le_start + lex->le_len;
1964                         lmj->lmm_objects[i].l_object_id =
1965                                 lsm->lsm_oinfo[i]->loi_id;
1966                         lmj->lmm_objects[i].l_object_gr =
1967                                 lsm->lsm_oinfo[i]->loi_gr;
1968                         lmj->lmm_objects[i].l_ost_gen =
1969                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1970                         lmj->lmm_objects[i].l_ost_idx =
1971                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1972                 }
1973                 lmm = (struct lov_mds_md *)lmj;
1974                 lmmsize = lmj_size;
1975 out_free_memmd:
1976                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
1977         }
1978 out:
1979         *lmmp = lmm;
1980         *lmm_size = lmmsize;
1981         *request = req;
1982         return rc;
1983 }
1984
1985 static int ll_lov_setea(struct inode *inode, struct file *file,
1986                             unsigned long arg)
1987 {
1988         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1989         struct lov_user_md  *lump;
1990         int lum_size = sizeof(struct lov_user_md) +
1991                        sizeof(struct lov_user_ost_data);
1992         int rc;
1993         ENTRY;
1994
1995         if (!capable (CAP_SYS_ADMIN))
1996                 RETURN(-EPERM);
1997
1998         OBD_ALLOC(lump, lum_size);
1999         if (lump == NULL) {
2000                 RETURN(-ENOMEM);
2001         }
2002         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2003         if (rc) {
2004                 OBD_FREE(lump, lum_size);
2005                 RETURN(-EFAULT);
2006         }
2007
2008         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2009
2010         OBD_FREE(lump, lum_size);
2011         RETURN(rc);
2012 }
2013
2014 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2015                             unsigned long arg)
2016 {
2017         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
2018         int rc;
2019         int flags = FMODE_WRITE;
2020         ENTRY;
2021
2022         /* Bug 1152: copy properly when this is no longer true */
2023         LASSERT(sizeof(lum) == sizeof(*lump));
2024         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
2025         rc = copy_from_user(&lum, lump, sizeof(lum));
2026         if (rc)
2027                 RETURN(-EFAULT);
2028
2029         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
2030         if (rc == 0) {
2031                  put_user(0, &lump->lmm_stripe_count);
2032                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
2033                                     0, ll_i2info(inode)->lli_smd, lump);
2034         }
2035         RETURN(rc);
2036 }
2037
2038 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2039 {
2040         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2041
2042         if (!lsm)
2043                 RETURN(-ENODATA);
2044
2045         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
2046                             (void *)arg);
2047 }
2048
2049 static int ll_get_grouplock(struct inode *inode, struct file *file,
2050                             unsigned long arg)
2051 {
2052         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2053         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2054                                                     .end = OBD_OBJECT_EOF}};
2055         struct lustre_handle lockh = { 0 };
2056         struct ll_inode_info *lli = ll_i2info(inode);
2057         struct lov_stripe_md *lsm = lli->lli_smd;
2058         int flags = 0, rc;
2059         ENTRY;
2060
2061         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2062                 RETURN(-EINVAL);
2063         }
2064
2065         policy.l_extent.gid = arg;
2066         if (file->f_flags & O_NONBLOCK)
2067                 flags = LDLM_FL_BLOCK_NOWAIT;
2068
2069         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2070         if (rc)
2071                 RETURN(rc);
2072
2073         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2074         fd->fd_gid = arg;
2075         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2076
2077         RETURN(0);
2078 }
2079
2080 static int ll_put_grouplock(struct inode *inode, struct file *file,
2081                             unsigned long arg)
2082 {
2083         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2084         struct ll_inode_info *lli = ll_i2info(inode);
2085         struct lov_stripe_md *lsm = lli->lli_smd;
2086         int rc;
2087         ENTRY;
2088
2089         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2090                 /* Ugh, it's already unlocked. */
2091                 RETURN(-EINVAL);
2092         }
2093
2094         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2095                 RETURN(-EINVAL);
2096
2097         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2098
2099         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2100         if (rc)
2101                 RETURN(rc);
2102
2103         fd->fd_gid = 0;
2104         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2105
2106         RETURN(0);
2107 }
2108
2109 static int join_sanity_check(struct inode *head, struct inode *tail)
2110 {
2111         ENTRY;
2112         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2113                 CERROR("server do not support join \n");
2114                 RETURN(-EINVAL);
2115         }
2116         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2117                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2118                        head->i_ino, tail->i_ino);
2119                 RETURN(-EINVAL);
2120         }
2121         if (head->i_ino == tail->i_ino) {
2122                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2123                 RETURN(-EINVAL);
2124         }
2125         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2126                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2127                 RETURN(-EINVAL);
2128         }
2129         RETURN(0);
2130 }
2131
2132 static int join_file(struct inode *head_inode, struct file *head_filp,
2133                      struct file *tail_filp)
2134 {
2135         struct dentry *tail_dentry = tail_filp->f_dentry;
2136         struct lookup_intent oit = {.it_op = IT_OPEN,
2137                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2138         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2139                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2140
2141         struct lustre_handle lockh;
2142         struct md_op_data *op_data;
2143         int    rc;
2144         loff_t data;
2145         ENTRY;
2146
2147         tail_dentry = tail_filp->f_dentry;
2148
2149         data = i_size_read(head_inode);
2150         op_data = ll_prep_md_op_data(NULL, head_inode,
2151                                      tail_dentry->d_parent->d_inode,
2152                                      tail_dentry->d_name.name,
2153                                      tail_dentry->d_name.len, 0,
2154                                      LUSTRE_OPC_ANY, &data);
2155         if (IS_ERR(op_data))
2156                 RETURN(PTR_ERR(op_data));
2157
2158         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2159                          op_data, &lockh, NULL, 0, 0);
2160
2161         ll_finish_md_op_data(op_data);
2162         if (rc < 0)
2163                 GOTO(out, rc);
2164
2165         rc = oit.d.lustre.it_status;
2166
2167         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2168                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2169                 ptlrpc_req_finished((struct ptlrpc_request *)
2170                                     oit.d.lustre.it_data);
2171                 GOTO(out, rc);
2172         }
2173
2174         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2175                                            * away */
2176                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2177                 oit.d.lustre.it_lock_mode = 0;
2178         }
2179         ll_release_openhandle(head_filp->f_dentry, &oit);
2180 out:
2181         ll_intent_release(&oit);
2182         RETURN(rc);
2183 }
2184
2185 static int ll_file_join(struct inode *head, struct file *filp,
2186                         char *filename_tail)
2187 {
2188         struct inode *tail = NULL, *first = NULL, *second = NULL;
2189         struct dentry *tail_dentry;
2190         struct file *tail_filp, *first_filp, *second_filp;
2191         struct ll_lock_tree first_tree, second_tree;
2192         struct ll_lock_tree_node *first_node, *second_node;
2193         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2194         int rc = 0, cleanup_phase = 0;
2195         ENTRY;
2196
2197         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2198                head->i_ino, head->i_generation, head, filename_tail);
2199
2200         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2201         if (IS_ERR(tail_filp)) {
2202                 CERROR("Can not open tail file %s", filename_tail);
2203                 rc = PTR_ERR(tail_filp);
2204                 GOTO(cleanup, rc);
2205         }
2206         tail = igrab(tail_filp->f_dentry->d_inode);
2207
2208         tlli = ll_i2info(tail);
2209         tail_dentry = tail_filp->f_dentry;
2210         LASSERT(tail_dentry);
2211         cleanup_phase = 1;
2212
2213         /*reorder the inode for lock sequence*/
2214         first = head->i_ino > tail->i_ino ? head : tail;
2215         second = head->i_ino > tail->i_ino ? tail : head;
2216         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2217         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2218
2219         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2220                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2221         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2222         if (IS_ERR(first_node)){
2223                 rc = PTR_ERR(first_node);
2224                 GOTO(cleanup, rc);
2225         }
2226         first_tree.lt_fd = first_filp->private_data;
2227         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2228         if (rc != 0)
2229                 GOTO(cleanup, rc);
2230         cleanup_phase = 2;
2231
2232         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2233         if (IS_ERR(second_node)){
2234                 rc = PTR_ERR(second_node);
2235                 GOTO(cleanup, rc);
2236         }
2237         second_tree.lt_fd = second_filp->private_data;
2238         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2239         if (rc != 0)
2240                 GOTO(cleanup, rc);
2241         cleanup_phase = 3;
2242
2243         rc = join_sanity_check(head, tail);
2244         if (rc)
2245                 GOTO(cleanup, rc);
2246
2247         rc = join_file(head, filp, tail_filp);
2248         if (rc)
2249                 GOTO(cleanup, rc);
2250 cleanup:
2251         switch (cleanup_phase) {
2252         case 3:
2253                 ll_tree_unlock(&second_tree);
2254                 obd_cancel_unused(ll_i2dtexp(second),
2255                                   ll_i2info(second)->lli_smd, 0, NULL);
2256         case 2:
2257                 ll_tree_unlock(&first_tree);
2258                 obd_cancel_unused(ll_i2dtexp(first),
2259                                   ll_i2info(first)->lli_smd, 0, NULL);
2260         case 1:
2261                 filp_close(tail_filp, 0);
2262                 if (tail)
2263                         iput(tail);
2264                 if (head && rc == 0) {
2265                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2266                                        &hlli->lli_smd);
2267                         hlli->lli_smd = NULL;
2268                 }
2269         case 0:
2270                 break;
2271         default:
2272                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2273                 LBUG();
2274         }
2275         RETURN(rc);
2276 }
2277
2278 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2279 {
2280         struct inode *inode = dentry->d_inode;
2281         struct obd_client_handle *och;
2282         int rc;
2283         ENTRY;
2284
2285         LASSERT(inode);
2286
2287         /* Root ? Do nothing. */
2288         if (dentry->d_inode->i_sb->s_root == dentry)
2289                 RETURN(0);
2290
2291         /* No open handle to close? Move away */
2292         if (!it_disposition(it, DISP_OPEN_OPEN))
2293                 RETURN(0);
2294
2295         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2296
2297         OBD_ALLOC(och, sizeof(*och));
2298         if (!och)
2299                 GOTO(out, rc = -ENOMEM);
2300
2301         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2302                     ll_i2info(inode), it, och);
2303
2304         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2305                                        inode, och);
2306  out:
2307         /* this one is in place of ll_file_open */
2308         ptlrpc_req_finished(it->d.lustre.it_data);
2309         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2310         RETURN(rc);
2311 }
2312
2313 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2314                   unsigned long arg)
2315 {
2316         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2317         int flags;
2318         ENTRY;
2319
2320         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2321                inode->i_generation, inode, cmd);
2322         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2323
2324         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2325         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2326                 RETURN(-ENOTTY);
2327
2328         switch(cmd) {
2329         case LL_IOC_GETFLAGS:
2330                 /* Get the current value of the file flags */
2331                 return put_user(fd->fd_flags, (int *)arg);
2332         case LL_IOC_SETFLAGS:
2333         case LL_IOC_CLRFLAGS:
2334                 /* Set or clear specific file flags */
2335                 /* XXX This probably needs checks to ensure the flags are
2336                  *     not abused, and to handle any flag side effects.
2337                  */
2338                 if (get_user(flags, (int *) arg))
2339                         RETURN(-EFAULT);
2340
2341                 if (cmd == LL_IOC_SETFLAGS) {
2342                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2343                             !(file->f_flags & O_DIRECT)) {
2344                                 CERROR("%s: unable to disable locking on "
2345                                        "non-O_DIRECT file\n", current->comm);
2346                                 RETURN(-EINVAL);
2347                         }
2348
2349                         fd->fd_flags |= flags;
2350                 } else {
2351                         fd->fd_flags &= ~flags;
2352                 }
2353                 RETURN(0);
2354         case LL_IOC_LOV_SETSTRIPE:
2355                 RETURN(ll_lov_setstripe(inode, file, arg));
2356         case LL_IOC_LOV_SETEA:
2357                 RETURN(ll_lov_setea(inode, file, arg));
2358         case LL_IOC_LOV_GETSTRIPE:
2359                 RETURN(ll_lov_getstripe(inode, arg));
2360         case LL_IOC_RECREATE_OBJ:
2361                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2362         case EXT3_IOC_GETFLAGS:
2363         case EXT3_IOC_SETFLAGS:
2364                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2365         case EXT3_IOC_GETVERSION_OLD:
2366         case EXT3_IOC_GETVERSION:
2367                 RETURN(put_user(inode->i_generation, (int *)arg));
2368         case LL_IOC_JOIN: {
2369                 char *ftail;
2370                 int rc;
2371
2372                 ftail = getname((const char *)arg);
2373                 if (IS_ERR(ftail))
2374                         RETURN(PTR_ERR(ftail));
2375                 rc = ll_file_join(inode, file, ftail);
2376                 putname(ftail);
2377                 RETURN(rc);
2378         }
2379         case LL_IOC_GROUP_LOCK:
2380                 RETURN(ll_get_grouplock(inode, file, arg));
2381         case LL_IOC_GROUP_UNLOCK:
2382                 RETURN(ll_put_grouplock(inode, file, arg));
2383         case IOC_OBD_STATFS:
2384                 RETURN(ll_obd_statfs(inode, (void *)arg));
2385
2386         /* We need to special case any other ioctls we want to handle,
2387          * to send them to the MDS/OST as appropriate and to properly
2388          * network encode the arg field.
2389         case EXT3_IOC_SETVERSION_OLD:
2390         case EXT3_IOC_SETVERSION:
2391         */
2392         case LL_IOC_FLUSHCTX:
2393                 RETURN(ll_flush_ctx(inode));
2394         default: {
2395                 int err;
2396
2397                 if (LLIOC_STOP == 
2398                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2399                         RETURN(err);
2400
2401                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2402                                      (void *)arg));
2403         }
2404         }
2405 }
2406
2407 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2408 {
2409         struct inode *inode = file->f_dentry->d_inode;
2410         struct ll_inode_info *lli = ll_i2info(inode);
2411         struct lov_stripe_md *lsm = lli->lli_smd;
2412         loff_t retval;
2413         ENTRY;
2414         retval = offset + ((origin == 2) ? i_size_read(inode) :
2415                            (origin == 1) ? file->f_pos : 0);
2416         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2417                inode->i_ino, inode->i_generation, inode, retval, retval,
2418                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2419         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2420
2421         if (origin == 2) { /* SEEK_END */
2422                 int nonblock = 0, rc;
2423
2424                 if (file->f_flags & O_NONBLOCK)
2425                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2426
2427                 if (lsm != NULL) {
2428                         rc = ll_glimpse_size(inode, nonblock);
2429                         if (rc != 0)
2430                                 RETURN(rc);
2431                 }
2432
2433                 ll_inode_size_lock(inode, 0);
2434                 offset += i_size_read(inode);
2435                 ll_inode_size_unlock(inode, 0);
2436         } else if (origin == 1) { /* SEEK_CUR */
2437                 offset += file->f_pos;
2438         }
2439
2440         retval = -EINVAL;
2441         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2442                 if (offset != file->f_pos) {
2443                         file->f_pos = offset;
2444 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2445                         file->f_reada = 0;
2446                         file->f_version = ++event;
2447 #endif
2448                 }
2449                 retval = offset;
2450         }
2451         
2452         RETURN(retval);
2453 }
2454
2455 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2456 {
2457         struct inode *inode = dentry->d_inode;
2458         struct ll_inode_info *lli = ll_i2info(inode);
2459         struct lov_stripe_md *lsm = lli->lli_smd;
2460         struct ptlrpc_request *req;
2461         struct obd_capa *oc;
2462         int rc, err;
2463         ENTRY;
2464         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2465                inode->i_generation, inode);
2466         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2467
2468         /* fsync's caller has already called _fdata{sync,write}, we want
2469          * that IO to finish before calling the osc and mdc sync methods */
2470         rc = filemap_fdatawait(inode->i_mapping);
2471
2472         /* catch async errors that were recorded back when async writeback
2473          * failed for pages in this mapping. */
2474         err = lli->lli_async_rc;
2475         lli->lli_async_rc = 0;
2476         if (rc == 0)
2477                 rc = err;
2478         if (lsm) {
2479                 err = lov_test_and_clear_async_rc(lsm);
2480                 if (rc == 0)
2481                         rc = err;
2482         }
2483
2484         oc = ll_mdscapa_get(inode);
2485         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2486                       &req);
2487         capa_put(oc);
2488         if (!rc)
2489                 rc = err;
2490         if (!err)
2491                 ptlrpc_req_finished(req);
2492
2493         if (data && lsm) {
2494                 struct obdo *oa;
2495                 
2496                 OBDO_ALLOC(oa);
2497                 if (!oa)
2498                         RETURN(rc ? rc : -ENOMEM);
2499
2500                 oa->o_id = lsm->lsm_object_id;
2501                 oa->o_gr = lsm->lsm_object_gr;
2502                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2503                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2504                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2505                                            OBD_MD_FLGROUP);
2506
2507                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2508                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2509                                0, OBD_OBJECT_EOF, oc);
2510                 capa_put(oc);
2511                 if (!rc)
2512                         rc = err;
2513                 OBDO_FREE(oa);
2514         }
2515
2516         RETURN(rc);
2517 }
2518
2519 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2520 {
2521         struct inode *inode = file->f_dentry->d_inode;
2522         struct ll_sb_info *sbi = ll_i2sbi(inode);
2523         struct ldlm_res_id res_id =
2524                 { .name = { fid_seq(ll_inode2fid(inode)),
2525                             fid_oid(ll_inode2fid(inode)),
2526                             fid_ver(ll_inode2fid(inode)),
2527                             LDLM_FLOCK} };
2528         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2529                 ldlm_flock_completion_ast, NULL, file_lock };
2530         struct lustre_handle lockh = {0};
2531         ldlm_policy_data_t flock;
2532         int flags = 0;
2533         int rc;
2534         ENTRY;
2535
2536         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2537                inode->i_ino, file_lock);
2538
2539         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2540  
2541         if (file_lock->fl_flags & FL_FLOCK) {
2542                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2543                 /* set missing params for flock() calls */
2544                 file_lock->fl_end = OFFSET_MAX;
2545                 file_lock->fl_pid = current->tgid;
2546         }
2547         flock.l_flock.pid = file_lock->fl_pid;
2548         flock.l_flock.start = file_lock->fl_start;
2549         flock.l_flock.end = file_lock->fl_end;
2550
2551         switch (file_lock->fl_type) {
2552         case F_RDLCK:
2553                 einfo.ei_mode = LCK_PR;
2554                 break;
2555         case F_UNLCK:
2556                 /* An unlock request may or may not have any relation to
2557                  * existing locks so we may not be able to pass a lock handle
2558                  * via a normal ldlm_lock_cancel() request. The request may even
2559                  * unlock a byte range in the middle of an existing lock. In
2560                  * order to process an unlock request we need all of the same
2561                  * information that is given with a normal read or write record
2562                  * lock request. To avoid creating another ldlm unlock (cancel)
2563                  * message we'll treat a LCK_NL flock request as an unlock. */
2564                 einfo.ei_mode = LCK_NL;
2565                 break;
2566         case F_WRLCK:
2567                 einfo.ei_mode = LCK_PW;
2568                 break;
2569         default:
2570                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2571                 LBUG();
2572         }
2573
2574         switch (cmd) {
2575         case F_SETLKW:
2576 #ifdef F_SETLKW64
2577         case F_SETLKW64:
2578 #endif
2579                 flags = 0;
2580                 break;
2581         case F_SETLK:
2582 #ifdef F_SETLK64
2583         case F_SETLK64:
2584 #endif
2585                 flags = LDLM_FL_BLOCK_NOWAIT;
2586                 break;
2587         case F_GETLK:
2588 #ifdef F_GETLK64
2589         case F_GETLK64:
2590 #endif
2591                 flags = LDLM_FL_TEST_LOCK;
2592                 /* Save the old mode so that if the mode in the lock changes we
2593                  * can decrement the appropriate reader or writer refcount. */
2594                 file_lock->fl_type = einfo.ei_mode;
2595                 break;
2596         default:
2597                 CERROR("unknown fcntl lock command: %d\n", cmd);
2598                 LBUG();
2599         }
2600
2601         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2602                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2603                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2604
2605         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2606                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2607         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2608                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2609 #ifdef HAVE_F_OP_FLOCK
2610         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2611             !(flags & LDLM_FL_TEST_LOCK))
2612                 posix_lock_file_wait(file, file_lock);
2613 #endif
2614
2615         RETURN(rc);
2616 }
2617
2618 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2619 {
2620         ENTRY;
2621
2622         RETURN(-ENOSYS);
2623 }
2624
2625 int ll_have_md_lock(struct inode *inode, __u64 bits)
2626 {
2627         struct lustre_handle lockh;
2628         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2629         struct lu_fid *fid;
2630         int flags;
2631         ENTRY;
2632
2633         if (!inode)
2634                RETURN(0);
2635
2636         fid = &ll_i2info(inode)->lli_fid;
2637         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2638
2639         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2640         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2641                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2642                 RETURN(1);
2643         }
2644         RETURN(0);
2645 }
2646
2647 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2648                             struct lustre_handle *lockh)
2649 {
2650         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2651         struct lu_fid *fid;
2652         ldlm_mode_t rc;
2653         int flags;
2654         ENTRY;
2655
2656         fid = &ll_i2info(inode)->lli_fid;
2657         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2658
2659         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2660         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2661                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2662         RETURN(rc);
2663 }
2664
2665 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2666         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2667                               * and return success */
2668                 inode->i_nlink = 0;
2669                 /* This path cannot be hit for regular files unless in
2670                  * case of obscure races, so no need to to validate
2671                  * size. */
2672                 if (!S_ISREG(inode->i_mode) &&
2673                     !S_ISDIR(inode->i_mode))
2674                         return 0;
2675         }
2676
2677         if (rc) {
2678                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2679                 return -abs(rc);
2680
2681         }
2682
2683         return 0;
2684 }
2685
2686 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2687 {
2688         struct inode *inode = dentry->d_inode;
2689         struct ptlrpc_request *req = NULL;
2690         struct ll_sb_info *sbi;
2691         struct obd_export *exp;
2692         int rc;
2693         ENTRY;
2694
2695         if (!inode) {
2696                 CERROR("REPORT THIS LINE TO PETER\n");
2697                 RETURN(0);
2698         }
2699         sbi = ll_i2sbi(inode);
2700
2701         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2702                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2703
2704         exp = ll_i2mdexp(inode);
2705
2706         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2707                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2708                 struct md_op_data *op_data;
2709
2710                 /* Call getattr by fid, so do not provide name at all. */
2711                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2712                                              dentry->d_inode, NULL, 0, 0,
2713                                              LUSTRE_OPC_ANY, NULL);
2714                 if (IS_ERR(op_data))
2715                         RETURN(PTR_ERR(op_data));
2716
2717                 oit.it_flags |= O_CHECK_STALE;
2718                 rc = md_intent_lock(exp, op_data, NULL, 0,
2719                                     /* we are not interested in name
2720                                        based lookup */
2721                                     &oit, 0, &req,
2722                                     ll_md_blocking_ast, 0);
2723                 ll_finish_md_op_data(op_data);
2724                 oit.it_flags &= ~O_CHECK_STALE;
2725                 if (rc < 0) {
2726                         rc = ll_inode_revalidate_fini(inode, rc);
2727                         GOTO (out, rc);
2728                 }
2729
2730                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2731                 if (rc != 0) {
2732                         ll_intent_release(&oit);
2733                         GOTO(out, rc);
2734                 }
2735
2736                 /* Unlinked? Unhash dentry, so it is not picked up later by
2737                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2738                    here to preserve get_cwd functionality on 2.6.
2739                    Bug 10503 */
2740                 if (!dentry->d_inode->i_nlink) {
2741                         spin_lock(&dcache_lock);
2742                         ll_drop_dentry(dentry);
2743                         spin_unlock(&dcache_lock);
2744                 }
2745
2746                 ll_lookup_finish_locks(&oit, dentry);
2747         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
2748                                                      MDS_INODELOCK_LOOKUP)) {
2749                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2750                 obd_valid valid = OBD_MD_FLGETATTR;
2751                 struct obd_capa *oc;
2752                 int ealen = 0;
2753
2754                 if (S_ISREG(inode->i_mode)) {
2755                         rc = ll_get_max_mdsize(sbi, &ealen);
2756                         if (rc)
2757                                 RETURN(rc);
2758                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2759                 }
2760                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2761                  * capa for this inode. Because we only keep capas of dirs
2762                  * fresh. */
2763                 oc = ll_mdscapa_get(inode);
2764                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2765                                 ealen, &req);
2766                 capa_put(oc);
2767                 if (rc) {
2768                         rc = ll_inode_revalidate_fini(inode, rc);
2769                         RETURN(rc);
2770                 }
2771
2772                 rc = ll_prep_inode(&inode, req, NULL);
2773                 if (rc)
2774                         GOTO(out, rc);
2775         }
2776
2777         /* if object not yet allocated, don't validate size */
2778         if (ll_i2info(inode)->lli_smd == NULL)
2779                 GOTO(out, rc = 0);
2780
2781         /* ll_glimpse_size will prefer locally cached writes if they extend
2782          * the file */
2783         rc = ll_glimpse_size(inode, 0);
2784         EXIT;
2785 out:
2786         ptlrpc_req_finished(req);
2787         return rc;
2788 }
2789
2790 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2791                   struct lookup_intent *it, struct kstat *stat)
2792 {
2793         struct inode *inode = de->d_inode;
2794         int res = 0;
2795
2796         res = ll_inode_revalidate_it(de, it);
2797         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2798
2799         if (res)
2800                 return res;
2801
2802         stat->dev = inode->i_sb->s_dev;
2803         stat->ino = inode->i_ino;
2804         stat->mode = inode->i_mode;
2805         stat->nlink = inode->i_nlink;
2806         stat->uid = inode->i_uid;
2807         stat->gid = inode->i_gid;
2808         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2809         stat->atime = inode->i_atime;
2810         stat->mtime = inode->i_mtime;
2811         stat->ctime = inode->i_ctime;
2812 #ifdef HAVE_INODE_BLKSIZE
2813         stat->blksize = inode->i_blksize;
2814 #else
2815         stat->blksize = 1 << inode->i_blkbits;
2816 #endif
2817
2818         ll_inode_size_lock(inode, 0);
2819         stat->size = i_size_read(inode);
2820         stat->blocks = inode->i_blocks;
2821         ll_inode_size_unlock(inode, 0);
2822
2823         return 0;
2824 }
2825 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2826 {
2827         struct lookup_intent it = { .it_op = IT_GETATTR };
2828
2829         return ll_getattr_it(mnt, de, &it, stat);
2830 }
2831
2832 static
2833 int lustre_check_acl(struct inode *inode, int mask)
2834 {
2835 #ifdef CONFIG_FS_POSIX_ACL
2836         struct ll_inode_info *lli = ll_i2info(inode);
2837         struct posix_acl *acl;
2838         int rc;
2839         ENTRY;
2840
2841         spin_lock(&lli->lli_lock);
2842         acl = posix_acl_dup(lli->lli_posix_acl);
2843         spin_unlock(&lli->lli_lock);
2844
2845         if (!acl)
2846                 RETURN(-EAGAIN);
2847
2848         rc = posix_acl_permission(inode, acl, mask);
2849         posix_acl_release(acl);
2850
2851         RETURN(rc);
2852 #else
2853         return -EAGAIN;
2854 #endif
2855 }
2856
2857 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2858 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2859 {
2860         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2861                inode->i_ino, inode->i_generation, inode, mask);
2862         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2863                 return lustre_check_remote_perm(inode, mask);
2864         
2865         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2866         return generic_permission(inode, mask, lustre_check_acl);
2867 }
2868 #else
2869 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2870 {
2871         int mode = inode->i_mode;
2872         int rc;
2873
2874         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2875                inode->i_ino, inode->i_generation, inode, mask);
2876
2877         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2878                 return lustre_check_remote_perm(inode, mask);
2879
2880         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2881
2882         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2883             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2884                 return -EROFS;
2885         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2886                 return -EACCES;
2887         if (current->fsuid == inode->i_uid) {
2888                 mode >>= 6;
2889         } else if (1) {
2890                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2891                         goto check_groups;
2892                 rc = lustre_check_acl(inode, mask);
2893                 if (rc == -EAGAIN)
2894                         goto check_groups;
2895                 if (rc == -EACCES)
2896                         goto check_capabilities;
2897                 return rc;
2898         } else {
2899 check_groups:
2900                 if (in_group_p(inode->i_gid))
2901                         mode >>= 3;
2902         }
2903         if ((mode & mask & S_IRWXO) == mask)
2904                 return 0;
2905
2906 check_capabilities:
2907         if (!(mask & MAY_EXEC) ||
2908             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2909                 if (capable(CAP_DAC_OVERRIDE))
2910                         return 0;
2911
2912         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2913             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2914                 return 0;
2915         
2916         return -EACCES;
2917 }
2918 #endif
2919
2920 /* -o localflock - only provides locally consistent flock locks */
2921 struct file_operations ll_file_operations = {
2922         .read           = ll_file_read,
2923         .write          = ll_file_write,
2924         .ioctl          = ll_file_ioctl,
2925         .open           = ll_file_open,
2926         .release        = ll_file_release,
2927         .mmap           = ll_file_mmap,
2928         .llseek         = ll_file_seek,
2929         .sendfile       = ll_file_sendfile,
2930         .fsync          = ll_fsync,
2931 };
2932
2933 struct file_operations ll_file_operations_flock = {
2934         .read           = ll_file_read,
2935         .write          = ll_file_write,
2936         .ioctl          = ll_file_ioctl,
2937         .open           = ll_file_open,
2938         .release        = ll_file_release,
2939         .mmap           = ll_file_mmap,
2940         .llseek         = ll_file_seek,
2941         .sendfile       = ll_file_sendfile,
2942         .fsync          = ll_fsync,
2943 #ifdef HAVE_F_OP_FLOCK
2944         .flock          = ll_file_flock,
2945 #endif
2946         .lock           = ll_file_flock
2947 };
2948
2949 /* These are for -o noflock - to return ENOSYS on flock calls */
2950 struct file_operations ll_file_operations_noflock = {
2951         .read           = ll_file_read,
2952         .write          = ll_file_write,
2953         .ioctl          = ll_file_ioctl,
2954         .open           = ll_file_open,
2955         .release        = ll_file_release,
2956         .mmap           = ll_file_mmap,
2957         .llseek         = ll_file_seek,
2958         .sendfile       = ll_file_sendfile,
2959         .fsync          = ll_fsync,
2960 #ifdef HAVE_F_OP_FLOCK
2961         .flock          = ll_file_noflock,
2962 #endif
2963         .lock           = ll_file_noflock
2964 };
2965
2966 struct inode_operations ll_file_inode_operations = {
2967 #ifdef HAVE_VFS_INTENT_PATCHES
2968         .setattr_raw    = ll_setattr_raw,
2969 #endif
2970         .setattr        = ll_setattr,
2971         .truncate       = ll_truncate,
2972         .getattr        = ll_getattr,
2973         .permission     = ll_inode_permission,
2974         .setxattr       = ll_setxattr,
2975         .getxattr       = ll_getxattr,
2976         .listxattr      = ll_listxattr,
2977         .removexattr    = ll_removexattr,
2978 };
2979
2980 /* dynamic ioctl number support routins */
2981 static struct llioc_ctl_data {
2982         struct rw_semaphore ioc_sem;
2983         struct list_head    ioc_head;
2984 } llioc = { 
2985         __RWSEM_INITIALIZER(llioc.ioc_sem), 
2986         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2987 };
2988
2989
2990 struct llioc_data {
2991         struct list_head        iocd_list;
2992         unsigned int            iocd_size;
2993         llioc_callback_t        iocd_cb;
2994         unsigned int            iocd_count;
2995         unsigned int            iocd_cmd[0];
2996 };
2997
2998 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2999 {
3000         unsigned int size;
3001         struct llioc_data *in_data = NULL;
3002         ENTRY;
3003
3004         if (cb == NULL || cmd == NULL ||
3005             count > LLIOC_MAX_CMD || count < 0)
3006                 RETURN(NULL);
3007
3008         size = sizeof(*in_data) + count * sizeof(unsigned int);
3009         OBD_ALLOC(in_data, size);
3010         if (in_data == NULL)
3011                 RETURN(NULL);
3012
3013         memset(in_data, 0, sizeof(*in_data));
3014         in_data->iocd_size = size;
3015         in_data->iocd_cb = cb;
3016         in_data->iocd_count = count;
3017         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3018
3019         down_write(&llioc.ioc_sem);
3020         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3021         up_write(&llioc.ioc_sem);
3022
3023         RETURN(in_data);
3024 }
3025
3026 void ll_iocontrol_unregister(void *magic)
3027 {
3028         struct llioc_data *tmp;
3029
3030         if (magic == NULL)
3031                 return;
3032
3033         down_write(&llioc.ioc_sem);
3034         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3035                 if (tmp == magic) {
3036                         unsigned int size = tmp->iocd_size;
3037
3038                         list_del(&tmp->iocd_list);
3039                         up_write(&llioc.ioc_sem);
3040
3041                         OBD_FREE(tmp, size);
3042                         return;
3043                 }
3044         }
3045         up_write(&llioc.ioc_sem);
3046
3047         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3048 }
3049
3050 EXPORT_SYMBOL(ll_iocontrol_register);
3051 EXPORT_SYMBOL(ll_iocontrol_unregister);
3052
3053 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
3054                         unsigned int cmd, unsigned long arg, int *rcp)
3055 {
3056         enum llioc_iter ret = LLIOC_CONT;
3057         struct llioc_data *data;
3058         int rc = -EINVAL, i;
3059
3060         down_read(&llioc.ioc_sem);
3061         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3062                 for (i = 0; i < data->iocd_count; i++) {
3063                         if (cmd != data->iocd_cmd[i]) 
3064                                 continue;
3065
3066                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3067                         break;
3068                 }
3069
3070                 if (ret == LLIOC_STOP)
3071                         break;
3072         }
3073         up_read(&llioc.ioc_sem);
3074
3075         if (rcp)
3076                 *rcp = rc;
3077         return ret;
3078 }