Whamcloud - gitweb
LU-2156 debug: quiet overly verbose console errors
[fs/lustre-release.git] / lustre / llite / file.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <linux/pagemap.h>
47 #include <linux/file.h>
48 #include "llite_internal.h"
49 #include <lustre/ll_fiemap.h>
50
51 #include "cl_object.h"
52
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, CFS_ALLOC_IO);
58         fd->fd_write_failed = false;
59         return fd;
60 }
61
62 static void ll_file_data_put(struct ll_file_data *fd)
63 {
64         if (fd != NULL)
65                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
66 }
67
68 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
69                           struct lustre_handle *fh)
70 {
71         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
72         op_data->op_attr.ia_mode = inode->i_mode;
73         op_data->op_attr.ia_atime = inode->i_atime;
74         op_data->op_attr.ia_mtime = inode->i_mtime;
75         op_data->op_attr.ia_ctime = inode->i_ctime;
76         op_data->op_attr.ia_size = i_size_read(inode);
77         op_data->op_attr_blocks = inode->i_blocks;
78         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
79                                         ll_inode_to_ext_flags(inode->i_flags);
80         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
81         if (fh)
82                 op_data->op_handle = *fh;
83         op_data->op_capa1 = ll_mdscapa_get(inode);
84 }
85
86 /**
87  * Closes the IO epoch and packs all the attributes into @op_data for
88  * the CLOSE rpc.
89  */
90 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
91                              struct obd_client_handle *och)
92 {
93         ENTRY;
94
95         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
96                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
97
98         if (!(och->och_flags & FMODE_WRITE))
99                 goto out;
100
101         if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
102                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
103         else
104                 ll_ioepoch_close(inode, op_data, &och, 0);
105
106 out:
107         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
108         ll_prep_md_op_data(op_data, inode, NULL, NULL,
109                            0, 0, LUSTRE_OPC_ANY, NULL);
110         EXIT;
111 }
112
113 static int ll_close_inode_openhandle(struct obd_export *md_exp,
114                                      struct inode *inode,
115                                      struct obd_client_handle *och)
116 {
117         struct obd_export *exp = ll_i2mdexp(inode);
118         struct md_op_data *op_data;
119         struct ptlrpc_request *req = NULL;
120         struct obd_device *obd = class_exp2obd(exp);
121         int epoch_close = 1;
122         int rc;
123         ENTRY;
124
125         if (obd == NULL) {
126                 /*
127                  * XXX: in case of LMV, is this correct to access
128                  * ->exp_handle?
129                  */
130                 CERROR("Invalid MDC connection handle "LPX64"\n",
131                        ll_i2mdexp(inode)->exp_handle.h_cookie);
132                 GOTO(out, rc = 0);
133         }
134
135         OBD_ALLOC_PTR(op_data);
136         if (op_data == NULL)
137                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
138
139         ll_prepare_close(inode, op_data, och);
140         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
141         rc = md_close(md_exp, op_data, och->och_mod, &req);
142         if (rc == -EAGAIN) {
143                 /* This close must have the epoch closed. */
144                 LASSERT(epoch_close);
145                 /* MDS has instructed us to obtain Size-on-MDS attribute from
146                  * OSTs and send setattr to back to MDS. */
147                 rc = ll_som_update(inode, op_data);
148                 if (rc) {
149                         CERROR("inode %lu mdc Size-on-MDS update failed: "
150                                "rc = %d\n", inode->i_ino, rc);
151                         rc = 0;
152                 }
153         } else if (rc) {
154                 CERROR("inode %lu mdc close failed: rc = %d\n",
155                        inode->i_ino, rc);
156         }
157         ll_finish_md_op_data(op_data);
158
159         if (rc == 0) {
160                 rc = ll_objects_destroy(req, inode);
161                 if (rc)
162                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
163                                inode->i_ino, rc);
164         }
165
166         EXIT;
167 out:
168
169         if (exp_connect_som(exp) && !epoch_close &&
170             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
171                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
172         } else {
173                 md_clear_open_replay_data(md_exp, och);
174                 /* Free @och if it is not waiting for DONE_WRITING. */
175                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
176                 OBD_FREE_PTR(och);
177         }
178         if (req) /* This is close request */
179                 ptlrpc_req_finished(req);
180         return rc;
181 }
182
183 int ll_md_real_close(struct inode *inode, int flags)
184 {
185         struct ll_inode_info *lli = ll_i2info(inode);
186         struct obd_client_handle **och_p;
187         struct obd_client_handle *och;
188         __u64 *och_usecount;
189         int rc = 0;
190         ENTRY;
191
192         if (flags & FMODE_WRITE) {
193                 och_p = &lli->lli_mds_write_och;
194                 och_usecount = &lli->lli_open_fd_write_count;
195         } else if (flags & FMODE_EXEC) {
196                 och_p = &lli->lli_mds_exec_och;
197                 och_usecount = &lli->lli_open_fd_exec_count;
198         } else {
199                 LASSERT(flags & FMODE_READ);
200                 och_p = &lli->lli_mds_read_och;
201                 och_usecount = &lli->lli_open_fd_read_count;
202         }
203
204         mutex_lock(&lli->lli_och_mutex);
205         if (*och_usecount) { /* There are still users of this handle, so
206                                 skip freeing it. */
207                 mutex_unlock(&lli->lli_och_mutex);
208                 RETURN(0);
209         }
210         och=*och_p;
211         *och_p = NULL;
212         mutex_unlock(&lli->lli_och_mutex);
213
214         if (och) { /* There might be a race and somebody have freed this och
215                       already */
216                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
217                                                inode, och);
218         }
219
220         RETURN(rc);
221 }
222
223 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
224                 struct file *file)
225 {
226         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
227         struct ll_inode_info *lli = ll_i2info(inode);
228         int rc = 0;
229         ENTRY;
230
231         /* clear group lock, if present */
232         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
233                 ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
234
235         /* Let's see if we have good enough OPEN lock on the file and if
236            we can skip talking to MDS */
237         if (file->f_dentry->d_inode) { /* Can this ever be false? */
238                 int lockmode;
239                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
240                 struct lustre_handle lockh;
241                 struct inode *inode = file->f_dentry->d_inode;
242                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
243
244                 mutex_lock(&lli->lli_och_mutex);
245                 if (fd->fd_omode & FMODE_WRITE) {
246                         lockmode = LCK_CW;
247                         LASSERT(lli->lli_open_fd_write_count);
248                         lli->lli_open_fd_write_count--;
249                 } else if (fd->fd_omode & FMODE_EXEC) {
250                         lockmode = LCK_PR;
251                         LASSERT(lli->lli_open_fd_exec_count);
252                         lli->lli_open_fd_exec_count--;
253                 } else {
254                         lockmode = LCK_CR;
255                         LASSERT(lli->lli_open_fd_read_count);
256                         lli->lli_open_fd_read_count--;
257                 }
258                 mutex_unlock(&lli->lli_och_mutex);
259
260                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
261                                    LDLM_IBITS, &policy, lockmode,
262                                    &lockh)) {
263                         rc = ll_md_real_close(file->f_dentry->d_inode,
264                                               fd->fd_omode);
265                 }
266         } else {
267                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
268                        file, file->f_dentry, file->f_dentry->d_name.name);
269         }
270
271         LUSTRE_FPRIVATE(file) = NULL;
272         ll_file_data_put(fd);
273         ll_capa_close(inode);
274
275         RETURN(rc);
276 }
277
278 /* While this returns an error code, fput() the caller does not, so we need
279  * to make every effort to clean up all of our state here.  Also, applications
280  * rarely check close errors and even if an error is returned they will not
281  * re-try the close call.
282  */
283 int ll_file_release(struct inode *inode, struct file *file)
284 {
285         struct ll_file_data *fd;
286         struct ll_sb_info *sbi = ll_i2sbi(inode);
287         struct ll_inode_info *lli = ll_i2info(inode);
288         int rc;
289         ENTRY;
290
291         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
292                inode->i_generation, inode);
293
294 #ifdef CONFIG_FS_POSIX_ACL
295         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
296             inode == inode->i_sb->s_root->d_inode) {
297                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
298
299                 LASSERT(fd != NULL);
300                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
301                         fd->fd_flags &= ~LL_FILE_RMTACL;
302                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
303                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
304                 }
305         }
306 #endif
307
308         if (inode->i_sb->s_root != file->f_dentry)
309                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
310         fd = LUSTRE_FPRIVATE(file);
311         LASSERT(fd != NULL);
312
313         /* The last ref on @file, maybe not the the owner pid of statahead.
314          * Different processes can open the same dir, "ll_opendir_key" means:
315          * it is me that should stop the statahead thread. */
316         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
317             lli->lli_opendir_pid != 0)
318                 ll_stop_statahead(inode, lli->lli_opendir_key);
319
320         if (inode->i_sb->s_root == file->f_dentry) {
321                 LUSTRE_FPRIVATE(file) = NULL;
322                 ll_file_data_put(fd);
323                 RETURN(0);
324         }
325
326         if (!S_ISDIR(inode->i_mode)) {
327                 lov_read_and_clear_async_rc(lli->lli_clob);
328                 lli->lli_async_rc = 0;
329         }
330
331         rc = ll_md_close(sbi->ll_md_exp, inode, file);
332
333         if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
334                 libcfs_debug_dumplog();
335
336         RETURN(rc);
337 }
338
339 static int ll_intent_file_open(struct file *file, void *lmm,
340                                int lmmsize, struct lookup_intent *itp)
341 {
342         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
343         struct dentry *parent = file->f_dentry->d_parent;
344         const char *name = file->f_dentry->d_name.name;
345         const int len = file->f_dentry->d_name.len;
346         struct md_op_data *op_data;
347         struct ptlrpc_request *req;
348         __u32 opc = LUSTRE_OPC_ANY;
349         int rc;
350         ENTRY;
351
352         if (!parent)
353                 RETURN(-ENOENT);
354
355         /* Usually we come here only for NFSD, and we want open lock.
356            But we can also get here with pre 2.6.15 patchless kernels, and in
357            that case that lock is also ok */
358         /* We can also get here if there was cached open handle in revalidate_it
359          * but it disappeared while we were getting from there to ll_file_open.
360          * But this means this file was closed and immediatelly opened which
361          * makes a good candidate for using OPEN lock */
362         /* If lmmsize & lmm are not 0, we are just setting stripe info
363          * parameters. No need for the open lock */
364         if (lmm == NULL && lmmsize == 0) {
365                 itp->it_flags |= MDS_OPEN_LOCK;
366                 if (itp->it_flags & FMODE_WRITE)
367                         opc = LUSTRE_OPC_CREATE;
368         }
369
370         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
371                                       file->f_dentry->d_inode, name, len,
372                                       O_RDWR, opc, NULL);
373         if (IS_ERR(op_data))
374                 RETURN(PTR_ERR(op_data));
375
376         itp->it_flags |= MDS_OPEN_BY_FID;
377         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
378                             0 /*unused */, &req, ll_md_blocking_ast, 0);
379         ll_finish_md_op_data(op_data);
380         if (rc == -ESTALE) {
381                 /* reason for keep own exit path - don`t flood log
382                 * with messages with -ESTALE errors.
383                 */
384                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
385                      it_open_error(DISP_OPEN_OPEN, itp))
386                         GOTO(out, rc);
387                 ll_release_openhandle(file->f_dentry, itp);
388                 GOTO(out, rc);
389         }
390
391         if (it_disposition(itp, DISP_LOOKUP_NEG))
392                 GOTO(out, rc = -ENOENT);
393
394         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
395                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
396                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
397                 GOTO(out, rc);
398         }
399
400         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
401         if (!rc && itp->d.lustre.it_lock_mode)
402                 ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
403                                  itp, NULL);
404
405 out:
406         ptlrpc_req_finished(itp->d.lustre.it_data);
407         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
408         ll_intent_drop_lock(itp);
409
410         RETURN(rc);
411 }
412
413 /**
414  * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
415  * not believe attributes if a few ioepoch holders exist. Attributes for
416  * previous ioepoch if new one is opened are also skipped by MDS.
417  */
418 void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
419 {
420         if (ioepoch && lli->lli_ioepoch != ioepoch) {
421                 lli->lli_ioepoch = ioepoch;
422                 CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
423                        ioepoch, PFID(&lli->lli_fid));
424         }
425 }
426
427 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
428                        struct lookup_intent *it, struct obd_client_handle *och)
429 {
430         struct ptlrpc_request *req = it->d.lustre.it_data;
431         struct mdt_body *body;
432
433         LASSERT(och);
434
435         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
436         LASSERT(body != NULL);                      /* reply already checked out */
437
438         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
439         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
440         och->och_fid = lli->lli_fid;
441         och->och_flags = it->it_flags;
442         ll_ioepoch_open(lli, body->ioepoch);
443
444         return md_set_open_replay_data(md_exp, och, req);
445 }
446
447 int ll_local_open(struct file *file, struct lookup_intent *it,
448                   struct ll_file_data *fd, struct obd_client_handle *och)
449 {
450         struct inode *inode = file->f_dentry->d_inode;
451         struct ll_inode_info *lli = ll_i2info(inode);
452         ENTRY;
453
454         LASSERT(!LUSTRE_FPRIVATE(file));
455
456         LASSERT(fd != NULL);
457
458         if (och) {
459                 struct ptlrpc_request *req = it->d.lustre.it_data;
460                 struct mdt_body *body;
461                 int rc;
462
463                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
464                 if (rc)
465                         RETURN(rc);
466
467                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
468                 if ((it->it_flags & FMODE_WRITE) &&
469                     (body->valid & OBD_MD_FLSIZE))
470                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
471                                lli->lli_ioepoch, PFID(&lli->lli_fid));
472         }
473
474         LUSTRE_FPRIVATE(file) = fd;
475         ll_readahead_init(inode, &fd->fd_ras);
476         fd->fd_omode = it->it_flags;
477         RETURN(0);
478 }
479
480 /* Open a file, and (for the very first open) create objects on the OSTs at
481  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
482  * creation or open until ll_lov_setstripe() ioctl is called.
483  *
484  * If we already have the stripe MD locally then we don't request it in
485  * md_open(), by passing a lmm_size = 0.
486  *
487  * It is up to the application to ensure no other processes open this file
488  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
489  * used.  We might be able to avoid races of that sort by getting lli_open_sem
490  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
491  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
492  */
493 int ll_file_open(struct inode *inode, struct file *file)
494 {
495         struct ll_inode_info *lli = ll_i2info(inode);
496         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
497                                           .it_flags = file->f_flags };
498         struct obd_client_handle **och_p = NULL;
499         __u64 *och_usecount = NULL;
500         struct ll_file_data *fd;
501         int rc = 0, opendir_set = 0;
502         ENTRY;
503
504         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
505                inode->i_generation, inode, file->f_flags);
506
507         it = file->private_data; /* XXX: compat macro */
508         file->private_data = NULL; /* prevent ll_local_open assertion */
509
510         fd = ll_file_data_get();
511         if (fd == NULL)
512                 GOTO(out_och_free, rc = -ENOMEM);
513
514         fd->fd_file = file;
515         if (S_ISDIR(inode->i_mode)) {
516                 spin_lock(&lli->lli_sa_lock);
517                 if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
518                     lli->lli_opendir_pid == 0) {
519                         lli->lli_opendir_key = fd;
520                         lli->lli_opendir_pid = cfs_curproc_pid();
521                         opendir_set = 1;
522                 }
523                 spin_unlock(&lli->lli_sa_lock);
524         }
525
526         if (inode->i_sb->s_root == file->f_dentry) {
527                 LUSTRE_FPRIVATE(file) = fd;
528                 RETURN(0);
529         }
530
531         if (!it || !it->d.lustre.it_disposition) {
532                 /* Convert f_flags into access mode. We cannot use file->f_mode,
533                  * because everything but O_ACCMODE mask was stripped from
534                  * there */
535                 if ((oit.it_flags + 1) & O_ACCMODE)
536                         oit.it_flags++;
537                 if (file->f_flags & O_TRUNC)
538                         oit.it_flags |= FMODE_WRITE;
539
540                 /* kernel only call f_op->open in dentry_open.  filp_open calls
541                  * dentry_open after call to open_namei that checks permissions.
542                  * Only nfsd_open call dentry_open directly without checking
543                  * permissions and because of that this code below is safe. */
544                 if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
545                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
546
547                 /* We do not want O_EXCL here, presumably we opened the file
548                  * already? XXX - NFS implications? */
549                 oit.it_flags &= ~O_EXCL;
550
551                 /* bug20584, if "it_flags" contains O_CREAT, the file will be
552                  * created if necessary, then "IT_CREAT" should be set to keep
553                  * consistent with it */
554                 if (oit.it_flags & O_CREAT)
555                         oit.it_op |= IT_CREAT;
556
557                 it = &oit;
558         }
559
560 restart:
561         /* Let's see if we have file open on MDS already. */
562         if (it->it_flags & FMODE_WRITE) {
563                 och_p = &lli->lli_mds_write_och;
564                 och_usecount = &lli->lli_open_fd_write_count;
565         } else if (it->it_flags & FMODE_EXEC) {
566                 och_p = &lli->lli_mds_exec_och;
567                 och_usecount = &lli->lli_open_fd_exec_count;
568          } else {
569                 och_p = &lli->lli_mds_read_och;
570                 och_usecount = &lli->lli_open_fd_read_count;
571         }
572
573         mutex_lock(&lli->lli_och_mutex);
574         if (*och_p) { /* Open handle is present */
575                 if (it_disposition(it, DISP_OPEN_OPEN)) {
576                         /* Well, there's extra open request that we do not need,
577                            let's close it somehow. This will decref request. */
578                         rc = it_open_error(DISP_OPEN_OPEN, it);
579                         if (rc) {
580                                 mutex_unlock(&lli->lli_och_mutex);
581                                 GOTO(out_openerr, rc);
582                         }
583
584                         ll_release_openhandle(file->f_dentry, it);
585                 }
586                 (*och_usecount)++;
587
588                 rc = ll_local_open(file, it, fd, NULL);
589                 if (rc) {
590                         (*och_usecount)--;
591                         mutex_unlock(&lli->lli_och_mutex);
592                         GOTO(out_openerr, rc);
593                 }
594         } else {
595                 LASSERT(*och_usecount == 0);
596                 if (!it->d.lustre.it_disposition) {
597                         /* We cannot just request lock handle now, new ELC code
598                            means that one of other OPEN locks for this file
599                            could be cancelled, and since blocking ast handler
600                            would attempt to grab och_mutex as well, that would
601                            result in a deadlock */
602                         mutex_unlock(&lli->lli_och_mutex);
603                         it->it_create_mode |= M_CHECK_STALE;
604                         rc = ll_intent_file_open(file, NULL, 0, it);
605                         it->it_create_mode &= ~M_CHECK_STALE;
606                         if (rc)
607                                 GOTO(out_openerr, rc);
608
609                         goto restart;
610                 }
611                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
612                 if (!*och_p)
613                         GOTO(out_och_free, rc = -ENOMEM);
614
615                 (*och_usecount)++;
616
617                 /* md_intent_lock() didn't get a request ref if there was an
618                  * open error, so don't do cleanup on the request here
619                  * (bug 3430) */
620                 /* XXX (green): Should not we bail out on any error here, not
621                  * just open error? */
622                 rc = it_open_error(DISP_OPEN_OPEN, it);
623                 if (rc)
624                         GOTO(out_och_free, rc);
625
626                 LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
627
628                 rc = ll_local_open(file, it, fd, *och_p);
629                 if (rc)
630                         GOTO(out_och_free, rc);
631         }
632         mutex_unlock(&lli->lli_och_mutex);
633         fd = NULL;
634
635         /* Must do this outside lli_och_mutex lock to prevent deadlock where
636            different kind of OPEN lock for this same inode gets cancelled
637            by ldlm_cancel_lru */
638         if (!S_ISREG(inode->i_mode))
639                 GOTO(out_och_free, rc);
640
641         ll_capa_open(inode);
642
643         if (!lli->lli_has_smd) {
644                 if (file->f_flags & O_LOV_DELAY_CREATE ||
645                     !(file->f_mode & FMODE_WRITE)) {
646                         CDEBUG(D_INODE, "object creation was delayed\n");
647                         GOTO(out_och_free, rc);
648                 }
649         }
650         file->f_flags &= ~O_LOV_DELAY_CREATE;
651         GOTO(out_och_free, rc);
652
653 out_och_free:
654         if (rc) {
655                 if (och_p && *och_p) {
656                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
657                         *och_p = NULL; /* OBD_FREE writes some magic there */
658                         (*och_usecount)--;
659                 }
660                 mutex_unlock(&lli->lli_och_mutex);
661
662 out_openerr:
663                 if (opendir_set != 0)
664                         ll_stop_statahead(inode, lli->lli_opendir_key);
665                 if (fd != NULL)
666                         ll_file_data_put(fd);
667         } else {
668                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
669         }
670
671         if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
672                 ptlrpc_req_finished(it->d.lustre.it_data);
673                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
674         }
675
676         return rc;
677 }
678
679 /* Fills the obdo with the attributes for the lsm */
680 static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
681                           struct obd_capa *capa, struct obdo *obdo,
682                           __u64 ioepoch, int sync)
683 {
684         struct ptlrpc_request_set *set;
685         struct obd_info            oinfo = { { { 0 } } };
686         int                        rc;
687
688         ENTRY;
689
690         LASSERT(lsm != NULL);
691
692         oinfo.oi_md = lsm;
693         oinfo.oi_oa = obdo;
694         oinfo.oi_oa->o_id = lsm->lsm_object_id;
695         oinfo.oi_oa->o_seq = lsm->lsm_object_seq;
696         oinfo.oi_oa->o_mode = S_IFREG;
697         oinfo.oi_oa->o_ioepoch = ioepoch;
698         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
699                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
700                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
701                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
702                                OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
703                                OBD_MD_FLDATAVERSION;
704         oinfo.oi_capa = capa;
705         if (sync) {
706                 oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
707                 oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
708         }
709
710         set = ptlrpc_prep_set();
711         if (set == NULL) {
712                 CERROR("can't allocate ptlrpc set\n");
713                 rc = -ENOMEM;
714         } else {
715                 rc = obd_getattr_async(exp, &oinfo, set);
716                 if (rc == 0)
717                         rc = ptlrpc_set_wait(set);
718                 ptlrpc_set_destroy(set);
719         }
720         if (rc == 0)
721                 oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
722                                          OBD_MD_FLATIME | OBD_MD_FLMTIME |
723                                          OBD_MD_FLCTIME | OBD_MD_FLSIZE |
724                                          OBD_MD_FLDATAVERSION);
725         RETURN(rc);
726 }
727
728 /**
729   * Performs the getattr on the inode and updates its fields.
730   * If @sync != 0, perform the getattr under the server-side lock.
731   */
732 int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
733                      __u64 ioepoch, int sync)
734 {
735         struct obd_capa      *capa = ll_mdscapa_get(inode);
736         struct lov_stripe_md *lsm;
737         int rc;
738         ENTRY;
739
740         lsm = ccc_inode_lsm_get(inode);
741         rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
742                             capa, obdo, ioepoch, sync);
743         capa_put(capa);
744         if (rc == 0) {
745                 obdo_refresh_inode(inode, obdo, obdo->o_valid);
746                 CDEBUG(D_INODE,
747                        "objid "LPX64" size %llu, blocks %llu, blksize %lu\n",
748                        lsm ? lsm->lsm_object_id : 0, i_size_read(inode),
749                        (unsigned long long)inode->i_blocks,
750                        (unsigned long)ll_inode_blksize(inode));
751         }
752         ccc_inode_lsm_put(inode, lsm);
753         RETURN(rc);
754 }
755
756 int ll_merge_lvb(struct inode *inode)
757 {
758         struct ll_inode_info *lli = ll_i2info(inode);
759         struct ll_sb_info *sbi = ll_i2sbi(inode);
760         struct lov_stripe_md *lsm;
761         struct ost_lvb lvb;
762         int rc = 0;
763
764         ENTRY;
765
766         lsm = ccc_inode_lsm_get(inode);
767         ll_inode_size_lock(inode);
768         inode_init_lvb(inode, &lvb);
769
770         /* merge timestamps the most resently obtained from mds with
771            timestamps obtained from osts */
772         lvb.lvb_atime = lli->lli_lvb.lvb_atime;
773         lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
774         lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
775         if (lsm != NULL) {
776                 rc = obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
777                 cl_isize_write_nolock(inode, lvb.lvb_size);
778
779                 CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
780                                 PFID(&lli->lli_fid), lvb.lvb_size);
781                 inode->i_blocks = lvb.lvb_blocks;
782         }
783         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
784         LTIME_S(inode->i_atime) = lvb.lvb_atime;
785         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
786         ll_inode_size_unlock(inode);
787         ccc_inode_lsm_put(inode, lsm);
788
789         RETURN(rc);
790 }
791
792 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
793                      lstat_t *st)
794 {
795         struct obdo obdo = { 0 };
796         int rc;
797
798         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
799         if (rc == 0) {
800                 st->st_size   = obdo.o_size;
801                 st->st_blocks = obdo.o_blocks;
802                 st->st_mtime  = obdo.o_mtime;
803                 st->st_atime  = obdo.o_atime;
804                 st->st_ctime  = obdo.o_ctime;
805         }
806         return rc;
807 }
808
809 void ll_io_init(struct cl_io *io, const struct file *file, int write)
810 {
811         struct inode *inode = file->f_dentry->d_inode;
812
813         io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
814         if (write) {
815                 io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
816                 io->u.ci_wr.wr_sync = file->f_flags & O_SYNC || IS_SYNC(inode);
817         }
818         io->ci_obj     = ll_i2info(inode)->lli_clob;
819         io->ci_lockreq = CILR_MAYBE;
820         if (ll_file_nolock(file)) {
821                 io->ci_lockreq = CILR_NEVER;
822                 io->ci_no_srvlock = 1;
823         } else if (file->f_flags & O_APPEND) {
824                 io->ci_lockreq = CILR_MANDATORY;
825         }
826 }
827
828 static ssize_t
829 ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
830                    struct file *file, enum cl_io_type iot,
831                    loff_t *ppos, size_t count)
832 {
833         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
834         struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
835         struct cl_io         *io;
836         ssize_t               result;
837         ENTRY;
838
839         io = ccc_env_thread_io(env);
840         ll_io_init(io, file, iot == CIT_WRITE);
841
842         if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
843                 struct vvp_io *vio = vvp_env_io(env);
844                 struct ccc_io *cio = ccc_env_io(env);
845                 int write_mutex_locked = 0;
846
847                 cio->cui_fd  = LUSTRE_FPRIVATE(file);
848                 vio->cui_io_subtype = args->via_io_subtype;
849
850                 switch (vio->cui_io_subtype) {
851                 case IO_NORMAL:
852                         cio->cui_iov = args->u.normal.via_iov;
853                         cio->cui_nrsegs = args->u.normal.via_nrsegs;
854                         cio->cui_tot_nrsegs = cio->cui_nrsegs;
855 #ifndef HAVE_FILE_WRITEV
856                         cio->cui_iocb = args->u.normal.via_iocb;
857 #endif
858                         if ((iot == CIT_WRITE) &&
859                             !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
860                                 if (mutex_lock_interruptible(&lli->
861                                                                lli_write_mutex))
862                                         GOTO(out, result = -ERESTARTSYS);
863                                 write_mutex_locked = 1;
864                         } else if (iot == CIT_READ) {
865                                 down_read(&lli->lli_trunc_sem);
866                         }
867                         break;
868                 case IO_SENDFILE:
869                         vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
870                         vio->u.sendfile.cui_target = args->u.sendfile.via_target;
871                         break;
872                 case IO_SPLICE:
873                         vio->u.splice.cui_pipe = args->u.splice.via_pipe;
874                         vio->u.splice.cui_flags = args->u.splice.via_flags;
875                         break;
876                 default:
877                         CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
878                         LBUG();
879                 }
880                 result = cl_io_loop(env, io);
881                 if (write_mutex_locked)
882                         mutex_unlock(&lli->lli_write_mutex);
883                 else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
884                         up_read(&lli->lli_trunc_sem);
885         } else {
886                 /* cl_io_rw_init() handled IO */
887                 result = io->ci_result;
888         }
889
890         if (io->ci_nob > 0) {
891                 result = io->ci_nob;
892                 *ppos = io->u.ci_wr.wr.crw_pos;
893         }
894         GOTO(out, result);
895 out:
896         cl_io_fini(env, io);
897
898         if (iot == CIT_READ) {
899                 if (result >= 0)
900                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
901                                            LPROC_LL_READ_BYTES, result);
902         } else if (iot == CIT_WRITE) {
903                 if (result >= 0) {
904                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
905                                            LPROC_LL_WRITE_BYTES, result);
906                         fd->fd_write_failed = false;
907                 } else {
908                         fd->fd_write_failed = true;
909                 }
910         }
911
912         return result;
913 }
914
915
916 /*
917  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
918  */
919 static int ll_file_get_iov_count(const struct iovec *iov,
920                                  unsigned long *nr_segs, size_t *count)
921 {
922         size_t cnt = 0;
923         unsigned long seg;
924
925         for (seg = 0; seg < *nr_segs; seg++) {
926                 const struct iovec *iv = &iov[seg];
927
928                 /*
929                  * If any segment has a negative length, or the cumulative
930                  * length ever wraps negative then return -EINVAL.
931                  */
932                 cnt += iv->iov_len;
933                 if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
934                         return -EINVAL;
935                 if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
936                         continue;
937                 if (seg == 0)
938                         return -EFAULT;
939                 *nr_segs = seg;
940                 cnt -= iv->iov_len;   /* This segment is no good */
941                 break;
942         }
943         *count = cnt;
944         return 0;
945 }
946
947 #ifdef HAVE_FILE_READV
948 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
949                               unsigned long nr_segs, loff_t *ppos)
950 {
951         struct lu_env      *env;
952         struct vvp_io_args *args;
953         size_t              count;
954         ssize_t             result;
955         int                 refcheck;
956         ENTRY;
957
958         result = ll_file_get_iov_count(iov, &nr_segs, &count);
959         if (result)
960                 RETURN(result);
961
962         env = cl_env_get(&refcheck);
963         if (IS_ERR(env))
964                 RETURN(PTR_ERR(env));
965
966         args = vvp_env_args(env, IO_NORMAL);
967         args->u.normal.via_iov = (struct iovec *)iov;
968         args->u.normal.via_nrsegs = nr_segs;
969
970         result = ll_file_io_generic(env, args, file, CIT_READ, ppos, count);
971         cl_env_put(env, &refcheck);
972         RETURN(result);
973 }
974
975 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
976                             loff_t *ppos)
977 {
978         struct lu_env *env;
979         struct iovec  *local_iov;
980         ssize_t        result;
981         int            refcheck;
982         ENTRY;
983
984         env = cl_env_get(&refcheck);
985         if (IS_ERR(env))
986                 RETURN(PTR_ERR(env));
987
988         local_iov = &vvp_env_info(env)->vti_local_iov;
989         local_iov->iov_base = (void __user *)buf;
990         local_iov->iov_len = count;
991         result = ll_file_readv(file, local_iov, 1, ppos);
992         cl_env_put(env, &refcheck);
993         RETURN(result);
994 }
995
996 #else
997 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
998                                 unsigned long nr_segs, loff_t pos)
999 {
1000         struct lu_env      *env;
1001         struct vvp_io_args *args;
1002         size_t              count;
1003         ssize_t             result;
1004         int                 refcheck;
1005         ENTRY;
1006
1007         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1008         if (result)
1009                 RETURN(result);
1010
1011         env = cl_env_get(&refcheck);
1012         if (IS_ERR(env))
1013                 RETURN(PTR_ERR(env));
1014
1015         args = vvp_env_args(env, IO_NORMAL);
1016         args->u.normal.via_iov = (struct iovec *)iov;
1017         args->u.normal.via_nrsegs = nr_segs;
1018         args->u.normal.via_iocb = iocb;
1019
1020         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1021                                     &iocb->ki_pos, count);
1022         cl_env_put(env, &refcheck);
1023         RETURN(result);
1024 }
1025
1026 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1027                             loff_t *ppos)
1028 {
1029         struct lu_env *env;
1030         struct iovec  *local_iov;
1031         struct kiocb  *kiocb;
1032         ssize_t        result;
1033         int            refcheck;
1034         ENTRY;
1035
1036         env = cl_env_get(&refcheck);
1037         if (IS_ERR(env))
1038                 RETURN(PTR_ERR(env));
1039
1040         local_iov = &vvp_env_info(env)->vti_local_iov;
1041         kiocb = &vvp_env_info(env)->vti_kiocb;
1042         local_iov->iov_base = (void __user *)buf;
1043         local_iov->iov_len = count;
1044         init_sync_kiocb(kiocb, file);
1045         kiocb->ki_pos = *ppos;
1046         kiocb->ki_left = count;
1047
1048         result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1049         *ppos = kiocb->ki_pos;
1050
1051         cl_env_put(env, &refcheck);
1052         RETURN(result);
1053 }
1054 #endif
1055
1056 /*
1057  * Write to a file (through the page cache).
1058  */
1059 #ifdef HAVE_FILE_WRITEV
1060 static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
1061                               unsigned long nr_segs, loff_t *ppos)
1062 {
1063         struct lu_env      *env;
1064         struct vvp_io_args *args;
1065         size_t              count;
1066         ssize_t             result;
1067         int                 refcheck;
1068         ENTRY;
1069
1070         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1071         if (result)
1072                 RETURN(result);
1073
1074         env = cl_env_get(&refcheck);
1075         if (IS_ERR(env))
1076                 RETURN(PTR_ERR(env));
1077
1078         args = vvp_env_args(env, IO_NORMAL);
1079         args->u.normal.via_iov = (struct iovec *)iov;
1080         args->u.normal.via_nrsegs = nr_segs;
1081
1082         result = ll_file_io_generic(env, args, file, CIT_WRITE, ppos, count);
1083         cl_env_put(env, &refcheck);
1084         RETURN(result);
1085 }
1086
1087 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1088                              loff_t *ppos)
1089 {
1090         struct lu_env    *env;
1091         struct iovec     *local_iov;
1092         ssize_t           result;
1093         int               refcheck;
1094         ENTRY;
1095
1096         env = cl_env_get(&refcheck);
1097         if (IS_ERR(env))
1098                 RETURN(PTR_ERR(env));
1099
1100         local_iov = &vvp_env_info(env)->vti_local_iov;
1101         local_iov->iov_base = (void __user *)buf;
1102         local_iov->iov_len = count;
1103
1104         result = ll_file_writev(file, local_iov, 1, ppos);
1105         cl_env_put(env, &refcheck);
1106         RETURN(result);
1107 }
1108
1109 #else /* AIO stuff */
1110 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1111                                  unsigned long nr_segs, loff_t pos)
1112 {
1113         struct lu_env      *env;
1114         struct vvp_io_args *args;
1115         size_t              count;
1116         ssize_t             result;
1117         int                 refcheck;
1118         ENTRY;
1119
1120         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1121         if (result)
1122                 RETURN(result);
1123
1124         env = cl_env_get(&refcheck);
1125         if (IS_ERR(env))
1126                 RETURN(PTR_ERR(env));
1127
1128         args = vvp_env_args(env, IO_NORMAL);
1129         args->u.normal.via_iov = (struct iovec *)iov;
1130         args->u.normal.via_nrsegs = nr_segs;
1131         args->u.normal.via_iocb = iocb;
1132
1133         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1134                                   &iocb->ki_pos, count);
1135         cl_env_put(env, &refcheck);
1136         RETURN(result);
1137 }
1138
1139 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1140                              loff_t *ppos)
1141 {
1142         struct lu_env *env;
1143         struct iovec  *local_iov;
1144         struct kiocb  *kiocb;
1145         ssize_t        result;
1146         int            refcheck;
1147         ENTRY;
1148
1149         env = cl_env_get(&refcheck);
1150         if (IS_ERR(env))
1151                 RETURN(PTR_ERR(env));
1152
1153         local_iov = &vvp_env_info(env)->vti_local_iov;
1154         kiocb = &vvp_env_info(env)->vti_kiocb;
1155         local_iov->iov_base = (void __user *)buf;
1156         local_iov->iov_len = count;
1157         init_sync_kiocb(kiocb, file);
1158         kiocb->ki_pos = *ppos;
1159         kiocb->ki_left = count;
1160
1161         result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1162         *ppos = kiocb->ki_pos;
1163
1164         cl_env_put(env, &refcheck);
1165         RETURN(result);
1166 }
1167 #endif
1168
1169
1170 #ifdef HAVE_KERNEL_SENDFILE
1171 /*
1172  * Send file content (through pagecache) somewhere with helper
1173  */
1174 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1175                                 read_actor_t actor, void *target)
1176 {
1177         struct lu_env      *env;
1178         struct vvp_io_args *args;
1179         ssize_t             result;
1180         int                 refcheck;
1181         ENTRY;
1182
1183         env = cl_env_get(&refcheck);
1184         if (IS_ERR(env))
1185                 RETURN(PTR_ERR(env));
1186
1187         args = vvp_env_args(env, IO_SENDFILE);
1188         args->u.sendfile.via_target = target;
1189         args->u.sendfile.via_actor = actor;
1190
1191         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1192         cl_env_put(env, &refcheck);
1193         RETURN(result);
1194 }
1195 #endif
1196
1197 #ifdef HAVE_KERNEL_SPLICE_READ
1198 /*
1199  * Send file content (through pagecache) somewhere with helper
1200  */
1201 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1202                                    struct pipe_inode_info *pipe, size_t count,
1203                                    unsigned int flags)
1204 {
1205         struct lu_env      *env;
1206         struct vvp_io_args *args;
1207         ssize_t             result;
1208         int                 refcheck;
1209         ENTRY;
1210
1211         env = cl_env_get(&refcheck);
1212         if (IS_ERR(env))
1213                 RETURN(PTR_ERR(env));
1214
1215         args = vvp_env_args(env, IO_SPLICE);
1216         args->u.splice.via_pipe = pipe;
1217         args->u.splice.via_flags = flags;
1218
1219         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1220         cl_env_put(env, &refcheck);
1221         RETURN(result);
1222 }
1223 #endif
1224
1225 static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq,
1226                            obd_count ost_idx)
1227 {
1228         struct obd_export *exp = ll_i2dtexp(inode);
1229         struct obd_trans_info oti = { 0 };
1230         struct obdo *oa = NULL;
1231         int lsm_size;
1232         int rc = 0;
1233         struct lov_stripe_md *lsm = NULL, *lsm2;
1234         ENTRY;
1235
1236         OBDO_ALLOC(oa);
1237         if (oa == NULL)
1238                 RETURN(-ENOMEM);
1239
1240         lsm = ccc_inode_lsm_get(inode);
1241         if (lsm == NULL)
1242                 GOTO(out, rc = -ENOENT);
1243
1244         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1245                    (lsm->lsm_stripe_count));
1246
1247         OBD_ALLOC_LARGE(lsm2, lsm_size);
1248         if (lsm2 == NULL)
1249                 GOTO(out, rc = -ENOMEM);
1250
1251         oa->o_id = id;
1252         oa->o_seq = seq;
1253         oa->o_nlink = ost_idx;
1254         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1255         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1256         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1257                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1258         obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1259         memcpy(lsm2, lsm, lsm_size);
1260         ll_inode_size_lock(inode);
1261         rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1262         ll_inode_size_unlock(inode);
1263
1264         OBD_FREE_LARGE(lsm2, lsm_size);
1265         GOTO(out, rc);
1266 out:
1267         ccc_inode_lsm_put(inode, lsm);
1268         OBDO_FREE(oa);
1269         return rc;
1270 }
1271
1272 static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1273 {
1274         struct ll_recreate_obj ucreat;
1275         ENTRY;
1276
1277         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1278                 RETURN(-EPERM);
1279
1280         if (cfs_copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1281                                sizeof(struct ll_recreate_obj)))
1282                 RETURN(-EFAULT);
1283
1284         RETURN(ll_lov_recreate(inode, ucreat.lrc_id, 0,
1285                                ucreat.lrc_ost_idx));
1286 }
1287
1288 static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1289 {
1290         struct lu_fid fid;
1291         obd_id id;
1292         obd_count ost_idx;
1293         ENTRY;
1294
1295         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1296                 RETURN(-EPERM);
1297
1298         if (cfs_copy_from_user(&fid, (struct lu_fid *)arg,
1299                                sizeof(struct lu_fid)))
1300                 RETURN(-EFAULT);
1301
1302         id = fid_oid(&fid) | ((fid_seq(&fid) & 0xffff) << 32);
1303         ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1304         RETURN(ll_lov_recreate(inode, id, 0, ost_idx));
1305 }
1306
1307 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1308                              int flags, struct lov_user_md *lum, int lum_size)
1309 {
1310         struct lov_stripe_md *lsm = NULL;
1311         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1312         int rc = 0;
1313         ENTRY;
1314
1315         lsm = ccc_inode_lsm_get(inode);
1316         if (lsm != NULL) {
1317                 ccc_inode_lsm_put(inode, lsm);
1318                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1319                        inode->i_ino);
1320                 RETURN(-EEXIST);
1321         }
1322
1323         ll_inode_size_lock(inode);
1324         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1325         if (rc)
1326                 GOTO(out, rc);
1327         rc = oit.d.lustre.it_status;
1328         if (rc < 0)
1329                 GOTO(out_req_free, rc);
1330
1331         ll_release_openhandle(file->f_dentry, &oit);
1332
1333  out:
1334         ll_inode_size_unlock(inode);
1335         ll_intent_release(&oit);
1336         ccc_inode_lsm_put(inode, lsm);
1337         RETURN(rc);
1338 out_req_free:
1339         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1340         goto out;
1341 }
1342
1343 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1344                              struct lov_mds_md **lmmp, int *lmm_size,
1345                              struct ptlrpc_request **request)
1346 {
1347         struct ll_sb_info *sbi = ll_i2sbi(inode);
1348         struct mdt_body  *body;
1349         struct lov_mds_md *lmm = NULL;
1350         struct ptlrpc_request *req = NULL;
1351         struct md_op_data *op_data;
1352         int rc, lmmsize;
1353
1354         rc = ll_get_max_mdsize(sbi, &lmmsize);
1355         if (rc)
1356                 RETURN(rc);
1357
1358         op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1359                                      strlen(filename), lmmsize,
1360                                      LUSTRE_OPC_ANY, NULL);
1361         if (IS_ERR(op_data))
1362                 RETURN(PTR_ERR(op_data));
1363
1364         op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1365         rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1366         ll_finish_md_op_data(op_data);
1367         if (rc < 0) {
1368                 CDEBUG(D_INFO, "md_getattr_name failed "
1369                        "on %s: rc %d\n", filename, rc);
1370                 GOTO(out, rc);
1371         }
1372
1373         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1374         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1375
1376         lmmsize = body->eadatasize;
1377
1378         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1379                         lmmsize == 0) {
1380                 GOTO(out, rc = -ENODATA);
1381         }
1382
1383         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1384         LASSERT(lmm != NULL);
1385
1386         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1387             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1388                 GOTO(out, rc = -EPROTO);
1389         }
1390
1391         /*
1392          * This is coming from the MDS, so is probably in
1393          * little endian.  We convert it to host endian before
1394          * passing it to userspace.
1395          */
1396         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1397                 /* if function called for directory - we should
1398                  * avoid swab not existent lsm objects */
1399                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1400                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1401                         if (S_ISREG(body->mode))
1402                                 lustre_swab_lov_user_md_objects(
1403                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1404                                  ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
1405                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1406                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1407                         if (S_ISREG(body->mode))
1408                                 lustre_swab_lov_user_md_objects(
1409                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1410                                  ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
1411                 }
1412         }
1413
1414 out:
1415         *lmmp = lmm;
1416         *lmm_size = lmmsize;
1417         *request = req;
1418         return rc;
1419 }
1420
1421 static int ll_lov_setea(struct inode *inode, struct file *file,
1422                             unsigned long arg)
1423 {
1424         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1425         struct lov_user_md  *lump;
1426         int lum_size = sizeof(struct lov_user_md) +
1427                        sizeof(struct lov_user_ost_data);
1428         int rc;
1429         ENTRY;
1430
1431         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1432                 RETURN(-EPERM);
1433
1434         OBD_ALLOC_LARGE(lump, lum_size);
1435         if (lump == NULL) {
1436                 RETURN(-ENOMEM);
1437         }
1438         if (cfs_copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1439                 OBD_FREE_LARGE(lump, lum_size);
1440                 RETURN(-EFAULT);
1441         }
1442
1443         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1444
1445         OBD_FREE_LARGE(lump, lum_size);
1446         RETURN(rc);
1447 }
1448
1449 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1450                             unsigned long arg)
1451 {
1452         struct lov_user_md_v3 lumv3;
1453         struct lov_user_md_v1 *lumv1 = (struct lov_user_md_v1 *)&lumv3;
1454         struct lov_user_md_v1 *lumv1p = (struct lov_user_md_v1 *)arg;
1455         struct lov_user_md_v3 *lumv3p = (struct lov_user_md_v3 *)arg;
1456         int lum_size;
1457         int rc;
1458         int flags = FMODE_WRITE;
1459         ENTRY;
1460
1461         /* first try with v1 which is smaller than v3 */
1462         lum_size = sizeof(struct lov_user_md_v1);
1463         if (cfs_copy_from_user(lumv1, lumv1p, lum_size))
1464                 RETURN(-EFAULT);
1465
1466         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1467                 lum_size = sizeof(struct lov_user_md_v3);
1468                 if (cfs_copy_from_user(&lumv3, lumv3p, lum_size))
1469                         RETURN(-EFAULT);
1470         }
1471
1472         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1473         if (rc == 0) {
1474                 struct lov_stripe_md *lsm;
1475                 put_user(0, &lumv1p->lmm_stripe_count);
1476                 lsm = ccc_inode_lsm_get(inode);
1477                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1478                                    0, lsm, (void *)arg);
1479                 ccc_inode_lsm_put(inode, lsm);
1480         }
1481         RETURN(rc);
1482 }
1483
1484 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1485 {
1486         struct lov_stripe_md *lsm;
1487         int rc = -ENODATA;
1488         ENTRY;
1489
1490         lsm = ccc_inode_lsm_get(inode);
1491         if (lsm != NULL)
1492                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1493                                    lsm, (void *)arg);
1494         ccc_inode_lsm_put(inode, lsm);
1495         RETURN(rc);
1496 }
1497
1498 int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1499 {
1500         struct ll_inode_info   *lli = ll_i2info(inode);
1501         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1502         struct ccc_grouplock    grouplock;
1503         int                     rc;
1504         ENTRY;
1505
1506         if (ll_file_nolock(file))
1507                 RETURN(-EOPNOTSUPP);
1508
1509         spin_lock(&lli->lli_lock);
1510         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1511                 CWARN("group lock already existed with gid %lu\n",
1512                       fd->fd_grouplock.cg_gid);
1513                 spin_unlock(&lli->lli_lock);
1514                 RETURN(-EINVAL);
1515         }
1516         LASSERT(fd->fd_grouplock.cg_lock == NULL);
1517         spin_unlock(&lli->lli_lock);
1518
1519         rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1520                               arg, (file->f_flags & O_NONBLOCK), &grouplock);
1521         if (rc)
1522                 RETURN(rc);
1523
1524         spin_lock(&lli->lli_lock);
1525         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1526                 spin_unlock(&lli->lli_lock);
1527                 CERROR("another thread just won the race\n");
1528                 cl_put_grouplock(&grouplock);
1529                 RETURN(-EINVAL);
1530         }
1531
1532         fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1533         fd->fd_grouplock = grouplock;
1534         spin_unlock(&lli->lli_lock);
1535
1536         CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1537         RETURN(0);
1538 }
1539
1540 int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1541 {
1542         struct ll_inode_info   *lli = ll_i2info(inode);
1543         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1544         struct ccc_grouplock    grouplock;
1545         ENTRY;
1546
1547         spin_lock(&lli->lli_lock);
1548         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1549                 spin_unlock(&lli->lli_lock);
1550                 CWARN("no group lock held\n");
1551                 RETURN(-EINVAL);
1552         }
1553         LASSERT(fd->fd_grouplock.cg_lock != NULL);
1554
1555         if (fd->fd_grouplock.cg_gid != arg) {
1556                 CWARN("group lock %lu doesn't match current id %lu\n",
1557                        arg, fd->fd_grouplock.cg_gid);
1558                 spin_unlock(&lli->lli_lock);
1559                 RETURN(-EINVAL);
1560         }
1561
1562         grouplock = fd->fd_grouplock;
1563         memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1564         fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1565         spin_unlock(&lli->lli_lock);
1566
1567         cl_put_grouplock(&grouplock);
1568         CDEBUG(D_INFO, "group lock %lu released\n", arg);
1569         RETURN(0);
1570 }
1571
1572 /**
1573  * Close inode open handle
1574  *
1575  * \param dentry [in]     dentry which contains the inode
1576  * \param it     [in,out] intent which contains open info and result
1577  *
1578  * \retval 0     success
1579  * \retval <0    failure
1580  */
1581 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1582 {
1583         struct inode *inode = dentry->d_inode;
1584         struct obd_client_handle *och;
1585         int rc;
1586         ENTRY;
1587
1588         LASSERT(inode);
1589
1590         /* Root ? Do nothing. */
1591         if (dentry->d_inode->i_sb->s_root == dentry)
1592                 RETURN(0);
1593
1594         /* No open handle to close? Move away */
1595         if (!it_disposition(it, DISP_OPEN_OPEN))
1596                 RETURN(0);
1597
1598         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1599
1600         OBD_ALLOC(och, sizeof(*och));
1601         if (!och)
1602                 GOTO(out, rc = -ENOMEM);
1603
1604         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
1605                     ll_i2info(inode), it, och);
1606
1607         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1608                                        inode, och);
1609  out:
1610         /* this one is in place of ll_file_open */
1611         if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1612                 ptlrpc_req_finished(it->d.lustre.it_data);
1613                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1614         }
1615         RETURN(rc);
1616 }
1617
1618 /**
1619  * Get size for inode for which FIEMAP mapping is requested.
1620  * Make the FIEMAP get_info call and returns the result.
1621  */
1622 int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1623               int num_bytes)
1624 {
1625         struct obd_export *exp = ll_i2dtexp(inode);
1626         struct lov_stripe_md *lsm = NULL;
1627         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1628         int vallen = num_bytes;
1629         int rc;
1630         ENTRY;
1631
1632         /* Checks for fiemap flags */
1633         if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1634                 fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1635                 return -EBADR;
1636         }
1637
1638         /* Check for FIEMAP_FLAG_SYNC */
1639         if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1640                 rc = filemap_fdatawrite(inode->i_mapping);
1641                 if (rc)
1642                         return rc;
1643         }
1644
1645         lsm = ccc_inode_lsm_get(inode);
1646         if (lsm == NULL)
1647                 return -ENOENT;
1648
1649         /* If the stripe_count > 1 and the application does not understand
1650          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1651          */
1652         if (lsm->lsm_stripe_count > 1 &&
1653             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1654                 GOTO(out, rc = -EOPNOTSUPP);
1655
1656         fm_key.oa.o_id = lsm->lsm_object_id;
1657         fm_key.oa.o_seq = lsm->lsm_object_seq;
1658         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1659
1660         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1661         obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1662         /* If filesize is 0, then there would be no objects for mapping */
1663         if (fm_key.oa.o_size == 0) {
1664                 fiemap->fm_mapped_extents = 0;
1665                 GOTO(out, rc = 0);
1666         }
1667
1668         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1669
1670         rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1671                           fiemap, lsm);
1672         if (rc)
1673                 CERROR("obd_get_info failed: rc = %d\n", rc);
1674
1675 out:
1676         ccc_inode_lsm_put(inode, lsm);
1677         RETURN(rc);
1678 }
1679
1680 int ll_fid2path(struct inode *inode, void *arg)
1681 {
1682         struct obd_export *exp = ll_i2mdexp(inode);
1683         struct getinfo_fid2path *gfout, *gfin;
1684         int outsize, rc;
1685         ENTRY;
1686
1687         if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
1688             !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
1689                 RETURN(-EPERM);
1690
1691         /* Need to get the buflen */
1692         OBD_ALLOC_PTR(gfin);
1693         if (gfin == NULL)
1694                 RETURN(-ENOMEM);
1695         if (cfs_copy_from_user(gfin, arg, sizeof(*gfin))) {
1696                 OBD_FREE_PTR(gfin);
1697                 RETURN(-EFAULT);
1698         }
1699
1700         outsize = sizeof(*gfout) + gfin->gf_pathlen;
1701         OBD_ALLOC(gfout, outsize);
1702         if (gfout == NULL) {
1703                 OBD_FREE_PTR(gfin);
1704                 RETURN(-ENOMEM);
1705         }
1706         memcpy(gfout, gfin, sizeof(*gfout));
1707         OBD_FREE_PTR(gfin);
1708
1709         /* Call mdc_iocontrol */
1710         rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1711         if (rc)
1712                 GOTO(gf_free, rc);
1713         if (cfs_copy_to_user(arg, gfout, outsize))
1714                 rc = -EFAULT;
1715
1716 gf_free:
1717         OBD_FREE(gfout, outsize);
1718         RETURN(rc);
1719 }
1720
1721 static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1722 {
1723         struct ll_user_fiemap *fiemap_s;
1724         size_t num_bytes, ret_bytes;
1725         unsigned int extent_count;
1726         int rc = 0;
1727
1728         /* Get the extent count so we can calculate the size of
1729          * required fiemap buffer */
1730         if (get_user(extent_count,
1731             &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1732                 RETURN(-EFAULT);
1733         num_bytes = sizeof(*fiemap_s) + (extent_count *
1734                                          sizeof(struct ll_fiemap_extent));
1735
1736         OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1737         if (fiemap_s == NULL)
1738                 RETURN(-ENOMEM);
1739
1740         /* get the fiemap value */
1741         if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
1742                            sizeof(*fiemap_s)))
1743                 GOTO(error, rc = -EFAULT);
1744
1745         /* If fm_extent_count is non-zero, read the first extent since
1746          * it is used to calculate end_offset and device from previous
1747          * fiemap call. */
1748         if (extent_count) {
1749                 if (copy_from_user(&fiemap_s->fm_extents[0],
1750                     (char __user *)arg + sizeof(*fiemap_s),
1751                     sizeof(struct ll_fiemap_extent)))
1752                         GOTO(error, rc = -EFAULT);
1753         }
1754
1755         rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1756         if (rc)
1757                 GOTO(error, rc);
1758
1759         ret_bytes = sizeof(struct ll_user_fiemap);
1760
1761         if (extent_count != 0)
1762                 ret_bytes += (fiemap_s->fm_mapped_extents *
1763                                  sizeof(struct ll_fiemap_extent));
1764
1765         if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1766                 rc = -EFAULT;
1767
1768 error:
1769         OBD_FREE_LARGE(fiemap_s, num_bytes);
1770         RETURN(rc);
1771 }
1772
1773 /*
1774  * Read the data_version for inode.
1775  *
1776  * This value is computed using stripe object version on OST.
1777  * Version is computed using server side locking.
1778  *
1779  * @param extent_lock  Take extent lock. Not needed if a process is already
1780  *                     holding the OST object group locks.
1781  */
1782 static int ll_data_version(struct inode *inode, __u64 *data_version,
1783                            int extent_lock)
1784 {
1785         struct lov_stripe_md *lsm = NULL;
1786         struct ll_sb_info    *sbi = ll_i2sbi(inode);
1787         struct obdo          *obdo = NULL;
1788         int                   rc;
1789         ENTRY;
1790
1791         /* If no stripe, we consider version is 0. */
1792         lsm = ccc_inode_lsm_get(inode);
1793         if (lsm == NULL) {
1794                 *data_version = 0;
1795                 CDEBUG(D_INODE, "No object for inode\n");
1796                 RETURN(0);
1797         }
1798
1799         OBD_ALLOC_PTR(obdo);
1800         if (obdo == NULL) {
1801                 ccc_inode_lsm_put(inode, lsm);
1802                 RETURN(-ENOMEM);
1803         }
1804
1805         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
1806         if (!rc) {
1807                 if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1808                         rc = -EOPNOTSUPP;
1809                 else
1810                         *data_version = obdo->o_data_version;
1811         }
1812
1813         OBD_FREE_PTR(obdo);
1814         ccc_inode_lsm_put(inode, lsm);
1815
1816         RETURN(rc);
1817 }
1818
1819 long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
1820 {
1821         struct inode *inode = file->f_dentry->d_inode;
1822         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1823         int flags;
1824
1825         ENTRY;
1826
1827         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1828                inode->i_generation, inode, cmd);
1829         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
1830
1831         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
1832         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
1833                 RETURN(-ENOTTY);
1834
1835         switch(cmd) {
1836         case LL_IOC_GETFLAGS:
1837                 /* Get the current value of the file flags */
1838                 return put_user(fd->fd_flags, (int *)arg);
1839         case LL_IOC_SETFLAGS:
1840         case LL_IOC_CLRFLAGS:
1841                 /* Set or clear specific file flags */
1842                 /* XXX This probably needs checks to ensure the flags are
1843                  *     not abused, and to handle any flag side effects.
1844                  */
1845                 if (get_user(flags, (int *) arg))
1846                         RETURN(-EFAULT);
1847
1848                 if (cmd == LL_IOC_SETFLAGS) {
1849                         if ((flags & LL_FILE_IGNORE_LOCK) &&
1850                             !(file->f_flags & O_DIRECT)) {
1851                                 CERROR("%s: unable to disable locking on "
1852                                        "non-O_DIRECT file\n", current->comm);
1853                                 RETURN(-EINVAL);
1854                         }
1855
1856                         fd->fd_flags |= flags;
1857                 } else {
1858                         fd->fd_flags &= ~flags;
1859                 }
1860                 RETURN(0);
1861         case LL_IOC_LOV_SETSTRIPE:
1862                 RETURN(ll_lov_setstripe(inode, file, arg));
1863         case LL_IOC_LOV_SETEA:
1864                 RETURN(ll_lov_setea(inode, file, arg));
1865         case LL_IOC_LOV_GETSTRIPE:
1866                 RETURN(ll_lov_getstripe(inode, arg));
1867         case LL_IOC_RECREATE_OBJ:
1868                 RETURN(ll_lov_recreate_obj(inode, arg));
1869         case LL_IOC_RECREATE_FID:
1870                 RETURN(ll_lov_recreate_fid(inode, arg));
1871         case FSFILT_IOC_FIEMAP:
1872                 RETURN(ll_ioctl_fiemap(inode, arg));
1873         case FSFILT_IOC_GETFLAGS:
1874         case FSFILT_IOC_SETFLAGS:
1875                 RETURN(ll_iocontrol(inode, file, cmd, arg));
1876         case FSFILT_IOC_GETVERSION_OLD:
1877         case FSFILT_IOC_GETVERSION:
1878                 RETURN(put_user(inode->i_generation, (int *)arg));
1879         case LL_IOC_GROUP_LOCK:
1880                 RETURN(ll_get_grouplock(inode, file, arg));
1881         case LL_IOC_GROUP_UNLOCK:
1882                 RETURN(ll_put_grouplock(inode, file, arg));
1883         case IOC_OBD_STATFS:
1884                 RETURN(ll_obd_statfs(inode, (void *)arg));
1885
1886         /* We need to special case any other ioctls we want to handle,
1887          * to send them to the MDS/OST as appropriate and to properly
1888          * network encode the arg field.
1889         case FSFILT_IOC_SETVERSION_OLD:
1890         case FSFILT_IOC_SETVERSION:
1891         */
1892         case LL_IOC_FLUSHCTX:
1893                 RETURN(ll_flush_ctx(inode));
1894         case LL_IOC_PATH2FID: {
1895                 if (cfs_copy_to_user((void *)arg, ll_inode2fid(inode),
1896                                      sizeof(struct lu_fid)))
1897                         RETURN(-EFAULT);
1898
1899                 RETURN(0);
1900         }
1901         case OBD_IOC_FID2PATH:
1902                 RETURN(ll_fid2path(inode, (void *)arg));
1903         case LL_IOC_DATA_VERSION: {
1904                 struct ioc_data_version idv;
1905                 int rc;
1906
1907                 if (cfs_copy_from_user(&idv, (char *)arg, sizeof(idv)))
1908                         RETURN(-EFAULT);
1909
1910                 rc = ll_data_version(inode, &idv.idv_version,
1911                                      !(idv.idv_flags & LL_DV_NOFLUSH));
1912
1913                 if (rc == 0 &&
1914                     cfs_copy_to_user((char *) arg, &idv, sizeof(idv)))
1915                         RETURN(-EFAULT);
1916
1917                 RETURN(rc);
1918         }
1919
1920         case LL_IOC_GET_MDTIDX: {
1921                 int mdtidx;
1922
1923                 mdtidx = ll_get_mdt_idx(inode);
1924                 if (mdtidx < 0)
1925                         RETURN(mdtidx);
1926
1927                 if (put_user((int)mdtidx, (int*)arg))
1928                         RETURN(-EFAULT);
1929
1930                 RETURN(0);
1931         }
1932         case OBD_IOC_GETDTNAME:
1933         case OBD_IOC_GETMDNAME:
1934                 RETURN(ll_get_obd_name(inode, cmd, arg));
1935         default: {
1936                 int err;
1937
1938                 if (LLIOC_STOP ==
1939                     ll_iocontrol_call(inode, file, cmd, arg, &err))
1940                         RETURN(err);
1941
1942                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
1943                                      (void *)arg));
1944         }
1945         }
1946 }
1947
1948 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
1949 {
1950         struct inode *inode = file->f_dentry->d_inode;
1951         loff_t retval;
1952         ENTRY;
1953         retval = offset + ((origin == 2) ? i_size_read(inode) :
1954                            (origin == 1) ? file->f_pos : 0);
1955         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%s)\n",
1956                inode->i_ino, inode->i_generation, inode, retval, retval,
1957                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
1958         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
1959
1960         if (origin == 2) { /* SEEK_END */
1961                 int rc;
1962
1963                 rc = ll_glimpse_size(inode);
1964                 if (rc != 0)
1965                         RETURN(rc);
1966
1967                 offset += i_size_read(inode);
1968         } else if (origin == 1) { /* SEEK_CUR */
1969                 offset += file->f_pos;
1970         }
1971
1972         retval = -EINVAL;
1973         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
1974                 if (offset != file->f_pos) {
1975                         file->f_pos = offset;
1976                 }
1977                 retval = offset;
1978         }
1979
1980         RETURN(retval);
1981 }
1982
1983 int ll_flush(struct file *file, fl_owner_t id)
1984 {
1985         struct inode *inode = file->f_dentry->d_inode;
1986         struct ll_inode_info *lli = ll_i2info(inode);
1987         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1988         int rc, err;
1989
1990         LASSERT(!S_ISDIR(inode->i_mode));
1991
1992         /* catch async errors that were recorded back when async writeback
1993          * failed for pages in this mapping. */
1994         rc = lli->lli_async_rc;
1995         lli->lli_async_rc = 0;
1996         err = lov_read_and_clear_async_rc(lli->lli_clob);
1997         if (rc == 0)
1998                 rc = err;
1999
2000         /* The application has been told write failure already.
2001          * Do not report failure again. */
2002         if (fd->fd_write_failed)
2003                 return 0;
2004         return rc ? -EIO : 0;
2005 }
2006
2007 /**
2008  * Called to make sure a portion of file has been written out.
2009  * if @local_only is not true, it will send OST_SYNC RPCs to ost.
2010  *
2011  * Return how many pages have been written.
2012  */
2013 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2014                        enum cl_fsync_mode mode)
2015 {
2016         struct cl_env_nest nest;
2017         struct lu_env *env;
2018         struct cl_io *io;
2019         struct obd_capa *capa = NULL;
2020         struct cl_fsync_io *fio;
2021         int result;
2022         ENTRY;
2023
2024         if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2025             mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2026                 RETURN(-EINVAL);
2027
2028         env = cl_env_nested_get(&nest);
2029         if (IS_ERR(env))
2030                 RETURN(PTR_ERR(env));
2031
2032         capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2033
2034         io = ccc_env_thread_io(env);
2035         io->ci_obj = cl_i2info(inode)->lli_clob;
2036         io->ci_ignore_layout = 1;
2037
2038         /* initialize parameters for sync */
2039         fio = &io->u.ci_fsync;
2040         fio->fi_capa = capa;
2041         fio->fi_start = start;
2042         fio->fi_end = end;
2043         fio->fi_fid = ll_inode2fid(inode);
2044         fio->fi_mode = mode;
2045         fio->fi_nr_written = 0;
2046
2047         if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2048                 result = cl_io_loop(env, io);
2049         else
2050                 result = io->ci_result;
2051         if (result == 0)
2052                 result = fio->fi_nr_written;
2053         cl_io_fini(env, io);
2054         cl_env_nested_put(&nest, env);
2055
2056         capa_put(capa);
2057
2058         RETURN(result);
2059 }
2060
2061 #ifdef HAVE_FILE_FSYNC_4ARGS
2062 int ll_fsync(struct file *file, loff_t start, loff_t end, int data)
2063 #elif defined(HAVE_FILE_FSYNC_2ARGS)
2064 int ll_fsync(struct file *file, int data)
2065 #else
2066 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2067 #endif
2068 {
2069         struct inode *inode = file->f_dentry->d_inode;
2070         struct ll_inode_info *lli = ll_i2info(inode);
2071         struct ptlrpc_request *req;
2072         struct obd_capa *oc;
2073         int rc, err;
2074         ENTRY;
2075
2076         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2077                inode->i_generation, inode);
2078         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2079
2080 #ifdef HAVE_FILE_FSYNC_4ARGS
2081         rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
2082         mutex_lock(&inode->i_mutex);
2083 #else
2084         /* fsync's caller has already called _fdata{sync,write}, we want
2085          * that IO to finish before calling the osc and mdc sync methods */
2086         rc = filemap_fdatawait(inode->i_mapping);
2087 #endif
2088
2089         /* catch async errors that were recorded back when async writeback
2090          * failed for pages in this mapping. */
2091         if (!S_ISDIR(inode->i_mode)) {
2092                 err = lli->lli_async_rc;
2093                 lli->lli_async_rc = 0;
2094                 if (rc == 0)
2095                         rc = err;
2096                 err = lov_read_and_clear_async_rc(lli->lli_clob);
2097                 if (rc == 0)
2098                         rc = err;
2099         }
2100
2101         oc = ll_mdscapa_get(inode);
2102         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2103                       &req);
2104         capa_put(oc);
2105         if (!rc)
2106                 rc = err;
2107         if (!err)
2108                 ptlrpc_req_finished(req);
2109
2110         if (data) {
2111                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2112
2113                 err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
2114                                 CL_FSYNC_ALL);
2115                 if (rc == 0 && err < 0)
2116                         rc = err;
2117                 if (rc < 0)
2118                         fd->fd_write_failed = true;
2119                 else
2120                         fd->fd_write_failed = false;
2121         }
2122
2123 #ifdef HAVE_FILE_FSYNC_4ARGS
2124         mutex_unlock(&inode->i_mutex);
2125 #endif
2126         RETURN(rc);
2127 }
2128
2129 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2130 {
2131         struct inode *inode = file->f_dentry->d_inode;
2132         struct ll_sb_info *sbi = ll_i2sbi(inode);
2133         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
2134                                            .ei_cb_cp =ldlm_flock_completion_ast,
2135                                            .ei_cbdata = file_lock };
2136         struct md_op_data *op_data;
2137         struct lustre_handle lockh = {0};
2138         ldlm_policy_data_t flock = {{0}};
2139         int flags = 0;
2140         int rc;
2141         ENTRY;
2142
2143         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2144                inode->i_ino, file_lock);
2145
2146         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2147
2148         if (file_lock->fl_flags & FL_FLOCK) {
2149                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2150                 /* flocks are whole-file locks */
2151                 flock.l_flock.end = OFFSET_MAX;
2152                 /* For flocks owner is determined by the local file desctiptor*/
2153                 flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2154         } else if (file_lock->fl_flags & FL_POSIX) {
2155                 flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2156                 flock.l_flock.start = file_lock->fl_start;
2157                 flock.l_flock.end = file_lock->fl_end;
2158         } else {
2159                 RETURN(-EINVAL);
2160         }
2161         flock.l_flock.pid = file_lock->fl_pid;
2162
2163         /* Somewhat ugly workaround for svc lockd.
2164          * lockd installs custom fl_lmops->lm_compare_owner that checks
2165          * for the fl_owner to be the same (which it always is on local node
2166          * I guess between lockd processes) and then compares pid.
2167          * As such we assign pid to the owner field to make it all work,
2168          * conflict with normal locks is unlikely since pid space and
2169          * pointer space for current->files are not intersecting */
2170         if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
2171                 flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2172
2173         switch (file_lock->fl_type) {
2174         case F_RDLCK:
2175                 einfo.ei_mode = LCK_PR;
2176                 break;
2177         case F_UNLCK:
2178                 /* An unlock request may or may not have any relation to
2179                  * existing locks so we may not be able to pass a lock handle
2180                  * via a normal ldlm_lock_cancel() request. The request may even
2181                  * unlock a byte range in the middle of an existing lock. In
2182                  * order to process an unlock request we need all of the same
2183                  * information that is given with a normal read or write record
2184                  * lock request. To avoid creating another ldlm unlock (cancel)
2185                  * message we'll treat a LCK_NL flock request as an unlock. */
2186                 einfo.ei_mode = LCK_NL;
2187                 break;
2188         case F_WRLCK:
2189                 einfo.ei_mode = LCK_PW;
2190                 break;
2191         default:
2192                 CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2193                         file_lock->fl_type);
2194                 RETURN (-ENOTSUPP);
2195         }
2196
2197         switch (cmd) {
2198         case F_SETLKW:
2199 #ifdef F_SETLKW64
2200         case F_SETLKW64:
2201 #endif
2202                 flags = 0;
2203                 break;
2204         case F_SETLK:
2205 #ifdef F_SETLK64
2206         case F_SETLK64:
2207 #endif
2208                 flags = LDLM_FL_BLOCK_NOWAIT;
2209                 break;
2210         case F_GETLK:
2211 #ifdef F_GETLK64
2212         case F_GETLK64:
2213 #endif
2214                 flags = LDLM_FL_TEST_LOCK;
2215                 /* Save the old mode so that if the mode in the lock changes we
2216                  * can decrement the appropriate reader or writer refcount. */
2217                 file_lock->fl_type = einfo.ei_mode;
2218                 break;
2219         default:
2220                 CERROR("unknown fcntl lock command: %d\n", cmd);
2221                 RETURN (-EINVAL);
2222         }
2223
2224         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2225                                      LUSTRE_OPC_ANY, NULL);
2226         if (IS_ERR(op_data))
2227                 RETURN(PTR_ERR(op_data));
2228
2229         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2230                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2231                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2232
2233         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2234                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2235
2236         ll_finish_md_op_data(op_data);
2237
2238         if ((file_lock->fl_flags & FL_FLOCK) &&
2239             (rc == 0 || file_lock->fl_type == F_UNLCK))
2240                 flock_lock_file_wait(file, file_lock);
2241         if ((file_lock->fl_flags & FL_POSIX) &&
2242             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2243             !(flags & LDLM_FL_TEST_LOCK))
2244                 posix_lock_file_wait(file, file_lock);
2245
2246         RETURN(rc);
2247 }
2248
2249 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2250 {
2251         ENTRY;
2252
2253         RETURN(-ENOSYS);
2254 }
2255
2256 /**
2257  * test if some locks matching bits and l_req_mode are acquired
2258  * - bits can be in different locks
2259  * - if found clear the common lock bits in *bits
2260  * - the bits not found, are kept in *bits
2261  * \param inode [IN]
2262  * \param bits [IN] searched lock bits [IN]
2263  * \param l_req_mode [IN] searched lock mode
2264  * \retval boolean, true iff all bits are found
2265  */
2266 int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2267 {
2268         struct lustre_handle lockh;
2269         ldlm_policy_data_t policy;
2270         ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
2271                                 (LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
2272         struct lu_fid *fid;
2273         __u64 flags;
2274         int i;
2275         ENTRY;
2276
2277         if (!inode)
2278                RETURN(0);
2279
2280         fid = &ll_i2info(inode)->lli_fid;
2281         CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
2282                ldlm_lockname[mode]);
2283
2284         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2285         for (i = 0; i < MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
2286                 policy.l_inodebits.bits = *bits & (1 << i);
2287                 if (policy.l_inodebits.bits == 0)
2288                         continue;
2289
2290                 if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
2291                                   &policy, mode, &lockh)) {
2292                         struct ldlm_lock *lock;
2293
2294                         lock = ldlm_handle2lock(&lockh);
2295                         if (lock) {
2296                                 *bits &=
2297                                       ~(lock->l_policy_data.l_inodebits.bits);
2298                                 LDLM_LOCK_PUT(lock);
2299                         } else {
2300                                 *bits &= ~policy.l_inodebits.bits;
2301                         }
2302                 }
2303         }
2304         RETURN(*bits == 0);
2305 }
2306
2307 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2308                             struct lustre_handle *lockh, __u64 flags)
2309 {
2310         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2311         struct lu_fid *fid;
2312         ldlm_mode_t rc;
2313         ENTRY;
2314
2315         fid = &ll_i2info(inode)->lli_fid;
2316         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2317
2318         rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
2319                            fid, LDLM_IBITS, &policy,
2320                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2321         RETURN(rc);
2322 }
2323
2324 static int ll_inode_revalidate_fini(struct inode *inode, int rc)
2325 {
2326         /* Already unlinked. Just update nlink and return success */
2327         if (rc == -ENOENT) {
2328                 clear_nlink(inode);
2329                 /* This path cannot be hit for regular files unless in
2330                  * case of obscure races, so no need to to validate
2331                  * size. */
2332                 if (!S_ISREG(inode->i_mode) && !S_ISDIR(inode->i_mode))
2333                         return 0;
2334         } else if (rc != 0) {
2335                 CERROR("%s: revalidate FID "DFID" error: rc = %d\n",
2336                        ll_get_fsname(inode->i_sb, NULL, 0),
2337                        PFID(ll_inode2fid(inode)), rc);
2338         }
2339
2340         return rc;
2341 }
2342
2343 int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2344                              __u64 ibits)
2345 {
2346         struct inode *inode = dentry->d_inode;
2347         struct ptlrpc_request *req = NULL;
2348         struct obd_export *exp;
2349         int rc = 0;
2350         ENTRY;
2351
2352         LASSERT(inode != NULL);
2353
2354         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2355                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2356
2357         exp = ll_i2mdexp(inode);
2358
2359         /* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
2360          *      But under CMD case, it caused some lock issues, should be fixed
2361          *      with new CMD ibits lock. See bug 12718 */
2362         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2363                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2364                 struct md_op_data *op_data;
2365
2366                 if (ibits == MDS_INODELOCK_LOOKUP)
2367                         oit.it_op = IT_LOOKUP;
2368
2369                 /* Call getattr by fid, so do not provide name at all. */
2370                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2371                                              dentry->d_inode, NULL, 0, 0,
2372                                              LUSTRE_OPC_ANY, NULL);
2373                 if (IS_ERR(op_data))
2374                         RETURN(PTR_ERR(op_data));
2375
2376                 oit.it_create_mode |= M_CHECK_STALE;
2377                 rc = md_intent_lock(exp, op_data, NULL, 0,
2378                                     /* we are not interested in name
2379                                        based lookup */
2380                                     &oit, 0, &req,
2381                                     ll_md_blocking_ast, 0);
2382                 ll_finish_md_op_data(op_data);
2383                 oit.it_create_mode &= ~M_CHECK_STALE;
2384                 if (rc < 0) {
2385                         rc = ll_inode_revalidate_fini(inode, rc);
2386                         GOTO (out, rc);
2387                 }
2388
2389                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2390                 if (rc != 0) {
2391                         ll_intent_release(&oit);
2392                         GOTO(out, rc);
2393                 }
2394
2395                 /* Unlinked? Unhash dentry, so it is not picked up later by
2396                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2397                    here to preserve get_cwd functionality on 2.6.
2398                    Bug 10503 */
2399                 if (!dentry->d_inode->i_nlink)
2400                         d_lustre_invalidate(dentry);
2401
2402                 ll_lookup_finish_locks(&oit, dentry);
2403         } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
2404                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2405                 obd_valid valid = OBD_MD_FLGETATTR;
2406                 struct md_op_data *op_data;
2407                 int ealen = 0;
2408
2409                 if (S_ISREG(inode->i_mode)) {
2410                         rc = ll_get_max_mdsize(sbi, &ealen);
2411                         if (rc)
2412                                 RETURN(rc);
2413                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2414                 }
2415
2416                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
2417                                              0, ealen, LUSTRE_OPC_ANY,
2418                                              NULL);
2419                 if (IS_ERR(op_data))
2420                         RETURN(PTR_ERR(op_data));
2421
2422                 op_data->op_valid = valid;
2423                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2424                  * capa for this inode. Because we only keep capas of dirs
2425                  * fresh. */
2426                 rc = md_getattr(sbi->ll_md_exp, op_data, &req);
2427                 ll_finish_md_op_data(op_data);
2428                 if (rc) {
2429                         rc = ll_inode_revalidate_fini(inode, rc);
2430                         RETURN(rc);
2431                 }
2432
2433                 rc = ll_prep_inode(&inode, req, NULL);
2434         }
2435 out:
2436         ptlrpc_req_finished(req);
2437         return rc;
2438 }
2439
2440 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2441                            __u64 ibits)
2442 {
2443         struct inode *inode = dentry->d_inode;
2444         int rc;
2445         ENTRY;
2446
2447         rc = __ll_inode_revalidate_it(dentry, it, ibits);
2448         if (rc != 0)
2449                 RETURN(rc);
2450
2451         /* if object isn't regular file, don't validate size */
2452         if (!S_ISREG(inode->i_mode)) {
2453                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
2454                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
2455                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
2456         } else {
2457                 rc = ll_glimpse_size(inode);
2458         }
2459         RETURN(rc);
2460 }
2461
2462 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2463                   struct lookup_intent *it, struct kstat *stat)
2464 {
2465         struct inode *inode = de->d_inode;
2466         struct ll_sb_info *sbi = ll_i2sbi(inode);
2467         struct ll_inode_info *lli = ll_i2info(inode);
2468         int res = 0;
2469
2470         res = ll_inode_revalidate_it(de, it, MDS_INODELOCK_UPDATE |
2471                                              MDS_INODELOCK_LOOKUP);
2472         ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
2473
2474         if (res)
2475                 return res;
2476
2477         stat->dev = inode->i_sb->s_dev;
2478         if (ll_need_32bit_api(sbi))
2479                 stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
2480         else
2481                 stat->ino = inode->i_ino;
2482         stat->mode = inode->i_mode;
2483         stat->nlink = inode->i_nlink;
2484         stat->uid = inode->i_uid;
2485         stat->gid = inode->i_gid;
2486         stat->rdev = inode->i_rdev;
2487         stat->atime = inode->i_atime;
2488         stat->mtime = inode->i_mtime;
2489         stat->ctime = inode->i_ctime;
2490         stat->blksize = 1 << inode->i_blkbits;
2491
2492         stat->size = i_size_read(inode);
2493         stat->blocks = inode->i_blocks;
2494
2495         return 0;
2496 }
2497 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2498 {
2499         struct lookup_intent it = { .it_op = IT_GETATTR };
2500
2501         return ll_getattr_it(mnt, de, &it, stat);
2502 }
2503
2504 #ifdef HAVE_LINUX_FIEMAP_H
2505 int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
2506                 __u64 start, __u64 len)
2507 {
2508         int rc;
2509         size_t num_bytes;
2510         struct ll_user_fiemap *fiemap;
2511         unsigned int extent_count = fieinfo->fi_extents_max;
2512
2513         num_bytes = sizeof(*fiemap) + (extent_count *
2514                                        sizeof(struct ll_fiemap_extent));
2515         OBD_ALLOC_LARGE(fiemap, num_bytes);
2516
2517         if (fiemap == NULL)
2518                 RETURN(-ENOMEM);
2519
2520         fiemap->fm_flags = fieinfo->fi_flags;
2521         fiemap->fm_extent_count = fieinfo->fi_extents_max;
2522         fiemap->fm_start = start;
2523         fiemap->fm_length = len;
2524         memcpy(&fiemap->fm_extents[0], fieinfo->fi_extents_start,
2525                sizeof(struct ll_fiemap_extent));
2526
2527         rc = ll_do_fiemap(inode, fiemap, num_bytes);
2528
2529         fieinfo->fi_flags = fiemap->fm_flags;
2530         fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
2531         memcpy(fieinfo->fi_extents_start, &fiemap->fm_extents[0],
2532                fiemap->fm_mapped_extents * sizeof(struct ll_fiemap_extent));
2533
2534         OBD_FREE_LARGE(fiemap, num_bytes);
2535         return rc;
2536 }
2537 #endif
2538
2539 struct posix_acl * ll_get_acl(struct inode *inode, int type)
2540 {
2541         struct ll_inode_info *lli = ll_i2info(inode);
2542         struct posix_acl *acl = NULL;
2543         ENTRY;
2544
2545         spin_lock(&lli->lli_lock);
2546         /* VFS' acl_permission_check->check_acl will release the refcount */
2547         acl = posix_acl_dup(lli->lli_posix_acl);
2548         spin_unlock(&lli->lli_lock);
2549
2550         RETURN(acl);
2551 }
2552
2553 #ifndef HAVE_GENERIC_PERMISSION_2ARGS
2554 static int
2555 # ifdef HAVE_GENERIC_PERMISSION_4ARGS
2556 ll_check_acl(struct inode *inode, int mask, unsigned int flags)
2557 # else
2558 ll_check_acl(struct inode *inode, int mask)
2559 # endif
2560 {
2561 # ifdef CONFIG_FS_POSIX_ACL
2562         struct posix_acl *acl;
2563         int rc;
2564         ENTRY;
2565
2566 #  ifdef HAVE_GENERIC_PERMISSION_4ARGS
2567         if (flags & IPERM_FLAG_RCU)
2568                 return -ECHILD;
2569 #  endif
2570         acl = ll_get_acl(inode, ACL_TYPE_ACCESS);
2571
2572         if (!acl)
2573                 RETURN(-EAGAIN);
2574
2575         rc = posix_acl_permission(inode, acl, mask);
2576         posix_acl_release(acl);
2577
2578         RETURN(rc);
2579 # else /* !CONFIG_FS_POSIX_ACL */
2580         return -EAGAIN;
2581 # endif /* CONFIG_FS_POSIX_ACL */
2582 }
2583 #endif /* HAVE_GENERIC_PERMISSION_2ARGS */
2584
2585 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
2586 int ll_inode_permission(struct inode *inode, int mask, unsigned int flags)
2587 #else
2588 # ifdef HAVE_INODE_PERMISION_2ARGS
2589 int ll_inode_permission(struct inode *inode, int mask)
2590 # else
2591 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2592 # endif
2593 #endif
2594 {
2595         int rc = 0;
2596         ENTRY;
2597
2598 #ifdef MAY_NOT_BLOCK
2599         if (mask & MAY_NOT_BLOCK)
2600                 return -ECHILD;
2601 #elif defined(HAVE_GENERIC_PERMISSION_4ARGS)
2602         if (flags & IPERM_FLAG_RCU)
2603                 return -ECHILD;
2604 #endif
2605
2606        /* as root inode are NOT getting validated in lookup operation,
2607         * need to do it before permission check. */
2608
2609         if (inode == inode->i_sb->s_root->d_inode) {
2610                 struct lookup_intent it = { .it_op = IT_LOOKUP };
2611
2612                 rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
2613                                               MDS_INODELOCK_LOOKUP);
2614                 if (rc)
2615                         RETURN(rc);
2616         }
2617
2618         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
2619                inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
2620
2621         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2622                 return lustre_check_remote_perm(inode, mask);
2623
2624         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2625         rc = ll_generic_permission(inode, mask, flags, ll_check_acl);
2626
2627         RETURN(rc);
2628 }
2629
2630 #ifdef HAVE_FILE_READV
2631 #define READ_METHOD readv
2632 #define READ_FUNCTION ll_file_readv
2633 #define WRITE_METHOD writev
2634 #define WRITE_FUNCTION ll_file_writev
2635 #else
2636 #define READ_METHOD aio_read
2637 #define READ_FUNCTION ll_file_aio_read
2638 #define WRITE_METHOD aio_write
2639 #define WRITE_FUNCTION ll_file_aio_write
2640 #endif
2641
2642 /* -o localflock - only provides locally consistent flock locks */
2643 struct file_operations ll_file_operations = {
2644         .read           = ll_file_read,
2645         .READ_METHOD    = READ_FUNCTION,
2646         .write          = ll_file_write,
2647         .WRITE_METHOD   = WRITE_FUNCTION,
2648         .unlocked_ioctl = ll_file_ioctl,
2649         .open           = ll_file_open,
2650         .release        = ll_file_release,
2651         .mmap           = ll_file_mmap,
2652         .llseek         = ll_file_seek,
2653 #ifdef HAVE_KERNEL_SENDFILE
2654         .sendfile       = ll_file_sendfile,
2655 #endif
2656 #ifdef HAVE_KERNEL_SPLICE_READ
2657         .splice_read    = ll_file_splice_read,
2658 #endif
2659         .fsync          = ll_fsync,
2660         .flush          = ll_flush
2661 };
2662
2663 struct file_operations ll_file_operations_flock = {
2664         .read           = ll_file_read,
2665         .READ_METHOD    = READ_FUNCTION,
2666         .write          = ll_file_write,
2667         .WRITE_METHOD   = WRITE_FUNCTION,
2668         .unlocked_ioctl = ll_file_ioctl,
2669         .open           = ll_file_open,
2670         .release        = ll_file_release,
2671         .mmap           = ll_file_mmap,
2672         .llseek         = ll_file_seek,
2673 #ifdef HAVE_KERNEL_SENDFILE
2674         .sendfile       = ll_file_sendfile,
2675 #endif
2676 #ifdef HAVE_KERNEL_SPLICE_READ
2677         .splice_read    = ll_file_splice_read,
2678 #endif
2679         .fsync          = ll_fsync,
2680         .flush          = ll_flush,
2681         .flock          = ll_file_flock,
2682         .lock           = ll_file_flock
2683 };
2684
2685 /* These are for -o noflock - to return ENOSYS on flock calls */
2686 struct file_operations ll_file_operations_noflock = {
2687         .read           = ll_file_read,
2688         .READ_METHOD    = READ_FUNCTION,
2689         .write          = ll_file_write,
2690         .WRITE_METHOD   = WRITE_FUNCTION,
2691         .unlocked_ioctl = ll_file_ioctl,
2692         .open           = ll_file_open,
2693         .release        = ll_file_release,
2694         .mmap           = ll_file_mmap,
2695         .llseek         = ll_file_seek,
2696 #ifdef HAVE_KERNEL_SENDFILE
2697         .sendfile       = ll_file_sendfile,
2698 #endif
2699 #ifdef HAVE_KERNEL_SPLICE_READ
2700         .splice_read    = ll_file_splice_read,
2701 #endif
2702         .fsync          = ll_fsync,
2703         .flush          = ll_flush,
2704         .flock          = ll_file_noflock,
2705         .lock           = ll_file_noflock
2706 };
2707
2708 struct inode_operations ll_file_inode_operations = {
2709         .setattr        = ll_setattr,
2710         .getattr        = ll_getattr,
2711         .permission     = ll_inode_permission,
2712         .setxattr       = ll_setxattr,
2713         .getxattr       = ll_getxattr,
2714         .listxattr      = ll_listxattr,
2715         .removexattr    = ll_removexattr,
2716 #ifdef  HAVE_LINUX_FIEMAP_H
2717         .fiemap         = ll_fiemap,
2718 #endif
2719 #ifdef HAVE_IOP_GET_ACL
2720         .get_acl        = ll_get_acl,
2721 #endif
2722 };
2723
2724 /* dynamic ioctl number support routins */
2725 static struct llioc_ctl_data {
2726         struct rw_semaphore     ioc_sem;
2727         cfs_list_t              ioc_head;
2728 } llioc = {
2729         __RWSEM_INITIALIZER(llioc.ioc_sem),
2730         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2731 };
2732
2733
2734 struct llioc_data {
2735         cfs_list_t              iocd_list;
2736         unsigned int            iocd_size;
2737         llioc_callback_t        iocd_cb;
2738         unsigned int            iocd_count;
2739         unsigned int            iocd_cmd[0];
2740 };
2741
2742 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2743 {
2744         unsigned int size;
2745         struct llioc_data *in_data = NULL;
2746         ENTRY;
2747
2748         if (cb == NULL || cmd == NULL ||
2749             count > LLIOC_MAX_CMD || count < 0)
2750                 RETURN(NULL);
2751
2752         size = sizeof(*in_data) + count * sizeof(unsigned int);
2753         OBD_ALLOC(in_data, size);
2754         if (in_data == NULL)
2755                 RETURN(NULL);
2756
2757         memset(in_data, 0, sizeof(*in_data));
2758         in_data->iocd_size = size;
2759         in_data->iocd_cb = cb;
2760         in_data->iocd_count = count;
2761         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2762
2763         down_write(&llioc.ioc_sem);
2764         cfs_list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2765         up_write(&llioc.ioc_sem);
2766
2767         RETURN(in_data);
2768 }
2769
2770 void ll_iocontrol_unregister(void *magic)
2771 {
2772         struct llioc_data *tmp;
2773
2774         if (magic == NULL)
2775                 return;
2776
2777         down_write(&llioc.ioc_sem);
2778         cfs_list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2779                 if (tmp == magic) {
2780                         unsigned int size = tmp->iocd_size;
2781
2782                         cfs_list_del(&tmp->iocd_list);
2783                         up_write(&llioc.ioc_sem);
2784
2785                         OBD_FREE(tmp, size);
2786                         return;
2787                 }
2788         }
2789         up_write(&llioc.ioc_sem);
2790
2791         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2792 }
2793
2794 EXPORT_SYMBOL(ll_iocontrol_register);
2795 EXPORT_SYMBOL(ll_iocontrol_unregister);
2796
2797 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
2798                         unsigned int cmd, unsigned long arg, int *rcp)
2799 {
2800         enum llioc_iter ret = LLIOC_CONT;
2801         struct llioc_data *data;
2802         int rc = -EINVAL, i;
2803
2804         down_read(&llioc.ioc_sem);
2805         cfs_list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2806                 for (i = 0; i < data->iocd_count; i++) {
2807                         if (cmd != data->iocd_cmd[i])
2808                                 continue;
2809
2810                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2811                         break;
2812                 }
2813
2814                 if (ret == LLIOC_STOP)
2815                         break;
2816         }
2817         up_read(&llioc.ioc_sem);
2818
2819         if (rcp)
2820                 *rcp = rc;
2821         return ret;
2822 }
2823
2824 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
2825 {
2826         struct ll_inode_info *lli = ll_i2info(inode);
2827         struct cl_env_nest nest;
2828         struct lu_env *env;
2829         int result;
2830         ENTRY;
2831
2832         if (lli->lli_clob == NULL)
2833                 RETURN(0);
2834
2835         env = cl_env_nested_get(&nest);
2836         if (IS_ERR(env))
2837                 RETURN(PTR_ERR(env));
2838
2839         result = cl_conf_set(env, lli->lli_clob, conf);
2840         cl_env_nested_put(&nest, env);
2841         RETURN(result);
2842 }
2843
2844 /**
2845  * This function checks if there exists a LAYOUT lock on the client side,
2846  * or enqueues it if it doesn't have one in cache.
2847  *
2848  * This function will not hold layout lock so it may be revoked any time after
2849  * this function returns. Any operations depend on layout should be redone
2850  * in that case.
2851  *
2852  * This function should be called before lov_io_init() to get an uptodate
2853  * layout version, the caller should save the version number and after IO
2854  * is finished, this function should be called again to verify that layout
2855  * is not changed during IO time.
2856  */
2857 int ll_layout_refresh(struct inode *inode, __u32 *gen)
2858 {
2859         struct ll_inode_info  *lli = ll_i2info(inode);
2860         struct ll_sb_info     *sbi = ll_i2sbi(inode);
2861         struct md_op_data     *op_data = NULL;
2862         struct lookup_intent   it = { .it_op = IT_LAYOUT };
2863         struct lustre_handle   lockh = { 0 };
2864         ldlm_mode_t            mode;
2865         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
2866                                            .ei_mode = LCK_CR,
2867                                            .ei_cb_bl = ll_md_blocking_ast,
2868                                            .ei_cb_cp = ldlm_completion_ast,
2869                                            .ei_cbdata = inode };
2870         int rc;
2871         ENTRY;
2872
2873         *gen = 0;
2874         if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
2875                 RETURN(0);
2876
2877         /* sanity checks */
2878         LASSERT(fid_is_sane(ll_inode2fid(inode)));
2879         LASSERT(S_ISREG(inode->i_mode));
2880
2881         /* mostly layout lock is caching on the local side, so try to match
2882          * it before grabbing layout lock mutex. */
2883         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
2884                                 LDLM_FL_LVB_READY);
2885         if (mode != 0) { /* hit cached lock */
2886                 /* lsm_layout_gen is started from 0, plus 1 here to distinguish
2887                  * the cases of no layout and first layout. */
2888                 *gen = lli->lli_layout_gen + 1;
2889
2890                 ldlm_lock_decref(&lockh, mode);
2891                 RETURN(0);
2892         }
2893
2894         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
2895                                      0, 0, LUSTRE_OPC_ANY, NULL);
2896         if (IS_ERR(op_data))
2897                 RETURN(PTR_ERR(op_data));
2898
2899         /* take layout lock mutex to enqueue layout lock exclusively. */
2900         mutex_lock(&lli->lli_layout_mutex);
2901
2902         /* try again inside layout mutex */
2903         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
2904                                 LDLM_FL_LVB_READY);
2905         if (mode != 0) { /* hit cached lock */
2906                 *gen = lli->lli_layout_gen + 1;
2907
2908                 ldlm_lock_decref(&lockh, mode);
2909                 mutex_unlock(&lli->lli_layout_mutex);
2910                 ll_finish_md_op_data(op_data);
2911                 RETURN(0);
2912         }
2913
2914         /* have to enqueue one */
2915         rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
2916                         NULL, 0, NULL, 0);
2917         if (it.d.lustre.it_data != NULL)
2918                 ptlrpc_req_finished(it.d.lustre.it_data);
2919         it.d.lustre.it_data = NULL;
2920
2921         if (rc == 0) {
2922                 struct ldlm_lock *lock;
2923                 struct cl_object_conf conf;
2924                 struct lustre_md md = { NULL };
2925                 void *lmm;
2926                 int lmmsize;
2927
2928                 LASSERT(lustre_handle_is_used(&lockh));
2929
2930                 /* set lock data in case this is a new lock */
2931                 ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
2932
2933                 lock = ldlm_handle2lock(&lockh);
2934                 LASSERT(lock != NULL);
2935
2936                 /* for IT_LAYOUT lock, lmm is returned in lock's lvb
2937                  * data via completion callback */
2938                 lmm = lock->l_lvb_data;
2939                 lmmsize = lock->l_lvb_len;
2940                 if (lmm != NULL) {
2941                         rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
2942                                         lmm, lmmsize);
2943                         if (rc >= 0) {
2944                                 if (md.lsm != NULL)
2945                                         *gen = md.lsm->lsm_layout_gen + 1;
2946                                 rc = 0;
2947                         } else {
2948                                 CERROR("file: "DFID" unpackmd error: %d\n",
2949                                         PFID(&lli->lli_fid), rc);
2950                         }
2951                 }
2952                 LDLM_LOCK_PUT(lock);
2953
2954                 /* set layout to file. This may cause lock expiration as we
2955                  * set layout inside layout ibits lock. */
2956                 memset(&conf, 0, sizeof conf);
2957                 conf.coc_inode = inode;
2958                 conf.u.coc_md = &md;
2959                 ll_layout_conf(inode, &conf);
2960                 /* is this racy? */
2961                 lli->lli_has_smd = md.lsm != NULL;
2962                 if (md.lsm != NULL)
2963                         obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
2964         }
2965         ll_intent_drop_lock(&it);
2966
2967         mutex_unlock(&lli->lli_layout_mutex);
2968         ll_finish_md_op_data(op_data);
2969
2970         RETURN(rc);
2971 }