Whamcloud - gitweb
improve handling recoverable errors
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
32 #include <linux/lustre_compat25.h>
33 #endif
34 #include "llite_internal.h"
35
36 /* also used by llite/special.c:ll_special_open() */
37 struct ll_file_data *ll_file_data_get(void)
38 {
39         struct ll_file_data *fd;
40
41         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
42         return fd;
43 }
44
45 static void ll_file_data_put(struct ll_file_data *fd)
46 {
47         if (fd != NULL)
48                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
49 }
50
51 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
52                           struct lustre_handle *fh)
53 {
54         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
55         op_data->op_attr.ia_mode = inode->i_mode;
56         op_data->op_attr.ia_atime = inode->i_atime;
57         op_data->op_attr.ia_mtime = inode->i_mtime;
58         op_data->op_attr.ia_ctime = inode->i_ctime;
59         op_data->op_attr.ia_size = i_size_read(inode);
60         op_data->op_attr_blocks = inode->i_blocks;
61         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
62         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
63         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
64         op_data->op_capa1 = ll_mdscapa_get(inode);
65 }
66
67 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
68                              struct obd_client_handle *och)
69 {
70         ENTRY;
71
72         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
73                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
74
75         if (!(och->och_flags & FMODE_WRITE))
76                 goto out;
77
78         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
79             !S_ISREG(inode->i_mode))
80                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
81         else
82                 ll_epoch_close(inode, op_data, &och, 0);
83
84 out:
85         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
86         EXIT;
87 }
88
89 static int ll_close_inode_openhandle(struct obd_export *md_exp,
90                                      struct inode *inode,
91                                      struct obd_client_handle *och)
92 {
93         struct obd_export *exp = ll_i2mdexp(inode);
94         struct md_op_data *op_data;
95         struct ptlrpc_request *req = NULL;
96         struct obd_device *obd = class_exp2obd(exp);
97         int epoch_close = 1;
98         int seq_end = 0, rc;
99         ENTRY;
100
101         if (obd == NULL) {
102                 /*
103                  * XXX: in case of LMV, is this correct to access
104                  * ->exp_handle?
105                  */
106                 CERROR("Invalid MDC connection handle "LPX64"\n",
107                        ll_i2mdexp(inode)->exp_handle.h_cookie);
108                 GOTO(out, rc = 0);
109         }
110
111         /*
112          * here we check if this is forced umount. If so this is called on
113          * canceling "open lock" and we do not call md_close() in this case, as
114          * it will not be successful, as import is already deactivated.
115          */
116         if (obd->obd_force)
117                 GOTO(out, rc = 0);
118
119         OBD_ALLOC_PTR(op_data);
120         if (op_data == NULL)
121                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
122
123         ll_prepare_close(inode, op_data, och);
124         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
125         rc = md_close(md_exp, op_data, och->och_mod, &req);
126         if (rc != -EAGAIN)
127                 seq_end = 1;
128
129         if (rc == -EAGAIN) {
130                 /* This close must have the epoch closed. */
131                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
132                 LASSERT(epoch_close);
133                 /* MDS has instructed us to obtain Size-on-MDS attribute from
134                  * OSTs and send setattr to back to MDS. */
135                 rc = ll_sizeonmds_update(inode, och->och_mod,
136                                          &och->och_fh, op_data->op_ioepoch);
137                 if (rc) {
138                         CERROR("inode %lu mdc Size-on-MDS update failed: "
139                                "rc = %d\n", inode->i_ino, rc);
140                         rc = 0;
141                 }
142         } else if (rc) {
143                 CERROR("inode %lu mdc close failed: rc = %d\n",
144                        inode->i_ino, rc);
145         }
146         ll_finish_md_op_data(op_data);
147
148         if (rc == 0) {
149                 rc = ll_objects_destroy(req, inode);
150                 if (rc)
151                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
152                                inode->i_ino, rc);
153         }
154
155         EXIT;
156 out:
157       
158         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
159             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
160                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
161         } else {
162                 if (seq_end)
163                         ptlrpc_close_replay_seq(req);
164                 md_clear_open_replay_data(md_exp, och);
165                 /* Free @och if it is not waiting for DONE_WRITING. */
166                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
167                 OBD_FREE_PTR(och);
168         }
169         if (req) /* This is close request */
170                 ptlrpc_req_finished(req);
171         return rc;
172 }
173
174 int ll_md_real_close(struct inode *inode, int flags)
175 {
176         struct ll_inode_info *lli = ll_i2info(inode);
177         struct obd_client_handle **och_p;
178         struct obd_client_handle *och;
179         __u64 *och_usecount;
180         int rc = 0;
181         ENTRY;
182
183         if (flags & FMODE_WRITE) {
184                 och_p = &lli->lli_mds_write_och;
185                 och_usecount = &lli->lli_open_fd_write_count;
186         } else if (flags & FMODE_EXEC) {
187                 och_p = &lli->lli_mds_exec_och;
188                 och_usecount = &lli->lli_open_fd_exec_count;
189         } else {
190                 LASSERT(flags & FMODE_READ);
191                 och_p = &lli->lli_mds_read_och;
192                 och_usecount = &lli->lli_open_fd_read_count;
193         }
194
195         down(&lli->lli_och_sem);
196         if (*och_usecount) { /* There are still users of this handle, so
197                                 skip freeing it. */
198                 up(&lli->lli_och_sem);
199                 RETURN(0);
200         }
201         och=*och_p;
202         *och_p = NULL;
203         up(&lli->lli_och_sem);
204
205         if (och) { /* There might be a race and somebody have freed this och
206                       already */
207                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
208                                                inode, och);
209         }
210
211         RETURN(rc);
212 }
213
214 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
215                 struct file *file)
216 {
217         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
218         struct ll_inode_info *lli = ll_i2info(inode);
219         int rc = 0;
220         ENTRY;
221
222         /* clear group lock, if present */
223         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
224                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
225                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
226                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
227                                       &fd->fd_cwlockh);
228         }
229
230         /* Let's see if we have good enough OPEN lock on the file and if
231            we can skip talking to MDS */
232         if (file->f_dentry->d_inode) { /* Can this ever be false? */
233                 int lockmode;
234                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
235                 struct lustre_handle lockh;
236                 struct inode *inode = file->f_dentry->d_inode;
237                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
238
239                 down(&lli->lli_och_sem);
240                 if (fd->fd_omode & FMODE_WRITE) {
241                         lockmode = LCK_CW;
242                         LASSERT(lli->lli_open_fd_write_count);
243                         lli->lli_open_fd_write_count--;
244                 } else if (fd->fd_omode & FMODE_EXEC) {
245                         lockmode = LCK_PR;
246                         LASSERT(lli->lli_open_fd_exec_count);
247                         lli->lli_open_fd_exec_count--;
248                 } else {
249                         lockmode = LCK_CR;
250                         LASSERT(lli->lli_open_fd_read_count);
251                         lli->lli_open_fd_read_count--;
252                 }
253                 up(&lli->lli_och_sem);
254
255                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
256                                    LDLM_IBITS, &policy, lockmode,
257                                    &lockh)) {
258                         rc = ll_md_real_close(file->f_dentry->d_inode,
259                                               fd->fd_omode);
260                 }
261         } else {
262                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
263                        file, file->f_dentry, file->f_dentry->d_name.name);
264         }
265
266         LUSTRE_FPRIVATE(file) = NULL;
267         ll_file_data_put(fd);
268         ll_capa_close(inode);
269
270         RETURN(rc);
271 }
272
273 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
274
275 /* While this returns an error code, fput() the caller does not, so we need
276  * to make every effort to clean up all of our state here.  Also, applications
277  * rarely check close errors and even if an error is returned they will not
278  * re-try the close call.
279  */
280 int ll_file_release(struct inode *inode, struct file *file)
281 {
282         struct ll_file_data *fd;
283         struct ll_sb_info *sbi = ll_i2sbi(inode);
284         struct ll_inode_info *lli = ll_i2info(inode);
285         struct lov_stripe_md *lsm = lli->lli_smd;
286         int rc;
287
288         ENTRY;
289         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
290                inode->i_generation, inode);
291
292         /* don't do anything for / */
293         if (inode->i_sb->s_root == file->f_dentry)
294                 RETURN(0);
295
296         ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
297         fd = LUSTRE_FPRIVATE(file);
298         LASSERT(fd != NULL);
299
300         /* don't do anything for / */
301         if (inode->i_sb->s_root == file->f_dentry) {
302                 LUSTRE_FPRIVATE(file) = NULL;
303                 ll_file_data_put(fd);
304                 RETURN(0);
305         }
306         
307         if (lsm)
308                 lov_test_and_clear_async_rc(lsm);
309         lli->lli_async_rc = 0;
310
311         rc = ll_md_close(sbi->ll_md_exp, inode, file);
312         RETURN(rc);
313 }
314
315 static int ll_intent_file_open(struct file *file, void *lmm,
316                                int lmmsize, struct lookup_intent *itp)
317 {
318         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
319         struct dentry *parent = file->f_dentry->d_parent;
320         const char *name = file->f_dentry->d_name.name;
321         const int len = file->f_dentry->d_name.len;
322         struct md_op_data *op_data;
323         struct ptlrpc_request *req;
324         int rc;
325
326         if (!parent)
327                 RETURN(-ENOENT);
328
329         /* Usually we come here only for NFSD, and we want open lock.
330            But we can also get here with pre 2.6.15 patchless kernels, and in
331            that case that lock is also ok */
332         /* We can also get here if there was cached open handle in revalidate_it
333          * but it disappeared while we were getting from there to ll_file_open.
334          * But this means this file was closed and immediatelly opened which
335          * makes a good candidate for using OPEN lock */
336         /* If lmmsize & lmm are not 0, we are just setting stripe info
337          * parameters. No need for the open lock */
338         if (!lmm && !lmmsize)
339                 itp->it_flags |= MDS_OPEN_LOCK;
340
341         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
342                                       file->f_dentry->d_inode, name, len,
343                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
344         if (IS_ERR(op_data))
345                 RETURN(PTR_ERR(op_data));
346
347         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
348                             0 /*unused */, &req, ll_md_blocking_ast, 0);
349         ll_finish_md_op_data(op_data);
350         if (rc == -ESTALE) {
351                 /* reason for keep own exit path - don`t flood log
352                 * with messages with -ESTALE errors.
353                 */
354                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
355                      it_open_error(DISP_OPEN_OPEN, itp))
356                         GOTO(out, rc);
357                 ll_release_openhandle(file->f_dentry, itp);
358                 GOTO(out_stale, rc);
359         }
360
361         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
362                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
363                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
364                 GOTO(out, rc);
365         }
366
367         if (itp->d.lustre.it_lock_mode)
368                 md_set_lock_data(sbi->ll_md_exp,
369                                  &itp->d.lustre.it_lock_handle, 
370                                  file->f_dentry->d_inode);
371
372         rc = ll_prep_inode(&file->f_dentry->d_inode, req, DLM_REPLY_REC_OFF,
373                            NULL);
374 out:
375         ptlrpc_req_finished(itp->d.lustre.it_data);
376
377 out_stale:
378         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
379         ll_intent_drop_lock(itp);
380
381         RETURN(rc);
382 }
383
384 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
385                        struct lookup_intent *it, struct obd_client_handle *och)
386 {
387         struct ptlrpc_request *req = it->d.lustre.it_data;
388         struct mdt_body *body;
389
390         LASSERT(och);
391
392         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
393         LASSERT(body != NULL);                      /* reply already checked out */
394         LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* and swabbed in md_enqueue */
395
396         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
397         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
398         och->och_fid = lli->lli_fid;
399         och->och_flags = it->it_flags;
400         lli->lli_ioepoch = body->ioepoch;
401
402         return md_set_open_replay_data(md_exp, och, req);
403 }
404
405 int ll_local_open(struct file *file, struct lookup_intent *it,
406                   struct ll_file_data *fd, struct obd_client_handle *och)
407 {
408         struct inode *inode = file->f_dentry->d_inode;
409         struct ll_inode_info *lli = ll_i2info(inode);
410         ENTRY;
411
412         LASSERT(!LUSTRE_FPRIVATE(file));
413
414         LASSERT(fd != NULL);
415
416         if (och) {
417                 struct ptlrpc_request *req = it->d.lustre.it_data;
418                 struct mdt_body *body;
419                 int rc;
420
421                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
422                 if (rc)
423                         RETURN(rc);
424
425                 body = lustre_msg_buf(req->rq_repmsg,
426                                       DLM_REPLY_REC_OFF, sizeof(*body));
427
428                 if ((it->it_flags & FMODE_WRITE) &&
429                     (body->valid & OBD_MD_FLSIZE))
430                 {
431                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
432                                lli->lli_ioepoch, PFID(&lli->lli_fid));
433                 }
434         }
435
436         LUSTRE_FPRIVATE(file) = fd;
437         ll_readahead_init(inode, &fd->fd_ras);
438         fd->fd_omode = it->it_flags;
439         RETURN(0);
440 }
441
442 /* Open a file, and (for the very first open) create objects on the OSTs at
443  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
444  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
445  * lli_open_sem to ensure no other process will create objects, send the
446  * stripe MD to the MDS, or try to destroy the objects if that fails.
447  *
448  * If we already have the stripe MD locally then we don't request it in
449  * md_open(), by passing a lmm_size = 0.
450  *
451  * It is up to the application to ensure no other processes open this file
452  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
453  * used.  We might be able to avoid races of that sort by getting lli_open_sem
454  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
455  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
456  */
457 int ll_file_open(struct inode *inode, struct file *file)
458 {
459         struct ll_inode_info *lli = ll_i2info(inode);
460         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
461                                           .it_flags = file->f_flags };
462         struct lov_stripe_md *lsm;
463         struct ptlrpc_request *req = NULL;
464         struct obd_client_handle **och_p;
465         __u64 *och_usecount;
466         struct ll_file_data *fd;
467         int rc = 0;
468         ENTRY;
469
470         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
471                inode->i_generation, inode, file->f_flags);
472
473         /* don't do anything for / */
474         if (inode->i_sb->s_root == file->f_dentry)
475                 RETURN(0);
476
477 #ifdef LUSTRE_KERNEL_VERSION
478         it = file->f_it;
479 #else
480         it = file->private_data; /* XXX: compat macro */
481         file->private_data = NULL; /* prevent ll_local_open assertion */
482 #endif
483
484         fd = ll_file_data_get();
485         if (fd == NULL)
486                 RETURN(-ENOMEM);
487
488         /* don't do anything for / */
489         if (inode->i_sb->s_root == file->f_dentry) {
490                 LUSTRE_FPRIVATE(file) = fd;
491                 RETURN(0);
492         }
493
494         if (!it || !it->d.lustre.it_disposition) {
495                 /* Convert f_flags into access mode. We cannot use file->f_mode,
496                  * because everything but O_ACCMODE mask was stripped from
497                  * there */
498                 if ((oit.it_flags + 1) & O_ACCMODE)
499                         oit.it_flags++;
500                 if (file->f_flags & O_TRUNC)
501                         oit.it_flags |= FMODE_WRITE;
502
503                 /* kernel only call f_op->open in dentry_open.  filp_open calls
504                  * dentry_open after call to open_namei that checks permissions.
505                  * Only nfsd_open call dentry_open directly without checking
506                  * permissions and because of that this code below is safe. */
507                 if (oit.it_flags & FMODE_WRITE)
508                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
509
510                 /* We do not want O_EXCL here, presumably we opened the file
511                  * already? XXX - NFS implications? */
512                 oit.it_flags &= ~O_EXCL;
513
514                 it = &oit;
515         }
516
517         /* Let's see if we have file open on MDS already. */
518         if (it->it_flags & FMODE_WRITE) {
519                 och_p = &lli->lli_mds_write_och;
520                 och_usecount = &lli->lli_open_fd_write_count;
521         } else if (it->it_flags & FMODE_EXEC) {
522                 och_p = &lli->lli_mds_exec_och;
523                 och_usecount = &lli->lli_open_fd_exec_count;
524          } else {
525                 och_p = &lli->lli_mds_read_och;
526                 och_usecount = &lli->lli_open_fd_read_count;
527         }
528         
529         down(&lli->lli_och_sem);
530         if (*och_p) { /* Open handle is present */
531                 if (it_disposition(it, DISP_OPEN_OPEN)) {
532                         /* Well, there's extra open request that we do not need,
533                            let's close it somehow. This will decref request. */
534                         rc = it_open_error(DISP_OPEN_OPEN, it);
535                         if (rc) {
536                                 ll_file_data_put(fd);
537                                 GOTO(out_och_free, rc);
538                         }       
539                         ll_release_openhandle(file->f_dentry, it);
540                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
541                                              LPROC_LL_OPEN);
542                 }
543                 (*och_usecount)++;
544
545                 rc = ll_local_open(file, it, fd, NULL);
546                 if (rc) {
547                         up(&lli->lli_och_sem);
548                         ll_file_data_put(fd);
549                         RETURN(rc);
550                 }
551         } else {
552                 LASSERT(*och_usecount == 0);
553                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
554                 if (!*och_p) {
555                         ll_file_data_put(fd);
556                         GOTO(out_och_free, rc = -ENOMEM);
557                 }
558                 (*och_usecount)++;
559                 if (!it->d.lustre.it_disposition) {
560                         it->it_flags |= O_CHECK_STALE;
561                         rc = ll_intent_file_open(file, NULL, 0, it);
562                         it->it_flags &= ~O_CHECK_STALE;
563                         if (rc) {
564                                 ll_file_data_put(fd);
565                                 GOTO(out_och_free, rc);
566                         }
567
568                         /* Got some error? Release the request */
569                         if (it->d.lustre.it_status < 0) {
570                                 req = it->d.lustre.it_data;
571                                 ptlrpc_req_finished(req);
572                         }
573                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
574                                          &it->d.lustre.it_lock_handle,
575                                          file->f_dentry->d_inode);
576                 }
577                 req = it->d.lustre.it_data;
578
579                 /* md_intent_lock() didn't get a request ref if there was an
580                  * open error, so don't do cleanup on the request here
581                  * (bug 3430) */
582                 /* XXX (green): Should not we bail out on any error here, not
583                  * just open error? */
584                 rc = it_open_error(DISP_OPEN_OPEN, it);
585                 if (rc) {
586                         ll_file_data_put(fd);
587                         GOTO(out_och_free, rc);
588                 }
589
590                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
591                 rc = ll_local_open(file, it, fd, *och_p);
592                 if (rc) {
593                         up(&lli->lli_och_sem);
594                         ll_file_data_put(fd);
595                         GOTO(out_och_free, rc);
596                 }
597         }
598         up(&lli->lli_och_sem);
599
600         /* Must do this outside lli_och_sem lock to prevent deadlock where
601            different kind of OPEN lock for this same inode gets cancelled
602            by ldlm_cancel_lru */
603         if (!S_ISREG(inode->i_mode))
604                 GOTO(out, rc);
605
606         ll_capa_open(inode);
607
608         lsm = lli->lli_smd;
609         if (lsm == NULL) {
610                 if (file->f_flags & O_LOV_DELAY_CREATE ||
611                     !(file->f_mode & FMODE_WRITE)) {
612                         CDEBUG(D_INODE, "object creation was delayed\n");
613                         GOTO(out, rc);
614                 }
615         }
616         file->f_flags &= ~O_LOV_DELAY_CREATE;
617         GOTO(out, rc);
618 out:
619         ptlrpc_req_finished(req);
620         if (req)
621                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
622 out_och_free:
623         if (rc) {
624                 if (*och_p) {
625                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
626                         *och_p = NULL; /* OBD_FREE writes some magic there */
627                         (*och_usecount)--;
628                 }
629                 up(&lli->lli_och_sem);
630         }
631
632         return rc;
633 }
634
635 /* Fills the obdo with the attributes for the inode defined by lsm */
636 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
637 {
638         struct ptlrpc_request_set *set;
639         struct ll_inode_info *lli = ll_i2info(inode);
640         struct lov_stripe_md *lsm = lli->lli_smd;
641
642         struct obd_info oinfo = { { { 0 } } };
643         int rc;
644         ENTRY;
645
646         LASSERT(lsm != NULL);
647
648         oinfo.oi_md = lsm;
649         oinfo.oi_oa = obdo;
650         oinfo.oi_oa->o_id = lsm->lsm_object_id;
651         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
652         oinfo.oi_oa->o_mode = S_IFREG;
653         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
654                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
655                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
656                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
657                                OBD_MD_FLGROUP;
658         oinfo.oi_capa = ll_mdscapa_get(inode);
659
660         set = ptlrpc_prep_set();
661         if (set == NULL) {
662                 CERROR("can't allocate ptlrpc set\n");
663                 rc = -ENOMEM;
664         } else {
665                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
666                 if (rc == 0)
667                         rc = ptlrpc_set_wait(set);
668                 ptlrpc_set_destroy(set);
669         }
670         capa_put(oinfo.oi_capa);
671         if (rc)
672                 RETURN(rc);
673
674         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
675                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
676                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
677
678         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
679         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
680                lli->lli_smd->lsm_object_id, i_size_read(inode),
681                inode->i_blocks, inode->i_blksize);
682         RETURN(0);
683 }
684
685 static inline void ll_remove_suid(struct inode *inode)
686 {
687         unsigned int mode;
688
689         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
690         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
691
692         /* was any of the uid bits set? */
693         mode &= inode->i_mode;
694         if (mode && !capable(CAP_FSETID)) {
695                 inode->i_mode &= ~mode;
696                 // XXX careful here - we cannot change the size
697         }
698 }
699
700 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
701 {
702         struct ll_inode_info *lli = ll_i2info(inode);
703         struct lov_stripe_md *lsm = lli->lli_smd;
704         struct obd_export *exp = ll_i2dtexp(inode);
705         struct {
706                 char name[16];
707                 struct ldlm_lock *lock;
708                 struct lov_stripe_md *lsm;
709         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
710         __u32 stripe, vallen = sizeof(stripe);
711         int rc;
712         ENTRY;
713
714         if (lsm->lsm_stripe_count == 1)
715                 GOTO(check, stripe = 0);
716
717         /* get our offset in the lov */
718         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
719         if (rc != 0) {
720                 CERROR("obd_get_info: rc = %d\n", rc);
721                 RETURN(rc);
722         }
723         LASSERT(stripe < lsm->lsm_stripe_count);
724
725 check:
726         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
727             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
728                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
729                            lsm->lsm_oinfo[stripe]->loi_id,
730                            lsm->lsm_oinfo[stripe]->loi_gr);
731                 RETURN(-ELDLM_NO_LOCK_DATA);
732         }
733
734         RETURN(stripe);
735 }
736
737 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
738  * we get a lock cancellation for each stripe, so we have to map the obd's
739  * region back onto the stripes in the file that it held.
740  *
741  * No one can dirty the extent until we've finished our work and they can
742  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
743  * but other kernel actors could have pages locked.
744  *
745  * Called with the DLM lock held. */
746 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
747                               struct ldlm_lock *lock, __u32 stripe)
748 {
749         ldlm_policy_data_t tmpex;
750         unsigned long start, end, count, skip, i, j;
751         struct page *page;
752         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
753         struct lustre_handle lockh;
754         struct address_space *mapping = inode->i_mapping;
755
756         ENTRY;
757         tmpex = lock->l_policy_data;
758         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
759                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
760                i_size_read(inode));
761
762         /* our locks are page granular thanks to osc_enqueue, we invalidate the
763          * whole page. */
764         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
765             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
766                 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
767                            CFS_PAGE_SIZE);
768         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
769         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
770
771         count = ~0;
772         skip = 0;
773         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
774         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
775         if (lsm->lsm_stripe_count > 1) {
776                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
777                 skip = (lsm->lsm_stripe_count - 1) * count;
778                 start += start/count * skip + stripe * count;
779                 if (end != ~0)
780                         end += end/count * skip + stripe * count;
781         }
782         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
783                 end = ~0;
784
785         i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
786             CFS_PAGE_SHIFT : 0;
787         if (i < end)
788                 end = i;
789
790         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
791                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
792                count, skip, end, discard ? " (DISCARDING)" : "");
793
794         /* walk through the vmas on the inode and tear down mmaped pages that
795          * intersect with the lock.  this stops immediately if there are no
796          * mmap()ed regions of the file.  This is not efficient at all and
797          * should be short lived. We'll associate mmap()ed pages with the lock
798          * and will be able to find them directly */
799         for (i = start; i <= end; i += (j + skip)) {
800                 j = min(count - (i % count), end - i + 1);
801                 LASSERT(j > 0);
802                 LASSERT(mapping);
803                 if (ll_teardown_mmaps(mapping,
804                                       (__u64)i << CFS_PAGE_SHIFT,
805                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
806                         break;
807         }
808
809         /* this is the simplistic implementation of page eviction at
810          * cancelation.  It is careful to get races with other page
811          * lockers handled correctly.  fixes from bug 20 will make it
812          * more efficient by associating locks with pages and with
813          * batching writeback under the lock explicitly. */
814         for (i = start, j = start % count; i <= end;
815              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
816                 if (j == count) {
817                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
818                         i += skip;
819                         j = 0;
820                         if (i > end)
821                                 break;
822                 }
823                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
824                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
825                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
826                          start, i, end);
827
828                 if (!mapping_has_pages(mapping)) {
829                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
830                         break;
831                 }
832
833                 cond_resched();
834
835                 page = find_get_page(mapping, i);
836                 if (page == NULL)
837                         continue;
838                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
839                                i, tmpex.l_extent.start);
840                 lock_page(page);
841
842                 /* page->mapping to check with racing against teardown */
843                 if (!discard && clear_page_dirty_for_io(page)) {
844                         rc = ll_call_writepage(inode, page);
845                         /* either waiting for io to complete or reacquiring
846                          * the lock that the failed writepage released */
847                         lock_page(page);
848                         wait_on_page_writeback(page);
849                         if (rc != 0) {
850                                 CERROR("writepage inode %lu(%p) of page %p "
851                                        "failed: %d\n", inode->i_ino, inode,
852                                        page, rc);
853                                 if (rc == -ENOSPC)
854                                         set_bit(AS_ENOSPC, &mapping->flags);
855                                 else
856                                         set_bit(AS_EIO, &mapping->flags);
857                         }
858                 }
859
860                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
861                 /* check to see if another DLM lock covers this page b=2765 */
862                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
863                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
864                                       LDLM_FL_TEST_LOCK,
865                                       &lock->l_resource->lr_name, LDLM_EXTENT,
866                                       &tmpex, LCK_PR | LCK_PW, &lockh);
867
868                 if (rc2 <= 0 && page->mapping != NULL) {
869                         struct ll_async_page *llap = llap_cast_private(page);
870                         /* checking again to account for writeback's
871                          * lock_page() */
872                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
873                         if (llap)
874                                 ll_ra_accounting(llap, mapping);
875                         ll_truncate_complete_page(page);
876                 }
877                 unlock_page(page);
878                 page_cache_release(page);
879         }
880         LASSERTF(tmpex.l_extent.start <=
881                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
882                   lock->l_policy_data.l_extent.end + 1),
883                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
884                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
885                  start, i, end);
886         EXIT;
887 }
888
889 static int ll_extent_lock_callback(struct ldlm_lock *lock,
890                                    struct ldlm_lock_desc *new, void *data,
891                                    int flag)
892 {
893         struct lustre_handle lockh = { 0 };
894         int rc;
895         ENTRY;
896
897         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
898                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
899                 LBUG();
900         }
901
902         switch (flag) {
903         case LDLM_CB_BLOCKING:
904                 ldlm_lock2handle(lock, &lockh);
905                 rc = ldlm_cli_cancel(&lockh);
906                 if (rc != ELDLM_OK)
907                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
908                 break;
909         case LDLM_CB_CANCELING: {
910                 struct inode *inode;
911                 struct ll_inode_info *lli;
912                 struct lov_stripe_md *lsm;
913                 int stripe;
914                 __u64 kms;
915
916                 /* This lock wasn't granted, don't try to evict pages */
917                 if (lock->l_req_mode != lock->l_granted_mode)
918                         RETURN(0);
919
920                 inode = ll_inode_from_lock(lock);
921                 if (inode == NULL)
922                         RETURN(0);
923                 lli = ll_i2info(inode);
924                 if (lli == NULL)
925                         goto iput;
926                 if (lli->lli_smd == NULL)
927                         goto iput;
928                 lsm = lli->lli_smd;
929
930                 stripe = ll_lock_to_stripe_offset(inode, lock);
931                 if (stripe < 0)
932                         goto iput;
933
934                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
935
936                 lov_stripe_lock(lsm);
937                 lock_res_and_lock(lock);
938                 kms = ldlm_extent_shift_kms(lock,
939                                             lsm->lsm_oinfo[stripe]->loi_kms);
940
941                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
942                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
943                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
944                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
945                 unlock_res_and_lock(lock);
946                 lov_stripe_unlock(lsm);
947         iput:
948                 iput(inode);
949                 break;
950         }
951         default:
952                 LBUG();
953         }
954
955         RETURN(0);
956 }
957
958 #if 0
959 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
960 {
961         /* XXX ALLOCATE - 160 bytes */
962         struct inode *inode = ll_inode_from_lock(lock);
963         struct ll_inode_info *lli = ll_i2info(inode);
964         struct lustre_handle lockh = { 0 };
965         struct ost_lvb *lvb;
966         int stripe;
967         ENTRY;
968
969         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
970                      LDLM_FL_BLOCK_CONV)) {
971                 LBUG(); /* not expecting any blocked async locks yet */
972                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
973                            "lock, returning");
974                 ldlm_lock_dump(D_OTHER, lock, 0);
975                 ldlm_reprocess_all(lock->l_resource);
976                 RETURN(0);
977         }
978
979         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
980
981         stripe = ll_lock_to_stripe_offset(inode, lock);
982         if (stripe < 0)
983                 goto iput;
984
985         if (lock->l_lvb_len) {
986                 struct lov_stripe_md *lsm = lli->lli_smd;
987                 __u64 kms;
988                 lvb = lock->l_lvb_data;
989                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
990
991                 lock_res_and_lock(lock);
992                 ll_inode_size_lock(inode, 1);
993                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
994                 kms = ldlm_extent_shift_kms(NULL, kms);
995                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
996                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
997                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
998                 lsm->lsm_oinfo[stripe].loi_kms = kms;
999                 ll_inode_size_unlock(inode, 1);
1000                 unlock_res_and_lock(lock);
1001         }
1002
1003 iput:
1004         iput(inode);
1005         wake_up(&lock->l_waitq);
1006
1007         ldlm_lock2handle(lock, &lockh);
1008         ldlm_lock_decref(&lockh, LCK_PR);
1009         RETURN(0);
1010 }
1011 #endif
1012
1013 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
1014 {
1015         struct ptlrpc_request *req = reqp;
1016         struct inode *inode = ll_inode_from_lock(lock);
1017         struct ll_inode_info *lli;
1018         struct lov_stripe_md *lsm;
1019         struct ost_lvb *lvb;
1020         int rc, stripe;
1021         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
1022         ENTRY;
1023
1024         if (inode == NULL)
1025                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
1026         lli = ll_i2info(inode);
1027         if (lli == NULL)
1028                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1029         lsm = lli->lli_smd;
1030         if (lsm == NULL)
1031                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1032
1033         /* First, find out which stripe index this lock corresponds to. */
1034         stripe = ll_lock_to_stripe_offset(inode, lock);
1035         if (stripe < 0)
1036                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1037
1038         rc = lustre_pack_reply(req, 2, size, NULL);
1039         if (rc) {
1040                 CERROR("lustre_pack_reply: %d\n", rc);
1041                 GOTO(iput, rc);
1042         }
1043
1044         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
1045         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1046         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1047         lvb->lvb_atime = LTIME_S(inode->i_atime);
1048         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1049
1050         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1051                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1052                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1053                    lvb->lvb_atime, lvb->lvb_ctime);
1054  iput:
1055         iput(inode);
1056
1057  out:
1058         /* These errors are normal races, so we don't want to fill the console
1059          * with messages by calling ptlrpc_error() */
1060         if (rc == -ELDLM_NO_LOCK_DATA)
1061                 lustre_pack_reply(req, 1, NULL, NULL);
1062
1063         req->rq_status = rc;
1064         return rc;
1065 }
1066
1067 static void ll_merge_lvb(struct inode *inode)
1068 {
1069         struct ll_inode_info *lli = ll_i2info(inode);
1070         struct ll_sb_info *sbi = ll_i2sbi(inode);
1071         struct ost_lvb lvb;
1072         ENTRY;
1073
1074         ll_inode_size_lock(inode, 1);
1075         inode_init_lvb(inode, &lvb);
1076         obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1077         i_size_write(inode, lvb.lvb_size);
1078         inode->i_blocks = lvb.lvb_blocks;
1079         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1080         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1081         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1082         ll_inode_size_unlock(inode, 1);
1083         EXIT;
1084 }
1085
1086 int ll_local_size(struct inode *inode)
1087 {
1088         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1089         struct ll_inode_info *lli = ll_i2info(inode);
1090         struct ll_sb_info *sbi = ll_i2sbi(inode);
1091         struct lustre_handle lockh = { 0 };
1092         int flags = 0;
1093         int rc;
1094         ENTRY;
1095
1096         if (lli->lli_smd->lsm_stripe_count == 0)
1097                 RETURN(0);
1098
1099         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1100                        &policy, LCK_PR | LCK_PW, &flags, inode, &lockh);
1101         if (rc < 0)
1102                 RETURN(rc);
1103         else if (rc == 0)
1104                 RETURN(-ENODATA);
1105
1106         ll_merge_lvb(inode);
1107         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR | LCK_PW, &lockh);
1108         RETURN(0);
1109 }
1110
1111 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1112                      lstat_t *st)
1113 {
1114         struct lustre_handle lockh = { 0 };
1115         struct ldlm_enqueue_info einfo = { 0 };
1116         struct obd_info oinfo = { { { 0 } } };
1117         struct ost_lvb lvb;
1118         int rc;
1119
1120         ENTRY;
1121
1122         einfo.ei_type = LDLM_EXTENT;
1123         einfo.ei_mode = LCK_PR;
1124         einfo.ei_cb_bl = ll_extent_lock_callback;
1125         einfo.ei_cb_cp = ldlm_completion_ast;
1126         einfo.ei_cb_gl = ll_glimpse_callback;
1127         einfo.ei_cbdata = NULL;
1128
1129         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1130         oinfo.oi_lockh = &lockh;
1131         oinfo.oi_md = lsm;
1132         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1133
1134         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1135         if (rc == -ENOENT)
1136                 RETURN(rc);
1137         if (rc != 0) {
1138                 CERROR("obd_enqueue returned rc %d, "
1139                        "returning -EIO\n", rc);
1140                 RETURN(rc > 0 ? -EIO : rc);
1141         }
1142
1143         lov_stripe_lock(lsm);
1144         memset(&lvb, 0, sizeof(lvb));
1145         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1146         st->st_size = lvb.lvb_size;
1147         st->st_blocks = lvb.lvb_blocks;
1148         st->st_mtime = lvb.lvb_mtime;
1149         st->st_atime = lvb.lvb_atime;
1150         st->st_ctime = lvb.lvb_ctime;
1151         lov_stripe_unlock(lsm);
1152
1153         RETURN(rc);
1154 }
1155
1156 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1157  * file (because it prefers KMS over RSS when larger) */
1158 int ll_glimpse_size(struct inode *inode, int ast_flags)
1159 {
1160         struct ll_inode_info *lli = ll_i2info(inode);
1161         struct ll_sb_info *sbi = ll_i2sbi(inode);
1162         struct lustre_handle lockh = { 0 };
1163         struct ldlm_enqueue_info einfo = { 0 };
1164         struct obd_info oinfo = { { { 0 } } };
1165         int rc;
1166         ENTRY;
1167
1168         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1169                 RETURN(0);
1170
1171         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1172
1173         if (!lli->lli_smd) {
1174                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1175                 RETURN(0);
1176         }
1177
1178         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1179          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1180          *       won't revoke any conflicting DLM locks held. Instead,
1181          *       ll_glimpse_callback() will be called on each client
1182          *       holding a DLM lock against this file, and resulting size
1183          *       will be returned for each stripe. DLM lock on [0, EOF] is
1184          *       acquired only if there were no conflicting locks. */
1185         einfo.ei_type = LDLM_EXTENT;
1186         einfo.ei_mode = LCK_PR;
1187         einfo.ei_cb_bl = ll_extent_lock_callback;
1188         einfo.ei_cb_cp = ldlm_completion_ast;
1189         einfo.ei_cb_gl = ll_glimpse_callback;
1190         einfo.ei_cbdata = inode;
1191
1192         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1193         oinfo.oi_lockh = &lockh;
1194         oinfo.oi_md = lli->lli_smd;
1195         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1196
1197         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1198         if (rc == -ENOENT)
1199                 RETURN(rc);
1200         if (rc != 0) {
1201                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1202                 RETURN(rc > 0 ? -EIO : rc);
1203         }
1204
1205         ll_merge_lvb(inode);
1206
1207         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1208                i_size_read(inode), inode->i_blocks);
1209
1210         RETURN(rc);
1211 }
1212
1213 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1214                    struct lov_stripe_md *lsm, int mode,
1215                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1216                    int ast_flags)
1217 {
1218         struct ll_sb_info *sbi = ll_i2sbi(inode);
1219         struct ost_lvb lvb;
1220         struct ldlm_enqueue_info einfo = { 0 };
1221         struct obd_info oinfo = { { { 0 } } };
1222         int rc;
1223         ENTRY;
1224
1225         LASSERT(!lustre_handle_is_used(lockh));
1226         LASSERT(lsm != NULL);
1227
1228         /* don't drop the mmapped file to LRU */
1229         if (mapping_mapped(inode->i_mapping))
1230                 ast_flags |= LDLM_FL_NO_LRU;
1231
1232         /* XXX phil: can we do this?  won't it screw the file size up? */
1233         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1234             (sbi->ll_flags & LL_SBI_NOLCK))
1235                 RETURN(0);
1236
1237         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1238                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1239
1240         einfo.ei_type = LDLM_EXTENT;
1241         einfo.ei_mode = mode;
1242         einfo.ei_cb_bl = ll_extent_lock_callback;
1243         einfo.ei_cb_cp = ldlm_completion_ast;
1244         einfo.ei_cb_gl = ll_glimpse_callback;
1245         einfo.ei_cbdata = inode;
1246
1247         oinfo.oi_policy = *policy;
1248         oinfo.oi_lockh = lockh;
1249         oinfo.oi_md = lsm;
1250         oinfo.oi_flags = ast_flags;
1251
1252         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1253         *policy = oinfo.oi_policy;
1254         if (rc > 0)
1255                 rc = -EIO;
1256
1257         ll_inode_size_lock(inode, 1);
1258         inode_init_lvb(inode, &lvb);
1259         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1260
1261         if (policy->l_extent.start == 0 &&
1262             policy->l_extent.end == OBD_OBJECT_EOF) {
1263                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1264                  * the kms under both a DLM lock and the
1265                  * ll_inode_size_lock().  If we don't get the
1266                  * ll_inode_size_lock() here we can match the DLM lock and
1267                  * reset i_size from the kms before the truncating path has
1268                  * updated the kms.  generic_file_write can then trust the
1269                  * stale i_size when doing appending writes and effectively
1270                  * cancel the result of the truncate.  Getting the
1271                  * ll_inode_size_lock() after the enqueue maintains the DLM
1272                  * -> ll_inode_size_lock() acquiring order. */
1273                 i_size_write(inode, lvb.lvb_size);
1274                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1275                        inode->i_ino, i_size_read(inode));
1276         }
1277
1278         if (rc == 0) {
1279                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1280                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1281                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1282         }
1283         ll_inode_size_unlock(inode, 1);
1284
1285         RETURN(rc);
1286 }
1287
1288 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1289                      struct lov_stripe_md *lsm, int mode,
1290                      struct lustre_handle *lockh)
1291 {
1292         struct ll_sb_info *sbi = ll_i2sbi(inode);
1293         int rc;
1294         ENTRY;
1295
1296         /* XXX phil: can we do this?  won't it screw the file size up? */
1297         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1298             (sbi->ll_flags & LL_SBI_NOLCK))
1299                 RETURN(0);
1300
1301         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1302
1303         RETURN(rc);
1304 }
1305
1306 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1307                             loff_t *ppos)
1308 {
1309         struct inode *inode = file->f_dentry->d_inode;
1310         struct ll_inode_info *lli = ll_i2info(inode);
1311         struct lov_stripe_md *lsm = lli->lli_smd;
1312         struct ll_sb_info *sbi = ll_i2sbi(inode);
1313         struct ll_lock_tree tree;
1314         struct ll_lock_tree_node *node;
1315         struct ost_lvb lvb;
1316         struct ll_ra_read bead;
1317         int rc, ra = 0;
1318         loff_t end;
1319         ssize_t retval, chunk, sum = 0;
1320
1321         __u64 kms;
1322         ENTRY;
1323         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1324                inode->i_ino, inode->i_generation, inode, count, *ppos);
1325         /* "If nbyte is 0, read() will return 0 and have no other results."
1326          *                      -- Single Unix Spec */
1327         if (count == 0)
1328                 RETURN(0);
1329
1330         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1331
1332         if (!lsm) {
1333                 /* Read on file with no objects should return zero-filled
1334                  * buffers up to file size (we can get non-zero sizes with
1335                  * mknod + truncate, then opening file for read. This is a
1336                  * common pattern in NFS case, it seems). Bug 6243 */
1337                 int notzeroed;
1338                 /* Since there are no objects on OSTs, we have nothing to get
1339                  * lock on and so we are forced to access inode->i_size
1340                  * unguarded */
1341
1342                 /* Read beyond end of file */
1343                 if (*ppos >= i_size_read(inode))
1344                         RETURN(0);
1345
1346                 if (count > i_size_read(inode) - *ppos)
1347                         count = i_size_read(inode) - *ppos;
1348                 /* Make sure to correctly adjust the file pos pointer for
1349                  * EFAULT case */
1350                 notzeroed = clear_user(buf, count);
1351                 count -= notzeroed;
1352                 *ppos += count;
1353                 if (!count)
1354                         RETURN(-EFAULT);
1355                 RETURN(count);
1356         }
1357
1358 repeat:
1359         if (sbi->ll_max_rw_chunk != 0) {
1360                 /* first, let's know the end of the current stripe */
1361                 end = *ppos;
1362                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
1363                                 (obd_off *)&end);
1364
1365                 /* correct, the end is beyond the request */
1366                 if (end > *ppos + count - 1)
1367                         end = *ppos + count - 1;
1368
1369                 /* and chunk shouldn't be too large even if striping is wide */
1370                 if (end - *ppos > sbi->ll_max_rw_chunk)
1371                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1372         } else {
1373                 end = *ppos + count - 1;
1374         }
1375
1376         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1377         if (IS_ERR(node)){
1378                 GOTO(out, retval = PTR_ERR(node));
1379         }
1380
1381         tree.lt_fd = LUSTRE_FPRIVATE(file);
1382         rc = ll_tree_lock(&tree, node, buf, count,
1383                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1384         if (rc != 0)
1385                 GOTO(out, retval = rc);
1386
1387         ll_inode_size_lock(inode, 1);
1388         /*
1389          * Consistency guarantees: following possibilities exist for the
1390          * relation between region being read and real file size at this
1391          * moment:
1392          *
1393          *  (A): the region is completely inside of the file;
1394          *
1395          *  (B-x): x bytes of region are inside of the file, the rest is
1396          *  outside;
1397          *
1398          *  (C): the region is completely outside of the file.
1399          *
1400          * This classification is stable under DLM lock acquired by
1401          * ll_tree_lock() above, because to change class, other client has to
1402          * take DLM lock conflicting with our lock. Also, any updates to
1403          * ->i_size by other threads on this client are serialized by
1404          * ll_inode_size_lock(). This guarantees that short reads are handled
1405          * correctly in the face of concurrent writes and truncates.
1406          */
1407         inode_init_lvb(inode, &lvb);
1408         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1409         kms = lvb.lvb_size;
1410         if (*ppos + count - 1 > kms) {
1411                 /* A glimpse is necessary to determine whether we return a
1412                  * short read (B) or some zeroes at the end of the buffer (C) */
1413                 ll_inode_size_unlock(inode, 1);
1414                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1415                 if (retval) {
1416                         ll_tree_unlock(&tree);
1417                         goto out;
1418                 }
1419         } else {
1420                 /* region is within kms and, hence, within real file size (A).
1421                  * We need to increase i_size to cover the read region so that
1422                  * generic_file_read() will do its job, but that doesn't mean
1423                  * the kms size is _correct_, it is only the _minimum_ size.
1424                  * If someone does a stat they will get the correct size which
1425                  * will always be >= the kms value here.  b=11081 */
1426                 if (i_size_read(inode) < kms)
1427                         i_size_write(inode, kms);
1428                 ll_inode_size_unlock(inode, 1);
1429         }
1430
1431         chunk = end - *ppos + 1;
1432         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1433                inode->i_ino, chunk, *ppos, i_size_read(inode));
1434
1435         /* turn off the kernel's read-ahead */
1436 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1437         file->f_ramax = 0;
1438 #else
1439         file->f_ra.ra_pages = 0;
1440 #endif
1441         /* initialize read-ahead window once per syscall */
1442         if (ra == 0) {
1443                 ra = 1;
1444                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1445                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1446                 ll_ra_read_in(file, &bead);
1447         }
1448
1449         /* BUG: 5972 */
1450         file_accessed(file);
1451         retval = generic_file_read(file, buf, chunk, ppos);
1452         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1453
1454         ll_tree_unlock(&tree);
1455
1456         if (retval > 0) {
1457                 buf += retval;
1458                 count -= retval;
1459                 sum += retval;
1460                 if (retval == chunk && count > 0)
1461                         goto repeat;
1462         }
1463
1464  out:
1465         if (ra != 0)
1466                 ll_ra_read_ex(file, &bead);
1467         retval = (sum > 0) ? sum : retval;
1468         RETURN(retval);
1469 }
1470
1471 /*
1472  * Write to a file (through the page cache).
1473  */
1474 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1475                              loff_t *ppos)
1476 {
1477         struct inode *inode = file->f_dentry->d_inode;
1478         struct ll_sb_info *sbi = ll_i2sbi(inode);
1479         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1480         struct ll_lock_tree tree;
1481         struct ll_lock_tree_node *node;
1482         loff_t maxbytes = ll_file_maxbytes(inode);
1483         loff_t lock_start, lock_end, end;
1484         ssize_t retval, chunk, sum = 0;
1485         int rc;
1486         ENTRY;
1487
1488         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1489                inode->i_ino, inode->i_generation, inode, count, *ppos);
1490
1491         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1492
1493         /* POSIX, but surprised the VFS doesn't check this already */
1494         if (count == 0)
1495                 RETURN(0);
1496
1497         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1498          * called on the file, don't fail the below assertion (bug 2388). */
1499         if (file->f_flags & O_LOV_DELAY_CREATE &&
1500             ll_i2info(inode)->lli_smd == NULL)
1501                 RETURN(-EBADF);
1502
1503         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1504
1505         down(&ll_i2info(inode)->lli_write_sem);
1506
1507 repeat:
1508         chunk = 0; /* just to fix gcc's warning */
1509         end = *ppos + count - 1;
1510
1511         if (file->f_flags & O_APPEND) {
1512                 lock_start = 0;
1513                 lock_end = OBD_OBJECT_EOF;
1514         } else if (sbi->ll_max_rw_chunk != 0) {
1515                 /* first, let's know the end of the current stripe */
1516                 end = *ppos;
1517                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1518                                 (obd_off *)&end);
1519
1520                 /* correct, the end is beyond the request */
1521                 if (end > *ppos + count - 1)
1522                         end = *ppos + count - 1;
1523
1524                 /* and chunk shouldn't be too large even if striping is wide */
1525                 if (end - *ppos > sbi->ll_max_rw_chunk)
1526                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1527                 lock_start = *ppos;
1528                 lock_end = end;
1529         } else {
1530                 lock_start = *ppos;
1531                 lock_end = *ppos + count - 1;
1532         }
1533         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1534
1535         if (IS_ERR(node))
1536                 GOTO(out, retval = PTR_ERR(node));
1537
1538         tree.lt_fd = LUSTRE_FPRIVATE(file);
1539         rc = ll_tree_lock(&tree, node, buf, count,
1540                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1541         if (rc != 0)
1542                 GOTO(out, retval = rc);
1543
1544         /* This is ok, g_f_w will overwrite this under i_sem if it races
1545          * with a local truncate, it just makes our maxbyte checking easier.
1546          * The i_size value gets updated in ll_extent_lock() as a consequence
1547          * of the [0,EOF] extent lock we requested above. */
1548         if (file->f_flags & O_APPEND) {
1549                 *ppos = i_size_read(inode);
1550                 end = *ppos + count - 1;
1551         }
1552
1553         if (*ppos >= maxbytes) {
1554                 send_sig(SIGXFSZ, current, 0);
1555                 GOTO(out_unlock, retval = -EFBIG);
1556         }
1557         if (*ppos + count > maxbytes)
1558                 count = maxbytes - *ppos;
1559
1560         /* generic_file_write handles O_APPEND after getting i_mutex */
1561         chunk = end - *ppos + 1;
1562         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1563                inode->i_ino, chunk, *ppos);
1564         retval = generic_file_write(file, buf, chunk, ppos);
1565         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1566
1567 out_unlock:
1568         ll_tree_unlock(&tree);
1569
1570 out:
1571         if (retval > 0) {
1572                 buf += retval;
1573                 count -= retval;
1574                 sum += retval;
1575                 if (retval == chunk && count > 0)
1576                         goto repeat;
1577         }
1578
1579         up(&ll_i2info(inode)->lli_write_sem);
1580
1581         retval = (sum > 0) ? sum : retval;
1582         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1583                            retval > 0 ? retval : 0);
1584         RETURN(retval);
1585 }
1586
1587 /*
1588  * Send file content (through pagecache) somewhere with helper
1589  */
1590 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1591 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1592                                 read_actor_t actor, void *target)
1593 {
1594         struct inode *inode = in_file->f_dentry->d_inode;
1595         struct ll_inode_info *lli = ll_i2info(inode);
1596         struct lov_stripe_md *lsm = lli->lli_smd;
1597         struct ll_lock_tree tree;
1598         struct ll_lock_tree_node *node;
1599         struct ost_lvb lvb;
1600         struct ll_ra_read bead;
1601         int rc;
1602         ssize_t retval;
1603         __u64 kms;
1604         ENTRY;
1605         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1606                inode->i_ino, inode->i_generation, inode, count, *ppos);
1607
1608         /* "If nbyte is 0, read() will return 0 and have no other results."
1609          *                      -- Single Unix Spec */
1610         if (count == 0)
1611                 RETURN(0);
1612
1613         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1614         /* turn off the kernel's read-ahead */
1615         in_file->f_ra.ra_pages = 0;
1616
1617         /* File with no objects, nothing to lock */
1618         if (!lsm)
1619                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1620
1621         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1622         if (IS_ERR(node))
1623                 RETURN(PTR_ERR(node));
1624
1625         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1626         rc = ll_tree_lock(&tree, node, NULL, count,
1627                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1628         if (rc != 0)
1629                 RETURN(rc);
1630
1631         ll_inode_size_lock(inode, 1);
1632         /*
1633          * Consistency guarantees: following possibilities exist for the
1634          * relation between region being read and real file size at this
1635          * moment:
1636          *
1637          *  (A): the region is completely inside of the file;
1638          *
1639          *  (B-x): x bytes of region are inside of the file, the rest is
1640          *  outside;
1641          *
1642          *  (C): the region is completely outside of the file.
1643          *
1644          * This classification is stable under DLM lock acquired by
1645          * ll_tree_lock() above, because to change class, other client has to
1646          * take DLM lock conflicting with our lock. Also, any updates to
1647          * ->i_size by other threads on this client are serialized by
1648          * ll_inode_size_lock(). This guarantees that short reads are handled
1649          * correctly in the face of concurrent writes and truncates.
1650          */
1651         inode_init_lvb(inode, &lvb);
1652         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1653         kms = lvb.lvb_size;
1654         if (*ppos + count - 1 > kms) {
1655                 /* A glimpse is necessary to determine whether we return a
1656                  * short read (B) or some zeroes at the end of the buffer (C) */
1657                 ll_inode_size_unlock(inode, 1);
1658                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1659                 if (retval)
1660                         goto out;
1661         } else {
1662                 /* region is within kms and, hence, within real file size (A) */
1663                 i_size_write(inode, kms);
1664                 ll_inode_size_unlock(inode, 1);
1665         }
1666
1667         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1668                inode->i_ino, count, *ppos, i_size_read(inode));
1669
1670         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1671         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1672         ll_ra_read_in(in_file, &bead);
1673         /* BUG: 5972 */
1674         file_accessed(in_file);
1675         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1676         ll_ra_read_ex(in_file, &bead);
1677
1678  out:
1679         ll_tree_unlock(&tree);
1680         RETURN(retval);
1681 }
1682 #endif
1683
1684 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1685                                unsigned long arg)
1686 {
1687         struct ll_inode_info *lli = ll_i2info(inode);
1688         struct obd_export *exp = ll_i2dtexp(inode);
1689         struct ll_recreate_obj ucreatp;
1690         struct obd_trans_info oti = { 0 };
1691         struct obdo *oa = NULL;
1692         int lsm_size;
1693         int rc = 0;
1694         struct lov_stripe_md *lsm, *lsm2;
1695         ENTRY;
1696
1697         if (!capable (CAP_SYS_ADMIN))
1698                 RETURN(-EPERM);
1699
1700         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1701                             sizeof(struct ll_recreate_obj));
1702         if (rc) {
1703                 RETURN(-EFAULT);
1704         }
1705         OBDO_ALLOC(oa);
1706         if (oa == NULL)
1707                 RETURN(-ENOMEM);
1708
1709         down(&lli->lli_size_sem);
1710         lsm = lli->lli_smd;
1711         if (lsm == NULL)
1712                 GOTO(out, rc = -ENOENT);
1713         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1714                    (lsm->lsm_stripe_count));
1715
1716         OBD_ALLOC(lsm2, lsm_size);
1717         if (lsm2 == NULL)
1718                 GOTO(out, rc = -ENOMEM);
1719
1720         oa->o_id = ucreatp.lrc_id;
1721         oa->o_gr = ucreatp.lrc_group;
1722         oa->o_nlink = ucreatp.lrc_ost_idx;
1723         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1724         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1725         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1726                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1727
1728         oti.oti_objid = NULL;
1729         memcpy(lsm2, lsm, lsm_size);
1730         rc = obd_create(exp, oa, &lsm2, &oti);
1731
1732         OBD_FREE(lsm2, lsm_size);
1733         GOTO(out, rc);
1734 out:
1735         up(&lli->lli_size_sem);
1736         OBDO_FREE(oa);
1737         return rc;
1738 }
1739
1740 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1741                              int flags, struct lov_user_md *lum, int lum_size)
1742 {
1743         struct ll_inode_info *lli = ll_i2info(inode);
1744         struct lov_stripe_md *lsm;
1745         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1746         int rc = 0;
1747         ENTRY;
1748
1749         down(&lli->lli_size_sem);
1750         lsm = lli->lli_smd;
1751         if (lsm) {
1752                 up(&lli->lli_size_sem);
1753                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1754                        inode->i_ino);
1755                 RETURN(-EEXIST);
1756         }
1757
1758         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1759         if (rc)
1760                 GOTO(out, rc);
1761         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1762                 GOTO(out_req_free, rc = -ENOENT);
1763         rc = oit.d.lustre.it_status;
1764         if (rc < 0)
1765                 GOTO(out_req_free, rc);
1766
1767         ll_release_openhandle(file->f_dentry, &oit);
1768
1769  out:
1770         up(&lli->lli_size_sem);
1771         ll_intent_release(&oit);
1772         RETURN(rc);
1773 out_req_free:
1774         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1775         goto out;
1776 }
1777
1778 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1779                              struct lov_mds_md **lmmp, int *lmm_size, 
1780                              struct ptlrpc_request **request)
1781 {
1782         struct ll_sb_info *sbi = ll_i2sbi(inode);
1783         struct mdt_body  *body;
1784         struct lov_mds_md *lmm = NULL;
1785         struct ptlrpc_request *req = NULL;
1786         struct obd_capa *oc;
1787         int rc, lmmsize;
1788
1789         rc = ll_get_max_mdsize(sbi, &lmmsize);
1790         if (rc)
1791                 RETURN(rc);
1792
1793         oc = ll_mdscapa_get(inode);
1794         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1795                              oc, filename, strlen(filename) + 1,
1796                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize, &req);
1797         capa_put(oc);
1798         if (rc < 0) {
1799                 CDEBUG(D_INFO, "md_getattr_name failed "
1800                        "on %s: rc %d\n", filename, rc);
1801                 GOTO(out, rc);
1802         }
1803
1804         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
1805         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1806         /* swabbed by mdc_getattr_name */
1807         LASSERT_REPSWABBED(req, REPLY_REC_OFF);
1808
1809         lmmsize = body->eadatasize;
1810
1811         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1812                         lmmsize == 0) {
1813                 GOTO(out, rc = -ENODATA);
1814         }
1815
1816         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, lmmsize);
1817         LASSERT(lmm != NULL);
1818         LASSERT_REPSWABBED(req, REPLY_REC_OFF + 1);
1819
1820         /*
1821          * This is coming from the MDS, so is probably in
1822          * little endian.  We convert it to host endian before
1823          * passing it to userspace.
1824          */
1825         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1826                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1827                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1828         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1829                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1830         }
1831
1832         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1833                 struct lov_stripe_md *lsm;
1834                 struct lov_user_md_join *lmj;
1835                 int lmj_size, i, aindex = 0;
1836
1837                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1838                 if (rc < 0)
1839                         GOTO(out, rc = -ENOMEM);
1840                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1841                 if (rc)
1842                         GOTO(out_free_memmd, rc);
1843
1844                 lmj_size = sizeof(struct lov_user_md_join) +
1845                            lsm->lsm_stripe_count *
1846                            sizeof(struct lov_user_ost_data_join);
1847                 OBD_ALLOC(lmj, lmj_size);
1848                 if (!lmj)
1849                         GOTO(out_free_memmd, rc = -ENOMEM);
1850
1851                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1852                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1853                         struct lov_extent *lex =
1854                                 &lsm->lsm_array->lai_ext_array[aindex];
1855
1856                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1857                                 aindex ++;
1858                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1859                                         LPU64" len %d\n", aindex, i,
1860                                         lex->le_start, (int)lex->le_len);
1861                         lmj->lmm_objects[i].l_extent_start =
1862                                 lex->le_start;
1863
1864                         if ((int)lex->le_len == -1)
1865                                 lmj->lmm_objects[i].l_extent_end = -1;
1866                         else
1867                                 lmj->lmm_objects[i].l_extent_end =
1868                                         lex->le_start + lex->le_len;
1869                         lmj->lmm_objects[i].l_object_id =
1870                                 lsm->lsm_oinfo[i]->loi_id;
1871                         lmj->lmm_objects[i].l_object_gr =
1872                                 lsm->lsm_oinfo[i]->loi_gr;
1873                         lmj->lmm_objects[i].l_ost_gen =
1874                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1875                         lmj->lmm_objects[i].l_ost_idx =
1876                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1877                 }
1878                 lmm = (struct lov_mds_md *)lmj;
1879                 lmmsize = lmj_size;
1880 out_free_memmd:
1881                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
1882         }
1883 out:
1884         *lmmp = lmm;
1885         *lmm_size = lmmsize;
1886         *request = req;
1887         return rc;
1888 }
1889
1890 static int ll_lov_setea(struct inode *inode, struct file *file,
1891                             unsigned long arg)
1892 {
1893         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1894         struct lov_user_md  *lump;
1895         int lum_size = sizeof(struct lov_user_md) +
1896                        sizeof(struct lov_user_ost_data);
1897         int rc;
1898         ENTRY;
1899
1900         if (!capable (CAP_SYS_ADMIN))
1901                 RETURN(-EPERM);
1902
1903         OBD_ALLOC(lump, lum_size);
1904         if (lump == NULL) {
1905                 RETURN(-ENOMEM);
1906         }
1907         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1908         if (rc) {
1909                 OBD_FREE(lump, lum_size);
1910                 RETURN(-EFAULT);
1911         }
1912
1913         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1914
1915         OBD_FREE(lump, lum_size);
1916         RETURN(rc);
1917 }
1918
1919 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1920                             unsigned long arg)
1921 {
1922         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1923         int rc;
1924         int flags = FMODE_WRITE;
1925         ENTRY;
1926
1927         /* Bug 1152: copy properly when this is no longer true */
1928         LASSERT(sizeof(lum) == sizeof(*lump));
1929         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1930         rc = copy_from_user(&lum, lump, sizeof(lum));
1931         if (rc)
1932                 RETURN(-EFAULT);
1933
1934         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1935         if (rc == 0) {
1936                  put_user(0, &lump->lmm_stripe_count);
1937                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1938                                     0, ll_i2info(inode)->lli_smd, lump);
1939         }
1940         RETURN(rc);
1941 }
1942
1943 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1944 {
1945         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1946
1947         if (!lsm)
1948                 RETURN(-ENODATA);
1949
1950         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1951                             (void *)arg);
1952 }
1953
1954 static int ll_get_grouplock(struct inode *inode, struct file *file,
1955                             unsigned long arg)
1956 {
1957         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1958         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1959                                                     .end = OBD_OBJECT_EOF}};
1960         struct lustre_handle lockh = { 0 };
1961         struct ll_inode_info *lli = ll_i2info(inode);
1962         struct lov_stripe_md *lsm = lli->lli_smd;
1963         int flags = 0, rc;
1964         ENTRY;
1965
1966         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1967                 RETURN(-EINVAL);
1968         }
1969
1970         policy.l_extent.gid = arg;
1971         if (file->f_flags & O_NONBLOCK)
1972                 flags = LDLM_FL_BLOCK_NOWAIT;
1973
1974         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1975         if (rc)
1976                 RETURN(rc);
1977
1978         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1979         fd->fd_gid = arg;
1980         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1981
1982         RETURN(0);
1983 }
1984
1985 static int ll_put_grouplock(struct inode *inode, struct file *file,
1986                             unsigned long arg)
1987 {
1988         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1989         struct ll_inode_info *lli = ll_i2info(inode);
1990         struct lov_stripe_md *lsm = lli->lli_smd;
1991         int rc;
1992         ENTRY;
1993
1994         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1995                 /* Ugh, it's already unlocked. */
1996                 RETURN(-EINVAL);
1997         }
1998
1999         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2000                 RETURN(-EINVAL);
2001
2002         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2003
2004         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2005         if (rc)
2006                 RETURN(rc);
2007
2008         fd->fd_gid = 0;
2009         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2010
2011         RETURN(0);
2012 }
2013
2014 static int join_sanity_check(struct inode *head, struct inode *tail)
2015 {
2016         ENTRY;
2017         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2018                 CERROR("server do not support join \n");
2019                 RETURN(-EINVAL);
2020         }
2021         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2022                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2023                        head->i_ino, tail->i_ino);
2024                 RETURN(-EINVAL);
2025         }
2026         if (head->i_ino == tail->i_ino) {
2027                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2028                 RETURN(-EINVAL);
2029         }
2030         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2031                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2032                 RETURN(-EINVAL);
2033         }
2034         RETURN(0);
2035 }
2036
2037 static int join_file(struct inode *head_inode, struct file *head_filp,
2038                      struct file *tail_filp)
2039 {
2040         struct dentry *tail_dentry = tail_filp->f_dentry;
2041         struct lookup_intent oit = {.it_op = IT_OPEN,
2042                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2043         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2044                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2045
2046         struct lustre_handle lockh;
2047         struct md_op_data *op_data;
2048         int    rc;
2049         loff_t data;
2050         ENTRY;
2051
2052         tail_dentry = tail_filp->f_dentry;
2053
2054         data = i_size_read(head_inode);
2055         op_data = ll_prep_md_op_data(NULL, head_inode,
2056                                      tail_dentry->d_parent->d_inode,
2057                                      tail_dentry->d_name.name,
2058                                      tail_dentry->d_name.len, 0,
2059                                      LUSTRE_OPC_ANY, &data);
2060         if (IS_ERR(op_data))
2061                 RETURN(PTR_ERR(op_data));
2062
2063         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2064                          op_data, &lockh, NULL, 0, 0);
2065
2066         ll_finish_md_op_data(op_data);
2067         if (rc < 0)
2068                 GOTO(out, rc);
2069
2070         rc = oit.d.lustre.it_status;
2071
2072         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2073                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2074                 ptlrpc_req_finished((struct ptlrpc_request *)
2075                                     oit.d.lustre.it_data);
2076                 GOTO(out, rc);
2077         }
2078
2079         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2080                                            * away */
2081                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2082                 oit.d.lustre.it_lock_mode = 0;
2083         }
2084         ll_release_openhandle(head_filp->f_dentry, &oit);
2085 out:
2086         ll_intent_release(&oit);
2087         RETURN(rc);
2088 }
2089
2090 static int ll_file_join(struct inode *head, struct file *filp,
2091                         char *filename_tail)
2092 {
2093         struct inode *tail = NULL, *first = NULL, *second = NULL;
2094         struct dentry *tail_dentry;
2095         struct file *tail_filp, *first_filp, *second_filp;
2096         struct ll_lock_tree first_tree, second_tree;
2097         struct ll_lock_tree_node *first_node, *second_node;
2098         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2099         int rc = 0, cleanup_phase = 0;
2100         ENTRY;
2101
2102         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2103                head->i_ino, head->i_generation, head, filename_tail);
2104
2105         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2106         if (IS_ERR(tail_filp)) {
2107                 CERROR("Can not open tail file %s", filename_tail);
2108                 rc = PTR_ERR(tail_filp);
2109                 GOTO(cleanup, rc);
2110         }
2111         tail = igrab(tail_filp->f_dentry->d_inode);
2112
2113         tlli = ll_i2info(tail);
2114         tail_dentry = tail_filp->f_dentry;
2115         LASSERT(tail_dentry);
2116         cleanup_phase = 1;
2117
2118         /*reorder the inode for lock sequence*/
2119         first = head->i_ino > tail->i_ino ? head : tail;
2120         second = head->i_ino > tail->i_ino ? tail : head;
2121         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2122         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2123
2124         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2125                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2126         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2127         if (IS_ERR(first_node)){
2128                 rc = PTR_ERR(first_node);
2129                 GOTO(cleanup, rc);
2130         }
2131         first_tree.lt_fd = first_filp->private_data;
2132         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2133         if (rc != 0)
2134                 GOTO(cleanup, rc);
2135         cleanup_phase = 2;
2136
2137         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2138         if (IS_ERR(second_node)){
2139                 rc = PTR_ERR(second_node);
2140                 GOTO(cleanup, rc);
2141         }
2142         second_tree.lt_fd = second_filp->private_data;
2143         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2144         if (rc != 0)
2145                 GOTO(cleanup, rc);
2146         cleanup_phase = 3;
2147
2148         rc = join_sanity_check(head, tail);
2149         if (rc)
2150                 GOTO(cleanup, rc);
2151
2152         rc = join_file(head, filp, tail_filp);
2153         if (rc)
2154                 GOTO(cleanup, rc);
2155 cleanup:
2156         switch (cleanup_phase) {
2157         case 3:
2158                 ll_tree_unlock(&second_tree);
2159                 obd_cancel_unused(ll_i2dtexp(second),
2160                                   ll_i2info(second)->lli_smd, 0, NULL);
2161         case 2:
2162                 ll_tree_unlock(&first_tree);
2163                 obd_cancel_unused(ll_i2dtexp(first),
2164                                   ll_i2info(first)->lli_smd, 0, NULL);
2165         case 1:
2166                 filp_close(tail_filp, 0);
2167                 if (tail)
2168                         iput(tail);
2169                 if (head && rc == 0) {
2170                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2171                                        &hlli->lli_smd);
2172                         hlli->lli_smd = NULL;
2173                 }
2174         case 0:
2175                 break;
2176         default:
2177                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2178                 LBUG();
2179         }
2180         RETURN(rc);
2181 }
2182
2183 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2184 {
2185         struct inode *inode = dentry->d_inode;
2186         struct obd_client_handle *och;
2187         int rc;
2188         ENTRY;
2189
2190         LASSERT(inode);
2191
2192         /* Root ? Do nothing. */
2193         if (dentry->d_inode->i_sb->s_root == dentry)
2194                 RETURN(0);
2195
2196         /* No open handle to close? Move away */
2197         if (!it_disposition(it, DISP_OPEN_OPEN))
2198                 RETURN(0);
2199
2200         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2201
2202         OBD_ALLOC(och, sizeof(*och));
2203         if (!och)
2204                 GOTO(out, rc = -ENOMEM);
2205
2206         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2207                     ll_i2info(inode), it, och);
2208
2209         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2210                                        inode, och);
2211  out:
2212         /* this one is in place of ll_file_open */
2213         ptlrpc_req_finished(it->d.lustre.it_data);
2214         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2215         RETURN(rc);
2216 }
2217
2218 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2219                   unsigned long arg)
2220 {
2221         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2222         int flags;
2223         ENTRY;
2224
2225         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2226                inode->i_generation, inode, cmd);
2227         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2228
2229         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2230         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2231                 RETURN(-ENOTTY);
2232
2233         switch(cmd) {
2234         case LL_IOC_GETFLAGS:
2235                 /* Get the current value of the file flags */
2236                 return put_user(fd->fd_flags, (int *)arg);
2237         case LL_IOC_SETFLAGS:
2238         case LL_IOC_CLRFLAGS:
2239                 /* Set or clear specific file flags */
2240                 /* XXX This probably needs checks to ensure the flags are
2241                  *     not abused, and to handle any flag side effects.
2242                  */
2243                 if (get_user(flags, (int *) arg))
2244                         RETURN(-EFAULT);
2245
2246                 if (cmd == LL_IOC_SETFLAGS) {
2247                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2248                             !(file->f_flags & O_DIRECT)) {
2249                                 CERROR("%s: unable to disable locking on "
2250                                        "non-O_DIRECT file\n", current->comm);
2251                                 RETURN(-EINVAL);
2252                         }
2253
2254                         fd->fd_flags |= flags;
2255                 } else {
2256                         fd->fd_flags &= ~flags;
2257                 }
2258                 RETURN(0);
2259         case LL_IOC_LOV_SETSTRIPE:
2260                 RETURN(ll_lov_setstripe(inode, file, arg));
2261         case LL_IOC_LOV_SETEA:
2262                 RETURN(ll_lov_setea(inode, file, arg));
2263         case LL_IOC_LOV_GETSTRIPE:
2264                 RETURN(ll_lov_getstripe(inode, arg));
2265         case LL_IOC_RECREATE_OBJ:
2266                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2267         case EXT3_IOC_GETFLAGS:
2268         case EXT3_IOC_SETFLAGS:
2269                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2270         case EXT3_IOC_GETVERSION_OLD:
2271         case EXT3_IOC_GETVERSION:
2272                 RETURN(put_user(inode->i_generation, (int *)arg));
2273         case LL_IOC_JOIN: {
2274                 char *ftail;
2275                 int rc;
2276
2277                 ftail = getname((const char *)arg);
2278                 if (IS_ERR(ftail))
2279                         RETURN(PTR_ERR(ftail));
2280                 rc = ll_file_join(inode, file, ftail);
2281                 putname(ftail);
2282                 RETURN(rc);
2283         }
2284         case LL_IOC_GROUP_LOCK:
2285                 RETURN(ll_get_grouplock(inode, file, arg));
2286         case LL_IOC_GROUP_UNLOCK:
2287                 RETURN(ll_put_grouplock(inode, file, arg));
2288         case IOC_OBD_STATFS:
2289                 RETURN(ll_obd_statfs(inode, (void *)arg));
2290
2291         /* We need to special case any other ioctls we want to handle,
2292          * to send them to the MDS/OST as appropriate and to properly
2293          * network encode the arg field.
2294         case EXT3_IOC_SETVERSION_OLD:
2295         case EXT3_IOC_SETVERSION:
2296         */
2297         case LL_IOC_FLUSHCTX:
2298                 RETURN(ll_flush_ctx(inode));
2299         case LL_IOC_GETFACL: {
2300                 struct rmtacl_ioctl_data ioc;
2301
2302                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2303                         RETURN(-EFAULT);
2304
2305                 RETURN(ll_ioctl_getfacl(inode, &ioc));
2306         }
2307         case LL_IOC_SETFACL: {
2308                 struct rmtacl_ioctl_data ioc;
2309
2310                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2311                         RETURN(-EFAULT);
2312
2313                 RETURN(ll_ioctl_setfacl(inode, &ioc));
2314         }
2315         default: {
2316                 int err;
2317
2318                 if (LLIOC_STOP == 
2319                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2320                         RETURN(err);
2321
2322                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2323                                      (void *)arg));
2324         }
2325         }
2326 }
2327
2328 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2329 {
2330         struct inode *inode = file->f_dentry->d_inode;
2331         struct ll_inode_info *lli = ll_i2info(inode);
2332         struct lov_stripe_md *lsm = lli->lli_smd;
2333         loff_t retval;
2334         ENTRY;
2335         retval = offset + ((origin == 2) ? i_size_read(inode) :
2336                            (origin == 1) ? file->f_pos : 0);
2337         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2338                inode->i_ino, inode->i_generation, inode, retval, retval,
2339                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2340         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2341
2342         if (origin == 2) { /* SEEK_END */
2343                 int nonblock = 0, rc;
2344
2345                 if (file->f_flags & O_NONBLOCK)
2346                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2347
2348                 if (lsm != NULL) {
2349                         rc = ll_glimpse_size(inode, nonblock);
2350                         if (rc != 0)
2351                                 RETURN(rc);
2352                 }
2353
2354                 ll_inode_size_lock(inode, 0);
2355                 offset += i_size_read(inode);
2356                 ll_inode_size_unlock(inode, 0);
2357         } else if (origin == 1) { /* SEEK_CUR */
2358                 offset += file->f_pos;
2359         }
2360
2361         retval = -EINVAL;
2362         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2363                 if (offset != file->f_pos) {
2364                         file->f_pos = offset;
2365 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2366                         file->f_reada = 0;
2367                         file->f_version = ++event;
2368 #endif
2369                 }
2370                 retval = offset;
2371         }
2372         
2373         RETURN(retval);
2374 }
2375
2376 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2377 {
2378         struct inode *inode = dentry->d_inode;
2379         struct ll_inode_info *lli = ll_i2info(inode);
2380         struct lov_stripe_md *lsm = lli->lli_smd;
2381         struct ptlrpc_request *req;
2382         struct obd_capa *oc;
2383         int rc, err;
2384         ENTRY;
2385         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2386                inode->i_generation, inode);
2387         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2388
2389         /* fsync's caller has already called _fdata{sync,write}, we want
2390          * that IO to finish before calling the osc and mdc sync methods */
2391         rc = filemap_fdatawait(inode->i_mapping);
2392
2393         /* catch async errors that were recorded back when async writeback
2394          * failed for pages in this mapping. */
2395         err = lli->lli_async_rc;
2396         lli->lli_async_rc = 0;
2397         if (rc == 0)
2398                 rc = err;
2399         if (lsm) {
2400                 err = lov_test_and_clear_async_rc(lsm);
2401                 if (rc == 0)
2402                         rc = err;
2403         }
2404
2405         oc = ll_mdscapa_get(inode);
2406         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2407                       &req);
2408         capa_put(oc);
2409         if (!rc)
2410                 rc = err;
2411         if (!err)
2412                 ptlrpc_req_finished(req);
2413
2414         if (data && lsm) {
2415                 struct obdo *oa;
2416                 
2417                 OBDO_ALLOC(oa);
2418                 if (!oa)
2419                         RETURN(rc ? rc : -ENOMEM);
2420
2421                 oa->o_id = lsm->lsm_object_id;
2422                 oa->o_gr = lsm->lsm_object_gr;
2423                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2424                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2425                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2426                                            OBD_MD_FLGROUP);
2427
2428                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2429                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2430                                0, OBD_OBJECT_EOF, oc);
2431                 capa_put(oc);
2432                 if (!rc)
2433                         rc = err;
2434                 OBDO_FREE(oa);
2435         }
2436
2437         RETURN(rc);
2438 }
2439
2440 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2441 {
2442         struct inode *inode = file->f_dentry->d_inode;
2443         struct ll_sb_info *sbi = ll_i2sbi(inode);
2444         struct ldlm_res_id res_id =
2445                 { .name = { fid_seq(ll_inode2fid(inode)),
2446                             fid_oid(ll_inode2fid(inode)),
2447                             fid_ver(ll_inode2fid(inode)),
2448                             LDLM_FLOCK} };
2449         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2450                 ldlm_flock_completion_ast, NULL, file_lock };
2451         struct lustre_handle lockh = {0};
2452         ldlm_policy_data_t flock;
2453         int flags = 0;
2454         int rc;
2455         ENTRY;
2456
2457         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2458                inode->i_ino, file_lock);
2459
2460         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2461  
2462         if (file_lock->fl_flags & FL_FLOCK) {
2463                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2464                 /* set missing params for flock() calls */
2465                 file_lock->fl_end = OFFSET_MAX;
2466                 file_lock->fl_pid = current->tgid;
2467         }
2468         flock.l_flock.pid = file_lock->fl_pid;
2469         flock.l_flock.start = file_lock->fl_start;
2470         flock.l_flock.end = file_lock->fl_end;
2471
2472         switch (file_lock->fl_type) {
2473         case F_RDLCK:
2474                 einfo.ei_mode = LCK_PR;
2475                 break;
2476         case F_UNLCK:
2477                 /* An unlock request may or may not have any relation to
2478                  * existing locks so we may not be able to pass a lock handle
2479                  * via a normal ldlm_lock_cancel() request. The request may even
2480                  * unlock a byte range in the middle of an existing lock. In
2481                  * order to process an unlock request we need all of the same
2482                  * information that is given with a normal read or write record
2483                  * lock request. To avoid creating another ldlm unlock (cancel)
2484                  * message we'll treat a LCK_NL flock request as an unlock. */
2485                 einfo.ei_mode = LCK_NL;
2486                 break;
2487         case F_WRLCK:
2488                 einfo.ei_mode = LCK_PW;
2489                 break;
2490         default:
2491                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2492                 LBUG();
2493         }
2494
2495         switch (cmd) {
2496         case F_SETLKW:
2497 #ifdef F_SETLKW64
2498         case F_SETLKW64:
2499 #endif
2500                 flags = 0;
2501                 break;
2502         case F_SETLK:
2503 #ifdef F_SETLK64
2504         case F_SETLK64:
2505 #endif
2506                 flags = LDLM_FL_BLOCK_NOWAIT;
2507                 break;
2508         case F_GETLK:
2509 #ifdef F_GETLK64
2510         case F_GETLK64:
2511 #endif
2512                 flags = LDLM_FL_TEST_LOCK;
2513                 /* Save the old mode so that if the mode in the lock changes we
2514                  * can decrement the appropriate reader or writer refcount. */
2515                 file_lock->fl_type = einfo.ei_mode;
2516                 break;
2517         default:
2518                 CERROR("unknown fcntl lock command: %d\n", cmd);
2519                 LBUG();
2520         }
2521
2522         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2523                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2524                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2525
2526         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2527                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2528         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2529                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2530 #ifdef HAVE_F_OP_FLOCK
2531         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2532             !(flags & LDLM_FL_TEST_LOCK))
2533                 posix_lock_file_wait(file, file_lock);
2534 #endif
2535
2536         RETURN(rc);
2537 }
2538
2539 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2540 {
2541         ENTRY;
2542
2543         RETURN(-ENOSYS);
2544 }
2545
2546 int ll_have_md_lock(struct inode *inode, __u64 bits)
2547 {
2548         struct lustre_handle lockh;
2549         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2550         struct lu_fid *fid;
2551         int flags;
2552         ENTRY;
2553
2554         if (!inode)
2555                RETURN(0);
2556
2557         fid = &ll_i2info(inode)->lli_fid;
2558         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2559
2560         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2561         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2562                           LCK_CR|LCK_CW|LCK_PR, &lockh)) {
2563                 RETURN(1);
2564         }
2565
2566         RETURN(0);
2567 }
2568
2569 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2570         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2571                               * and return success */
2572                 inode->i_nlink = 0;
2573                 /* This path cannot be hit for regular files unless in
2574                  * case of obscure races, so no need to to validate
2575                  * size. */
2576                 if (!S_ISREG(inode->i_mode) &&
2577                     !S_ISDIR(inode->i_mode))
2578                         return 0;
2579         }
2580
2581         if (rc) {
2582                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2583                 return -abs(rc);
2584
2585         }
2586
2587         return 0;
2588 }
2589
2590 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2591 {
2592         struct inode *inode = dentry->d_inode;
2593         struct ptlrpc_request *req = NULL;
2594         struct ll_sb_info *sbi;
2595         struct obd_export *exp;
2596         int rc;
2597         ENTRY;
2598
2599         if (!inode) {
2600                 CERROR("REPORT THIS LINE TO PETER\n");
2601                 RETURN(0);
2602         }
2603         sbi = ll_i2sbi(inode);
2604
2605         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2606                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2607 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
2608         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_REVALIDATE, 1);
2609 #endif
2610
2611         exp = ll_i2mdexp(inode);
2612
2613         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2614                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2615                 struct md_op_data *op_data;
2616
2617                 /* Call getattr by fid, so do not provide name at all. */
2618                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2619                                              dentry->d_inode, NULL, 0, 0,
2620                                              LUSTRE_OPC_ANY, NULL);
2621                 if (IS_ERR(op_data))
2622                         RETURN(PTR_ERR(op_data));
2623
2624                 oit.it_flags |= O_CHECK_STALE;
2625                 rc = md_intent_lock(exp, op_data, NULL, 0,
2626                                     /* we are not interested in name
2627                                        based lookup */
2628                                     &oit, 0, &req,
2629                                     ll_md_blocking_ast, 0);
2630                 ll_finish_md_op_data(op_data);
2631                 oit.it_flags &= ~O_CHECK_STALE;
2632                 if (rc < 0) {
2633                         rc = ll_inode_revalidate_fini(inode, rc);
2634                         GOTO (out, rc);
2635                 }
2636
2637                 rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2638                 if (rc != 0) {
2639                         ll_intent_release(&oit);
2640                         GOTO(out, rc);
2641                 }
2642
2643                 /* Unlinked? Unhash dentry, so it is not picked up later by
2644                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2645                    here to preserve get_cwd functionality on 2.6.
2646                    Bug 10503 */
2647                 if (!dentry->d_inode->i_nlink) {
2648                         spin_lock(&dcache_lock);
2649                         ll_drop_dentry(dentry);
2650                         spin_unlock(&dcache_lock);
2651                 }
2652
2653                 ll_lookup_finish_locks(&oit, dentry);
2654         } else if (!ll_have_md_lock(dentry->d_inode,
2655                                     MDS_INODELOCK_UPDATE)) {
2656                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2657                 obd_valid valid = OBD_MD_FLGETATTR;
2658                 struct obd_capa *oc;
2659                 int ealen = 0;
2660
2661                 if (S_ISREG(inode->i_mode)) {
2662                         rc = ll_get_max_mdsize(sbi, &ealen);
2663                         if (rc)
2664                                 RETURN(rc);
2665                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2666                 }
2667                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2668                  * capa for this inode. Because we only keep capas of dirs
2669                  * fresh. */
2670                 oc = ll_mdscapa_get(inode);
2671                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2672                                 ealen, &req);
2673                 capa_put(oc);
2674                 if (rc) {
2675                         rc = ll_inode_revalidate_fini(inode, rc);
2676                         RETURN(rc);
2677                 }
2678
2679                 rc = ll_prep_inode(&inode, req, REPLY_REC_OFF,
2680                                    NULL);
2681                 if (rc)
2682                         GOTO(out, rc);
2683         }
2684
2685         /* if object not yet allocated, don't validate size */
2686         if (ll_i2info(inode)->lli_smd == NULL)
2687                 GOTO(out, rc = 0);
2688
2689         /* ll_glimpse_size will prefer locally cached writes if they extend
2690          * the file */
2691         rc = ll_glimpse_size(inode, 0);
2692         EXIT;
2693 out:
2694         ptlrpc_req_finished(req);
2695         return rc;
2696 }
2697
2698 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2699 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2700                   struct lookup_intent *it, struct kstat *stat)
2701 {
2702         struct inode *inode = de->d_inode;
2703         int res = 0;
2704
2705         res = ll_inode_revalidate_it(de, it);
2706         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2707
2708         if (res)
2709                 return res;
2710
2711         stat->dev = inode->i_sb->s_dev;
2712         stat->ino = inode->i_ino;
2713         stat->mode = inode->i_mode;
2714         stat->nlink = inode->i_nlink;
2715         stat->uid = inode->i_uid;
2716         stat->gid = inode->i_gid;
2717         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2718         stat->atime = inode->i_atime;
2719         stat->mtime = inode->i_mtime;
2720         stat->ctime = inode->i_ctime;
2721 #ifdef HAVE_INODE_BLKSIZE
2722         stat->blksize = inode->i_blksize;
2723 #else
2724         stat->blksize = 1 << inode->i_blkbits;
2725 #endif
2726
2727         ll_inode_size_lock(inode, 0);
2728         stat->size = i_size_read(inode);
2729         stat->blocks = inode->i_blocks;
2730         ll_inode_size_unlock(inode, 0);
2731
2732         return 0;
2733 }
2734 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2735 {
2736         struct lookup_intent it = { .it_op = IT_GETATTR };
2737
2738         return ll_getattr_it(mnt, de, &it, stat);
2739 }
2740 #endif
2741
2742 static
2743 int lustre_check_acl(struct inode *inode, int mask)
2744 {
2745 #ifdef CONFIG_FS_POSIX_ACL
2746         struct ll_inode_info *lli = ll_i2info(inode);
2747         struct posix_acl *acl;
2748         int rc;
2749         ENTRY;
2750
2751         spin_lock(&lli->lli_lock);
2752         acl = posix_acl_dup(lli->lli_posix_acl);
2753         spin_unlock(&lli->lli_lock);
2754
2755         if (!acl)
2756                 RETURN(-EAGAIN);
2757
2758         rc = posix_acl_permission(inode, acl, mask);
2759         posix_acl_release(acl);
2760
2761         RETURN(rc);
2762 #else
2763         return -EAGAIN;
2764 #endif
2765 }
2766
2767 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2768 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2769 {
2770         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2771                inode->i_ino, inode->i_generation, inode, mask);
2772         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2773                 return lustre_check_remote_perm(inode, mask);
2774         
2775         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2776         return generic_permission(inode, mask, lustre_check_acl);
2777 }
2778 #else
2779 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
2780 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2781 #else
2782 int ll_inode_permission(struct inode *inode, int mask)
2783 #endif
2784 {
2785         int mode = inode->i_mode;
2786         int rc;
2787
2788         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2789                inode->i_ino, inode->i_generation, inode, mask);
2790
2791         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2792                 return lustre_check_remote_perm(inode, mask);
2793
2794         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2795
2796         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2797             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2798                 return -EROFS;
2799         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2800                 return -EACCES;
2801         if (current->fsuid == inode->i_uid) {
2802                 mode >>= 6;
2803         } else if (1) {
2804                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2805                         goto check_groups;
2806                 rc = lustre_check_acl(inode, mask);
2807                 if (rc == -EAGAIN)
2808                         goto check_groups;
2809                 if (rc == -EACCES)
2810                         goto check_capabilities;
2811                 return rc;
2812         } else {
2813 check_groups:
2814                 if (in_group_p(inode->i_gid))
2815                         mode >>= 3;
2816         }
2817         if ((mode & mask & S_IRWXO) == mask)
2818                 return 0;
2819
2820 check_capabilities:
2821         if (!(mask & MAY_EXEC) ||
2822             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2823                 if (capable(CAP_DAC_OVERRIDE))
2824                         return 0;
2825
2826         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2827             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2828                 return 0;
2829         
2830         return -EACCES;
2831 }
2832 #endif
2833
2834 /* -o localflock - only provides locally consistent flock locks */
2835 struct file_operations ll_file_operations = {
2836         .read           = ll_file_read,
2837         .write          = ll_file_write,
2838         .ioctl          = ll_file_ioctl,
2839         .open           = ll_file_open,
2840         .release        = ll_file_release,
2841         .mmap           = ll_file_mmap,
2842         .llseek         = ll_file_seek,
2843 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2844         .sendfile       = ll_file_sendfile,
2845 #endif
2846         .fsync          = ll_fsync,
2847 };
2848
2849 struct file_operations ll_file_operations_flock = {
2850         .read           = ll_file_read,
2851         .write          = ll_file_write,
2852         .ioctl          = ll_file_ioctl,
2853         .open           = ll_file_open,
2854         .release        = ll_file_release,
2855         .mmap           = ll_file_mmap,
2856         .llseek         = ll_file_seek,
2857 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2858         .sendfile       = ll_file_sendfile,
2859 #endif
2860         .fsync          = ll_fsync,
2861 #ifdef HAVE_F_OP_FLOCK
2862         .flock          = ll_file_flock,
2863 #endif
2864         .lock           = ll_file_flock
2865 };
2866
2867 /* These are for -o noflock - to return ENOSYS on flock calls */
2868 struct file_operations ll_file_operations_noflock = {
2869         .read           = ll_file_read,
2870         .write          = ll_file_write,
2871         .ioctl          = ll_file_ioctl,
2872         .open           = ll_file_open,
2873         .release        = ll_file_release,
2874         .mmap           = ll_file_mmap,
2875         .llseek         = ll_file_seek,
2876 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2877         .sendfile       = ll_file_sendfile,
2878 #endif
2879         .fsync          = ll_fsync,
2880 #ifdef HAVE_F_OP_FLOCK
2881         .flock          = ll_file_noflock,
2882 #endif
2883         .lock           = ll_file_noflock
2884 };
2885
2886 struct inode_operations ll_file_inode_operations = {
2887 #ifdef LUSTRE_KERNEL_VERSION
2888         .setattr_raw    = ll_setattr_raw,
2889 #endif
2890         .setattr        = ll_setattr,
2891         .truncate       = ll_truncate,
2892 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2893         .getattr        = ll_getattr,
2894 #else
2895         .revalidate_it  = ll_inode_revalidate_it,
2896 #endif
2897         .permission     = ll_inode_permission,
2898         .setxattr       = ll_setxattr,
2899         .getxattr       = ll_getxattr,
2900         .listxattr      = ll_listxattr,
2901         .removexattr    = ll_removexattr,
2902 };
2903
2904 /* dynamic ioctl number support routins */
2905 static struct llioc_ctl_data {
2906         struct rw_semaphore ioc_sem;
2907         struct list_head    ioc_head;
2908 } llioc = { 
2909         __RWSEM_INITIALIZER(llioc.ioc_sem), 
2910         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2911 };
2912
2913
2914 struct llioc_data {
2915         struct list_head        iocd_list;
2916         unsigned int            iocd_size;
2917         llioc_callback_t        iocd_cb;
2918         unsigned int            iocd_count;
2919         unsigned int            iocd_cmd[0];
2920 };
2921
2922 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2923 {
2924         unsigned int size;
2925         struct llioc_data *in_data = NULL;
2926         ENTRY;
2927
2928         if (cb == NULL || cmd == NULL ||
2929             count > LLIOC_MAX_CMD || count < 0)
2930                 RETURN(NULL);
2931
2932         size = sizeof(*in_data) + count * sizeof(unsigned int);
2933         OBD_ALLOC(in_data, size);
2934         if (in_data == NULL)
2935                 RETURN(NULL);
2936
2937         memset(in_data, 0, sizeof(*in_data));
2938         in_data->iocd_size = size;
2939         in_data->iocd_cb = cb;
2940         in_data->iocd_count = count;
2941         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2942
2943         down_write(&llioc.ioc_sem);
2944         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2945         up_write(&llioc.ioc_sem);
2946
2947         RETURN(in_data);
2948 }
2949
2950 void ll_iocontrol_unregister(void *magic)
2951 {
2952         struct llioc_data *tmp;
2953
2954         if (magic == NULL)
2955                 return;
2956
2957         down_write(&llioc.ioc_sem);
2958         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2959                 if (tmp == magic) {
2960                         unsigned int size = tmp->iocd_size;
2961
2962                         list_del(&tmp->iocd_list);
2963                         up_write(&llioc.ioc_sem);
2964
2965                         OBD_FREE(tmp, size);
2966                         return;
2967                 }
2968         }
2969         up_write(&llioc.ioc_sem);
2970
2971         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2972 }
2973
2974 EXPORT_SYMBOL(ll_iocontrol_register);
2975 EXPORT_SYMBOL(ll_iocontrol_unregister);
2976
2977 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
2978                         unsigned int cmd, unsigned long arg, int *rcp)
2979 {
2980         enum llioc_iter ret = LLIOC_CONT;
2981         struct llioc_data *data;
2982         int rc = -EINVAL, i;
2983
2984         down_read(&llioc.ioc_sem);
2985         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2986                 for (i = 0; i < data->iocd_count; i++) {
2987                         if (cmd != data->iocd_cmd[i]) 
2988                                 continue;
2989
2990                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2991                         break;
2992                 }
2993
2994                 if (ret == LLIOC_STOP)
2995                         break;
2996         }
2997         up_read(&llioc.ioc_sem);
2998
2999         if (rcp)
3000                 *rcp = rc;
3001         return ret;
3002 }