Whamcloud - gitweb
05afdd004402215b2bf522d88007162b71e4ea11
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #include "llite_internal.h"
32
33 /* also used by llite/special.c:ll_special_open() */
34 struct ll_file_data *ll_file_data_get(void)
35 {
36         struct ll_file_data *fd;
37
38         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
39         return fd;
40 }
41
42 static void ll_file_data_put(struct ll_file_data *fd)
43 {
44         if (fd != NULL)
45                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
46 }
47
48 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
49                           struct lustre_handle *fh)
50 {
51         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
52         op_data->op_attr.ia_mode = inode->i_mode;
53         op_data->op_attr.ia_atime = inode->i_atime;
54         op_data->op_attr.ia_mtime = inode->i_mtime;
55         op_data->op_attr.ia_ctime = inode->i_ctime;
56         op_data->op_attr.ia_size = i_size_read(inode);
57         op_data->op_attr_blocks = inode->i_blocks;
58         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
59         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
60         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
61         op_data->op_capa1 = ll_mdscapa_get(inode);
62 }
63
64 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
65                              struct obd_client_handle *och)
66 {
67         ENTRY;
68
69         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
70                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
71
72         if (!(och->och_flags & FMODE_WRITE))
73                 goto out;
74
75         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
76             !S_ISREG(inode->i_mode))
77                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
78         else
79                 ll_epoch_close(inode, op_data, &och, 0);
80
81 out:
82         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
83         EXIT;
84 }
85
86 static int ll_close_inode_openhandle(struct obd_export *md_exp,
87                                      struct inode *inode,
88                                      struct obd_client_handle *och)
89 {
90         struct obd_export *exp = ll_i2mdexp(inode);
91         struct md_op_data *op_data;
92         struct ptlrpc_request *req = NULL;
93         struct obd_device *obd = class_exp2obd(exp);
94         int epoch_close = 1;
95         int seq_end = 0, rc;
96         ENTRY;
97
98         if (obd == NULL) {
99                 /*
100                  * XXX: in case of LMV, is this correct to access
101                  * ->exp_handle?
102                  */
103                 CERROR("Invalid MDC connection handle "LPX64"\n",
104                        ll_i2mdexp(inode)->exp_handle.h_cookie);
105                 GOTO(out, rc = 0);
106         }
107
108         /*
109          * here we check if this is forced umount. If so this is called on
110          * canceling "open lock" and we do not call md_close() in this case, as
111          * it will not be successful, as import is already deactivated.
112          */
113         if (obd->obd_force)
114                 GOTO(out, rc = 0);
115
116         OBD_ALLOC_PTR(op_data);
117         if (op_data == NULL)
118                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
119
120         ll_prepare_close(inode, op_data, och);
121         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
122         rc = md_close(md_exp, op_data, och->och_mod, &req);
123         if (rc != -EAGAIN)
124                 seq_end = 1;
125
126         if (rc == -EAGAIN) {
127                 /* This close must have the epoch closed. */
128                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
129                 LASSERT(epoch_close);
130                 /* MDS has instructed us to obtain Size-on-MDS attribute from
131                  * OSTs and send setattr to back to MDS. */
132                 rc = ll_sizeonmds_update(inode, och->och_mod,
133                                          &och->och_fh, op_data->op_ioepoch);
134                 if (rc) {
135                         CERROR("inode %lu mdc Size-on-MDS update failed: "
136                                "rc = %d\n", inode->i_ino, rc);
137                         rc = 0;
138                 }
139         } else if (rc) {
140                 CERROR("inode %lu mdc close failed: rc = %d\n",
141                        inode->i_ino, rc);
142         }
143         ll_finish_md_op_data(op_data);
144
145         if (rc == 0) {
146                 rc = ll_objects_destroy(req, inode);
147                 if (rc)
148                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
149                                inode->i_ino, rc);
150         }
151
152         EXIT;
153 out:
154       
155         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
156             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
157                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
158         } else {
159                 if (seq_end)
160                         ptlrpc_close_replay_seq(req);
161                 md_clear_open_replay_data(md_exp, och);
162                 /* Free @och if it is not waiting for DONE_WRITING. */
163                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
164                 OBD_FREE_PTR(och);
165         }
166         if (req) /* This is close request */
167                 ptlrpc_req_finished(req);
168         return rc;
169 }
170
171 int ll_md_real_close(struct inode *inode, int flags)
172 {
173         struct ll_inode_info *lli = ll_i2info(inode);
174         struct obd_client_handle **och_p;
175         struct obd_client_handle *och;
176         __u64 *och_usecount;
177         int rc = 0;
178         ENTRY;
179
180         if (flags & FMODE_WRITE) {
181                 och_p = &lli->lli_mds_write_och;
182                 och_usecount = &lli->lli_open_fd_write_count;
183         } else if (flags & FMODE_EXEC) {
184                 och_p = &lli->lli_mds_exec_och;
185                 och_usecount = &lli->lli_open_fd_exec_count;
186         } else {
187                 LASSERT(flags & FMODE_READ);
188                 och_p = &lli->lli_mds_read_och;
189                 och_usecount = &lli->lli_open_fd_read_count;
190         }
191
192         down(&lli->lli_och_sem);
193         if (*och_usecount) { /* There are still users of this handle, so
194                                 skip freeing it. */
195                 up(&lli->lli_och_sem);
196                 RETURN(0);
197         }
198         och=*och_p;
199         *och_p = NULL;
200         up(&lli->lli_och_sem);
201
202         if (och) { /* There might be a race and somebody have freed this och
203                       already */
204                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
205                                                inode, och);
206         }
207
208         RETURN(rc);
209 }
210
211 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
212                 struct file *file)
213 {
214         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
215         struct ll_inode_info *lli = ll_i2info(inode);
216         int rc = 0;
217         ENTRY;
218
219         /* clear group lock, if present */
220         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
221                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
222                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
223                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
224                                       &fd->fd_cwlockh);
225         }
226
227         /* Let's see if we have good enough OPEN lock on the file and if
228            we can skip talking to MDS */
229         if (file->f_dentry->d_inode) { /* Can this ever be false? */
230                 int lockmode;
231                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
232                 struct lustre_handle lockh;
233                 struct inode *inode = file->f_dentry->d_inode;
234                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
235
236                 down(&lli->lli_och_sem);
237                 if (fd->fd_omode & FMODE_WRITE) {
238                         lockmode = LCK_CW;
239                         LASSERT(lli->lli_open_fd_write_count);
240                         lli->lli_open_fd_write_count--;
241                 } else if (fd->fd_omode & FMODE_EXEC) {
242                         lockmode = LCK_PR;
243                         LASSERT(lli->lli_open_fd_exec_count);
244                         lli->lli_open_fd_exec_count--;
245                 } else {
246                         lockmode = LCK_CR;
247                         LASSERT(lli->lli_open_fd_read_count);
248                         lli->lli_open_fd_read_count--;
249                 }
250                 up(&lli->lli_och_sem);
251
252                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
253                                    LDLM_IBITS, &policy, lockmode,
254                                    &lockh)) {
255                         rc = ll_md_real_close(file->f_dentry->d_inode,
256                                               fd->fd_omode);
257                 }
258         } else {
259                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
260                        file, file->f_dentry, file->f_dentry->d_name.name);
261         }
262
263         LUSTRE_FPRIVATE(file) = NULL;
264         ll_file_data_put(fd);
265         ll_capa_close(inode);
266
267         RETURN(rc);
268 }
269
270 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
271
272 /* While this returns an error code, fput() the caller does not, so we need
273  * to make every effort to clean up all of our state here.  Also, applications
274  * rarely check close errors and even if an error is returned they will not
275  * re-try the close call.
276  */
277 int ll_file_release(struct inode *inode, struct file *file)
278 {
279         struct ll_file_data *fd;
280         struct ll_sb_info *sbi = ll_i2sbi(inode);
281         struct ll_inode_info *lli = ll_i2info(inode);
282         struct lov_stripe_md *lsm = lli->lli_smd;
283         int rc;
284
285         ENTRY;
286         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
287                inode->i_generation, inode);
288
289         /* don't do anything for / */
290         if (inode->i_sb->s_root == file->f_dentry)
291                 RETURN(0);
292
293         ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
294         fd = LUSTRE_FPRIVATE(file);
295         LASSERT(fd != NULL);
296
297         /* don't do anything for / */
298         if (inode->i_sb->s_root == file->f_dentry) {
299                 LUSTRE_FPRIVATE(file) = NULL;
300                 ll_file_data_put(fd);
301                 RETURN(0);
302         }
303         
304         if (lsm)
305                 lov_test_and_clear_async_rc(lsm);
306         lli->lli_async_rc = 0;
307
308         rc = ll_md_close(sbi->ll_md_exp, inode, file);
309         RETURN(rc);
310 }
311
312 static int ll_intent_file_open(struct file *file, void *lmm,
313                                int lmmsize, struct lookup_intent *itp)
314 {
315         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
316         struct dentry *parent = file->f_dentry->d_parent;
317         const char *name = file->f_dentry->d_name.name;
318         const int len = file->f_dentry->d_name.len;
319         struct md_op_data *op_data;
320         struct ptlrpc_request *req;
321         int rc;
322
323         if (!parent)
324                 RETURN(-ENOENT);
325
326         /* Usually we come here only for NFSD, and we want open lock.
327            But we can also get here with pre 2.6.15 patchless kernels, and in
328            that case that lock is also ok */
329         /* We can also get here if there was cached open handle in revalidate_it
330          * but it disappeared while we were getting from there to ll_file_open.
331          * But this means this file was closed and immediatelly opened which
332          * makes a good candidate for using OPEN lock */
333         /* If lmmsize & lmm are not 0, we are just setting stripe info
334          * parameters. No need for the open lock */
335         if (!lmm && !lmmsize)
336                 itp->it_flags |= MDS_OPEN_LOCK;
337
338         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
339                                       file->f_dentry->d_inode, name, len,
340                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
341         if (IS_ERR(op_data))
342                 RETURN(PTR_ERR(op_data));
343
344         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
345                             0 /*unused */, &req, ll_md_blocking_ast, 0);
346         ll_finish_md_op_data(op_data);
347         if (rc == -ESTALE) {
348                 /* reason for keep own exit path - don`t flood log
349                 * with messages with -ESTALE errors.
350                 */
351                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
352                      it_open_error(DISP_OPEN_OPEN, itp))
353                         GOTO(out, rc);
354                 ll_release_openhandle(file->f_dentry, itp);
355                 GOTO(out_stale, rc);
356         }
357
358         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
359                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
360                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
361                 GOTO(out, rc);
362         }
363
364         if (itp->d.lustre.it_lock_mode)
365                 md_set_lock_data(sbi->ll_md_exp,
366                                  &itp->d.lustre.it_lock_handle, 
367                                  file->f_dentry->d_inode);
368
369         rc = ll_prep_inode(&file->f_dentry->d_inode, req, DLM_REPLY_REC_OFF,
370                            NULL);
371 out:
372         ptlrpc_req_finished(itp->d.lustre.it_data);
373
374 out_stale:
375         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
376         ll_intent_drop_lock(itp);
377
378         RETURN(rc);
379 }
380
381 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
382                        struct lookup_intent *it, struct obd_client_handle *och)
383 {
384         struct ptlrpc_request *req = it->d.lustre.it_data;
385         struct mdt_body *body;
386
387         LASSERT(och);
388
389         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
390         LASSERT(body != NULL);                      /* reply already checked out */
391         LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* and swabbed in md_enqueue */
392
393         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
394         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
395         och->och_fid = lli->lli_fid;
396         och->och_flags = it->it_flags;
397         lli->lli_ioepoch = body->ioepoch;
398
399         return md_set_open_replay_data(md_exp, och, req);
400 }
401
402 int ll_local_open(struct file *file, struct lookup_intent *it,
403                   struct ll_file_data *fd, struct obd_client_handle *och)
404 {
405         struct inode *inode = file->f_dentry->d_inode;
406         struct ll_inode_info *lli = ll_i2info(inode);
407         ENTRY;
408
409         LASSERT(!LUSTRE_FPRIVATE(file));
410
411         LASSERT(fd != NULL);
412
413         if (och) {
414                 struct ptlrpc_request *req = it->d.lustre.it_data;
415                 struct mdt_body *body;
416                 int rc;
417
418                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
419                 if (rc)
420                         RETURN(rc);
421
422                 body = lustre_msg_buf(req->rq_repmsg,
423                                       DLM_REPLY_REC_OFF, sizeof(*body));
424
425                 if ((it->it_flags & FMODE_WRITE) &&
426                     (body->valid & OBD_MD_FLSIZE))
427                 {
428                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
429                                lli->lli_ioepoch, PFID(&lli->lli_fid));
430                 }
431         }
432
433         LUSTRE_FPRIVATE(file) = fd;
434         ll_readahead_init(inode, &fd->fd_ras);
435         fd->fd_omode = it->it_flags;
436         RETURN(0);
437 }
438
439 /* Open a file, and (for the very first open) create objects on the OSTs at
440  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
441  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
442  * lli_open_sem to ensure no other process will create objects, send the
443  * stripe MD to the MDS, or try to destroy the objects if that fails.
444  *
445  * If we already have the stripe MD locally then we don't request it in
446  * md_open(), by passing a lmm_size = 0.
447  *
448  * It is up to the application to ensure no other processes open this file
449  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
450  * used.  We might be able to avoid races of that sort by getting lli_open_sem
451  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
452  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
453  */
454 int ll_file_open(struct inode *inode, struct file *file)
455 {
456         struct ll_inode_info *lli = ll_i2info(inode);
457         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
458                                           .it_flags = file->f_flags };
459         struct lov_stripe_md *lsm;
460         struct ptlrpc_request *req = NULL;
461         struct obd_client_handle **och_p;
462         __u64 *och_usecount;
463         struct ll_file_data *fd;
464         int rc = 0;
465         ENTRY;
466
467         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
468                inode->i_generation, inode, file->f_flags);
469
470         /* don't do anything for / */
471         if (inode->i_sb->s_root == file->f_dentry)
472                 RETURN(0);
473
474 #ifdef HAVE_VFS_INTENT_PATCHES
475         it = file->f_it;
476 #else
477         it = file->private_data; /* XXX: compat macro */
478         file->private_data = NULL; /* prevent ll_local_open assertion */
479 #endif
480
481         fd = ll_file_data_get();
482         if (fd == NULL)
483                 RETURN(-ENOMEM);
484
485         /* don't do anything for / */
486         if (inode->i_sb->s_root == file->f_dentry) {
487                 LUSTRE_FPRIVATE(file) = fd;
488                 RETURN(0);
489         }
490
491         if (!it || !it->d.lustre.it_disposition) {
492                 /* Convert f_flags into access mode. We cannot use file->f_mode,
493                  * because everything but O_ACCMODE mask was stripped from
494                  * there */
495                 if ((oit.it_flags + 1) & O_ACCMODE)
496                         oit.it_flags++;
497                 if (file->f_flags & O_TRUNC)
498                         oit.it_flags |= FMODE_WRITE;
499
500                 /* kernel only call f_op->open in dentry_open.  filp_open calls
501                  * dentry_open after call to open_namei that checks permissions.
502                  * Only nfsd_open call dentry_open directly without checking
503                  * permissions and because of that this code below is safe. */
504                 if (oit.it_flags & FMODE_WRITE)
505                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
506
507                 /* We do not want O_EXCL here, presumably we opened the file
508                  * already? XXX - NFS implications? */
509                 oit.it_flags &= ~O_EXCL;
510
511                 it = &oit;
512         }
513
514         /* Let's see if we have file open on MDS already. */
515         if (it->it_flags & FMODE_WRITE) {
516                 och_p = &lli->lli_mds_write_och;
517                 och_usecount = &lli->lli_open_fd_write_count;
518         } else if (it->it_flags & FMODE_EXEC) {
519                 och_p = &lli->lli_mds_exec_och;
520                 och_usecount = &lli->lli_open_fd_exec_count;
521          } else {
522                 och_p = &lli->lli_mds_read_och;
523                 och_usecount = &lli->lli_open_fd_read_count;
524         }
525         
526         down(&lli->lli_och_sem);
527         if (*och_p) { /* Open handle is present */
528                 if (it_disposition(it, DISP_OPEN_OPEN)) {
529                         /* Well, there's extra open request that we do not need,
530                            let's close it somehow. This will decref request. */
531                         rc = it_open_error(DISP_OPEN_OPEN, it);
532                         if (rc) {
533                                 ll_file_data_put(fd);
534                                 GOTO(out_och_free, rc);
535                         }       
536                         ll_release_openhandle(file->f_dentry, it);
537                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
538                                              LPROC_LL_OPEN);
539                 }
540                 (*och_usecount)++;
541
542                 rc = ll_local_open(file, it, fd, NULL);
543                 if (rc) {
544                         up(&lli->lli_och_sem);
545                         ll_file_data_put(fd);
546                         RETURN(rc);
547                 }
548         } else {
549                 LASSERT(*och_usecount == 0);
550                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
551                 if (!*och_p) {
552                         ll_file_data_put(fd);
553                         GOTO(out_och_free, rc = -ENOMEM);
554                 }
555                 (*och_usecount)++;
556                 if (!it->d.lustre.it_disposition) {
557                         it->it_flags |= O_CHECK_STALE;
558                         rc = ll_intent_file_open(file, NULL, 0, it);
559                         it->it_flags &= ~O_CHECK_STALE;
560                         if (rc) {
561                                 ll_file_data_put(fd);
562                                 GOTO(out_och_free, rc);
563                         }
564
565                         /* Got some error? Release the request */
566                         if (it->d.lustre.it_status < 0) {
567                                 req = it->d.lustre.it_data;
568                                 ptlrpc_req_finished(req);
569                         }
570                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
571                                          &it->d.lustre.it_lock_handle,
572                                          file->f_dentry->d_inode);
573                 }
574                 req = it->d.lustre.it_data;
575
576                 /* md_intent_lock() didn't get a request ref if there was an
577                  * open error, so don't do cleanup on the request here
578                  * (bug 3430) */
579                 /* XXX (green): Should not we bail out on any error here, not
580                  * just open error? */
581                 rc = it_open_error(DISP_OPEN_OPEN, it);
582                 if (rc) {
583                         ll_file_data_put(fd);
584                         GOTO(out_och_free, rc);
585                 }
586
587                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
588                 rc = ll_local_open(file, it, fd, *och_p);
589                 if (rc) {
590                         up(&lli->lli_och_sem);
591                         ll_file_data_put(fd);
592                         GOTO(out_och_free, rc);
593                 }
594         }
595         up(&lli->lli_och_sem);
596
597         /* Must do this outside lli_och_sem lock to prevent deadlock where
598            different kind of OPEN lock for this same inode gets cancelled
599            by ldlm_cancel_lru */
600         if (!S_ISREG(inode->i_mode))
601                 GOTO(out, rc);
602
603         ll_capa_open(inode);
604
605         lsm = lli->lli_smd;
606         if (lsm == NULL) {
607                 if (file->f_flags & O_LOV_DELAY_CREATE ||
608                     !(file->f_mode & FMODE_WRITE)) {
609                         CDEBUG(D_INODE, "object creation was delayed\n");
610                         GOTO(out, rc);
611                 }
612         }
613         file->f_flags &= ~O_LOV_DELAY_CREATE;
614         GOTO(out, rc);
615 out:
616         ptlrpc_req_finished(req);
617         if (req)
618                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
619 out_och_free:
620         if (rc) {
621                 if (*och_p) {
622                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
623                         *och_p = NULL; /* OBD_FREE writes some magic there */
624                         (*och_usecount)--;
625                 }
626                 up(&lli->lli_och_sem);
627         }
628
629         return rc;
630 }
631
632 /* Fills the obdo with the attributes for the inode defined by lsm */
633 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
634 {
635         struct ptlrpc_request_set *set;
636         struct ll_inode_info *lli = ll_i2info(inode);
637         struct lov_stripe_md *lsm = lli->lli_smd;
638
639         struct obd_info oinfo = { { { 0 } } };
640         int rc;
641         ENTRY;
642
643         LASSERT(lsm != NULL);
644
645         oinfo.oi_md = lsm;
646         oinfo.oi_oa = obdo;
647         oinfo.oi_oa->o_id = lsm->lsm_object_id;
648         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
649         oinfo.oi_oa->o_mode = S_IFREG;
650         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
651                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
652                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
653                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
654                                OBD_MD_FLGROUP;
655         oinfo.oi_capa = ll_mdscapa_get(inode);
656
657         set = ptlrpc_prep_set();
658         if (set == NULL) {
659                 CERROR("can't allocate ptlrpc set\n");
660                 rc = -ENOMEM;
661         } else {
662                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
663                 if (rc == 0)
664                         rc = ptlrpc_set_wait(set);
665                 ptlrpc_set_destroy(set);
666         }
667         capa_put(oinfo.oi_capa);
668         if (rc)
669                 RETURN(rc);
670
671         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
672                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
673                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
674
675         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
676         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
677                lli->lli_smd->lsm_object_id, i_size_read(inode),
678                inode->i_blocks, inode->i_blksize);
679         RETURN(0);
680 }
681
682 static inline void ll_remove_suid(struct inode *inode)
683 {
684         unsigned int mode;
685
686         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
687         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
688
689         /* was any of the uid bits set? */
690         mode &= inode->i_mode;
691         if (mode && !capable(CAP_FSETID)) {
692                 inode->i_mode &= ~mode;
693                 // XXX careful here - we cannot change the size
694         }
695 }
696
697 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
698 {
699         struct ll_inode_info *lli = ll_i2info(inode);
700         struct lov_stripe_md *lsm = lli->lli_smd;
701         struct obd_export *exp = ll_i2dtexp(inode);
702         struct {
703                 char name[16];
704                 struct ldlm_lock *lock;
705                 struct lov_stripe_md *lsm;
706         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
707         __u32 stripe, vallen = sizeof(stripe);
708         int rc;
709         ENTRY;
710
711         if (lsm->lsm_stripe_count == 1)
712                 GOTO(check, stripe = 0);
713
714         /* get our offset in the lov */
715         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
716         if (rc != 0) {
717                 CERROR("obd_get_info: rc = %d\n", rc);
718                 RETURN(rc);
719         }
720         LASSERT(stripe < lsm->lsm_stripe_count);
721
722 check:
723         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
724             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
725                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
726                            lsm->lsm_oinfo[stripe]->loi_id,
727                            lsm->lsm_oinfo[stripe]->loi_gr);
728                 RETURN(-ELDLM_NO_LOCK_DATA);
729         }
730
731         RETURN(stripe);
732 }
733
734 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
735  * we get a lock cancellation for each stripe, so we have to map the obd's
736  * region back onto the stripes in the file that it held.
737  *
738  * No one can dirty the extent until we've finished our work and they can
739  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
740  * but other kernel actors could have pages locked.
741  *
742  * Called with the DLM lock held. */
743 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
744                               struct ldlm_lock *lock, __u32 stripe)
745 {
746         ldlm_policy_data_t tmpex;
747         unsigned long start, end, count, skip, i, j;
748         struct page *page;
749         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
750         struct lustre_handle lockh;
751         struct address_space *mapping = inode->i_mapping;
752
753         ENTRY;
754         tmpex = lock->l_policy_data;
755         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
756                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
757                i_size_read(inode));
758
759         /* our locks are page granular thanks to osc_enqueue, we invalidate the
760          * whole page. */
761         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
762             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
763                 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
764                            CFS_PAGE_SIZE);
765         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
766         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
767
768         count = ~0;
769         skip = 0;
770         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
771         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
772         if (lsm->lsm_stripe_count > 1) {
773                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
774                 skip = (lsm->lsm_stripe_count - 1) * count;
775                 start += start/count * skip + stripe * count;
776                 if (end != ~0)
777                         end += end/count * skip + stripe * count;
778         }
779         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
780                 end = ~0;
781
782         i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
783             CFS_PAGE_SHIFT : 0;
784         if (i < end)
785                 end = i;
786
787         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
788                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
789                count, skip, end, discard ? " (DISCARDING)" : "");
790
791         /* walk through the vmas on the inode and tear down mmaped pages that
792          * intersect with the lock.  this stops immediately if there are no
793          * mmap()ed regions of the file.  This is not efficient at all and
794          * should be short lived. We'll associate mmap()ed pages with the lock
795          * and will be able to find them directly */
796         for (i = start; i <= end; i += (j + skip)) {
797                 j = min(count - (i % count), end - i + 1);
798                 LASSERT(j > 0);
799                 LASSERT(mapping);
800                 if (ll_teardown_mmaps(mapping,
801                                       (__u64)i << CFS_PAGE_SHIFT,
802                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
803                         break;
804         }
805
806         /* this is the simplistic implementation of page eviction at
807          * cancelation.  It is careful to get races with other page
808          * lockers handled correctly.  fixes from bug 20 will make it
809          * more efficient by associating locks with pages and with
810          * batching writeback under the lock explicitly. */
811         for (i = start, j = start % count; i <= end;
812              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
813                 if (j == count) {
814                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
815                         i += skip;
816                         j = 0;
817                         if (i > end)
818                                 break;
819                 }
820                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
821                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
822                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
823                          start, i, end);
824
825                 if (!mapping_has_pages(mapping)) {
826                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
827                         break;
828                 }
829
830                 cond_resched();
831
832                 page = find_get_page(mapping, i);
833                 if (page == NULL)
834                         continue;
835                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
836                                i, tmpex.l_extent.start);
837                 lock_page(page);
838
839                 /* page->mapping to check with racing against teardown */
840                 if (!discard && clear_page_dirty_for_io(page)) {
841                         rc = ll_call_writepage(inode, page);
842                         /* either waiting for io to complete or reacquiring
843                          * the lock that the failed writepage released */
844                         lock_page(page);
845                         wait_on_page_writeback(page);
846                         if (rc != 0) {
847                                 CERROR("writepage inode %lu(%p) of page %p "
848                                        "failed: %d\n", inode->i_ino, inode,
849                                        page, rc);
850                                 if (rc == -ENOSPC)
851                                         set_bit(AS_ENOSPC, &mapping->flags);
852                                 else
853                                         set_bit(AS_EIO, &mapping->flags);
854                         }
855                 }
856
857                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
858                 /* check to see if another DLM lock covers this page b=2765 */
859                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
860                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
861                                       LDLM_FL_TEST_LOCK,
862                                       &lock->l_resource->lr_name, LDLM_EXTENT,
863                                       &tmpex, LCK_PR | LCK_PW, &lockh);
864
865                 if (rc2 <= 0 && page->mapping != NULL) {
866                         struct ll_async_page *llap = llap_cast_private(page);
867                         /* checking again to account for writeback's
868                          * lock_page() */
869                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
870                         if (llap)
871                                 ll_ra_accounting(llap, mapping);
872                         ll_truncate_complete_page(page);
873                 }
874                 unlock_page(page);
875                 page_cache_release(page);
876         }
877         LASSERTF(tmpex.l_extent.start <=
878                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
879                   lock->l_policy_data.l_extent.end + 1),
880                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
881                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
882                  start, i, end);
883         EXIT;
884 }
885
886 static int ll_extent_lock_callback(struct ldlm_lock *lock,
887                                    struct ldlm_lock_desc *new, void *data,
888                                    int flag)
889 {
890         struct lustre_handle lockh = { 0 };
891         int rc;
892         ENTRY;
893
894         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
895                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
896                 LBUG();
897         }
898
899         switch (flag) {
900         case LDLM_CB_BLOCKING:
901                 ldlm_lock2handle(lock, &lockh);
902                 rc = ldlm_cli_cancel(&lockh);
903                 if (rc != ELDLM_OK)
904                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
905                 break;
906         case LDLM_CB_CANCELING: {
907                 struct inode *inode;
908                 struct ll_inode_info *lli;
909                 struct lov_stripe_md *lsm;
910                 int stripe;
911                 __u64 kms;
912
913                 /* This lock wasn't granted, don't try to evict pages */
914                 if (lock->l_req_mode != lock->l_granted_mode)
915                         RETURN(0);
916
917                 inode = ll_inode_from_lock(lock);
918                 if (inode == NULL)
919                         RETURN(0);
920                 lli = ll_i2info(inode);
921                 if (lli == NULL)
922                         goto iput;
923                 if (lli->lli_smd == NULL)
924                         goto iput;
925                 lsm = lli->lli_smd;
926
927                 stripe = ll_lock_to_stripe_offset(inode, lock);
928                 if (stripe < 0)
929                         goto iput;
930
931                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
932
933                 lov_stripe_lock(lsm);
934                 lock_res_and_lock(lock);
935                 kms = ldlm_extent_shift_kms(lock,
936                                             lsm->lsm_oinfo[stripe]->loi_kms);
937
938                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
939                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
940                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
941                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
942                 unlock_res_and_lock(lock);
943                 lov_stripe_unlock(lsm);
944         iput:
945                 iput(inode);
946                 break;
947         }
948         default:
949                 LBUG();
950         }
951
952         RETURN(0);
953 }
954
955 #if 0
956 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
957 {
958         /* XXX ALLOCATE - 160 bytes */
959         struct inode *inode = ll_inode_from_lock(lock);
960         struct ll_inode_info *lli = ll_i2info(inode);
961         struct lustre_handle lockh = { 0 };
962         struct ost_lvb *lvb;
963         int stripe;
964         ENTRY;
965
966         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
967                      LDLM_FL_BLOCK_CONV)) {
968                 LBUG(); /* not expecting any blocked async locks yet */
969                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
970                            "lock, returning");
971                 ldlm_lock_dump(D_OTHER, lock, 0);
972                 ldlm_reprocess_all(lock->l_resource);
973                 RETURN(0);
974         }
975
976         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
977
978         stripe = ll_lock_to_stripe_offset(inode, lock);
979         if (stripe < 0)
980                 goto iput;
981
982         if (lock->l_lvb_len) {
983                 struct lov_stripe_md *lsm = lli->lli_smd;
984                 __u64 kms;
985                 lvb = lock->l_lvb_data;
986                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
987
988                 lock_res_and_lock(lock);
989                 ll_inode_size_lock(inode, 1);
990                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
991                 kms = ldlm_extent_shift_kms(NULL, kms);
992                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
993                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
994                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
995                 lsm->lsm_oinfo[stripe].loi_kms = kms;
996                 ll_inode_size_unlock(inode, 1);
997                 unlock_res_and_lock(lock);
998         }
999
1000 iput:
1001         iput(inode);
1002         wake_up(&lock->l_waitq);
1003
1004         ldlm_lock2handle(lock, &lockh);
1005         ldlm_lock_decref(&lockh, LCK_PR);
1006         RETURN(0);
1007 }
1008 #endif
1009
1010 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
1011 {
1012         struct ptlrpc_request *req = reqp;
1013         struct inode *inode = ll_inode_from_lock(lock);
1014         struct ll_inode_info *lli;
1015         struct lov_stripe_md *lsm;
1016         struct ost_lvb *lvb;
1017         int rc, stripe;
1018         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
1019         ENTRY;
1020
1021         if (inode == NULL)
1022                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
1023         lli = ll_i2info(inode);
1024         if (lli == NULL)
1025                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1026         lsm = lli->lli_smd;
1027         if (lsm == NULL)
1028                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1029
1030         /* First, find out which stripe index this lock corresponds to. */
1031         stripe = ll_lock_to_stripe_offset(inode, lock);
1032         if (stripe < 0)
1033                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1034
1035         rc = lustre_pack_reply(req, 2, size, NULL);
1036         if (rc) {
1037                 CERROR("lustre_pack_reply: %d\n", rc);
1038                 GOTO(iput, rc);
1039         }
1040
1041         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
1042         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1043         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1044         lvb->lvb_atime = LTIME_S(inode->i_atime);
1045         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1046
1047         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1048                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1049                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1050                    lvb->lvb_atime, lvb->lvb_ctime);
1051  iput:
1052         iput(inode);
1053
1054  out:
1055         /* These errors are normal races, so we don't want to fill the console
1056          * with messages by calling ptlrpc_error() */
1057         if (rc == -ELDLM_NO_LOCK_DATA)
1058                 lustre_pack_reply(req, 1, NULL, NULL);
1059
1060         req->rq_status = rc;
1061         return rc;
1062 }
1063
1064 static void ll_merge_lvb(struct inode *inode)
1065 {
1066         struct ll_inode_info *lli = ll_i2info(inode);
1067         struct ll_sb_info *sbi = ll_i2sbi(inode);
1068         struct ost_lvb lvb;
1069         ENTRY;
1070
1071         ll_inode_size_lock(inode, 1);
1072         inode_init_lvb(inode, &lvb);
1073         obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1074         i_size_write(inode, lvb.lvb_size);
1075         inode->i_blocks = lvb.lvb_blocks;
1076         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1077         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1078         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1079         ll_inode_size_unlock(inode, 1);
1080         EXIT;
1081 }
1082
1083 int ll_local_size(struct inode *inode)
1084 {
1085         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1086         struct ll_inode_info *lli = ll_i2info(inode);
1087         struct ll_sb_info *sbi = ll_i2sbi(inode);
1088         struct lustre_handle lockh = { 0 };
1089         int flags = 0;
1090         int rc;
1091         ENTRY;
1092
1093         if (lli->lli_smd->lsm_stripe_count == 0)
1094                 RETURN(0);
1095
1096         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1097                        &policy, LCK_PR | LCK_PW, &flags, inode, &lockh);
1098         if (rc < 0)
1099                 RETURN(rc);
1100         else if (rc == 0)
1101                 RETURN(-ENODATA);
1102
1103         ll_merge_lvb(inode);
1104         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR | LCK_PW, &lockh);
1105         RETURN(0);
1106 }
1107
1108 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1109                      lstat_t *st)
1110 {
1111         struct lustre_handle lockh = { 0 };
1112         struct ldlm_enqueue_info einfo = { 0 };
1113         struct obd_info oinfo = { { { 0 } } };
1114         struct ost_lvb lvb;
1115         int rc;
1116
1117         ENTRY;
1118
1119         einfo.ei_type = LDLM_EXTENT;
1120         einfo.ei_mode = LCK_PR;
1121         einfo.ei_cb_bl = ll_extent_lock_callback;
1122         einfo.ei_cb_cp = ldlm_completion_ast;
1123         einfo.ei_cb_gl = ll_glimpse_callback;
1124         einfo.ei_cbdata = NULL;
1125
1126         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1127         oinfo.oi_lockh = &lockh;
1128         oinfo.oi_md = lsm;
1129         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1130
1131         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1132         if (rc == -ENOENT)
1133                 RETURN(rc);
1134         if (rc != 0) {
1135                 CERROR("obd_enqueue returned rc %d, "
1136                        "returning -EIO\n", rc);
1137                 RETURN(rc > 0 ? -EIO : rc);
1138         }
1139
1140         lov_stripe_lock(lsm);
1141         memset(&lvb, 0, sizeof(lvb));
1142         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1143         st->st_size = lvb.lvb_size;
1144         st->st_blocks = lvb.lvb_blocks;
1145         st->st_mtime = lvb.lvb_mtime;
1146         st->st_atime = lvb.lvb_atime;
1147         st->st_ctime = lvb.lvb_ctime;
1148         lov_stripe_unlock(lsm);
1149
1150         RETURN(rc);
1151 }
1152
1153 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1154  * file (because it prefers KMS over RSS when larger) */
1155 int ll_glimpse_size(struct inode *inode, int ast_flags)
1156 {
1157         struct ll_inode_info *lli = ll_i2info(inode);
1158         struct ll_sb_info *sbi = ll_i2sbi(inode);
1159         struct lustre_handle lockh = { 0 };
1160         struct ldlm_enqueue_info einfo = { 0 };
1161         struct obd_info oinfo = { { { 0 } } };
1162         int rc;
1163         ENTRY;
1164
1165         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1166                 RETURN(0);
1167
1168         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1169
1170         if (!lli->lli_smd) {
1171                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1172                 RETURN(0);
1173         }
1174
1175         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1176          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1177          *       won't revoke any conflicting DLM locks held. Instead,
1178          *       ll_glimpse_callback() will be called on each client
1179          *       holding a DLM lock against this file, and resulting size
1180          *       will be returned for each stripe. DLM lock on [0, EOF] is
1181          *       acquired only if there were no conflicting locks. */
1182         einfo.ei_type = LDLM_EXTENT;
1183         einfo.ei_mode = LCK_PR;
1184         einfo.ei_cb_bl = ll_extent_lock_callback;
1185         einfo.ei_cb_cp = ldlm_completion_ast;
1186         einfo.ei_cb_gl = ll_glimpse_callback;
1187         einfo.ei_cbdata = inode;
1188
1189         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1190         oinfo.oi_lockh = &lockh;
1191         oinfo.oi_md = lli->lli_smd;
1192         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1193
1194         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1195         if (rc == -ENOENT)
1196                 RETURN(rc);
1197         if (rc != 0) {
1198                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1199                 RETURN(rc > 0 ? -EIO : rc);
1200         }
1201
1202         ll_merge_lvb(inode);
1203
1204         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1205                i_size_read(inode), inode->i_blocks);
1206
1207         RETURN(rc);
1208 }
1209
1210 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1211                    struct lov_stripe_md *lsm, int mode,
1212                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1213                    int ast_flags)
1214 {
1215         struct ll_sb_info *sbi = ll_i2sbi(inode);
1216         struct ost_lvb lvb;
1217         struct ldlm_enqueue_info einfo = { 0 };
1218         struct obd_info oinfo = { { { 0 } } };
1219         int rc;
1220         ENTRY;
1221
1222         LASSERT(!lustre_handle_is_used(lockh));
1223         LASSERT(lsm != NULL);
1224
1225         /* don't drop the mmapped file to LRU */
1226         if (mapping_mapped(inode->i_mapping))
1227                 ast_flags |= LDLM_FL_NO_LRU;
1228
1229         /* XXX phil: can we do this?  won't it screw the file size up? */
1230         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1231             (sbi->ll_flags & LL_SBI_NOLCK))
1232                 RETURN(0);
1233
1234         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1235                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1236
1237         einfo.ei_type = LDLM_EXTENT;
1238         einfo.ei_mode = mode;
1239         einfo.ei_cb_bl = ll_extent_lock_callback;
1240         einfo.ei_cb_cp = ldlm_completion_ast;
1241         einfo.ei_cb_gl = ll_glimpse_callback;
1242         einfo.ei_cbdata = inode;
1243
1244         oinfo.oi_policy = *policy;
1245         oinfo.oi_lockh = lockh;
1246         oinfo.oi_md = lsm;
1247         oinfo.oi_flags = ast_flags;
1248
1249         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1250         *policy = oinfo.oi_policy;
1251         if (rc > 0)
1252                 rc = -EIO;
1253
1254         ll_inode_size_lock(inode, 1);
1255         inode_init_lvb(inode, &lvb);
1256         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1257
1258         if (policy->l_extent.start == 0 &&
1259             policy->l_extent.end == OBD_OBJECT_EOF) {
1260                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1261                  * the kms under both a DLM lock and the
1262                  * ll_inode_size_lock().  If we don't get the
1263                  * ll_inode_size_lock() here we can match the DLM lock and
1264                  * reset i_size from the kms before the truncating path has
1265                  * updated the kms.  generic_file_write can then trust the
1266                  * stale i_size when doing appending writes and effectively
1267                  * cancel the result of the truncate.  Getting the
1268                  * ll_inode_size_lock() after the enqueue maintains the DLM
1269                  * -> ll_inode_size_lock() acquiring order. */
1270                 i_size_write(inode, lvb.lvb_size);
1271                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1272                        inode->i_ino, i_size_read(inode));
1273         }
1274
1275         if (rc == 0) {
1276                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1277                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1278                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1279         }
1280         ll_inode_size_unlock(inode, 1);
1281
1282         RETURN(rc);
1283 }
1284
1285 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1286                      struct lov_stripe_md *lsm, int mode,
1287                      struct lustre_handle *lockh)
1288 {
1289         struct ll_sb_info *sbi = ll_i2sbi(inode);
1290         int rc;
1291         ENTRY;
1292
1293         /* XXX phil: can we do this?  won't it screw the file size up? */
1294         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1295             (sbi->ll_flags & LL_SBI_NOLCK))
1296                 RETURN(0);
1297
1298         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1299
1300         RETURN(rc);
1301 }
1302
1303 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1304                             loff_t *ppos)
1305 {
1306         struct inode *inode = file->f_dentry->d_inode;
1307         struct ll_inode_info *lli = ll_i2info(inode);
1308         struct lov_stripe_md *lsm = lli->lli_smd;
1309         struct ll_sb_info *sbi = ll_i2sbi(inode);
1310         struct ll_lock_tree tree;
1311         struct ll_lock_tree_node *node;
1312         struct ost_lvb lvb;
1313         struct ll_ra_read bead;
1314         int rc, ra = 0;
1315         loff_t end;
1316         ssize_t retval, chunk, sum = 0;
1317
1318         __u64 kms;
1319         ENTRY;
1320         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1321                inode->i_ino, inode->i_generation, inode, count, *ppos);
1322         /* "If nbyte is 0, read() will return 0 and have no other results."
1323          *                      -- Single Unix Spec */
1324         if (count == 0)
1325                 RETURN(0);
1326
1327         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1328
1329         if (!lsm) {
1330                 /* Read on file with no objects should return zero-filled
1331                  * buffers up to file size (we can get non-zero sizes with
1332                  * mknod + truncate, then opening file for read. This is a
1333                  * common pattern in NFS case, it seems). Bug 6243 */
1334                 int notzeroed;
1335                 /* Since there are no objects on OSTs, we have nothing to get
1336                  * lock on and so we are forced to access inode->i_size
1337                  * unguarded */
1338
1339                 /* Read beyond end of file */
1340                 if (*ppos >= i_size_read(inode))
1341                         RETURN(0);
1342
1343                 if (count > i_size_read(inode) - *ppos)
1344                         count = i_size_read(inode) - *ppos;
1345                 /* Make sure to correctly adjust the file pos pointer for
1346                  * EFAULT case */
1347                 notzeroed = clear_user(buf, count);
1348                 count -= notzeroed;
1349                 *ppos += count;
1350                 if (!count)
1351                         RETURN(-EFAULT);
1352                 RETURN(count);
1353         }
1354
1355 repeat:
1356         if (sbi->ll_max_rw_chunk != 0) {
1357                 /* first, let's know the end of the current stripe */
1358                 end = *ppos;
1359                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
1360                                 (obd_off *)&end);
1361
1362                 /* correct, the end is beyond the request */
1363                 if (end > *ppos + count - 1)
1364                         end = *ppos + count - 1;
1365
1366                 /* and chunk shouldn't be too large even if striping is wide */
1367                 if (end - *ppos > sbi->ll_max_rw_chunk)
1368                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1369         } else {
1370                 end = *ppos + count - 1;
1371         }
1372
1373         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1374         if (IS_ERR(node)){
1375                 GOTO(out, retval = PTR_ERR(node));
1376         }
1377
1378         tree.lt_fd = LUSTRE_FPRIVATE(file);
1379         rc = ll_tree_lock(&tree, node, buf, count,
1380                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1381         if (rc != 0)
1382                 GOTO(out, retval = rc);
1383
1384         ll_inode_size_lock(inode, 1);
1385         /*
1386          * Consistency guarantees: following possibilities exist for the
1387          * relation between region being read and real file size at this
1388          * moment:
1389          *
1390          *  (A): the region is completely inside of the file;
1391          *
1392          *  (B-x): x bytes of region are inside of the file, the rest is
1393          *  outside;
1394          *
1395          *  (C): the region is completely outside of the file.
1396          *
1397          * This classification is stable under DLM lock acquired by
1398          * ll_tree_lock() above, because to change class, other client has to
1399          * take DLM lock conflicting with our lock. Also, any updates to
1400          * ->i_size by other threads on this client are serialized by
1401          * ll_inode_size_lock(). This guarantees that short reads are handled
1402          * correctly in the face of concurrent writes and truncates.
1403          */
1404         inode_init_lvb(inode, &lvb);
1405         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1406         kms = lvb.lvb_size;
1407         if (*ppos + count - 1 > kms) {
1408                 /* A glimpse is necessary to determine whether we return a
1409                  * short read (B) or some zeroes at the end of the buffer (C) */
1410                 ll_inode_size_unlock(inode, 1);
1411                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1412                 if (retval) {
1413                         ll_tree_unlock(&tree);
1414                         goto out;
1415                 }
1416         } else {
1417                 /* region is within kms and, hence, within real file size (A).
1418                  * We need to increase i_size to cover the read region so that
1419                  * generic_file_read() will do its job, but that doesn't mean
1420                  * the kms size is _correct_, it is only the _minimum_ size.
1421                  * If someone does a stat they will get the correct size which
1422                  * will always be >= the kms value here.  b=11081 */
1423                 if (i_size_read(inode) < kms)
1424                         i_size_write(inode, kms);
1425                 ll_inode_size_unlock(inode, 1);
1426         }
1427
1428         chunk = end - *ppos + 1;
1429         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1430                inode->i_ino, chunk, *ppos, i_size_read(inode));
1431
1432         /* turn off the kernel's read-ahead */
1433         file->f_ra.ra_pages = 0;
1434
1435         /* initialize read-ahead window once per syscall */
1436         if (ra == 0) {
1437                 ra = 1;
1438                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1439                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1440                 ll_ra_read_in(file, &bead);
1441         }
1442
1443         /* BUG: 5972 */
1444         file_accessed(file);
1445         retval = generic_file_read(file, buf, chunk, ppos);
1446         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1447
1448         ll_tree_unlock(&tree);
1449
1450         if (retval > 0) {
1451                 buf += retval;
1452                 count -= retval;
1453                 sum += retval;
1454                 if (retval == chunk && count > 0)
1455                         goto repeat;
1456         }
1457
1458  out:
1459         if (ra != 0)
1460                 ll_ra_read_ex(file, &bead);
1461         retval = (sum > 0) ? sum : retval;
1462         RETURN(retval);
1463 }
1464
1465 /*
1466  * Write to a file (through the page cache).
1467  */
1468 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1469                              loff_t *ppos)
1470 {
1471         struct inode *inode = file->f_dentry->d_inode;
1472         struct ll_sb_info *sbi = ll_i2sbi(inode);
1473         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1474         struct ll_lock_tree tree;
1475         struct ll_lock_tree_node *node;
1476         loff_t maxbytes = ll_file_maxbytes(inode);
1477         loff_t lock_start, lock_end, end;
1478         ssize_t retval, chunk, sum = 0;
1479         int rc;
1480         ENTRY;
1481
1482         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1483                inode->i_ino, inode->i_generation, inode, count, *ppos);
1484
1485         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1486
1487         /* POSIX, but surprised the VFS doesn't check this already */
1488         if (count == 0)
1489                 RETURN(0);
1490
1491         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1492          * called on the file, don't fail the below assertion (bug 2388). */
1493         if (file->f_flags & O_LOV_DELAY_CREATE &&
1494             ll_i2info(inode)->lli_smd == NULL)
1495                 RETURN(-EBADF);
1496
1497         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1498
1499         down(&ll_i2info(inode)->lli_write_sem);
1500
1501 repeat:
1502         chunk = 0; /* just to fix gcc's warning */
1503         end = *ppos + count - 1;
1504
1505         if (file->f_flags & O_APPEND) {
1506                 lock_start = 0;
1507                 lock_end = OBD_OBJECT_EOF;
1508         } else if (sbi->ll_max_rw_chunk != 0) {
1509                 /* first, let's know the end of the current stripe */
1510                 end = *ppos;
1511                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1512                                 (obd_off *)&end);
1513
1514                 /* correct, the end is beyond the request */
1515                 if (end > *ppos + count - 1)
1516                         end = *ppos + count - 1;
1517
1518                 /* and chunk shouldn't be too large even if striping is wide */
1519                 if (end - *ppos > sbi->ll_max_rw_chunk)
1520                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1521                 lock_start = *ppos;
1522                 lock_end = end;
1523         } else {
1524                 lock_start = *ppos;
1525                 lock_end = *ppos + count - 1;
1526         }
1527         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1528
1529         if (IS_ERR(node))
1530                 GOTO(out, retval = PTR_ERR(node));
1531
1532         tree.lt_fd = LUSTRE_FPRIVATE(file);
1533         rc = ll_tree_lock(&tree, node, buf, count,
1534                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1535         if (rc != 0)
1536                 GOTO(out, retval = rc);
1537
1538         /* This is ok, g_f_w will overwrite this under i_sem if it races
1539          * with a local truncate, it just makes our maxbyte checking easier.
1540          * The i_size value gets updated in ll_extent_lock() as a consequence
1541          * of the [0,EOF] extent lock we requested above. */
1542         if (file->f_flags & O_APPEND) {
1543                 *ppos = i_size_read(inode);
1544                 end = *ppos + count - 1;
1545         }
1546
1547         if (*ppos >= maxbytes) {
1548                 send_sig(SIGXFSZ, current, 0);
1549                 GOTO(out_unlock, retval = -EFBIG);
1550         }
1551         if (*ppos + count > maxbytes)
1552                 count = maxbytes - *ppos;
1553
1554         /* generic_file_write handles O_APPEND after getting i_mutex */
1555         chunk = end - *ppos + 1;
1556         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1557                inode->i_ino, chunk, *ppos);
1558         retval = generic_file_write(file, buf, chunk, ppos);
1559         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1560
1561 out_unlock:
1562         ll_tree_unlock(&tree);
1563
1564 out:
1565         if (retval > 0) {
1566                 buf += retval;
1567                 count -= retval;
1568                 sum += retval;
1569                 if (retval == chunk && count > 0)
1570                         goto repeat;
1571         }
1572
1573         up(&ll_i2info(inode)->lli_write_sem);
1574
1575         retval = (sum > 0) ? sum : retval;
1576         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1577                            retval > 0 ? retval : 0);
1578         RETURN(retval);
1579 }
1580
1581 /*
1582  * Send file content (through pagecache) somewhere with helper
1583  */
1584 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1585                                 read_actor_t actor, void *target)
1586 {
1587         struct inode *inode = in_file->f_dentry->d_inode;
1588         struct ll_inode_info *lli = ll_i2info(inode);
1589         struct lov_stripe_md *lsm = lli->lli_smd;
1590         struct ll_lock_tree tree;
1591         struct ll_lock_tree_node *node;
1592         struct ost_lvb lvb;
1593         struct ll_ra_read bead;
1594         int rc;
1595         ssize_t retval;
1596         __u64 kms;
1597         ENTRY;
1598         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1599                inode->i_ino, inode->i_generation, inode, count, *ppos);
1600
1601         /* "If nbyte is 0, read() will return 0 and have no other results."
1602          *                      -- Single Unix Spec */
1603         if (count == 0)
1604                 RETURN(0);
1605
1606         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1607         /* turn off the kernel's read-ahead */
1608         in_file->f_ra.ra_pages = 0;
1609
1610         /* File with no objects, nothing to lock */
1611         if (!lsm)
1612                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1613
1614         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1615         if (IS_ERR(node))
1616                 RETURN(PTR_ERR(node));
1617
1618         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1619         rc = ll_tree_lock(&tree, node, NULL, count,
1620                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1621         if (rc != 0)
1622                 RETURN(rc);
1623
1624         ll_inode_size_lock(inode, 1);
1625         /*
1626          * Consistency guarantees: following possibilities exist for the
1627          * relation between region being read and real file size at this
1628          * moment:
1629          *
1630          *  (A): the region is completely inside of the file;
1631          *
1632          *  (B-x): x bytes of region are inside of the file, the rest is
1633          *  outside;
1634          *
1635          *  (C): the region is completely outside of the file.
1636          *
1637          * This classification is stable under DLM lock acquired by
1638          * ll_tree_lock() above, because to change class, other client has to
1639          * take DLM lock conflicting with our lock. Also, any updates to
1640          * ->i_size by other threads on this client are serialized by
1641          * ll_inode_size_lock(). This guarantees that short reads are handled
1642          * correctly in the face of concurrent writes and truncates.
1643          */
1644         inode_init_lvb(inode, &lvb);
1645         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1646         kms = lvb.lvb_size;
1647         if (*ppos + count - 1 > kms) {
1648                 /* A glimpse is necessary to determine whether we return a
1649                  * short read (B) or some zeroes at the end of the buffer (C) */
1650                 ll_inode_size_unlock(inode, 1);
1651                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1652                 if (retval)
1653                         goto out;
1654         } else {
1655                 /* region is within kms and, hence, within real file size (A) */
1656                 i_size_write(inode, kms);
1657                 ll_inode_size_unlock(inode, 1);
1658         }
1659
1660         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1661                inode->i_ino, count, *ppos, i_size_read(inode));
1662
1663         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1664         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1665         ll_ra_read_in(in_file, &bead);
1666         /* BUG: 5972 */
1667         file_accessed(in_file);
1668         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1669         ll_ra_read_ex(in_file, &bead);
1670
1671  out:
1672         ll_tree_unlock(&tree);
1673         RETURN(retval);
1674 }
1675
1676 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1677                                unsigned long arg)
1678 {
1679         struct ll_inode_info *lli = ll_i2info(inode);
1680         struct obd_export *exp = ll_i2dtexp(inode);
1681         struct ll_recreate_obj ucreatp;
1682         struct obd_trans_info oti = { 0 };
1683         struct obdo *oa = NULL;
1684         int lsm_size;
1685         int rc = 0;
1686         struct lov_stripe_md *lsm, *lsm2;
1687         ENTRY;
1688
1689         if (!capable (CAP_SYS_ADMIN))
1690                 RETURN(-EPERM);
1691
1692         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1693                             sizeof(struct ll_recreate_obj));
1694         if (rc) {
1695                 RETURN(-EFAULT);
1696         }
1697         OBDO_ALLOC(oa);
1698         if (oa == NULL)
1699                 RETURN(-ENOMEM);
1700
1701         down(&lli->lli_size_sem);
1702         lsm = lli->lli_smd;
1703         if (lsm == NULL)
1704                 GOTO(out, rc = -ENOENT);
1705         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1706                    (lsm->lsm_stripe_count));
1707
1708         OBD_ALLOC(lsm2, lsm_size);
1709         if (lsm2 == NULL)
1710                 GOTO(out, rc = -ENOMEM);
1711
1712         oa->o_id = ucreatp.lrc_id;
1713         oa->o_gr = ucreatp.lrc_group;
1714         oa->o_nlink = ucreatp.lrc_ost_idx;
1715         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1716         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1717         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1718                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1719
1720         oti.oti_objid = NULL;
1721         memcpy(lsm2, lsm, lsm_size);
1722         rc = obd_create(exp, oa, &lsm2, &oti);
1723
1724         OBD_FREE(lsm2, lsm_size);
1725         GOTO(out, rc);
1726 out:
1727         up(&lli->lli_size_sem);
1728         OBDO_FREE(oa);
1729         return rc;
1730 }
1731
1732 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1733                              int flags, struct lov_user_md *lum, int lum_size)
1734 {
1735         struct ll_inode_info *lli = ll_i2info(inode);
1736         struct lov_stripe_md *lsm;
1737         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1738         int rc = 0;
1739         ENTRY;
1740
1741         down(&lli->lli_size_sem);
1742         lsm = lli->lli_smd;
1743         if (lsm) {
1744                 up(&lli->lli_size_sem);
1745                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1746                        inode->i_ino);
1747                 RETURN(-EEXIST);
1748         }
1749
1750         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1751         if (rc)
1752                 GOTO(out, rc);
1753         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1754                 GOTO(out_req_free, rc = -ENOENT);
1755         rc = oit.d.lustre.it_status;
1756         if (rc < 0)
1757                 GOTO(out_req_free, rc);
1758
1759         ll_release_openhandle(file->f_dentry, &oit);
1760
1761  out:
1762         up(&lli->lli_size_sem);
1763         ll_intent_release(&oit);
1764         RETURN(rc);
1765 out_req_free:
1766         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1767         goto out;
1768 }
1769
1770 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1771                              struct lov_mds_md **lmmp, int *lmm_size, 
1772                              struct ptlrpc_request **request)
1773 {
1774         struct ll_sb_info *sbi = ll_i2sbi(inode);
1775         struct mdt_body  *body;
1776         struct lov_mds_md *lmm = NULL;
1777         struct ptlrpc_request *req = NULL;
1778         struct obd_capa *oc;
1779         int rc, lmmsize;
1780
1781         rc = ll_get_max_mdsize(sbi, &lmmsize);
1782         if (rc)
1783                 RETURN(rc);
1784
1785         oc = ll_mdscapa_get(inode);
1786         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1787                              oc, filename, strlen(filename) + 1,
1788                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize, &req);
1789         capa_put(oc);
1790         if (rc < 0) {
1791                 CDEBUG(D_INFO, "md_getattr_name failed "
1792                        "on %s: rc %d\n", filename, rc);
1793                 GOTO(out, rc);
1794         }
1795
1796         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
1797         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1798         /* swabbed by mdc_getattr_name */
1799         LASSERT_REPSWABBED(req, REPLY_REC_OFF);
1800
1801         lmmsize = body->eadatasize;
1802
1803         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1804                         lmmsize == 0) {
1805                 GOTO(out, rc = -ENODATA);
1806         }
1807
1808         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, lmmsize);
1809         LASSERT(lmm != NULL);
1810         LASSERT_REPSWABBED(req, REPLY_REC_OFF + 1);
1811
1812         /*
1813          * This is coming from the MDS, so is probably in
1814          * little endian.  We convert it to host endian before
1815          * passing it to userspace.
1816          */
1817         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1818                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1819                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1820         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1821                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1822         }
1823
1824         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1825                 struct lov_stripe_md *lsm;
1826                 struct lov_user_md_join *lmj;
1827                 int lmj_size, i, aindex = 0;
1828
1829                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1830                 if (rc < 0)
1831                         GOTO(out, rc = -ENOMEM);
1832                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1833                 if (rc)
1834                         GOTO(out_free_memmd, rc);
1835
1836                 lmj_size = sizeof(struct lov_user_md_join) +
1837                            lsm->lsm_stripe_count *
1838                            sizeof(struct lov_user_ost_data_join);
1839                 OBD_ALLOC(lmj, lmj_size);
1840                 if (!lmj)
1841                         GOTO(out_free_memmd, rc = -ENOMEM);
1842
1843                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1844                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1845                         struct lov_extent *lex =
1846                                 &lsm->lsm_array->lai_ext_array[aindex];
1847
1848                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1849                                 aindex ++;
1850                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1851                                         LPU64" len %d\n", aindex, i,
1852                                         lex->le_start, (int)lex->le_len);
1853                         lmj->lmm_objects[i].l_extent_start =
1854                                 lex->le_start;
1855
1856                         if ((int)lex->le_len == -1)
1857                                 lmj->lmm_objects[i].l_extent_end = -1;
1858                         else
1859                                 lmj->lmm_objects[i].l_extent_end =
1860                                         lex->le_start + lex->le_len;
1861                         lmj->lmm_objects[i].l_object_id =
1862                                 lsm->lsm_oinfo[i]->loi_id;
1863                         lmj->lmm_objects[i].l_object_gr =
1864                                 lsm->lsm_oinfo[i]->loi_gr;
1865                         lmj->lmm_objects[i].l_ost_gen =
1866                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1867                         lmj->lmm_objects[i].l_ost_idx =
1868                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1869                 }
1870                 lmm = (struct lov_mds_md *)lmj;
1871                 lmmsize = lmj_size;
1872 out_free_memmd:
1873                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
1874         }
1875 out:
1876         *lmmp = lmm;
1877         *lmm_size = lmmsize;
1878         *request = req;
1879         return rc;
1880 }
1881
1882 static int ll_lov_setea(struct inode *inode, struct file *file,
1883                             unsigned long arg)
1884 {
1885         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1886         struct lov_user_md  *lump;
1887         int lum_size = sizeof(struct lov_user_md) +
1888                        sizeof(struct lov_user_ost_data);
1889         int rc;
1890         ENTRY;
1891
1892         if (!capable (CAP_SYS_ADMIN))
1893                 RETURN(-EPERM);
1894
1895         OBD_ALLOC(lump, lum_size);
1896         if (lump == NULL) {
1897                 RETURN(-ENOMEM);
1898         }
1899         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1900         if (rc) {
1901                 OBD_FREE(lump, lum_size);
1902                 RETURN(-EFAULT);
1903         }
1904
1905         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1906
1907         OBD_FREE(lump, lum_size);
1908         RETURN(rc);
1909 }
1910
1911 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1912                             unsigned long arg)
1913 {
1914         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1915         int rc;
1916         int flags = FMODE_WRITE;
1917         ENTRY;
1918
1919         /* Bug 1152: copy properly when this is no longer true */
1920         LASSERT(sizeof(lum) == sizeof(*lump));
1921         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1922         rc = copy_from_user(&lum, lump, sizeof(lum));
1923         if (rc)
1924                 RETURN(-EFAULT);
1925
1926         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1927         if (rc == 0) {
1928                  put_user(0, &lump->lmm_stripe_count);
1929                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1930                                     0, ll_i2info(inode)->lli_smd, lump);
1931         }
1932         RETURN(rc);
1933 }
1934
1935 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1936 {
1937         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1938
1939         if (!lsm)
1940                 RETURN(-ENODATA);
1941
1942         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1943                             (void *)arg);
1944 }
1945
1946 static int ll_get_grouplock(struct inode *inode, struct file *file,
1947                             unsigned long arg)
1948 {
1949         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1950         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1951                                                     .end = OBD_OBJECT_EOF}};
1952         struct lustre_handle lockh = { 0 };
1953         struct ll_inode_info *lli = ll_i2info(inode);
1954         struct lov_stripe_md *lsm = lli->lli_smd;
1955         int flags = 0, rc;
1956         ENTRY;
1957
1958         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1959                 RETURN(-EINVAL);
1960         }
1961
1962         policy.l_extent.gid = arg;
1963         if (file->f_flags & O_NONBLOCK)
1964                 flags = LDLM_FL_BLOCK_NOWAIT;
1965
1966         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1967         if (rc)
1968                 RETURN(rc);
1969
1970         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1971         fd->fd_gid = arg;
1972         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1973
1974         RETURN(0);
1975 }
1976
1977 static int ll_put_grouplock(struct inode *inode, struct file *file,
1978                             unsigned long arg)
1979 {
1980         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1981         struct ll_inode_info *lli = ll_i2info(inode);
1982         struct lov_stripe_md *lsm = lli->lli_smd;
1983         int rc;
1984         ENTRY;
1985
1986         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1987                 /* Ugh, it's already unlocked. */
1988                 RETURN(-EINVAL);
1989         }
1990
1991         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1992                 RETURN(-EINVAL);
1993
1994         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1995
1996         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1997         if (rc)
1998                 RETURN(rc);
1999
2000         fd->fd_gid = 0;
2001         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2002
2003         RETURN(0);
2004 }
2005
2006 static int join_sanity_check(struct inode *head, struct inode *tail)
2007 {
2008         ENTRY;
2009         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2010                 CERROR("server do not support join \n");
2011                 RETURN(-EINVAL);
2012         }
2013         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2014                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2015                        head->i_ino, tail->i_ino);
2016                 RETURN(-EINVAL);
2017         }
2018         if (head->i_ino == tail->i_ino) {
2019                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2020                 RETURN(-EINVAL);
2021         }
2022         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2023                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2024                 RETURN(-EINVAL);
2025         }
2026         RETURN(0);
2027 }
2028
2029 static int join_file(struct inode *head_inode, struct file *head_filp,
2030                      struct file *tail_filp)
2031 {
2032         struct dentry *tail_dentry = tail_filp->f_dentry;
2033         struct lookup_intent oit = {.it_op = IT_OPEN,
2034                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2035         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2036                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2037
2038         struct lustre_handle lockh;
2039         struct md_op_data *op_data;
2040         int    rc;
2041         loff_t data;
2042         ENTRY;
2043
2044         tail_dentry = tail_filp->f_dentry;
2045
2046         data = i_size_read(head_inode);
2047         op_data = ll_prep_md_op_data(NULL, head_inode,
2048                                      tail_dentry->d_parent->d_inode,
2049                                      tail_dentry->d_name.name,
2050                                      tail_dentry->d_name.len, 0,
2051                                      LUSTRE_OPC_ANY, &data);
2052         if (IS_ERR(op_data))
2053                 RETURN(PTR_ERR(op_data));
2054
2055         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2056                          op_data, &lockh, NULL, 0, 0);
2057
2058         ll_finish_md_op_data(op_data);
2059         if (rc < 0)
2060                 GOTO(out, rc);
2061
2062         rc = oit.d.lustre.it_status;
2063
2064         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2065                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2066                 ptlrpc_req_finished((struct ptlrpc_request *)
2067                                     oit.d.lustre.it_data);
2068                 GOTO(out, rc);
2069         }
2070
2071         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2072                                            * away */
2073                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2074                 oit.d.lustre.it_lock_mode = 0;
2075         }
2076         ll_release_openhandle(head_filp->f_dentry, &oit);
2077 out:
2078         ll_intent_release(&oit);
2079         RETURN(rc);
2080 }
2081
2082 static int ll_file_join(struct inode *head, struct file *filp,
2083                         char *filename_tail)
2084 {
2085         struct inode *tail = NULL, *first = NULL, *second = NULL;
2086         struct dentry *tail_dentry;
2087         struct file *tail_filp, *first_filp, *second_filp;
2088         struct ll_lock_tree first_tree, second_tree;
2089         struct ll_lock_tree_node *first_node, *second_node;
2090         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2091         int rc = 0, cleanup_phase = 0;
2092         ENTRY;
2093
2094         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2095                head->i_ino, head->i_generation, head, filename_tail);
2096
2097         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2098         if (IS_ERR(tail_filp)) {
2099                 CERROR("Can not open tail file %s", filename_tail);
2100                 rc = PTR_ERR(tail_filp);
2101                 GOTO(cleanup, rc);
2102         }
2103         tail = igrab(tail_filp->f_dentry->d_inode);
2104
2105         tlli = ll_i2info(tail);
2106         tail_dentry = tail_filp->f_dentry;
2107         LASSERT(tail_dentry);
2108         cleanup_phase = 1;
2109
2110         /*reorder the inode for lock sequence*/
2111         first = head->i_ino > tail->i_ino ? head : tail;
2112         second = head->i_ino > tail->i_ino ? tail : head;
2113         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2114         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2115
2116         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2117                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2118         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2119         if (IS_ERR(first_node)){
2120                 rc = PTR_ERR(first_node);
2121                 GOTO(cleanup, rc);
2122         }
2123         first_tree.lt_fd = first_filp->private_data;
2124         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2125         if (rc != 0)
2126                 GOTO(cleanup, rc);
2127         cleanup_phase = 2;
2128
2129         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2130         if (IS_ERR(second_node)){
2131                 rc = PTR_ERR(second_node);
2132                 GOTO(cleanup, rc);
2133         }
2134         second_tree.lt_fd = second_filp->private_data;
2135         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2136         if (rc != 0)
2137                 GOTO(cleanup, rc);
2138         cleanup_phase = 3;
2139
2140         rc = join_sanity_check(head, tail);
2141         if (rc)
2142                 GOTO(cleanup, rc);
2143
2144         rc = join_file(head, filp, tail_filp);
2145         if (rc)
2146                 GOTO(cleanup, rc);
2147 cleanup:
2148         switch (cleanup_phase) {
2149         case 3:
2150                 ll_tree_unlock(&second_tree);
2151                 obd_cancel_unused(ll_i2dtexp(second),
2152                                   ll_i2info(second)->lli_smd, 0, NULL);
2153         case 2:
2154                 ll_tree_unlock(&first_tree);
2155                 obd_cancel_unused(ll_i2dtexp(first),
2156                                   ll_i2info(first)->lli_smd, 0, NULL);
2157         case 1:
2158                 filp_close(tail_filp, 0);
2159                 if (tail)
2160                         iput(tail);
2161                 if (head && rc == 0) {
2162                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2163                                        &hlli->lli_smd);
2164                         hlli->lli_smd = NULL;
2165                 }
2166         case 0:
2167                 break;
2168         default:
2169                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2170                 LBUG();
2171         }
2172         RETURN(rc);
2173 }
2174
2175 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2176 {
2177         struct inode *inode = dentry->d_inode;
2178         struct obd_client_handle *och;
2179         int rc;
2180         ENTRY;
2181
2182         LASSERT(inode);
2183
2184         /* Root ? Do nothing. */
2185         if (dentry->d_inode->i_sb->s_root == dentry)
2186                 RETURN(0);
2187
2188         /* No open handle to close? Move away */
2189         if (!it_disposition(it, DISP_OPEN_OPEN))
2190                 RETURN(0);
2191
2192         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2193
2194         OBD_ALLOC(och, sizeof(*och));
2195         if (!och)
2196                 GOTO(out, rc = -ENOMEM);
2197
2198         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2199                     ll_i2info(inode), it, och);
2200
2201         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2202                                        inode, och);
2203  out:
2204         /* this one is in place of ll_file_open */
2205         ptlrpc_req_finished(it->d.lustre.it_data);
2206         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2207         RETURN(rc);
2208 }
2209
2210 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2211                   unsigned long arg)
2212 {
2213         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2214         int flags;
2215         ENTRY;
2216
2217         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2218                inode->i_generation, inode, cmd);
2219         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2220
2221         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2222         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2223                 RETURN(-ENOTTY);
2224
2225         switch(cmd) {
2226         case LL_IOC_GETFLAGS:
2227                 /* Get the current value of the file flags */
2228                 return put_user(fd->fd_flags, (int *)arg);
2229         case LL_IOC_SETFLAGS:
2230         case LL_IOC_CLRFLAGS:
2231                 /* Set or clear specific file flags */
2232                 /* XXX This probably needs checks to ensure the flags are
2233                  *     not abused, and to handle any flag side effects.
2234                  */
2235                 if (get_user(flags, (int *) arg))
2236                         RETURN(-EFAULT);
2237
2238                 if (cmd == LL_IOC_SETFLAGS) {
2239                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2240                             !(file->f_flags & O_DIRECT)) {
2241                                 CERROR("%s: unable to disable locking on "
2242                                        "non-O_DIRECT file\n", current->comm);
2243                                 RETURN(-EINVAL);
2244                         }
2245
2246                         fd->fd_flags |= flags;
2247                 } else {
2248                         fd->fd_flags &= ~flags;
2249                 }
2250                 RETURN(0);
2251         case LL_IOC_LOV_SETSTRIPE:
2252                 RETURN(ll_lov_setstripe(inode, file, arg));
2253         case LL_IOC_LOV_SETEA:
2254                 RETURN(ll_lov_setea(inode, file, arg));
2255         case LL_IOC_LOV_GETSTRIPE:
2256                 RETURN(ll_lov_getstripe(inode, arg));
2257         case LL_IOC_RECREATE_OBJ:
2258                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2259         case EXT3_IOC_GETFLAGS:
2260         case EXT3_IOC_SETFLAGS:
2261                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2262         case EXT3_IOC_GETVERSION_OLD:
2263         case EXT3_IOC_GETVERSION:
2264                 RETURN(put_user(inode->i_generation, (int *)arg));
2265         case LL_IOC_JOIN: {
2266                 char *ftail;
2267                 int rc;
2268
2269                 ftail = getname((const char *)arg);
2270                 if (IS_ERR(ftail))
2271                         RETURN(PTR_ERR(ftail));
2272                 rc = ll_file_join(inode, file, ftail);
2273                 putname(ftail);
2274                 RETURN(rc);
2275         }
2276         case LL_IOC_GROUP_LOCK:
2277                 RETURN(ll_get_grouplock(inode, file, arg));
2278         case LL_IOC_GROUP_UNLOCK:
2279                 RETURN(ll_put_grouplock(inode, file, arg));
2280         case IOC_OBD_STATFS:
2281                 RETURN(ll_obd_statfs(inode, (void *)arg));
2282
2283         /* We need to special case any other ioctls we want to handle,
2284          * to send them to the MDS/OST as appropriate and to properly
2285          * network encode the arg field.
2286         case EXT3_IOC_SETVERSION_OLD:
2287         case EXT3_IOC_SETVERSION:
2288         */
2289         case LL_IOC_FLUSHCTX:
2290                 RETURN(ll_flush_ctx(inode));
2291         case LL_IOC_GETFACL: {
2292                 struct rmtacl_ioctl_data ioc;
2293
2294                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2295                         RETURN(-EFAULT);
2296
2297                 RETURN(ll_ioctl_getfacl(inode, &ioc));
2298         }
2299         case LL_IOC_SETFACL: {
2300                 struct rmtacl_ioctl_data ioc;
2301
2302                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2303                         RETURN(-EFAULT);
2304
2305                 RETURN(ll_ioctl_setfacl(inode, &ioc));
2306         }
2307         default: {
2308                 int err;
2309
2310                 if (LLIOC_STOP == 
2311                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2312                         RETURN(err);
2313
2314                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2315                                      (void *)arg));
2316         }
2317         }
2318 }
2319
2320 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2321 {
2322         struct inode *inode = file->f_dentry->d_inode;
2323         struct ll_inode_info *lli = ll_i2info(inode);
2324         struct lov_stripe_md *lsm = lli->lli_smd;
2325         loff_t retval;
2326         ENTRY;
2327         retval = offset + ((origin == 2) ? i_size_read(inode) :
2328                            (origin == 1) ? file->f_pos : 0);
2329         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2330                inode->i_ino, inode->i_generation, inode, retval, retval,
2331                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2332         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2333
2334         if (origin == 2) { /* SEEK_END */
2335                 int nonblock = 0, rc;
2336
2337                 if (file->f_flags & O_NONBLOCK)
2338                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2339
2340                 if (lsm != NULL) {
2341                         rc = ll_glimpse_size(inode, nonblock);
2342                         if (rc != 0)
2343                                 RETURN(rc);
2344                 }
2345
2346                 ll_inode_size_lock(inode, 0);
2347                 offset += i_size_read(inode);
2348                 ll_inode_size_unlock(inode, 0);
2349         } else if (origin == 1) { /* SEEK_CUR */
2350                 offset += file->f_pos;
2351         }
2352
2353         retval = -EINVAL;
2354         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2355                 if (offset != file->f_pos) {
2356                         file->f_pos = offset;
2357 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2358                         file->f_reada = 0;
2359                         file->f_version = ++event;
2360 #endif
2361                 }
2362                 retval = offset;
2363         }
2364         
2365         RETURN(retval);
2366 }
2367
2368 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2369 {
2370         struct inode *inode = dentry->d_inode;
2371         struct ll_inode_info *lli = ll_i2info(inode);
2372         struct lov_stripe_md *lsm = lli->lli_smd;
2373         struct ptlrpc_request *req;
2374         struct obd_capa *oc;
2375         int rc, err;
2376         ENTRY;
2377         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2378                inode->i_generation, inode);
2379         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2380
2381         /* fsync's caller has already called _fdata{sync,write}, we want
2382          * that IO to finish before calling the osc and mdc sync methods */
2383         rc = filemap_fdatawait(inode->i_mapping);
2384
2385         /* catch async errors that were recorded back when async writeback
2386          * failed for pages in this mapping. */
2387         err = lli->lli_async_rc;
2388         lli->lli_async_rc = 0;
2389         if (rc == 0)
2390                 rc = err;
2391         if (lsm) {
2392                 err = lov_test_and_clear_async_rc(lsm);
2393                 if (rc == 0)
2394                         rc = err;
2395         }
2396
2397         oc = ll_mdscapa_get(inode);
2398         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2399                       &req);
2400         capa_put(oc);
2401         if (!rc)
2402                 rc = err;
2403         if (!err)
2404                 ptlrpc_req_finished(req);
2405
2406         if (data && lsm) {
2407                 struct obdo *oa;
2408                 
2409                 OBDO_ALLOC(oa);
2410                 if (!oa)
2411                         RETURN(rc ? rc : -ENOMEM);
2412
2413                 oa->o_id = lsm->lsm_object_id;
2414                 oa->o_gr = lsm->lsm_object_gr;
2415                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2416                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2417                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2418                                            OBD_MD_FLGROUP);
2419
2420                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2421                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2422                                0, OBD_OBJECT_EOF, oc);
2423                 capa_put(oc);
2424                 if (!rc)
2425                         rc = err;
2426                 OBDO_FREE(oa);
2427         }
2428
2429         RETURN(rc);
2430 }
2431
2432 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2433 {
2434         struct inode *inode = file->f_dentry->d_inode;
2435         struct ll_sb_info *sbi = ll_i2sbi(inode);
2436         struct ldlm_res_id res_id =
2437                 { .name = { fid_seq(ll_inode2fid(inode)),
2438                             fid_oid(ll_inode2fid(inode)),
2439                             fid_ver(ll_inode2fid(inode)),
2440                             LDLM_FLOCK} };
2441         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2442                 ldlm_flock_completion_ast, NULL, file_lock };
2443         struct lustre_handle lockh = {0};
2444         ldlm_policy_data_t flock;
2445         int flags = 0;
2446         int rc;
2447         ENTRY;
2448
2449         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2450                inode->i_ino, file_lock);
2451
2452         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2453  
2454         if (file_lock->fl_flags & FL_FLOCK) {
2455                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2456                 /* set missing params for flock() calls */
2457                 file_lock->fl_end = OFFSET_MAX;
2458                 file_lock->fl_pid = current->tgid;
2459         }
2460         flock.l_flock.pid = file_lock->fl_pid;
2461         flock.l_flock.start = file_lock->fl_start;
2462         flock.l_flock.end = file_lock->fl_end;
2463
2464         switch (file_lock->fl_type) {
2465         case F_RDLCK:
2466                 einfo.ei_mode = LCK_PR;
2467                 break;
2468         case F_UNLCK:
2469                 /* An unlock request may or may not have any relation to
2470                  * existing locks so we may not be able to pass a lock handle
2471                  * via a normal ldlm_lock_cancel() request. The request may even
2472                  * unlock a byte range in the middle of an existing lock. In
2473                  * order to process an unlock request we need all of the same
2474                  * information that is given with a normal read or write record
2475                  * lock request. To avoid creating another ldlm unlock (cancel)
2476                  * message we'll treat a LCK_NL flock request as an unlock. */
2477                 einfo.ei_mode = LCK_NL;
2478                 break;
2479         case F_WRLCK:
2480                 einfo.ei_mode = LCK_PW;
2481                 break;
2482         default:
2483                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2484                 LBUG();
2485         }
2486
2487         switch (cmd) {
2488         case F_SETLKW:
2489 #ifdef F_SETLKW64
2490         case F_SETLKW64:
2491 #endif
2492                 flags = 0;
2493                 break;
2494         case F_SETLK:
2495 #ifdef F_SETLK64
2496         case F_SETLK64:
2497 #endif
2498                 flags = LDLM_FL_BLOCK_NOWAIT;
2499                 break;
2500         case F_GETLK:
2501 #ifdef F_GETLK64
2502         case F_GETLK64:
2503 #endif
2504                 flags = LDLM_FL_TEST_LOCK;
2505                 /* Save the old mode so that if the mode in the lock changes we
2506                  * can decrement the appropriate reader or writer refcount. */
2507                 file_lock->fl_type = einfo.ei_mode;
2508                 break;
2509         default:
2510                 CERROR("unknown fcntl lock command: %d\n", cmd);
2511                 LBUG();
2512         }
2513
2514         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2515                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2516                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2517
2518         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2519                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2520         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2521                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2522 #ifdef HAVE_F_OP_FLOCK
2523         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2524             !(flags & LDLM_FL_TEST_LOCK))
2525                 posix_lock_file_wait(file, file_lock);
2526 #endif
2527
2528         RETURN(rc);
2529 }
2530
2531 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2532 {
2533         ENTRY;
2534
2535         RETURN(-ENOSYS);
2536 }
2537
2538 int ll_have_md_lock(struct inode *inode, __u64 bits)
2539 {
2540         struct lustre_handle lockh;
2541         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2542         struct lu_fid *fid;
2543         int flags;
2544         ENTRY;
2545
2546         if (!inode)
2547                RETURN(0);
2548
2549         fid = &ll_i2info(inode)->lli_fid;
2550         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2551
2552         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2553         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2554                           LCK_CR|LCK_CW|LCK_PR, &lockh)) {
2555                 RETURN(1);
2556         }
2557
2558         RETURN(0);
2559 }
2560
2561 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2562         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2563                               * and return success */
2564                 inode->i_nlink = 0;
2565                 /* This path cannot be hit for regular files unless in
2566                  * case of obscure races, so no need to to validate
2567                  * size. */
2568                 if (!S_ISREG(inode->i_mode) &&
2569                     !S_ISDIR(inode->i_mode))
2570                         return 0;
2571         }
2572
2573         if (rc) {
2574                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2575                 return -abs(rc);
2576
2577         }
2578
2579         return 0;
2580 }
2581
2582 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2583 {
2584         struct inode *inode = dentry->d_inode;
2585         struct ptlrpc_request *req = NULL;
2586         struct ll_sb_info *sbi;
2587         struct obd_export *exp;
2588         int rc;
2589         ENTRY;
2590
2591         if (!inode) {
2592                 CERROR("REPORT THIS LINE TO PETER\n");
2593                 RETURN(0);
2594         }
2595         sbi = ll_i2sbi(inode);
2596
2597         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2598                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2599
2600         exp = ll_i2mdexp(inode);
2601
2602         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2603                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2604                 struct md_op_data *op_data;
2605
2606                 /* Call getattr by fid, so do not provide name at all. */
2607                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2608                                              dentry->d_inode, NULL, 0, 0,
2609                                              LUSTRE_OPC_ANY, NULL);
2610                 if (IS_ERR(op_data))
2611                         RETURN(PTR_ERR(op_data));
2612
2613                 oit.it_flags |= O_CHECK_STALE;
2614                 rc = md_intent_lock(exp, op_data, NULL, 0,
2615                                     /* we are not interested in name
2616                                        based lookup */
2617                                     &oit, 0, &req,
2618                                     ll_md_blocking_ast, 0);
2619                 ll_finish_md_op_data(op_data);
2620                 oit.it_flags &= ~O_CHECK_STALE;
2621                 if (rc < 0) {
2622                         rc = ll_inode_revalidate_fini(inode, rc);
2623                         GOTO (out, rc);
2624                 }
2625
2626                 rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2627                 if (rc != 0) {
2628                         ll_intent_release(&oit);
2629                         GOTO(out, rc);
2630                 }
2631
2632                 /* Unlinked? Unhash dentry, so it is not picked up later by
2633                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2634                    here to preserve get_cwd functionality on 2.6.
2635                    Bug 10503 */
2636                 if (!dentry->d_inode->i_nlink) {
2637                         spin_lock(&dcache_lock);
2638                         ll_drop_dentry(dentry);
2639                         spin_unlock(&dcache_lock);
2640                 }
2641
2642                 ll_lookup_finish_locks(&oit, dentry);
2643         } else if (!ll_have_md_lock(dentry->d_inode,
2644                                     MDS_INODELOCK_UPDATE)) {
2645                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2646                 obd_valid valid = OBD_MD_FLGETATTR;
2647                 struct obd_capa *oc;
2648                 int ealen = 0;
2649
2650                 if (S_ISREG(inode->i_mode)) {
2651                         rc = ll_get_max_mdsize(sbi, &ealen);
2652                         if (rc)
2653                                 RETURN(rc);
2654                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2655                 }
2656                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2657                  * capa for this inode. Because we only keep capas of dirs
2658                  * fresh. */
2659                 oc = ll_mdscapa_get(inode);
2660                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2661                                 ealen, &req);
2662                 capa_put(oc);
2663                 if (rc) {
2664                         rc = ll_inode_revalidate_fini(inode, rc);
2665                         RETURN(rc);
2666                 }
2667
2668                 rc = ll_prep_inode(&inode, req, REPLY_REC_OFF,
2669                                    NULL);
2670                 if (rc)
2671                         GOTO(out, rc);
2672         }
2673
2674         /* if object not yet allocated, don't validate size */
2675         if (ll_i2info(inode)->lli_smd == NULL)
2676                 GOTO(out, rc = 0);
2677
2678         /* ll_glimpse_size will prefer locally cached writes if they extend
2679          * the file */
2680         rc = ll_glimpse_size(inode, 0);
2681         EXIT;
2682 out:
2683         ptlrpc_req_finished(req);
2684         return rc;
2685 }
2686
2687 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2688                   struct lookup_intent *it, struct kstat *stat)
2689 {
2690         struct inode *inode = de->d_inode;
2691         int res = 0;
2692
2693         res = ll_inode_revalidate_it(de, it);
2694         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2695
2696         if (res)
2697                 return res;
2698
2699         stat->dev = inode->i_sb->s_dev;
2700         stat->ino = inode->i_ino;
2701         stat->mode = inode->i_mode;
2702         stat->nlink = inode->i_nlink;
2703         stat->uid = inode->i_uid;
2704         stat->gid = inode->i_gid;
2705         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2706         stat->atime = inode->i_atime;
2707         stat->mtime = inode->i_mtime;
2708         stat->ctime = inode->i_ctime;
2709 #ifdef HAVE_INODE_BLKSIZE
2710         stat->blksize = inode->i_blksize;
2711 #else
2712         stat->blksize = 1 << inode->i_blkbits;
2713 #endif
2714
2715         ll_inode_size_lock(inode, 0);
2716         stat->size = i_size_read(inode);
2717         stat->blocks = inode->i_blocks;
2718         ll_inode_size_unlock(inode, 0);
2719
2720         return 0;
2721 }
2722 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2723 {
2724         struct lookup_intent it = { .it_op = IT_GETATTR };
2725
2726         return ll_getattr_it(mnt, de, &it, stat);
2727 }
2728
2729 static
2730 int lustre_check_acl(struct inode *inode, int mask)
2731 {
2732 #ifdef CONFIG_FS_POSIX_ACL
2733         struct ll_inode_info *lli = ll_i2info(inode);
2734         struct posix_acl *acl;
2735         int rc;
2736         ENTRY;
2737
2738         spin_lock(&lli->lli_lock);
2739         acl = posix_acl_dup(lli->lli_posix_acl);
2740         spin_unlock(&lli->lli_lock);
2741
2742         if (!acl)
2743                 RETURN(-EAGAIN);
2744
2745         rc = posix_acl_permission(inode, acl, mask);
2746         posix_acl_release(acl);
2747
2748         RETURN(rc);
2749 #else
2750         return -EAGAIN;
2751 #endif
2752 }
2753
2754 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2755 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2756 {
2757         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2758                inode->i_ino, inode->i_generation, inode, mask);
2759         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2760                 return lustre_check_remote_perm(inode, mask);
2761         
2762         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2763         return generic_permission(inode, mask, lustre_check_acl);
2764 }
2765 #else
2766 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2767 {
2768         int mode = inode->i_mode;
2769         int rc;
2770
2771         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2772                inode->i_ino, inode->i_generation, inode, mask);
2773
2774         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2775                 return lustre_check_remote_perm(inode, mask);
2776
2777         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2778
2779         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2780             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2781                 return -EROFS;
2782         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2783                 return -EACCES;
2784         if (current->fsuid == inode->i_uid) {
2785                 mode >>= 6;
2786         } else if (1) {
2787                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2788                         goto check_groups;
2789                 rc = lustre_check_acl(inode, mask);
2790                 if (rc == -EAGAIN)
2791                         goto check_groups;
2792                 if (rc == -EACCES)
2793                         goto check_capabilities;
2794                 return rc;
2795         } else {
2796 check_groups:
2797                 if (in_group_p(inode->i_gid))
2798                         mode >>= 3;
2799         }
2800         if ((mode & mask & S_IRWXO) == mask)
2801                 return 0;
2802
2803 check_capabilities:
2804         if (!(mask & MAY_EXEC) ||
2805             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2806                 if (capable(CAP_DAC_OVERRIDE))
2807                         return 0;
2808
2809         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2810             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2811                 return 0;
2812         
2813         return -EACCES;
2814 }
2815 #endif
2816
2817 /* -o localflock - only provides locally consistent flock locks */
2818 struct file_operations ll_file_operations = {
2819         .read           = ll_file_read,
2820         .write          = ll_file_write,
2821         .ioctl          = ll_file_ioctl,
2822         .open           = ll_file_open,
2823         .release        = ll_file_release,
2824         .mmap           = ll_file_mmap,
2825         .llseek         = ll_file_seek,
2826         .sendfile       = ll_file_sendfile,
2827         .fsync          = ll_fsync,
2828 };
2829
2830 struct file_operations ll_file_operations_flock = {
2831         .read           = ll_file_read,
2832         .write          = ll_file_write,
2833         .ioctl          = ll_file_ioctl,
2834         .open           = ll_file_open,
2835         .release        = ll_file_release,
2836         .mmap           = ll_file_mmap,
2837         .llseek         = ll_file_seek,
2838         .sendfile       = ll_file_sendfile,
2839         .fsync          = ll_fsync,
2840 #ifdef HAVE_F_OP_FLOCK
2841         .flock          = ll_file_flock,
2842 #endif
2843         .lock           = ll_file_flock
2844 };
2845
2846 /* These are for -o noflock - to return ENOSYS on flock calls */
2847 struct file_operations ll_file_operations_noflock = {
2848         .read           = ll_file_read,
2849         .write          = ll_file_write,
2850         .ioctl          = ll_file_ioctl,
2851         .open           = ll_file_open,
2852         .release        = ll_file_release,
2853         .mmap           = ll_file_mmap,
2854         .llseek         = ll_file_seek,
2855         .sendfile       = ll_file_sendfile,
2856         .fsync          = ll_fsync,
2857 #ifdef HAVE_F_OP_FLOCK
2858         .flock          = ll_file_noflock,
2859 #endif
2860         .lock           = ll_file_noflock
2861 };
2862
2863 struct inode_operations ll_file_inode_operations = {
2864 #ifdef HAVE_VFS_INTENT_PATCHES
2865         .setattr_raw    = ll_setattr_raw,
2866 #endif
2867         .setattr        = ll_setattr,
2868         .truncate       = ll_truncate,
2869         .getattr        = ll_getattr,
2870         .permission     = ll_inode_permission,
2871         .setxattr       = ll_setxattr,
2872         .getxattr       = ll_getxattr,
2873         .listxattr      = ll_listxattr,
2874         .removexattr    = ll_removexattr,
2875 };
2876
2877 /* dynamic ioctl number support routins */
2878 static struct llioc_ctl_data {
2879         struct rw_semaphore ioc_sem;
2880         struct list_head    ioc_head;
2881 } llioc = { 
2882         __RWSEM_INITIALIZER(llioc.ioc_sem), 
2883         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2884 };
2885
2886
2887 struct llioc_data {
2888         struct list_head        iocd_list;
2889         unsigned int            iocd_size;
2890         llioc_callback_t        iocd_cb;
2891         unsigned int            iocd_count;
2892         unsigned int            iocd_cmd[0];
2893 };
2894
2895 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2896 {
2897         unsigned int size;
2898         struct llioc_data *in_data = NULL;
2899         ENTRY;
2900
2901         if (cb == NULL || cmd == NULL ||
2902             count > LLIOC_MAX_CMD || count < 0)
2903                 RETURN(NULL);
2904
2905         size = sizeof(*in_data) + count * sizeof(unsigned int);
2906         OBD_ALLOC(in_data, size);
2907         if (in_data == NULL)
2908                 RETURN(NULL);
2909
2910         memset(in_data, 0, sizeof(*in_data));
2911         in_data->iocd_size = size;
2912         in_data->iocd_cb = cb;
2913         in_data->iocd_count = count;
2914         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2915
2916         down_write(&llioc.ioc_sem);
2917         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2918         up_write(&llioc.ioc_sem);
2919
2920         RETURN(in_data);
2921 }
2922
2923 void ll_iocontrol_unregister(void *magic)
2924 {
2925         struct llioc_data *tmp;
2926
2927         if (magic == NULL)
2928                 return;
2929
2930         down_write(&llioc.ioc_sem);
2931         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2932                 if (tmp == magic) {
2933                         unsigned int size = tmp->iocd_size;
2934
2935                         list_del(&tmp->iocd_list);
2936                         up_write(&llioc.ioc_sem);
2937
2938                         OBD_FREE(tmp, size);
2939                         return;
2940                 }
2941         }
2942         up_write(&llioc.ioc_sem);
2943
2944         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2945 }
2946
2947 EXPORT_SYMBOL(ll_iocontrol_register);
2948 EXPORT_SYMBOL(ll_iocontrol_unregister);
2949
2950 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
2951                         unsigned int cmd, unsigned long arg, int *rcp)
2952 {
2953         enum llioc_iter ret = LLIOC_CONT;
2954         struct llioc_data *data;
2955         int rc = -EINVAL, i;
2956
2957         down_read(&llioc.ioc_sem);
2958         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2959                 for (i = 0; i < data->iocd_count; i++) {
2960                         if (cmd != data->iocd_cmd[i]) 
2961                                 continue;
2962
2963                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2964                         break;
2965                 }
2966
2967                 if (ret == LLIOC_STOP)
2968                         break;
2969         }
2970         up_read(&llioc.ioc_sem);
2971
2972         if (rcp)
2973                 *rcp = rc;
2974         return ret;
2975 }