Whamcloud - gitweb
allow userland application to be know about about lost one of stripes.
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #include "llite_internal.h"
32
33 /* also used by llite/special.c:ll_special_open() */
34 struct ll_file_data *ll_file_data_get(void)
35 {
36         struct ll_file_data *fd;
37
38         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
39         return fd;
40 }
41
42 static void ll_file_data_put(struct ll_file_data *fd)
43 {
44         if (fd != NULL)
45                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
46 }
47
48 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
49                           struct lustre_handle *fh)
50 {
51         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
52         op_data->op_attr.ia_mode = inode->i_mode;
53         op_data->op_attr.ia_atime = inode->i_atime;
54         op_data->op_attr.ia_mtime = inode->i_mtime;
55         op_data->op_attr.ia_ctime = inode->i_ctime;
56         op_data->op_attr.ia_size = i_size_read(inode);
57         op_data->op_attr_blocks = inode->i_blocks;
58         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
59         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
60         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
61         op_data->op_capa1 = ll_mdscapa_get(inode);
62 }
63
64 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
65                              struct obd_client_handle *och)
66 {
67         ENTRY;
68
69         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
70                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
71
72         if (!(och->och_flags & FMODE_WRITE))
73                 goto out;
74
75         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
76             !S_ISREG(inode->i_mode))
77                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
78         else
79                 ll_epoch_close(inode, op_data, &och, 0);
80
81 out:
82         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
83         EXIT;
84 }
85
86 static int ll_close_inode_openhandle(struct obd_export *md_exp,
87                                      struct inode *inode,
88                                      struct obd_client_handle *och)
89 {
90         struct obd_export *exp = ll_i2mdexp(inode);
91         struct md_op_data *op_data;
92         struct ptlrpc_request *req = NULL;
93         struct obd_device *obd = class_exp2obd(exp);
94         int epoch_close = 1;
95         int seq_end = 0, rc;
96         ENTRY;
97
98         if (obd == NULL) {
99                 /*
100                  * XXX: in case of LMV, is this correct to access
101                  * ->exp_handle?
102                  */
103                 CERROR("Invalid MDC connection handle "LPX64"\n",
104                        ll_i2mdexp(inode)->exp_handle.h_cookie);
105                 GOTO(out, rc = 0);
106         }
107
108         /*
109          * here we check if this is forced umount. If so this is called on
110          * canceling "open lock" and we do not call md_close() in this case, as
111          * it will not be successful, as import is already deactivated.
112          */
113         if (obd->obd_force)
114                 GOTO(out, rc = 0);
115
116         OBD_ALLOC_PTR(op_data);
117         if (op_data == NULL)
118                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
119
120         ll_prepare_close(inode, op_data, och);
121         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
122         rc = md_close(md_exp, op_data, och->och_mod, &req);
123         if (rc != -EAGAIN)
124                 seq_end = 1;
125
126         if (rc == -EAGAIN) {
127                 /* This close must have the epoch closed. */
128                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
129                 LASSERT(epoch_close);
130                 /* MDS has instructed us to obtain Size-on-MDS attribute from
131                  * OSTs and send setattr to back to MDS. */
132                 rc = ll_sizeonmds_update(inode, och->och_mod,
133                                          &och->och_fh, op_data->op_ioepoch);
134                 if (rc) {
135                         CERROR("inode %lu mdc Size-on-MDS update failed: "
136                                "rc = %d\n", inode->i_ino, rc);
137                         rc = 0;
138                 }
139         } else if (rc) {
140                 CERROR("inode %lu mdc close failed: rc = %d\n",
141                        inode->i_ino, rc);
142         }
143         ll_finish_md_op_data(op_data);
144
145         if (rc == 0) {
146                 rc = ll_objects_destroy(req, inode);
147                 if (rc)
148                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
149                                inode->i_ino, rc);
150         }
151
152         EXIT;
153 out:
154       
155         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
156             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
157                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
158         } else {
159                 if (seq_end)
160                         ptlrpc_close_replay_seq(req);
161                 md_clear_open_replay_data(md_exp, och);
162                 /* Free @och if it is not waiting for DONE_WRITING. */
163                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
164                 OBD_FREE_PTR(och);
165         }
166         if (req) /* This is close request */
167                 ptlrpc_req_finished(req);
168         return rc;
169 }
170
171 int ll_md_real_close(struct inode *inode, int flags)
172 {
173         struct ll_inode_info *lli = ll_i2info(inode);
174         struct obd_client_handle **och_p;
175         struct obd_client_handle *och;
176         __u64 *och_usecount;
177         int rc = 0;
178         ENTRY;
179
180         if (flags & FMODE_WRITE) {
181                 och_p = &lli->lli_mds_write_och;
182                 och_usecount = &lli->lli_open_fd_write_count;
183         } else if (flags & FMODE_EXEC) {
184                 och_p = &lli->lli_mds_exec_och;
185                 och_usecount = &lli->lli_open_fd_exec_count;
186         } else {
187                 LASSERT(flags & FMODE_READ);
188                 och_p = &lli->lli_mds_read_och;
189                 och_usecount = &lli->lli_open_fd_read_count;
190         }
191
192         down(&lli->lli_och_sem);
193         if (*och_usecount) { /* There are still users of this handle, so
194                                 skip freeing it. */
195                 up(&lli->lli_och_sem);
196                 RETURN(0);
197         }
198         och=*och_p;
199         *och_p = NULL;
200         up(&lli->lli_och_sem);
201
202         if (och) { /* There might be a race and somebody have freed this och
203                       already */
204                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
205                                                inode, och);
206         }
207
208         RETURN(rc);
209 }
210
211 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
212                 struct file *file)
213 {
214         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
215         struct ll_inode_info *lli = ll_i2info(inode);
216         int rc = 0;
217         ENTRY;
218
219         /* clear group lock, if present */
220         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
221                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
222                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
223                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
224                                       &fd->fd_cwlockh);
225         }
226
227         /* Let's see if we have good enough OPEN lock on the file and if
228            we can skip talking to MDS */
229         if (file->f_dentry->d_inode) { /* Can this ever be false? */
230                 int lockmode;
231                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
232                 struct lustre_handle lockh;
233                 struct inode *inode = file->f_dentry->d_inode;
234                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
235
236                 down(&lli->lli_och_sem);
237                 if (fd->fd_omode & FMODE_WRITE) {
238                         lockmode = LCK_CW;
239                         LASSERT(lli->lli_open_fd_write_count);
240                         lli->lli_open_fd_write_count--;
241                 } else if (fd->fd_omode & FMODE_EXEC) {
242                         lockmode = LCK_PR;
243                         LASSERT(lli->lli_open_fd_exec_count);
244                         lli->lli_open_fd_exec_count--;
245                 } else {
246                         lockmode = LCK_CR;
247                         LASSERT(lli->lli_open_fd_read_count);
248                         lli->lli_open_fd_read_count--;
249                 }
250                 up(&lli->lli_och_sem);
251
252                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
253                                    LDLM_IBITS, &policy, lockmode,
254                                    &lockh)) {
255                         rc = ll_md_real_close(file->f_dentry->d_inode,
256                                               fd->fd_omode);
257                 }
258         } else {
259                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
260                        file, file->f_dentry, file->f_dentry->d_name.name);
261         }
262
263         LUSTRE_FPRIVATE(file) = NULL;
264         ll_file_data_put(fd);
265         ll_capa_close(inode);
266
267         RETURN(rc);
268 }
269
270 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
271
272 /* While this returns an error code, fput() the caller does not, so we need
273  * to make every effort to clean up all of our state here.  Also, applications
274  * rarely check close errors and even if an error is returned they will not
275  * re-try the close call.
276  */
277 int ll_file_release(struct inode *inode, struct file *file)
278 {
279         struct ll_file_data *fd;
280         struct ll_sb_info *sbi = ll_i2sbi(inode);
281         struct ll_inode_info *lli = ll_i2info(inode);
282         struct lov_stripe_md *lsm = lli->lli_smd;
283         int rc;
284
285         ENTRY;
286         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
287                inode->i_generation, inode);
288
289         /* don't do anything for / */
290         if (inode->i_sb->s_root == file->f_dentry)
291                 RETURN(0);
292
293         ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
294         fd = LUSTRE_FPRIVATE(file);
295         LASSERT(fd != NULL);
296
297         /* don't do anything for / */
298         if (inode->i_sb->s_root == file->f_dentry) {
299                 LUSTRE_FPRIVATE(file) = NULL;
300                 ll_file_data_put(fd);
301                 RETURN(0);
302         }
303         
304         if (lsm)
305                 lov_test_and_clear_async_rc(lsm);
306         lli->lli_async_rc = 0;
307
308         rc = ll_md_close(sbi->ll_md_exp, inode, file);
309         RETURN(rc);
310 }
311
312 static int ll_intent_file_open(struct file *file, void *lmm,
313                                int lmmsize, struct lookup_intent *itp)
314 {
315         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
316         struct dentry *parent = file->f_dentry->d_parent;
317         const char *name = file->f_dentry->d_name.name;
318         const int len = file->f_dentry->d_name.len;
319         struct md_op_data *op_data;
320         struct ptlrpc_request *req;
321         int rc;
322
323         if (!parent)
324                 RETURN(-ENOENT);
325
326         /* Usually we come here only for NFSD, and we want open lock.
327            But we can also get here with pre 2.6.15 patchless kernels, and in
328            that case that lock is also ok */
329         /* We can also get here if there was cached open handle in revalidate_it
330          * but it disappeared while we were getting from there to ll_file_open.
331          * But this means this file was closed and immediatelly opened which
332          * makes a good candidate for using OPEN lock */
333         /* If lmmsize & lmm are not 0, we are just setting stripe info
334          * parameters. No need for the open lock */
335         if (!lmm && !lmmsize)
336                 itp->it_flags |= MDS_OPEN_LOCK;
337
338         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
339                                       file->f_dentry->d_inode, name, len,
340                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
341         if (IS_ERR(op_data))
342                 RETURN(PTR_ERR(op_data));
343
344         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
345                             0 /*unused */, &req, ll_md_blocking_ast, 0);
346         ll_finish_md_op_data(op_data);
347         if (rc == -ESTALE) {
348                 /* reason for keep own exit path - don`t flood log
349                 * with messages with -ESTALE errors.
350                 */
351                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
352                      it_open_error(DISP_OPEN_OPEN, itp))
353                         GOTO(out, rc);
354                 ll_release_openhandle(file->f_dentry, itp);
355                 GOTO(out_stale, rc);
356         }
357
358         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
359                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
360                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
361                 GOTO(out, rc);
362         }
363
364         if (itp->d.lustre.it_lock_mode)
365                 md_set_lock_data(sbi->ll_md_exp,
366                                  &itp->d.lustre.it_lock_handle, 
367                                  file->f_dentry->d_inode);
368
369         rc = ll_prep_inode(&file->f_dentry->d_inode, req, DLM_REPLY_REC_OFF,
370                            NULL);
371 out:
372         ptlrpc_req_finished(itp->d.lustre.it_data);
373
374 out_stale:
375         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
376         ll_intent_drop_lock(itp);
377
378         RETURN(rc);
379 }
380
381 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
382                        struct lookup_intent *it, struct obd_client_handle *och)
383 {
384         struct ptlrpc_request *req = it->d.lustre.it_data;
385         struct mdt_body *body;
386
387         LASSERT(och);
388
389         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
390         /* reply already checked out */
391         LASSERT(body != NULL);
392         /* and swabbed in md_enqueue */
393         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
394
395         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
396         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
397         och->och_fid = lli->lli_fid;
398         och->och_flags = it->it_flags;
399         lli->lli_ioepoch = body->ioepoch;
400
401         return md_set_open_replay_data(md_exp, och, req);
402 }
403
404 int ll_local_open(struct file *file, struct lookup_intent *it,
405                   struct ll_file_data *fd, struct obd_client_handle *och)
406 {
407         struct inode *inode = file->f_dentry->d_inode;
408         struct ll_inode_info *lli = ll_i2info(inode);
409         ENTRY;
410
411         LASSERT(!LUSTRE_FPRIVATE(file));
412
413         LASSERT(fd != NULL);
414
415         if (och) {
416                 struct ptlrpc_request *req = it->d.lustre.it_data;
417                 struct mdt_body *body;
418                 int rc;
419
420                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
421                 if (rc)
422                         RETURN(rc);
423
424                 body = lustre_msg_buf(req->rq_repmsg,
425                                       DLM_REPLY_REC_OFF, sizeof(*body));
426
427                 if ((it->it_flags & FMODE_WRITE) &&
428                     (body->valid & OBD_MD_FLSIZE))
429                 {
430                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
431                                lli->lli_ioepoch, PFID(&lli->lli_fid));
432                 }
433         }
434
435         LUSTRE_FPRIVATE(file) = fd;
436         ll_readahead_init(inode, &fd->fd_ras);
437         fd->fd_omode = it->it_flags;
438         RETURN(0);
439 }
440
441 /* Open a file, and (for the very first open) create objects on the OSTs at
442  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
443  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
444  * lli_open_sem to ensure no other process will create objects, send the
445  * stripe MD to the MDS, or try to destroy the objects if that fails.
446  *
447  * If we already have the stripe MD locally then we don't request it in
448  * md_open(), by passing a lmm_size = 0.
449  *
450  * It is up to the application to ensure no other processes open this file
451  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
452  * used.  We might be able to avoid races of that sort by getting lli_open_sem
453  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
454  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
455  */
456 int ll_file_open(struct inode *inode, struct file *file)
457 {
458         struct ll_inode_info *lli = ll_i2info(inode);
459         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
460                                           .it_flags = file->f_flags };
461         struct lov_stripe_md *lsm;
462         struct ptlrpc_request *req = NULL;
463         struct obd_client_handle **och_p;
464         __u64 *och_usecount;
465         struct ll_file_data *fd;
466         int rc = 0;
467         ENTRY;
468
469         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
470                inode->i_generation, inode, file->f_flags);
471
472         /* don't do anything for / */
473         if (inode->i_sb->s_root == file->f_dentry)
474                 RETURN(0);
475
476 #ifdef HAVE_VFS_INTENT_PATCHES
477         it = file->f_it;
478 #else
479         it = file->private_data; /* XXX: compat macro */
480         file->private_data = NULL; /* prevent ll_local_open assertion */
481 #endif
482
483         fd = ll_file_data_get();
484         if (fd == NULL)
485                 RETURN(-ENOMEM);
486
487         /* don't do anything for / */
488         if (inode->i_sb->s_root == file->f_dentry) {
489                 LUSTRE_FPRIVATE(file) = fd;
490                 RETURN(0);
491         }
492
493         if (!it || !it->d.lustre.it_disposition) {
494                 /* Convert f_flags into access mode. We cannot use file->f_mode,
495                  * because everything but O_ACCMODE mask was stripped from
496                  * there */
497                 if ((oit.it_flags + 1) & O_ACCMODE)
498                         oit.it_flags++;
499                 if (file->f_flags & O_TRUNC)
500                         oit.it_flags |= FMODE_WRITE;
501
502                 /* kernel only call f_op->open in dentry_open.  filp_open calls
503                  * dentry_open after call to open_namei that checks permissions.
504                  * Only nfsd_open call dentry_open directly without checking
505                  * permissions and because of that this code below is safe. */
506                 if (oit.it_flags & FMODE_WRITE)
507                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
508
509                 /* We do not want O_EXCL here, presumably we opened the file
510                  * already? XXX - NFS implications? */
511                 oit.it_flags &= ~O_EXCL;
512
513                 it = &oit;
514         }
515
516         /* Let's see if we have file open on MDS already. */
517         if (it->it_flags & FMODE_WRITE) {
518                 och_p = &lli->lli_mds_write_och;
519                 och_usecount = &lli->lli_open_fd_write_count;
520         } else if (it->it_flags & FMODE_EXEC) {
521                 och_p = &lli->lli_mds_exec_och;
522                 och_usecount = &lli->lli_open_fd_exec_count;
523          } else {
524                 och_p = &lli->lli_mds_read_och;
525                 och_usecount = &lli->lli_open_fd_read_count;
526         }
527         
528         down(&lli->lli_och_sem);
529         if (*och_p) { /* Open handle is present */
530                 if (it_disposition(it, DISP_OPEN_OPEN)) {
531                         /* Well, there's extra open request that we do not need,
532                            let's close it somehow. This will decref request. */
533                         rc = it_open_error(DISP_OPEN_OPEN, it);
534                         if (rc) {
535                                 ll_file_data_put(fd);
536                                 GOTO(out_och_free, rc);
537                         }       
538                         ll_release_openhandle(file->f_dentry, it);
539                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
540                                              LPROC_LL_OPEN);
541                 }
542                 (*och_usecount)++;
543
544                 rc = ll_local_open(file, it, fd, NULL);
545                 if (rc) {
546                         up(&lli->lli_och_sem);
547                         ll_file_data_put(fd);
548                         RETURN(rc);
549                 }
550         } else {
551                 LASSERT(*och_usecount == 0);
552                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
553                 if (!*och_p) {
554                         ll_file_data_put(fd);
555                         GOTO(out_och_free, rc = -ENOMEM);
556                 }
557                 (*och_usecount)++;
558                 if (!it->d.lustre.it_disposition) {
559                         it->it_flags |= O_CHECK_STALE;
560                         rc = ll_intent_file_open(file, NULL, 0, it);
561                         it->it_flags &= ~O_CHECK_STALE;
562                         if (rc) {
563                                 ll_file_data_put(fd);
564                                 GOTO(out_och_free, rc);
565                         }
566
567                         /* Got some error? Release the request */
568                         if (it->d.lustre.it_status < 0) {
569                                 req = it->d.lustre.it_data;
570                                 ptlrpc_req_finished(req);
571                         }
572                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
573                                          &it->d.lustre.it_lock_handle,
574                                          file->f_dentry->d_inode);
575                 }
576                 req = it->d.lustre.it_data;
577
578                 /* md_intent_lock() didn't get a request ref if there was an
579                  * open error, so don't do cleanup on the request here
580                  * (bug 3430) */
581                 /* XXX (green): Should not we bail out on any error here, not
582                  * just open error? */
583                 rc = it_open_error(DISP_OPEN_OPEN, it);
584                 if (rc) {
585                         ll_file_data_put(fd);
586                         GOTO(out_och_free, rc);
587                 }
588
589                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
590                 rc = ll_local_open(file, it, fd, *och_p);
591                 if (rc) {
592                         up(&lli->lli_och_sem);
593                         ll_file_data_put(fd);
594                         GOTO(out_och_free, rc);
595                 }
596         }
597         up(&lli->lli_och_sem);
598
599         /* Must do this outside lli_och_sem lock to prevent deadlock where
600            different kind of OPEN lock for this same inode gets cancelled
601            by ldlm_cancel_lru */
602         if (!S_ISREG(inode->i_mode))
603                 GOTO(out, rc);
604
605         ll_capa_open(inode);
606
607         lsm = lli->lli_smd;
608         if (lsm == NULL) {
609                 if (file->f_flags & O_LOV_DELAY_CREATE ||
610                     !(file->f_mode & FMODE_WRITE)) {
611                         CDEBUG(D_INODE, "object creation was delayed\n");
612                         GOTO(out, rc);
613                 }
614         }
615         file->f_flags &= ~O_LOV_DELAY_CREATE;
616         GOTO(out, rc);
617 out:
618         ptlrpc_req_finished(req);
619         if (req)
620                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
621 out_och_free:
622         if (rc) {
623                 if (*och_p) {
624                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
625                         *och_p = NULL; /* OBD_FREE writes some magic there */
626                         (*och_usecount)--;
627                 }
628                 up(&lli->lli_och_sem);
629         }
630
631         return rc;
632 }
633
634 /* Fills the obdo with the attributes for the inode defined by lsm */
635 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
636 {
637         struct ptlrpc_request_set *set;
638         struct ll_inode_info *lli = ll_i2info(inode);
639         struct lov_stripe_md *lsm = lli->lli_smd;
640
641         struct obd_info oinfo = { { { 0 } } };
642         int rc;
643         ENTRY;
644
645         LASSERT(lsm != NULL);
646
647         oinfo.oi_md = lsm;
648         oinfo.oi_oa = obdo;
649         oinfo.oi_oa->o_id = lsm->lsm_object_id;
650         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
651         oinfo.oi_oa->o_mode = S_IFREG;
652         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
653                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
654                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
655                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
656                                OBD_MD_FLGROUP;
657         oinfo.oi_capa = ll_mdscapa_get(inode);
658
659         set = ptlrpc_prep_set();
660         if (set == NULL) {
661                 CERROR("can't allocate ptlrpc set\n");
662                 rc = -ENOMEM;
663         } else {
664                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
665                 if (rc == 0)
666                         rc = ptlrpc_set_wait(set);
667                 ptlrpc_set_destroy(set);
668         }
669         capa_put(oinfo.oi_capa);
670         if (rc)
671                 RETURN(rc);
672
673         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
674                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
675                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
676
677         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
678         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
679                lli->lli_smd->lsm_object_id, i_size_read(inode),
680                inode->i_blocks, inode->i_blksize);
681         RETURN(0);
682 }
683
684 static inline void ll_remove_suid(struct inode *inode)
685 {
686         unsigned int mode;
687
688         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
689         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
690
691         /* was any of the uid bits set? */
692         mode &= inode->i_mode;
693         if (mode && !capable(CAP_FSETID)) {
694                 inode->i_mode &= ~mode;
695                 // XXX careful here - we cannot change the size
696         }
697 }
698
699 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
700 {
701         struct ll_inode_info *lli = ll_i2info(inode);
702         struct lov_stripe_md *lsm = lli->lli_smd;
703         struct obd_export *exp = ll_i2dtexp(inode);
704         struct {
705                 char name[16];
706                 struct ldlm_lock *lock;
707                 struct lov_stripe_md *lsm;
708         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
709         __u32 stripe, vallen = sizeof(stripe);
710         int rc;
711         ENTRY;
712
713         if (lsm->lsm_stripe_count == 1)
714                 GOTO(check, stripe = 0);
715
716         /* get our offset in the lov */
717         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
718         if (rc != 0) {
719                 CERROR("obd_get_info: rc = %d\n", rc);
720                 RETURN(rc);
721         }
722         LASSERT(stripe < lsm->lsm_stripe_count);
723
724 check:
725         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
726             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
727                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
728                            lsm->lsm_oinfo[stripe]->loi_id,
729                            lsm->lsm_oinfo[stripe]->loi_gr);
730                 RETURN(-ELDLM_NO_LOCK_DATA);
731         }
732
733         RETURN(stripe);
734 }
735
736 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
737  * we get a lock cancellation for each stripe, so we have to map the obd's
738  * region back onto the stripes in the file that it held.
739  *
740  * No one can dirty the extent until we've finished our work and they can
741  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
742  * but other kernel actors could have pages locked.
743  *
744  * Called with the DLM lock held. */
745 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
746                               struct ldlm_lock *lock, __u32 stripe)
747 {
748         ldlm_policy_data_t tmpex;
749         unsigned long start, end, count, skip, i, j;
750         struct page *page;
751         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
752         struct lustre_handle lockh;
753         struct address_space *mapping = inode->i_mapping;
754
755         ENTRY;
756         tmpex = lock->l_policy_data;
757         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
758                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
759                i_size_read(inode));
760
761         /* our locks are page granular thanks to osc_enqueue, we invalidate the
762          * whole page. */
763         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
764             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
765                 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
766                            CFS_PAGE_SIZE);
767         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
768         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
769
770         count = ~0;
771         skip = 0;
772         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
773         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
774         if (lsm->lsm_stripe_count > 1) {
775                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
776                 skip = (lsm->lsm_stripe_count - 1) * count;
777                 start += start/count * skip + stripe * count;
778                 if (end != ~0)
779                         end += end/count * skip + stripe * count;
780         }
781         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
782                 end = ~0;
783
784         i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
785             CFS_PAGE_SHIFT : 0;
786         if (i < end)
787                 end = i;
788
789         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
790                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
791                count, skip, end, discard ? " (DISCARDING)" : "");
792
793         /* walk through the vmas on the inode and tear down mmaped pages that
794          * intersect with the lock.  this stops immediately if there are no
795          * mmap()ed regions of the file.  This is not efficient at all and
796          * should be short lived. We'll associate mmap()ed pages with the lock
797          * and will be able to find them directly */
798         for (i = start; i <= end; i += (j + skip)) {
799                 j = min(count - (i % count), end - i + 1);
800                 LASSERT(j > 0);
801                 LASSERT(mapping);
802                 if (ll_teardown_mmaps(mapping,
803                                       (__u64)i << CFS_PAGE_SHIFT,
804                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
805                         break;
806         }
807
808         /* this is the simplistic implementation of page eviction at
809          * cancelation.  It is careful to get races with other page
810          * lockers handled correctly.  fixes from bug 20 will make it
811          * more efficient by associating locks with pages and with
812          * batching writeback under the lock explicitly. */
813         for (i = start, j = start % count; i <= end;
814              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
815                 if (j == count) {
816                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
817                         i += skip;
818                         j = 0;
819                         if (i > end)
820                                 break;
821                 }
822                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
823                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
824                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
825                          start, i, end);
826
827                 if (!mapping_has_pages(mapping)) {
828                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
829                         break;
830                 }
831
832                 cond_resched();
833
834                 page = find_get_page(mapping, i);
835                 if (page == NULL)
836                         continue;
837                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
838                                i, tmpex.l_extent.start);
839                 lock_page(page);
840
841                 /* page->mapping to check with racing against teardown */
842                 if (!discard && clear_page_dirty_for_io(page)) {
843                         rc = ll_call_writepage(inode, page);
844                         /* either waiting for io to complete or reacquiring
845                          * the lock that the failed writepage released */
846                         lock_page(page);
847                         wait_on_page_writeback(page);
848                         if (rc != 0) {
849                                 CERROR("writepage inode %lu(%p) of page %p "
850                                        "failed: %d\n", inode->i_ino, inode,
851                                        page, rc);
852                                 if (rc == -ENOSPC)
853                                         set_bit(AS_ENOSPC, &mapping->flags);
854                                 else
855                                         set_bit(AS_EIO, &mapping->flags);
856                         }
857                 }
858
859                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
860                 /* check to see if another DLM lock covers this page b=2765 */
861                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
862                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
863                                       LDLM_FL_TEST_LOCK,
864                                       &lock->l_resource->lr_name, LDLM_EXTENT,
865                                       &tmpex, LCK_PR | LCK_PW, &lockh);
866
867                 if (rc2 <= 0 && page->mapping != NULL) {
868                         struct ll_async_page *llap = llap_cast_private(page);
869                         /* checking again to account for writeback's
870                          * lock_page() */
871                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
872                         if (llap)
873                                 ll_ra_accounting(llap, mapping);
874                         ll_truncate_complete_page(page);
875                 }
876                 unlock_page(page);
877                 page_cache_release(page);
878         }
879         LASSERTF(tmpex.l_extent.start <=
880                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
881                   lock->l_policy_data.l_extent.end + 1),
882                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
883                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
884                  start, i, end);
885         EXIT;
886 }
887
888 static int ll_extent_lock_callback(struct ldlm_lock *lock,
889                                    struct ldlm_lock_desc *new, void *data,
890                                    int flag)
891 {
892         struct lustre_handle lockh = { 0 };
893         int rc;
894         ENTRY;
895
896         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
897                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
898                 LBUG();
899         }
900
901         switch (flag) {
902         case LDLM_CB_BLOCKING:
903                 ldlm_lock2handle(lock, &lockh);
904                 rc = ldlm_cli_cancel(&lockh);
905                 if (rc != ELDLM_OK)
906                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
907                 break;
908         case LDLM_CB_CANCELING: {
909                 struct inode *inode;
910                 struct ll_inode_info *lli;
911                 struct lov_stripe_md *lsm;
912                 int stripe;
913                 __u64 kms;
914
915                 /* This lock wasn't granted, don't try to evict pages */
916                 if (lock->l_req_mode != lock->l_granted_mode)
917                         RETURN(0);
918
919                 inode = ll_inode_from_lock(lock);
920                 if (inode == NULL)
921                         RETURN(0);
922                 lli = ll_i2info(inode);
923                 if (lli == NULL)
924                         goto iput;
925                 if (lli->lli_smd == NULL)
926                         goto iput;
927                 lsm = lli->lli_smd;
928
929                 stripe = ll_lock_to_stripe_offset(inode, lock);
930                 if (stripe < 0)
931                         goto iput;
932
933                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
934
935                 lov_stripe_lock(lsm);
936                 lock_res_and_lock(lock);
937                 kms = ldlm_extent_shift_kms(lock,
938                                             lsm->lsm_oinfo[stripe]->loi_kms);
939
940                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
941                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
942                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
943                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
944                 unlock_res_and_lock(lock);
945                 lov_stripe_unlock(lsm);
946         iput:
947                 iput(inode);
948                 break;
949         }
950         default:
951                 LBUG();
952         }
953
954         RETURN(0);
955 }
956
957 #if 0
958 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
959 {
960         /* XXX ALLOCATE - 160 bytes */
961         struct inode *inode = ll_inode_from_lock(lock);
962         struct ll_inode_info *lli = ll_i2info(inode);
963         struct lustre_handle lockh = { 0 };
964         struct ost_lvb *lvb;
965         int stripe;
966         ENTRY;
967
968         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
969                      LDLM_FL_BLOCK_CONV)) {
970                 LBUG(); /* not expecting any blocked async locks yet */
971                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
972                            "lock, returning");
973                 ldlm_lock_dump(D_OTHER, lock, 0);
974                 ldlm_reprocess_all(lock->l_resource);
975                 RETURN(0);
976         }
977
978         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
979
980         stripe = ll_lock_to_stripe_offset(inode, lock);
981         if (stripe < 0)
982                 goto iput;
983
984         if (lock->l_lvb_len) {
985                 struct lov_stripe_md *lsm = lli->lli_smd;
986                 __u64 kms;
987                 lvb = lock->l_lvb_data;
988                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
989
990                 lock_res_and_lock(lock);
991                 ll_inode_size_lock(inode, 1);
992                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
993                 kms = ldlm_extent_shift_kms(NULL, kms);
994                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
995                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
996                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
997                 lsm->lsm_oinfo[stripe].loi_kms = kms;
998                 ll_inode_size_unlock(inode, 1);
999                 unlock_res_and_lock(lock);
1000         }
1001
1002 iput:
1003         iput(inode);
1004         wake_up(&lock->l_waitq);
1005
1006         ldlm_lock2handle(lock, &lockh);
1007         ldlm_lock_decref(&lockh, LCK_PR);
1008         RETURN(0);
1009 }
1010 #endif
1011
1012 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
1013 {
1014         struct ptlrpc_request *req = reqp;
1015         struct inode *inode = ll_inode_from_lock(lock);
1016         struct ll_inode_info *lli;
1017         struct lov_stripe_md *lsm;
1018         struct ost_lvb *lvb;
1019         int rc, stripe;
1020         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
1021         ENTRY;
1022
1023         if (inode == NULL)
1024                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
1025         lli = ll_i2info(inode);
1026         if (lli == NULL)
1027                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1028         lsm = lli->lli_smd;
1029         if (lsm == NULL)
1030                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1031
1032         /* First, find out which stripe index this lock corresponds to. */
1033         stripe = ll_lock_to_stripe_offset(inode, lock);
1034         if (stripe < 0)
1035                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1036
1037         rc = lustre_pack_reply(req, 2, size, NULL);
1038         if (rc)
1039                 GOTO(iput, rc);
1040
1041         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
1042         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1043         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1044         lvb->lvb_atime = LTIME_S(inode->i_atime);
1045         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1046
1047         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1048                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1049                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1050                    lvb->lvb_atime, lvb->lvb_ctime);
1051  iput:
1052         iput(inode);
1053
1054  out:
1055         /* These errors are normal races, so we don't want to fill the console
1056          * with messages by calling ptlrpc_error() */
1057         if (rc == -ELDLM_NO_LOCK_DATA)
1058                 lustre_pack_reply(req, 1, NULL, NULL);
1059
1060         req->rq_status = rc;
1061         return rc;
1062 }
1063
1064 static int ll_merge_lvb(struct inode *inode)
1065 {
1066         struct ll_inode_info *lli = ll_i2info(inode);
1067         struct ll_sb_info *sbi = ll_i2sbi(inode);
1068         struct ost_lvb lvb;
1069         int rc;
1070
1071         ENTRY;
1072
1073         ll_inode_size_lock(inode, 1);
1074         inode_init_lvb(inode, &lvb);
1075         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1076         i_size_write(inode, lvb.lvb_size);
1077         inode->i_blocks = lvb.lvb_blocks;
1078
1079         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1080         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1081         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1082         ll_inode_size_unlock(inode, 1);
1083
1084         RETURN(rc);
1085 }
1086
1087 int ll_local_size(struct inode *inode)
1088 {
1089         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1090         struct ll_inode_info *lli = ll_i2info(inode);
1091         struct ll_sb_info *sbi = ll_i2sbi(inode);
1092         struct lustre_handle lockh = { 0 };
1093         int flags = 0;
1094         int rc;
1095         ENTRY;
1096
1097         if (lli->lli_smd->lsm_stripe_count == 0)
1098                 RETURN(0);
1099
1100         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1101                        &policy, LCK_PR, &flags, inode, &lockh);
1102         if (rc < 0)
1103                 RETURN(rc);
1104         else if (rc == 0)
1105                 RETURN(-ENODATA);
1106
1107         rc = ll_merge_lvb(inode);
1108         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1109         RETURN(rc);
1110 }
1111
1112 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1113                      lstat_t *st)
1114 {
1115         struct lustre_handle lockh = { 0 };
1116         struct ldlm_enqueue_info einfo = { 0 };
1117         struct obd_info oinfo = { { { 0 } } };
1118         struct ost_lvb lvb;
1119         int rc;
1120
1121         ENTRY;
1122
1123         einfo.ei_type = LDLM_EXTENT;
1124         einfo.ei_mode = LCK_PR;
1125         einfo.ei_cb_bl = ll_extent_lock_callback;
1126         einfo.ei_cb_cp = ldlm_completion_ast;
1127         einfo.ei_cb_gl = ll_glimpse_callback;
1128         einfo.ei_cbdata = NULL;
1129
1130         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1131         oinfo.oi_lockh = &lockh;
1132         oinfo.oi_md = lsm;
1133         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1134
1135         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1136         if (rc == -ENOENT)
1137                 RETURN(rc);
1138         if (rc != 0) {
1139                 CERROR("obd_enqueue returned rc %d, "
1140                        "returning -EIO\n", rc);
1141                 RETURN(rc > 0 ? -EIO : rc);
1142         }
1143
1144         lov_stripe_lock(lsm);
1145         memset(&lvb, 0, sizeof(lvb));
1146         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1147         st->st_size = lvb.lvb_size;
1148         st->st_blocks = lvb.lvb_blocks;
1149         st->st_mtime = lvb.lvb_mtime;
1150         st->st_atime = lvb.lvb_atime;
1151         st->st_ctime = lvb.lvb_ctime;
1152         lov_stripe_unlock(lsm);
1153
1154         RETURN(rc);
1155 }
1156
1157 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1158  * file (because it prefers KMS over RSS when larger) */
1159 int ll_glimpse_size(struct inode *inode, int ast_flags)
1160 {
1161         struct ll_inode_info *lli = ll_i2info(inode);
1162         struct ll_sb_info *sbi = ll_i2sbi(inode);
1163         struct lustre_handle lockh = { 0 };
1164         struct ldlm_enqueue_info einfo = { 0 };
1165         struct obd_info oinfo = { { { 0 } } };
1166         int rc;
1167         ENTRY;
1168
1169         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1170                 RETURN(0);
1171
1172         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1173
1174         if (!lli->lli_smd) {
1175                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1176                 RETURN(0);
1177         }
1178
1179         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1180          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1181          *       won't revoke any conflicting DLM locks held. Instead,
1182          *       ll_glimpse_callback() will be called on each client
1183          *       holding a DLM lock against this file, and resulting size
1184          *       will be returned for each stripe. DLM lock on [0, EOF] is
1185          *       acquired only if there were no conflicting locks. */
1186         einfo.ei_type = LDLM_EXTENT;
1187         einfo.ei_mode = LCK_PR;
1188         einfo.ei_cb_bl = ll_extent_lock_callback;
1189         einfo.ei_cb_cp = ldlm_completion_ast;
1190         einfo.ei_cb_gl = ll_glimpse_callback;
1191         einfo.ei_cbdata = inode;
1192
1193         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1194         oinfo.oi_lockh = &lockh;
1195         oinfo.oi_md = lli->lli_smd;
1196         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1197
1198         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1199         if (rc == -ENOENT)
1200                 RETURN(rc);
1201         if (rc != 0) {
1202                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1203                 RETURN(rc > 0 ? -EIO : rc);
1204         }
1205
1206         rc = ll_merge_lvb(inode);
1207
1208         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1209                i_size_read(inode), inode->i_blocks);
1210
1211         RETURN(rc);
1212 }
1213
1214 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1215                    struct lov_stripe_md *lsm, int mode,
1216                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1217                    int ast_flags)
1218 {
1219         struct ll_sb_info *sbi = ll_i2sbi(inode);
1220         struct ost_lvb lvb;
1221         struct ldlm_enqueue_info einfo = { 0 };
1222         struct obd_info oinfo = { { { 0 } } };
1223         int rc;
1224         ENTRY;
1225
1226         LASSERT(!lustre_handle_is_used(lockh));
1227         LASSERT(lsm != NULL);
1228
1229         /* don't drop the mmapped file to LRU */
1230         if (mapping_mapped(inode->i_mapping))
1231                 ast_flags |= LDLM_FL_NO_LRU;
1232
1233         /* XXX phil: can we do this?  won't it screw the file size up? */
1234         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1235             (sbi->ll_flags & LL_SBI_NOLCK))
1236                 RETURN(0);
1237
1238         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1239                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1240
1241         einfo.ei_type = LDLM_EXTENT;
1242         einfo.ei_mode = mode;
1243         einfo.ei_cb_bl = ll_extent_lock_callback;
1244         einfo.ei_cb_cp = ldlm_completion_ast;
1245         einfo.ei_cb_gl = ll_glimpse_callback;
1246         einfo.ei_cbdata = inode;
1247
1248         oinfo.oi_policy = *policy;
1249         oinfo.oi_lockh = lockh;
1250         oinfo.oi_md = lsm;
1251         oinfo.oi_flags = ast_flags;
1252
1253         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1254         *policy = oinfo.oi_policy;
1255         if (rc > 0)
1256                 rc = -EIO;
1257
1258         ll_inode_size_lock(inode, 1);
1259         inode_init_lvb(inode, &lvb);
1260         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1261
1262         if (policy->l_extent.start == 0 &&
1263             policy->l_extent.end == OBD_OBJECT_EOF) {
1264                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1265                  * the kms under both a DLM lock and the
1266                  * ll_inode_size_lock().  If we don't get the
1267                  * ll_inode_size_lock() here we can match the DLM lock and
1268                  * reset i_size from the kms before the truncating path has
1269                  * updated the kms.  generic_file_write can then trust the
1270                  * stale i_size when doing appending writes and effectively
1271                  * cancel the result of the truncate.  Getting the
1272                  * ll_inode_size_lock() after the enqueue maintains the DLM
1273                  * -> ll_inode_size_lock() acquiring order. */
1274                 i_size_write(inode, lvb.lvb_size);
1275                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1276                        inode->i_ino, i_size_read(inode));
1277         }
1278
1279         if (rc == 0) {
1280                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1281                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1282                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1283         }
1284         ll_inode_size_unlock(inode, 1);
1285
1286         RETURN(rc);
1287 }
1288
1289 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1290                      struct lov_stripe_md *lsm, int mode,
1291                      struct lustre_handle *lockh)
1292 {
1293         struct ll_sb_info *sbi = ll_i2sbi(inode);
1294         int rc;
1295         ENTRY;
1296
1297         /* XXX phil: can we do this?  won't it screw the file size up? */
1298         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1299             (sbi->ll_flags & LL_SBI_NOLCK))
1300                 RETURN(0);
1301
1302         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1303
1304         RETURN(rc);
1305 }
1306
1307 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1308                             loff_t *ppos)
1309 {
1310         struct inode *inode = file->f_dentry->d_inode;
1311         struct ll_inode_info *lli = ll_i2info(inode);
1312         struct lov_stripe_md *lsm = lli->lli_smd;
1313         struct ll_sb_info *sbi = ll_i2sbi(inode);
1314         struct ll_lock_tree tree;
1315         struct ll_lock_tree_node *node;
1316         struct ost_lvb lvb;
1317         struct ll_ra_read bead;
1318         int rc, ra = 0;
1319         loff_t end;
1320         ssize_t retval, chunk, sum = 0;
1321
1322         __u64 kms;
1323         ENTRY;
1324         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1325                inode->i_ino, inode->i_generation, inode, count, *ppos);
1326         /* "If nbyte is 0, read() will return 0 and have no other results."
1327          *                      -- Single Unix Spec */
1328         if (count == 0)
1329                 RETURN(0);
1330
1331         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1332
1333         if (!lsm) {
1334                 /* Read on file with no objects should return zero-filled
1335                  * buffers up to file size (we can get non-zero sizes with
1336                  * mknod + truncate, then opening file for read. This is a
1337                  * common pattern in NFS case, it seems). Bug 6243 */
1338                 int notzeroed;
1339                 /* Since there are no objects on OSTs, we have nothing to get
1340                  * lock on and so we are forced to access inode->i_size
1341                  * unguarded */
1342
1343                 /* Read beyond end of file */
1344                 if (*ppos >= i_size_read(inode))
1345                         RETURN(0);
1346
1347                 if (count > i_size_read(inode) - *ppos)
1348                         count = i_size_read(inode) - *ppos;
1349                 /* Make sure to correctly adjust the file pos pointer for
1350                  * EFAULT case */
1351                 notzeroed = clear_user(buf, count);
1352                 count -= notzeroed;
1353                 *ppos += count;
1354                 if (!count)
1355                         RETURN(-EFAULT);
1356                 RETURN(count);
1357         }
1358
1359 repeat:
1360         if (sbi->ll_max_rw_chunk != 0) {
1361                 /* first, let's know the end of the current stripe */
1362                 end = *ppos;
1363                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
1364                                 (obd_off *)&end);
1365
1366                 /* correct, the end is beyond the request */
1367                 if (end > *ppos + count - 1)
1368                         end = *ppos + count - 1;
1369
1370                 /* and chunk shouldn't be too large even if striping is wide */
1371                 if (end - *ppos > sbi->ll_max_rw_chunk)
1372                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1373         } else {
1374                 end = *ppos + count - 1;
1375         }
1376
1377         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1378         if (IS_ERR(node)){
1379                 GOTO(out, retval = PTR_ERR(node));
1380         }
1381
1382         tree.lt_fd = LUSTRE_FPRIVATE(file);
1383         rc = ll_tree_lock(&tree, node, buf, count,
1384                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1385         if (rc != 0)
1386                 GOTO(out, retval = rc);
1387
1388         ll_inode_size_lock(inode, 1);
1389         /*
1390          * Consistency guarantees: following possibilities exist for the
1391          * relation between region being read and real file size at this
1392          * moment:
1393          *
1394          *  (A): the region is completely inside of the file;
1395          *
1396          *  (B-x): x bytes of region are inside of the file, the rest is
1397          *  outside;
1398          *
1399          *  (C): the region is completely outside of the file.
1400          *
1401          * This classification is stable under DLM lock acquired by
1402          * ll_tree_lock() above, because to change class, other client has to
1403          * take DLM lock conflicting with our lock. Also, any updates to
1404          * ->i_size by other threads on this client are serialized by
1405          * ll_inode_size_lock(). This guarantees that short reads are handled
1406          * correctly in the face of concurrent writes and truncates.
1407          */
1408         inode_init_lvb(inode, &lvb);
1409         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1410         kms = lvb.lvb_size;
1411         if (*ppos + count - 1 > kms) {
1412                 /* A glimpse is necessary to determine whether we return a
1413                  * short read (B) or some zeroes at the end of the buffer (C) */
1414                 ll_inode_size_unlock(inode, 1);
1415                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1416                 if (retval) {
1417                         ll_tree_unlock(&tree);
1418                         goto out;
1419                 }
1420         } else {
1421                 /* region is within kms and, hence, within real file size (A).
1422                  * We need to increase i_size to cover the read region so that
1423                  * generic_file_read() will do its job, but that doesn't mean
1424                  * the kms size is _correct_, it is only the _minimum_ size.
1425                  * If someone does a stat they will get the correct size which
1426                  * will always be >= the kms value here.  b=11081 */
1427                 if (i_size_read(inode) < kms)
1428                         i_size_write(inode, kms);
1429                 ll_inode_size_unlock(inode, 1);
1430         }
1431
1432         chunk = end - *ppos + 1;
1433         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1434                inode->i_ino, chunk, *ppos, i_size_read(inode));
1435
1436         /* turn off the kernel's read-ahead */
1437         file->f_ra.ra_pages = 0;
1438
1439         /* initialize read-ahead window once per syscall */
1440         if (ra == 0) {
1441                 ra = 1;
1442                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1443                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1444                 ll_ra_read_in(file, &bead);
1445         }
1446
1447         /* BUG: 5972 */
1448         file_accessed(file);
1449         retval = generic_file_read(file, buf, chunk, ppos);
1450         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1451
1452         ll_tree_unlock(&tree);
1453
1454         if (retval > 0) {
1455                 buf += retval;
1456                 count -= retval;
1457                 sum += retval;
1458                 if (retval == chunk && count > 0)
1459                         goto repeat;
1460         }
1461
1462  out:
1463         if (ra != 0)
1464                 ll_ra_read_ex(file, &bead);
1465         retval = (sum > 0) ? sum : retval;
1466         RETURN(retval);
1467 }
1468
1469 /*
1470  * Write to a file (through the page cache).
1471  */
1472 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1473                              loff_t *ppos)
1474 {
1475         struct inode *inode = file->f_dentry->d_inode;
1476         struct ll_sb_info *sbi = ll_i2sbi(inode);
1477         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1478         struct ll_lock_tree tree;
1479         struct ll_lock_tree_node *node;
1480         loff_t maxbytes = ll_file_maxbytes(inode);
1481         loff_t lock_start, lock_end, end;
1482         ssize_t retval, chunk, sum = 0;
1483         int rc;
1484         ENTRY;
1485
1486         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1487                inode->i_ino, inode->i_generation, inode, count, *ppos);
1488
1489         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1490
1491         /* POSIX, but surprised the VFS doesn't check this already */
1492         if (count == 0)
1493                 RETURN(0);
1494
1495         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1496          * called on the file, don't fail the below assertion (bug 2388). */
1497         if (file->f_flags & O_LOV_DELAY_CREATE &&
1498             ll_i2info(inode)->lli_smd == NULL)
1499                 RETURN(-EBADF);
1500
1501         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1502
1503         down(&ll_i2info(inode)->lli_write_sem);
1504
1505 repeat:
1506         chunk = 0; /* just to fix gcc's warning */
1507         end = *ppos + count - 1;
1508
1509         if (file->f_flags & O_APPEND) {
1510                 lock_start = 0;
1511                 lock_end = OBD_OBJECT_EOF;
1512         } else if (sbi->ll_max_rw_chunk != 0) {
1513                 /* first, let's know the end of the current stripe */
1514                 end = *ppos;
1515                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1516                                 (obd_off *)&end);
1517
1518                 /* correct, the end is beyond the request */
1519                 if (end > *ppos + count - 1)
1520                         end = *ppos + count - 1;
1521
1522                 /* and chunk shouldn't be too large even if striping is wide */
1523                 if (end - *ppos > sbi->ll_max_rw_chunk)
1524                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1525                 lock_start = *ppos;
1526                 lock_end = end;
1527         } else {
1528                 lock_start = *ppos;
1529                 lock_end = *ppos + count - 1;
1530         }
1531         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1532
1533         if (IS_ERR(node))
1534                 GOTO(out, retval = PTR_ERR(node));
1535
1536         tree.lt_fd = LUSTRE_FPRIVATE(file);
1537         rc = ll_tree_lock(&tree, node, buf, count,
1538                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1539         if (rc != 0)
1540                 GOTO(out, retval = rc);
1541
1542         /* This is ok, g_f_w will overwrite this under i_sem if it races
1543          * with a local truncate, it just makes our maxbyte checking easier.
1544          * The i_size value gets updated in ll_extent_lock() as a consequence
1545          * of the [0,EOF] extent lock we requested above. */
1546         if (file->f_flags & O_APPEND) {
1547                 *ppos = i_size_read(inode);
1548                 end = *ppos + count - 1;
1549         }
1550
1551         if (*ppos >= maxbytes) {
1552                 send_sig(SIGXFSZ, current, 0);
1553                 GOTO(out_unlock, retval = -EFBIG);
1554         }
1555         if (*ppos + count > maxbytes)
1556                 count = maxbytes - *ppos;
1557
1558         /* generic_file_write handles O_APPEND after getting i_mutex */
1559         chunk = end - *ppos + 1;
1560         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1561                inode->i_ino, chunk, *ppos);
1562         retval = generic_file_write(file, buf, chunk, ppos);
1563         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1564
1565 out_unlock:
1566         ll_tree_unlock(&tree);
1567
1568 out:
1569         if (retval > 0) {
1570                 buf += retval;
1571                 count -= retval;
1572                 sum += retval;
1573                 if (retval == chunk && count > 0)
1574                         goto repeat;
1575         }
1576
1577         up(&ll_i2info(inode)->lli_write_sem);
1578
1579         retval = (sum > 0) ? sum : retval;
1580         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1581                            retval > 0 ? retval : 0);
1582         RETURN(retval);
1583 }
1584
1585 /*
1586  * Send file content (through pagecache) somewhere with helper
1587  */
1588 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1589                                 read_actor_t actor, void *target)
1590 {
1591         struct inode *inode = in_file->f_dentry->d_inode;
1592         struct ll_inode_info *lli = ll_i2info(inode);
1593         struct lov_stripe_md *lsm = lli->lli_smd;
1594         struct ll_lock_tree tree;
1595         struct ll_lock_tree_node *node;
1596         struct ost_lvb lvb;
1597         struct ll_ra_read bead;
1598         int rc;
1599         ssize_t retval;
1600         __u64 kms;
1601         ENTRY;
1602         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1603                inode->i_ino, inode->i_generation, inode, count, *ppos);
1604
1605         /* "If nbyte is 0, read() will return 0 and have no other results."
1606          *                      -- Single Unix Spec */
1607         if (count == 0)
1608                 RETURN(0);
1609
1610         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1611         /* turn off the kernel's read-ahead */
1612         in_file->f_ra.ra_pages = 0;
1613
1614         /* File with no objects, nothing to lock */
1615         if (!lsm)
1616                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1617
1618         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1619         if (IS_ERR(node))
1620                 RETURN(PTR_ERR(node));
1621
1622         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1623         rc = ll_tree_lock(&tree, node, NULL, count,
1624                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1625         if (rc != 0)
1626                 RETURN(rc);
1627
1628         ll_inode_size_lock(inode, 1);
1629         /*
1630          * Consistency guarantees: following possibilities exist for the
1631          * relation between region being read and real file size at this
1632          * moment:
1633          *
1634          *  (A): the region is completely inside of the file;
1635          *
1636          *  (B-x): x bytes of region are inside of the file, the rest is
1637          *  outside;
1638          *
1639          *  (C): the region is completely outside of the file.
1640          *
1641          * This classification is stable under DLM lock acquired by
1642          * ll_tree_lock() above, because to change class, other client has to
1643          * take DLM lock conflicting with our lock. Also, any updates to
1644          * ->i_size by other threads on this client are serialized by
1645          * ll_inode_size_lock(). This guarantees that short reads are handled
1646          * correctly in the face of concurrent writes and truncates.
1647          */
1648         inode_init_lvb(inode, &lvb);
1649         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1650         kms = lvb.lvb_size;
1651         if (*ppos + count - 1 > kms) {
1652                 /* A glimpse is necessary to determine whether we return a
1653                  * short read (B) or some zeroes at the end of the buffer (C) */
1654                 ll_inode_size_unlock(inode, 1);
1655                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1656                 if (retval)
1657                         goto out;
1658         } else {
1659                 /* region is within kms and, hence, within real file size (A) */
1660                 i_size_write(inode, kms);
1661                 ll_inode_size_unlock(inode, 1);
1662         }
1663
1664         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1665                inode->i_ino, count, *ppos, i_size_read(inode));
1666
1667         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1668         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1669         ll_ra_read_in(in_file, &bead);
1670         /* BUG: 5972 */
1671         file_accessed(in_file);
1672         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1673         ll_ra_read_ex(in_file, &bead);
1674
1675  out:
1676         ll_tree_unlock(&tree);
1677         RETURN(retval);
1678 }
1679
1680 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1681                                unsigned long arg)
1682 {
1683         struct ll_inode_info *lli = ll_i2info(inode);
1684         struct obd_export *exp = ll_i2dtexp(inode);
1685         struct ll_recreate_obj ucreatp;
1686         struct obd_trans_info oti = { 0 };
1687         struct obdo *oa = NULL;
1688         int lsm_size;
1689         int rc = 0;
1690         struct lov_stripe_md *lsm, *lsm2;
1691         ENTRY;
1692
1693         if (!capable (CAP_SYS_ADMIN))
1694                 RETURN(-EPERM);
1695
1696         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1697                             sizeof(struct ll_recreate_obj));
1698         if (rc) {
1699                 RETURN(-EFAULT);
1700         }
1701         OBDO_ALLOC(oa);
1702         if (oa == NULL)
1703                 RETURN(-ENOMEM);
1704
1705         down(&lli->lli_size_sem);
1706         lsm = lli->lli_smd;
1707         if (lsm == NULL)
1708                 GOTO(out, rc = -ENOENT);
1709         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1710                    (lsm->lsm_stripe_count));
1711
1712         OBD_ALLOC(lsm2, lsm_size);
1713         if (lsm2 == NULL)
1714                 GOTO(out, rc = -ENOMEM);
1715
1716         oa->o_id = ucreatp.lrc_id;
1717         oa->o_gr = ucreatp.lrc_group;
1718         oa->o_nlink = ucreatp.lrc_ost_idx;
1719         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1720         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1721         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1722                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1723
1724         memcpy(lsm2, lsm, lsm_size);
1725         rc = obd_create(exp, oa, &lsm2, &oti);
1726
1727         OBD_FREE(lsm2, lsm_size);
1728         GOTO(out, rc);
1729 out:
1730         up(&lli->lli_size_sem);
1731         OBDO_FREE(oa);
1732         return rc;
1733 }
1734
1735 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1736                              int flags, struct lov_user_md *lum, int lum_size)
1737 {
1738         struct ll_inode_info *lli = ll_i2info(inode);
1739         struct lov_stripe_md *lsm;
1740         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1741         int rc = 0;
1742         ENTRY;
1743
1744         down(&lli->lli_size_sem);
1745         lsm = lli->lli_smd;
1746         if (lsm) {
1747                 up(&lli->lli_size_sem);
1748                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1749                        inode->i_ino);
1750                 RETURN(-EEXIST);
1751         }
1752
1753         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1754         if (rc)
1755                 GOTO(out, rc);
1756         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1757                 GOTO(out_req_free, rc = -ENOENT);
1758         rc = oit.d.lustre.it_status;
1759         if (rc < 0)
1760                 GOTO(out_req_free, rc);
1761
1762         ll_release_openhandle(file->f_dentry, &oit);
1763
1764  out:
1765         up(&lli->lli_size_sem);
1766         ll_intent_release(&oit);
1767         RETURN(rc);
1768 out_req_free:
1769         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1770         goto out;
1771 }
1772
1773 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1774                              struct lov_mds_md **lmmp, int *lmm_size, 
1775                              struct ptlrpc_request **request)
1776 {
1777         struct ll_sb_info *sbi = ll_i2sbi(inode);
1778         struct mdt_body  *body;
1779         struct lov_mds_md *lmm = NULL;
1780         struct ptlrpc_request *req = NULL;
1781         struct obd_capa *oc;
1782         int rc, lmmsize;
1783
1784         rc = ll_get_max_mdsize(sbi, &lmmsize);
1785         if (rc)
1786                 RETURN(rc);
1787
1788         oc = ll_mdscapa_get(inode);
1789         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1790                              oc, filename, strlen(filename) + 1,
1791                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize, &req);
1792         capa_put(oc);
1793         if (rc < 0) {
1794                 CDEBUG(D_INFO, "md_getattr_name failed "
1795                        "on %s: rc %d\n", filename, rc);
1796                 GOTO(out, rc);
1797         }
1798
1799         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
1800         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1801         /* swabbed by mdc_getattr_name */
1802         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF));
1803
1804         lmmsize = body->eadatasize;
1805
1806         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1807                         lmmsize == 0) {
1808                 GOTO(out, rc = -ENODATA);
1809         }
1810
1811         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, lmmsize);
1812         LASSERT(lmm != NULL);
1813         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF + 1));
1814
1815         /*
1816          * This is coming from the MDS, so is probably in
1817          * little endian.  We convert it to host endian before
1818          * passing it to userspace.
1819          */
1820         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1821                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1822                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1823         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1824                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1825         }
1826
1827         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1828                 struct lov_stripe_md *lsm;
1829                 struct lov_user_md_join *lmj;
1830                 int lmj_size, i, aindex = 0;
1831
1832                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1833                 if (rc < 0)
1834                         GOTO(out, rc = -ENOMEM);
1835                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1836                 if (rc)
1837                         GOTO(out_free_memmd, rc);
1838
1839                 lmj_size = sizeof(struct lov_user_md_join) +
1840                            lsm->lsm_stripe_count *
1841                            sizeof(struct lov_user_ost_data_join);
1842                 OBD_ALLOC(lmj, lmj_size);
1843                 if (!lmj)
1844                         GOTO(out_free_memmd, rc = -ENOMEM);
1845
1846                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1847                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1848                         struct lov_extent *lex =
1849                                 &lsm->lsm_array->lai_ext_array[aindex];
1850
1851                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1852                                 aindex ++;
1853                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1854                                         LPU64" len %d\n", aindex, i,
1855                                         lex->le_start, (int)lex->le_len);
1856                         lmj->lmm_objects[i].l_extent_start =
1857                                 lex->le_start;
1858
1859                         if ((int)lex->le_len == -1)
1860                                 lmj->lmm_objects[i].l_extent_end = -1;
1861                         else
1862                                 lmj->lmm_objects[i].l_extent_end =
1863                                         lex->le_start + lex->le_len;
1864                         lmj->lmm_objects[i].l_object_id =
1865                                 lsm->lsm_oinfo[i]->loi_id;
1866                         lmj->lmm_objects[i].l_object_gr =
1867                                 lsm->lsm_oinfo[i]->loi_gr;
1868                         lmj->lmm_objects[i].l_ost_gen =
1869                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1870                         lmj->lmm_objects[i].l_ost_idx =
1871                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1872                 }
1873                 lmm = (struct lov_mds_md *)lmj;
1874                 lmmsize = lmj_size;
1875 out_free_memmd:
1876                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
1877         }
1878 out:
1879         *lmmp = lmm;
1880         *lmm_size = lmmsize;
1881         *request = req;
1882         return rc;
1883 }
1884
1885 static int ll_lov_setea(struct inode *inode, struct file *file,
1886                             unsigned long arg)
1887 {
1888         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1889         struct lov_user_md  *lump;
1890         int lum_size = sizeof(struct lov_user_md) +
1891                        sizeof(struct lov_user_ost_data);
1892         int rc;
1893         ENTRY;
1894
1895         if (!capable (CAP_SYS_ADMIN))
1896                 RETURN(-EPERM);
1897
1898         OBD_ALLOC(lump, lum_size);
1899         if (lump == NULL) {
1900                 RETURN(-ENOMEM);
1901         }
1902         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1903         if (rc) {
1904                 OBD_FREE(lump, lum_size);
1905                 RETURN(-EFAULT);
1906         }
1907
1908         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1909
1910         OBD_FREE(lump, lum_size);
1911         RETURN(rc);
1912 }
1913
1914 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1915                             unsigned long arg)
1916 {
1917         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1918         int rc;
1919         int flags = FMODE_WRITE;
1920         ENTRY;
1921
1922         /* Bug 1152: copy properly when this is no longer true */
1923         LASSERT(sizeof(lum) == sizeof(*lump));
1924         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1925         rc = copy_from_user(&lum, lump, sizeof(lum));
1926         if (rc)
1927                 RETURN(-EFAULT);
1928
1929         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1930         if (rc == 0) {
1931                  put_user(0, &lump->lmm_stripe_count);
1932                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1933                                     0, ll_i2info(inode)->lli_smd, lump);
1934         }
1935         RETURN(rc);
1936 }
1937
1938 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1939 {
1940         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1941
1942         if (!lsm)
1943                 RETURN(-ENODATA);
1944
1945         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1946                             (void *)arg);
1947 }
1948
1949 static int ll_get_grouplock(struct inode *inode, struct file *file,
1950                             unsigned long arg)
1951 {
1952         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1953         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1954                                                     .end = OBD_OBJECT_EOF}};
1955         struct lustre_handle lockh = { 0 };
1956         struct ll_inode_info *lli = ll_i2info(inode);
1957         struct lov_stripe_md *lsm = lli->lli_smd;
1958         int flags = 0, rc;
1959         ENTRY;
1960
1961         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1962                 RETURN(-EINVAL);
1963         }
1964
1965         policy.l_extent.gid = arg;
1966         if (file->f_flags & O_NONBLOCK)
1967                 flags = LDLM_FL_BLOCK_NOWAIT;
1968
1969         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1970         if (rc)
1971                 RETURN(rc);
1972
1973         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1974         fd->fd_gid = arg;
1975         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1976
1977         RETURN(0);
1978 }
1979
1980 static int ll_put_grouplock(struct inode *inode, struct file *file,
1981                             unsigned long arg)
1982 {
1983         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1984         struct ll_inode_info *lli = ll_i2info(inode);
1985         struct lov_stripe_md *lsm = lli->lli_smd;
1986         int rc;
1987         ENTRY;
1988
1989         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1990                 /* Ugh, it's already unlocked. */
1991                 RETURN(-EINVAL);
1992         }
1993
1994         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1995                 RETURN(-EINVAL);
1996
1997         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1998
1999         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2000         if (rc)
2001                 RETURN(rc);
2002
2003         fd->fd_gid = 0;
2004         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2005
2006         RETURN(0);
2007 }
2008
2009 static int join_sanity_check(struct inode *head, struct inode *tail)
2010 {
2011         ENTRY;
2012         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2013                 CERROR("server do not support join \n");
2014                 RETURN(-EINVAL);
2015         }
2016         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2017                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2018                        head->i_ino, tail->i_ino);
2019                 RETURN(-EINVAL);
2020         }
2021         if (head->i_ino == tail->i_ino) {
2022                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2023                 RETURN(-EINVAL);
2024         }
2025         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2026                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2027                 RETURN(-EINVAL);
2028         }
2029         RETURN(0);
2030 }
2031
2032 static int join_file(struct inode *head_inode, struct file *head_filp,
2033                      struct file *tail_filp)
2034 {
2035         struct dentry *tail_dentry = tail_filp->f_dentry;
2036         struct lookup_intent oit = {.it_op = IT_OPEN,
2037                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2038         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2039                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2040
2041         struct lustre_handle lockh;
2042         struct md_op_data *op_data;
2043         int    rc;
2044         loff_t data;
2045         ENTRY;
2046
2047         tail_dentry = tail_filp->f_dentry;
2048
2049         data = i_size_read(head_inode);
2050         op_data = ll_prep_md_op_data(NULL, head_inode,
2051                                      tail_dentry->d_parent->d_inode,
2052                                      tail_dentry->d_name.name,
2053                                      tail_dentry->d_name.len, 0,
2054                                      LUSTRE_OPC_ANY, &data);
2055         if (IS_ERR(op_data))
2056                 RETURN(PTR_ERR(op_data));
2057
2058         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2059                          op_data, &lockh, NULL, 0, 0);
2060
2061         ll_finish_md_op_data(op_data);
2062         if (rc < 0)
2063                 GOTO(out, rc);
2064
2065         rc = oit.d.lustre.it_status;
2066
2067         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2068                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2069                 ptlrpc_req_finished((struct ptlrpc_request *)
2070                                     oit.d.lustre.it_data);
2071                 GOTO(out, rc);
2072         }
2073
2074         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2075                                            * away */
2076                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2077                 oit.d.lustre.it_lock_mode = 0;
2078         }
2079         ll_release_openhandle(head_filp->f_dentry, &oit);
2080 out:
2081         ll_intent_release(&oit);
2082         RETURN(rc);
2083 }
2084
2085 static int ll_file_join(struct inode *head, struct file *filp,
2086                         char *filename_tail)
2087 {
2088         struct inode *tail = NULL, *first = NULL, *second = NULL;
2089         struct dentry *tail_dentry;
2090         struct file *tail_filp, *first_filp, *second_filp;
2091         struct ll_lock_tree first_tree, second_tree;
2092         struct ll_lock_tree_node *first_node, *second_node;
2093         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2094         int rc = 0, cleanup_phase = 0;
2095         ENTRY;
2096
2097         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2098                head->i_ino, head->i_generation, head, filename_tail);
2099
2100         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2101         if (IS_ERR(tail_filp)) {
2102                 CERROR("Can not open tail file %s", filename_tail);
2103                 rc = PTR_ERR(tail_filp);
2104                 GOTO(cleanup, rc);
2105         }
2106         tail = igrab(tail_filp->f_dentry->d_inode);
2107
2108         tlli = ll_i2info(tail);
2109         tail_dentry = tail_filp->f_dentry;
2110         LASSERT(tail_dentry);
2111         cleanup_phase = 1;
2112
2113         /*reorder the inode for lock sequence*/
2114         first = head->i_ino > tail->i_ino ? head : tail;
2115         second = head->i_ino > tail->i_ino ? tail : head;
2116         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2117         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2118
2119         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2120                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2121         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2122         if (IS_ERR(first_node)){
2123                 rc = PTR_ERR(first_node);
2124                 GOTO(cleanup, rc);
2125         }
2126         first_tree.lt_fd = first_filp->private_data;
2127         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2128         if (rc != 0)
2129                 GOTO(cleanup, rc);
2130         cleanup_phase = 2;
2131
2132         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2133         if (IS_ERR(second_node)){
2134                 rc = PTR_ERR(second_node);
2135                 GOTO(cleanup, rc);
2136         }
2137         second_tree.lt_fd = second_filp->private_data;
2138         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2139         if (rc != 0)
2140                 GOTO(cleanup, rc);
2141         cleanup_phase = 3;
2142
2143         rc = join_sanity_check(head, tail);
2144         if (rc)
2145                 GOTO(cleanup, rc);
2146
2147         rc = join_file(head, filp, tail_filp);
2148         if (rc)
2149                 GOTO(cleanup, rc);
2150 cleanup:
2151         switch (cleanup_phase) {
2152         case 3:
2153                 ll_tree_unlock(&second_tree);
2154                 obd_cancel_unused(ll_i2dtexp(second),
2155                                   ll_i2info(second)->lli_smd, 0, NULL);
2156         case 2:
2157                 ll_tree_unlock(&first_tree);
2158                 obd_cancel_unused(ll_i2dtexp(first),
2159                                   ll_i2info(first)->lli_smd, 0, NULL);
2160         case 1:
2161                 filp_close(tail_filp, 0);
2162                 if (tail)
2163                         iput(tail);
2164                 if (head && rc == 0) {
2165                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2166                                        &hlli->lli_smd);
2167                         hlli->lli_smd = NULL;
2168                 }
2169         case 0:
2170                 break;
2171         default:
2172                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2173                 LBUG();
2174         }
2175         RETURN(rc);
2176 }
2177
2178 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2179 {
2180         struct inode *inode = dentry->d_inode;
2181         struct obd_client_handle *och;
2182         int rc;
2183         ENTRY;
2184
2185         LASSERT(inode);
2186
2187         /* Root ? Do nothing. */
2188         if (dentry->d_inode->i_sb->s_root == dentry)
2189                 RETURN(0);
2190
2191         /* No open handle to close? Move away */
2192         if (!it_disposition(it, DISP_OPEN_OPEN))
2193                 RETURN(0);
2194
2195         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2196
2197         OBD_ALLOC(och, sizeof(*och));
2198         if (!och)
2199                 GOTO(out, rc = -ENOMEM);
2200
2201         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2202                     ll_i2info(inode), it, och);
2203
2204         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2205                                        inode, och);
2206  out:
2207         /* this one is in place of ll_file_open */
2208         ptlrpc_req_finished(it->d.lustre.it_data);
2209         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2210         RETURN(rc);
2211 }
2212
2213 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2214                   unsigned long arg)
2215 {
2216         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2217         int flags;
2218         ENTRY;
2219
2220         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2221                inode->i_generation, inode, cmd);
2222         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2223
2224         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2225         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2226                 RETURN(-ENOTTY);
2227
2228         switch(cmd) {
2229         case LL_IOC_GETFLAGS:
2230                 /* Get the current value of the file flags */
2231                 return put_user(fd->fd_flags, (int *)arg);
2232         case LL_IOC_SETFLAGS:
2233         case LL_IOC_CLRFLAGS:
2234                 /* Set or clear specific file flags */
2235                 /* XXX This probably needs checks to ensure the flags are
2236                  *     not abused, and to handle any flag side effects.
2237                  */
2238                 if (get_user(flags, (int *) arg))
2239                         RETURN(-EFAULT);
2240
2241                 if (cmd == LL_IOC_SETFLAGS) {
2242                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2243                             !(file->f_flags & O_DIRECT)) {
2244                                 CERROR("%s: unable to disable locking on "
2245                                        "non-O_DIRECT file\n", current->comm);
2246                                 RETURN(-EINVAL);
2247                         }
2248
2249                         fd->fd_flags |= flags;
2250                 } else {
2251                         fd->fd_flags &= ~flags;
2252                 }
2253                 RETURN(0);
2254         case LL_IOC_LOV_SETSTRIPE:
2255                 RETURN(ll_lov_setstripe(inode, file, arg));
2256         case LL_IOC_LOV_SETEA:
2257                 RETURN(ll_lov_setea(inode, file, arg));
2258         case LL_IOC_LOV_GETSTRIPE:
2259                 RETURN(ll_lov_getstripe(inode, arg));
2260         case LL_IOC_RECREATE_OBJ:
2261                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2262         case EXT3_IOC_GETFLAGS:
2263         case EXT3_IOC_SETFLAGS:
2264                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2265         case EXT3_IOC_GETVERSION_OLD:
2266         case EXT3_IOC_GETVERSION:
2267                 RETURN(put_user(inode->i_generation, (int *)arg));
2268         case LL_IOC_JOIN: {
2269                 char *ftail;
2270                 int rc;
2271
2272                 ftail = getname((const char *)arg);
2273                 if (IS_ERR(ftail))
2274                         RETURN(PTR_ERR(ftail));
2275                 rc = ll_file_join(inode, file, ftail);
2276                 putname(ftail);
2277                 RETURN(rc);
2278         }
2279         case LL_IOC_GROUP_LOCK:
2280                 RETURN(ll_get_grouplock(inode, file, arg));
2281         case LL_IOC_GROUP_UNLOCK:
2282                 RETURN(ll_put_grouplock(inode, file, arg));
2283         case IOC_OBD_STATFS:
2284                 RETURN(ll_obd_statfs(inode, (void *)arg));
2285
2286         /* We need to special case any other ioctls we want to handle,
2287          * to send them to the MDS/OST as appropriate and to properly
2288          * network encode the arg field.
2289         case EXT3_IOC_SETVERSION_OLD:
2290         case EXT3_IOC_SETVERSION:
2291         */
2292         case LL_IOC_FLUSHCTX:
2293                 RETURN(ll_flush_ctx(inode));
2294         case LL_IOC_GETFACL: {
2295                 struct rmtacl_ioctl_data ioc;
2296
2297                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2298                         RETURN(-EFAULT);
2299
2300                 RETURN(ll_ioctl_getfacl(inode, &ioc));
2301         }
2302         case LL_IOC_SETFACL: {
2303                 struct rmtacl_ioctl_data ioc;
2304
2305                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2306                         RETURN(-EFAULT);
2307
2308                 RETURN(ll_ioctl_setfacl(inode, &ioc));
2309         }
2310         default: {
2311                 int err;
2312
2313                 if (LLIOC_STOP == 
2314                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2315                         RETURN(err);
2316
2317                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2318                                      (void *)arg));
2319         }
2320         }
2321 }
2322
2323 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2324 {
2325         struct inode *inode = file->f_dentry->d_inode;
2326         struct ll_inode_info *lli = ll_i2info(inode);
2327         struct lov_stripe_md *lsm = lli->lli_smd;
2328         loff_t retval;
2329         ENTRY;
2330         retval = offset + ((origin == 2) ? i_size_read(inode) :
2331                            (origin == 1) ? file->f_pos : 0);
2332         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2333                inode->i_ino, inode->i_generation, inode, retval, retval,
2334                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2335         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2336
2337         if (origin == 2) { /* SEEK_END */
2338                 int nonblock = 0, rc;
2339
2340                 if (file->f_flags & O_NONBLOCK)
2341                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2342
2343                 if (lsm != NULL) {
2344                         rc = ll_glimpse_size(inode, nonblock);
2345                         if (rc != 0)
2346                                 RETURN(rc);
2347                 }
2348
2349                 ll_inode_size_lock(inode, 0);
2350                 offset += i_size_read(inode);
2351                 ll_inode_size_unlock(inode, 0);
2352         } else if (origin == 1) { /* SEEK_CUR */
2353                 offset += file->f_pos;
2354         }
2355
2356         retval = -EINVAL;
2357         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2358                 if (offset != file->f_pos) {
2359                         file->f_pos = offset;
2360 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2361                         file->f_reada = 0;
2362                         file->f_version = ++event;
2363 #endif
2364                 }
2365                 retval = offset;
2366         }
2367         
2368         RETURN(retval);
2369 }
2370
2371 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2372 {
2373         struct inode *inode = dentry->d_inode;
2374         struct ll_inode_info *lli = ll_i2info(inode);
2375         struct lov_stripe_md *lsm = lli->lli_smd;
2376         struct ptlrpc_request *req;
2377         struct obd_capa *oc;
2378         int rc, err;
2379         ENTRY;
2380         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2381                inode->i_generation, inode);
2382         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2383
2384         /* fsync's caller has already called _fdata{sync,write}, we want
2385          * that IO to finish before calling the osc and mdc sync methods */
2386         rc = filemap_fdatawait(inode->i_mapping);
2387
2388         /* catch async errors that were recorded back when async writeback
2389          * failed for pages in this mapping. */
2390         err = lli->lli_async_rc;
2391         lli->lli_async_rc = 0;
2392         if (rc == 0)
2393                 rc = err;
2394         if (lsm) {
2395                 err = lov_test_and_clear_async_rc(lsm);
2396                 if (rc == 0)
2397                         rc = err;
2398         }
2399
2400         oc = ll_mdscapa_get(inode);
2401         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2402                       &req);
2403         capa_put(oc);
2404         if (!rc)
2405                 rc = err;
2406         if (!err)
2407                 ptlrpc_req_finished(req);
2408
2409         if (data && lsm) {
2410                 struct obdo *oa;
2411                 
2412                 OBDO_ALLOC(oa);
2413                 if (!oa)
2414                         RETURN(rc ? rc : -ENOMEM);
2415
2416                 oa->o_id = lsm->lsm_object_id;
2417                 oa->o_gr = lsm->lsm_object_gr;
2418                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2419                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2420                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2421                                            OBD_MD_FLGROUP);
2422
2423                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2424                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2425                                0, OBD_OBJECT_EOF, oc);
2426                 capa_put(oc);
2427                 if (!rc)
2428                         rc = err;
2429                 OBDO_FREE(oa);
2430         }
2431
2432         RETURN(rc);
2433 }
2434
2435 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2436 {
2437         struct inode *inode = file->f_dentry->d_inode;
2438         struct ll_sb_info *sbi = ll_i2sbi(inode);
2439         struct ldlm_res_id res_id =
2440                 { .name = { fid_seq(ll_inode2fid(inode)),
2441                             fid_oid(ll_inode2fid(inode)),
2442                             fid_ver(ll_inode2fid(inode)),
2443                             LDLM_FLOCK} };
2444         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2445                 ldlm_flock_completion_ast, NULL, file_lock };
2446         struct lustre_handle lockh = {0};
2447         ldlm_policy_data_t flock;
2448         int flags = 0;
2449         int rc;
2450         ENTRY;
2451
2452         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2453                inode->i_ino, file_lock);
2454
2455         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2456  
2457         if (file_lock->fl_flags & FL_FLOCK) {
2458                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2459                 /* set missing params for flock() calls */
2460                 file_lock->fl_end = OFFSET_MAX;
2461                 file_lock->fl_pid = current->tgid;
2462         }
2463         flock.l_flock.pid = file_lock->fl_pid;
2464         flock.l_flock.start = file_lock->fl_start;
2465         flock.l_flock.end = file_lock->fl_end;
2466
2467         switch (file_lock->fl_type) {
2468         case F_RDLCK:
2469                 einfo.ei_mode = LCK_PR;
2470                 break;
2471         case F_UNLCK:
2472                 /* An unlock request may or may not have any relation to
2473                  * existing locks so we may not be able to pass a lock handle
2474                  * via a normal ldlm_lock_cancel() request. The request may even
2475                  * unlock a byte range in the middle of an existing lock. In
2476                  * order to process an unlock request we need all of the same
2477                  * information that is given with a normal read or write record
2478                  * lock request. To avoid creating another ldlm unlock (cancel)
2479                  * message we'll treat a LCK_NL flock request as an unlock. */
2480                 einfo.ei_mode = LCK_NL;
2481                 break;
2482         case F_WRLCK:
2483                 einfo.ei_mode = LCK_PW;
2484                 break;
2485         default:
2486                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2487                 LBUG();
2488         }
2489
2490         switch (cmd) {
2491         case F_SETLKW:
2492 #ifdef F_SETLKW64
2493         case F_SETLKW64:
2494 #endif
2495                 flags = 0;
2496                 break;
2497         case F_SETLK:
2498 #ifdef F_SETLK64
2499         case F_SETLK64:
2500 #endif
2501                 flags = LDLM_FL_BLOCK_NOWAIT;
2502                 break;
2503         case F_GETLK:
2504 #ifdef F_GETLK64
2505         case F_GETLK64:
2506 #endif
2507                 flags = LDLM_FL_TEST_LOCK;
2508                 /* Save the old mode so that if the mode in the lock changes we
2509                  * can decrement the appropriate reader or writer refcount. */
2510                 file_lock->fl_type = einfo.ei_mode;
2511                 break;
2512         default:
2513                 CERROR("unknown fcntl lock command: %d\n", cmd);
2514                 LBUG();
2515         }
2516
2517         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2518                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2519                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2520
2521         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2522                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2523         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2524                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2525 #ifdef HAVE_F_OP_FLOCK
2526         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2527             !(flags & LDLM_FL_TEST_LOCK))
2528                 posix_lock_file_wait(file, file_lock);
2529 #endif
2530
2531         RETURN(rc);
2532 }
2533
2534 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2535 {
2536         ENTRY;
2537
2538         RETURN(-ENOSYS);
2539 }
2540
2541 int ll_have_md_lock(struct inode *inode, __u64 bits)
2542 {
2543         struct lustre_handle lockh;
2544         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2545         struct lu_fid *fid;
2546         int flags;
2547         ENTRY;
2548
2549         if (!inode)
2550                RETURN(0);
2551
2552         fid = &ll_i2info(inode)->lli_fid;
2553         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2554
2555         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2556         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2557                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2558                 RETURN(1);
2559         }
2560         RETURN(0);
2561 }
2562
2563 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2564                             struct lustre_handle *lockh)
2565 {
2566         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2567         struct lu_fid *fid;
2568         ldlm_mode_t rc;
2569         int flags;
2570         ENTRY;
2571
2572         fid = &ll_i2info(inode)->lli_fid;
2573         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2574
2575         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2576         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2577                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2578         RETURN(rc);
2579 }
2580
2581 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2582         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2583                               * and return success */
2584                 inode->i_nlink = 0;
2585                 /* This path cannot be hit for regular files unless in
2586                  * case of obscure races, so no need to to validate
2587                  * size. */
2588                 if (!S_ISREG(inode->i_mode) &&
2589                     !S_ISDIR(inode->i_mode))
2590                         return 0;
2591         }
2592
2593         if (rc) {
2594                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2595                 return -abs(rc);
2596
2597         }
2598
2599         return 0;
2600 }
2601
2602 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2603 {
2604         struct inode *inode = dentry->d_inode;
2605         struct ptlrpc_request *req = NULL;
2606         struct ll_sb_info *sbi;
2607         struct obd_export *exp;
2608         int rc;
2609         ENTRY;
2610
2611         if (!inode) {
2612                 CERROR("REPORT THIS LINE TO PETER\n");
2613                 RETURN(0);
2614         }
2615         sbi = ll_i2sbi(inode);
2616
2617         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2618                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2619
2620         exp = ll_i2mdexp(inode);
2621
2622         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2623                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2624                 struct md_op_data *op_data;
2625
2626                 /* Call getattr by fid, so do not provide name at all. */
2627                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2628                                              dentry->d_inode, NULL, 0, 0,
2629                                              LUSTRE_OPC_ANY, NULL);
2630                 if (IS_ERR(op_data))
2631                         RETURN(PTR_ERR(op_data));
2632
2633                 oit.it_flags |= O_CHECK_STALE;
2634                 rc = md_intent_lock(exp, op_data, NULL, 0,
2635                                     /* we are not interested in name
2636                                        based lookup */
2637                                     &oit, 0, &req,
2638                                     ll_md_blocking_ast, 0);
2639                 ll_finish_md_op_data(op_data);
2640                 oit.it_flags &= ~O_CHECK_STALE;
2641                 if (rc < 0) {
2642                         rc = ll_inode_revalidate_fini(inode, rc);
2643                         GOTO (out, rc);
2644                 }
2645
2646                 rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2647                 if (rc != 0) {
2648                         ll_intent_release(&oit);
2649                         GOTO(out, rc);
2650                 }
2651
2652                 /* Unlinked? Unhash dentry, so it is not picked up later by
2653                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2654                    here to preserve get_cwd functionality on 2.6.
2655                    Bug 10503 */
2656                 if (!dentry->d_inode->i_nlink) {
2657                         spin_lock(&dcache_lock);
2658                         ll_drop_dentry(dentry);
2659                         spin_unlock(&dcache_lock);
2660                 }
2661
2662                 ll_lookup_finish_locks(&oit, dentry);
2663         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE)) {
2664                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2665                 obd_valid valid = OBD_MD_FLGETATTR;
2666                 struct obd_capa *oc;
2667                 int ealen = 0;
2668
2669                 if (S_ISREG(inode->i_mode)) {
2670                         rc = ll_get_max_mdsize(sbi, &ealen);
2671                         if (rc)
2672                                 RETURN(rc);
2673                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2674                 }
2675                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2676                  * capa for this inode. Because we only keep capas of dirs
2677                  * fresh. */
2678                 oc = ll_mdscapa_get(inode);
2679                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2680                                 ealen, &req);
2681                 capa_put(oc);
2682                 if (rc) {
2683                         rc = ll_inode_revalidate_fini(inode, rc);
2684                         RETURN(rc);
2685                 }
2686
2687                 rc = ll_prep_inode(&inode, req, REPLY_REC_OFF,
2688                                    NULL);
2689                 if (rc)
2690                         GOTO(out, rc);
2691         }
2692
2693         /* if object not yet allocated, don't validate size */
2694         if (ll_i2info(inode)->lli_smd == NULL)
2695                 GOTO(out, rc = 0);
2696
2697         /* ll_glimpse_size will prefer locally cached writes if they extend
2698          * the file */
2699         rc = ll_glimpse_size(inode, 0);
2700         EXIT;
2701 out:
2702         ptlrpc_req_finished(req);
2703         return rc;
2704 }
2705
2706 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2707                   struct lookup_intent *it, struct kstat *stat)
2708 {
2709         struct inode *inode = de->d_inode;
2710         int res = 0;
2711
2712         res = ll_inode_revalidate_it(de, it);
2713         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2714
2715         if (res)
2716                 return res;
2717
2718         stat->dev = inode->i_sb->s_dev;
2719         stat->ino = inode->i_ino;
2720         stat->mode = inode->i_mode;
2721         stat->nlink = inode->i_nlink;
2722         stat->uid = inode->i_uid;
2723         stat->gid = inode->i_gid;
2724         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2725         stat->atime = inode->i_atime;
2726         stat->mtime = inode->i_mtime;
2727         stat->ctime = inode->i_ctime;
2728 #ifdef HAVE_INODE_BLKSIZE
2729         stat->blksize = inode->i_blksize;
2730 #else
2731         stat->blksize = 1 << inode->i_blkbits;
2732 #endif
2733
2734         ll_inode_size_lock(inode, 0);
2735         stat->size = i_size_read(inode);
2736         stat->blocks = inode->i_blocks;
2737         ll_inode_size_unlock(inode, 0);
2738
2739         return 0;
2740 }
2741 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2742 {
2743         struct lookup_intent it = { .it_op = IT_GETATTR };
2744
2745         return ll_getattr_it(mnt, de, &it, stat);
2746 }
2747
2748 static
2749 int lustre_check_acl(struct inode *inode, int mask)
2750 {
2751 #ifdef CONFIG_FS_POSIX_ACL
2752         struct ll_inode_info *lli = ll_i2info(inode);
2753         struct posix_acl *acl;
2754         int rc;
2755         ENTRY;
2756
2757         spin_lock(&lli->lli_lock);
2758         acl = posix_acl_dup(lli->lli_posix_acl);
2759         spin_unlock(&lli->lli_lock);
2760
2761         if (!acl)
2762                 RETURN(-EAGAIN);
2763
2764         rc = posix_acl_permission(inode, acl, mask);
2765         posix_acl_release(acl);
2766
2767         RETURN(rc);
2768 #else
2769         return -EAGAIN;
2770 #endif
2771 }
2772
2773 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2774 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2775 {
2776         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2777                inode->i_ino, inode->i_generation, inode, mask);
2778         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2779                 return lustre_check_remote_perm(inode, mask);
2780         
2781         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2782         return generic_permission(inode, mask, lustre_check_acl);
2783 }
2784 #else
2785 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2786 {
2787         int mode = inode->i_mode;
2788         int rc;
2789
2790         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2791                inode->i_ino, inode->i_generation, inode, mask);
2792
2793         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2794                 return lustre_check_remote_perm(inode, mask);
2795
2796         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2797
2798         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2799             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2800                 return -EROFS;
2801         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2802                 return -EACCES;
2803         if (current->fsuid == inode->i_uid) {
2804                 mode >>= 6;
2805         } else if (1) {
2806                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2807                         goto check_groups;
2808                 rc = lustre_check_acl(inode, mask);
2809                 if (rc == -EAGAIN)
2810                         goto check_groups;
2811                 if (rc == -EACCES)
2812                         goto check_capabilities;
2813                 return rc;
2814         } else {
2815 check_groups:
2816                 if (in_group_p(inode->i_gid))
2817                         mode >>= 3;
2818         }
2819         if ((mode & mask & S_IRWXO) == mask)
2820                 return 0;
2821
2822 check_capabilities:
2823         if (!(mask & MAY_EXEC) ||
2824             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2825                 if (capable(CAP_DAC_OVERRIDE))
2826                         return 0;
2827
2828         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2829             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2830                 return 0;
2831         
2832         return -EACCES;
2833 }
2834 #endif
2835
2836 /* -o localflock - only provides locally consistent flock locks */
2837 struct file_operations ll_file_operations = {
2838         .read           = ll_file_read,
2839         .write          = ll_file_write,
2840         .ioctl          = ll_file_ioctl,
2841         .open           = ll_file_open,
2842         .release        = ll_file_release,
2843         .mmap           = ll_file_mmap,
2844         .llseek         = ll_file_seek,
2845         .sendfile       = ll_file_sendfile,
2846         .fsync          = ll_fsync,
2847 };
2848
2849 struct file_operations ll_file_operations_flock = {
2850         .read           = ll_file_read,
2851         .write          = ll_file_write,
2852         .ioctl          = ll_file_ioctl,
2853         .open           = ll_file_open,
2854         .release        = ll_file_release,
2855         .mmap           = ll_file_mmap,
2856         .llseek         = ll_file_seek,
2857         .sendfile       = ll_file_sendfile,
2858         .fsync          = ll_fsync,
2859 #ifdef HAVE_F_OP_FLOCK
2860         .flock          = ll_file_flock,
2861 #endif
2862         .lock           = ll_file_flock
2863 };
2864
2865 /* These are for -o noflock - to return ENOSYS on flock calls */
2866 struct file_operations ll_file_operations_noflock = {
2867         .read           = ll_file_read,
2868         .write          = ll_file_write,
2869         .ioctl          = ll_file_ioctl,
2870         .open           = ll_file_open,
2871         .release        = ll_file_release,
2872         .mmap           = ll_file_mmap,
2873         .llseek         = ll_file_seek,
2874         .sendfile       = ll_file_sendfile,
2875         .fsync          = ll_fsync,
2876 #ifdef HAVE_F_OP_FLOCK
2877         .flock          = ll_file_noflock,
2878 #endif
2879         .lock           = ll_file_noflock
2880 };
2881
2882 struct inode_operations ll_file_inode_operations = {
2883 #ifdef HAVE_VFS_INTENT_PATCHES
2884         .setattr_raw    = ll_setattr_raw,
2885 #endif
2886         .setattr        = ll_setattr,
2887         .truncate       = ll_truncate,
2888         .getattr        = ll_getattr,
2889         .permission     = ll_inode_permission,
2890         .setxattr       = ll_setxattr,
2891         .getxattr       = ll_getxattr,
2892         .listxattr      = ll_listxattr,
2893         .removexattr    = ll_removexattr,
2894 };
2895
2896 /* dynamic ioctl number support routins */
2897 static struct llioc_ctl_data {
2898         struct rw_semaphore ioc_sem;
2899         struct list_head    ioc_head;
2900 } llioc = { 
2901         __RWSEM_INITIALIZER(llioc.ioc_sem), 
2902         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2903 };
2904
2905
2906 struct llioc_data {
2907         struct list_head        iocd_list;
2908         unsigned int            iocd_size;
2909         llioc_callback_t        iocd_cb;
2910         unsigned int            iocd_count;
2911         unsigned int            iocd_cmd[0];
2912 };
2913
2914 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2915 {
2916         unsigned int size;
2917         struct llioc_data *in_data = NULL;
2918         ENTRY;
2919
2920         if (cb == NULL || cmd == NULL ||
2921             count > LLIOC_MAX_CMD || count < 0)
2922                 RETURN(NULL);
2923
2924         size = sizeof(*in_data) + count * sizeof(unsigned int);
2925         OBD_ALLOC(in_data, size);
2926         if (in_data == NULL)
2927                 RETURN(NULL);
2928
2929         memset(in_data, 0, sizeof(*in_data));
2930         in_data->iocd_size = size;
2931         in_data->iocd_cb = cb;
2932         in_data->iocd_count = count;
2933         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2934
2935         down_write(&llioc.ioc_sem);
2936         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2937         up_write(&llioc.ioc_sem);
2938
2939         RETURN(in_data);
2940 }
2941
2942 void ll_iocontrol_unregister(void *magic)
2943 {
2944         struct llioc_data *tmp;
2945
2946         if (magic == NULL)
2947                 return;
2948
2949         down_write(&llioc.ioc_sem);
2950         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2951                 if (tmp == magic) {
2952                         unsigned int size = tmp->iocd_size;
2953
2954                         list_del(&tmp->iocd_list);
2955                         up_write(&llioc.ioc_sem);
2956
2957                         OBD_FREE(tmp, size);
2958                         return;
2959                 }
2960         }
2961         up_write(&llioc.ioc_sem);
2962
2963         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2964 }
2965
2966 EXPORT_SYMBOL(ll_iocontrol_register);
2967 EXPORT_SYMBOL(ll_iocontrol_unregister);
2968
2969 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
2970                         unsigned int cmd, unsigned long arg, int *rcp)
2971 {
2972         enum llioc_iter ret = LLIOC_CONT;
2973         struct llioc_data *data;
2974         int rc = -EINVAL, i;
2975
2976         down_read(&llioc.ioc_sem);
2977         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2978                 for (i = 0; i < data->iocd_count; i++) {
2979                         if (cmd != data->iocd_cmd[i]) 
2980                                 continue;
2981
2982                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2983                         break;
2984                 }
2985
2986                 if (ret == LLIOC_STOP)
2987                         break;
2988         }
2989         up_read(&llioc.ioc_sem);
2990
2991         if (rcp)
2992                 *rcp = rc;
2993         return ret;
2994 }