Whamcloud - gitweb
- fixes in split about using correct byte order;
[fs/lustre-release.git] / lustre / llite / llite_lib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Light Super operations
5  *
6  *  Copyright (c) 2002-2005 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LLITE
25
26 #include <linux/module.h>
27 #include <linux/types.h>
28 #include <linux/random.h>
29 #include <linux/version.h>
30
31 #include <lustre_lite.h>
32 #include <lustre_ha.h>
33 #include <lustre_dlm.h>
34 #include <lprocfs_status.h>
35 #include <lustre_disk.h>
36 #include <lustre_param.h>
37 #include <lustre_log.h>
38 #include "llite_internal.h"
39
40 kmem_cache_t *ll_file_data_slab;
41
42 LIST_HEAD(ll_super_blocks);
43 spinlock_t ll_sb_lock = SPIN_LOCK_UNLOCKED;
44
45 extern struct address_space_operations ll_aops;
46 extern struct address_space_operations ll_dir_aops;
47
48 #ifndef log2
49 #define log2(n) ffz(~(n))
50 #endif
51
52
53 static struct ll_sb_info *ll_init_sbi(void)
54 {
55         struct ll_sb_info *sbi = NULL;
56         class_uuid_t uuid;
57         int i;
58         ENTRY;
59
60         OBD_ALLOC(sbi, sizeof(*sbi));
61         if (!sbi)
62                 RETURN(NULL);
63
64         spin_lock_init(&sbi->ll_lock);
65         spin_lock_init(&sbi->ll_lco.lco_lock);
66         INIT_LIST_HEAD(&sbi->ll_pglist);
67         if (num_physpages >> (20 - PAGE_SHIFT) < 512)
68                 sbi->ll_async_page_max = num_physpages / 2;
69         else
70                 sbi->ll_async_page_max = (num_physpages / 4) * 3;
71         sbi->ll_ra_info.ra_max_pages = min(num_physpages / 8,
72                                            SBI_DEFAULT_READAHEAD_MAX);
73         sbi->ll_ra_info.ra_max_read_ahead_whole_pages =
74                                            SBI_DEFAULT_READAHEAD_WHOLE_MAX;
75
76         INIT_LIST_HEAD(&sbi->ll_conn_chain);
77         INIT_LIST_HEAD(&sbi->ll_orphan_dentry_list);
78
79         class_generate_random_uuid(uuid);
80         class_uuid_unparse(uuid, &sbi->ll_sb_uuid);
81         CDEBUG(D_HA, "generated uuid: %s\n", sbi->ll_sb_uuid.uuid);
82
83         spin_lock(&ll_sb_lock);
84         list_add_tail(&sbi->ll_list, &ll_super_blocks);
85         spin_unlock(&ll_sb_lock);
86
87         INIT_LIST_HEAD(&sbi->ll_deathrow);
88         spin_lock_init(&sbi->ll_deathrow_lock);
89         for (i = 0; i < LL_PROCESS_HIST_MAX; i++) { 
90                 spin_lock_init(&sbi->ll_rw_extents_info.pp_extents[i].pp_r_hist.oh_lock);
91                 spin_lock_init(&sbi->ll_rw_extents_info.pp_extents[i].pp_w_hist.oh_lock);
92         }
93
94         RETURN(sbi);
95 }
96
97 void ll_free_sbi(struct super_block *sb)
98 {
99         struct ll_sb_info *sbi = ll_s2sbi(sb);
100         ENTRY;
101
102         if (sbi != NULL) {
103                 spin_lock(&ll_sb_lock);
104                 list_del(&sbi->ll_list);
105                 spin_unlock(&ll_sb_lock);
106                 OBD_FREE(sbi, sizeof(*sbi));
107         }
108         EXIT;
109 }
110
111 static struct dentry_operations ll_d_root_ops = {
112 #ifdef LUSTRE_KERNEL_VERSION
113         .d_compare = ll_dcompare,
114 #endif
115 };
116
117 /* Initialize the default and maximum LOV EA and cookie sizes.  This allows
118  * us to make MDS RPCs with large enough reply buffers to hold the
119  * maximum-sized (= maximum striped) EA and cookie without having to
120  * calculate this (via a call into the LOV + OSCs) each time we make an RPC. */
121 static int ll_init_ea_size(struct obd_export *md_exp, struct obd_export *dt_exp)
122 {
123         struct lov_stripe_md lsm = { .lsm_magic = LOV_MAGIC };
124         __u32 valsize = sizeof(struct lov_desc);
125         int rc, easize, def_easize, cookiesize;
126         struct lov_desc desc;
127         __u32 stripes;
128         ENTRY;
129
130         rc = obd_get_info(dt_exp, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC,
131                           &valsize, &desc);
132         if (rc)
133                 RETURN(rc);
134
135         stripes = min(desc.ld_tgt_count, (__u32)LOV_MAX_STRIPE_COUNT);
136         lsm.lsm_stripe_count = stripes;
137         easize = obd_size_diskmd(dt_exp, &lsm);
138
139         lsm.lsm_stripe_count = desc.ld_default_stripe_count;
140         def_easize = obd_size_diskmd(dt_exp, &lsm);
141
142         cookiesize = stripes * sizeof(struct llog_cookie);
143
144         CDEBUG(D_HA, "updating max_mdsize/max_cookiesize: %d/%d\n",
145                easize, cookiesize);
146
147         rc = md_init_ea_size(md_exp, easize, def_easize, cookiesize);
148         RETURN(rc);
149 }
150
151 static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
152                                     int mdt_pag, uid_t nllu, gid_t nllg)
153 {
154         struct inode *root = 0;
155         struct ll_sb_info *sbi = ll_s2sbi(sb);
156         struct obd_device *obd;
157         struct lu_fid rootfid;
158         struct obd_capa *oc = NULL;
159         struct obd_statfs osfs;
160         struct ptlrpc_request *request = NULL;
161         struct lustre_handle dt_conn = {0, };
162         struct lustre_handle md_conn = {0, };
163         struct obd_connect_data *data = NULL;
164         struct lustre_md lmd;
165         obd_valid valid;
166         int size, err;
167         ENTRY;
168
169         obd = class_name2obd(md);
170         if (!obd) {
171                 CERROR("MD %s: not setup or attached\n", md);
172                 RETURN(-EINVAL);
173         }
174
175         OBD_ALLOC_PTR(data);
176         if (data == NULL)
177                 RETURN(-ENOMEM);
178
179         if (proc_lustre_fs_root) {
180                 err = lprocfs_register_mountpoint(proc_lustre_fs_root, sb,
181                                                   dt, md);
182                 if (err < 0)
183                         CERROR("could not register mount in /proc/lustre");
184         }
185
186         /* indicate the features supported by this client */
187         data->ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_NODEVOH |
188                                   OBD_CONNECT_ACL | OBD_CONNECT_JOIN |
189                                   OBD_CONNECT_ATTRFID | OBD_CONNECT_VERSION |
190                                   OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA;
191         data->ocd_ibits_known = MDS_INODELOCK_FULL;
192         data->ocd_version = LUSTRE_VERSION_CODE;
193
194         if (sb->s_flags & MS_RDONLY)
195                 data->ocd_connect_flags |= OBD_CONNECT_RDONLY;
196         if (sbi->ll_flags & LL_SBI_USER_XATTR)
197                 data->ocd_connect_flags |= OBD_CONNECT_XATTR;
198
199         if (sbi->ll_flags & LL_SBI_FLOCK)
200                 sbi->ll_fop = &ll_file_operations_flock;
201         else
202                 sbi->ll_fop = &ll_file_operations;
203
204         /* real client */
205         data->ocd_connect_flags |= OBD_CONNECT_REAL;
206         if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
207                 data->ocd_connect_flags &= ~OBD_CONNECT_LCL_CLIENT;
208                 data->ocd_connect_flags |= OBD_CONNECT_RMT_CLIENT;
209                 data->ocd_nllu = nllu;
210                 data->ocd_nllg = nllg;
211         } else {
212                 data->ocd_connect_flags &= ~OBD_CONNECT_RMT_CLIENT;
213                 data->ocd_connect_flags |= OBD_CONNECT_LCL_CLIENT;
214         }
215
216         if (mdt_pag)
217                 obd_set_info_async(obd->obd_self_export, 3, "pag",
218                                    0, NULL, NULL);
219
220         err = obd_connect(NULL, &md_conn, obd, &sbi->ll_sb_uuid, data);
221         if (err == -EBUSY) {
222                 LCONSOLE_ERROR("An MDT (md %s) is performing recovery, of "
223                                "which this client is not a part.  Please wait "
224                                "for recovery to complete, abort, or "
225                                "time out.\n", md);
226                 GOTO(out, err);
227         } else if (err) {
228                 CERROR("cannot connect to %s: rc = %d\n", md, err);
229                 GOTO(out, err);
230         }
231         sbi->ll_md_exp = class_conn2export(&md_conn);
232
233         err = obd_statfs(obd, &osfs, cfs_time_current_64() - HZ);
234         if (err)
235                 GOTO(out_md, err);
236
237         size = sizeof(*data);
238         err = obd_get_info(sbi->ll_md_exp, strlen(KEY_CONN_DATA), KEY_CONN_DATA,
239                            &size, data);
240         if (err) {
241                 CERROR("Get connect data failed: %d \n", err);
242                 GOTO(out_md, err);
243         }
244
245         LASSERT(osfs.os_bsize);
246         sb->s_blocksize = osfs.os_bsize;
247         sb->s_blocksize_bits = log2(osfs.os_bsize);
248         sb->s_magic = LL_SUPER_MAGIC;
249         sb->s_maxbytes = PAGE_CACHE_MAXBYTES;
250         sbi->ll_namelen = osfs.os_namelen;
251         sbi->ll_max_rw_chunk = LL_DEFAULT_MAX_RW_CHUNK;
252
253         if ((sbi->ll_flags & LL_SBI_USER_XATTR) &&
254             !(data->ocd_connect_flags & OBD_CONNECT_XATTR)) {
255                 LCONSOLE_INFO("Disabling user_xattr feature because "
256                               "it is not supported on the server\n");
257                 sbi->ll_flags &= ~LL_SBI_USER_XATTR;
258         }
259
260         if (data->ocd_connect_flags & OBD_CONNECT_ACL) {
261 #ifdef MS_POSIXACL
262                 sb->s_flags |= MS_POSIXACL;
263 #endif
264                 sbi->ll_flags |= LL_SBI_ACL;
265         } else {
266                 LCONSOLE_INFO("client wants to enable acl, but mdt not!\n");
267 #ifdef MS_POSIXACL
268                 sb->s_flags &= ~MS_POSIXACL;
269 #endif
270                 sbi->ll_flags &= ~LL_SBI_ACL;
271         }
272
273         if (data->ocd_connect_flags & OBD_CONNECT_JOIN)
274                 sbi->ll_flags |= LL_SBI_JOIN;
275
276         if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
277                 if (!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT)) {
278                         /* sometimes local client claims to be remote, but mdt
279                          * will disagree when client gss not applied. */
280                         LCONSOLE_INFO("client claims to be remote, but server "
281                                       "rejected, forced to be local.\n");
282                         sbi->ll_flags &= ~LL_SBI_RMT_CLIENT;
283                 }
284         } else {
285                 if (!(data->ocd_connect_flags & OBD_CONNECT_LCL_CLIENT)) {
286                         /* with gss applied, remote client can not claim to be
287                          * local, so mdt maybe force client to be remote. */
288                         LCONSOLE_INFO("client claims to be local, but server "
289                                       "rejected, forced to be remote.\n");
290                         sbi->ll_flags |= LL_SBI_RMT_CLIENT;
291                 }
292         }
293
294         if (data->ocd_connect_flags & OBD_CONNECT_MDS_CAPA) {
295                 LCONSOLE_INFO("client enabled MDS capability!\n");
296                 sbi->ll_flags |= LL_SBI_MDS_CAPA;
297         }
298
299         if (data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA) {
300                 LCONSOLE_INFO("client enabled OSS capability!\n");
301                 sbi->ll_flags |= LL_SBI_OSS_CAPA;
302         }
303
304 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,0))
305         /* We set sb->s_dev equal on all lustre clients in order to support
306          * NFS export clustering.  NFSD requires that the FSID be the same
307          * on all clients. */
308         /* s_dev is also used in lt_compare() to compare two fs, but that is
309          * only a node-local comparison. */
310         
311         /* XXX: this will not work with LMV */
312         sb->s_dev = get_uuid2int(sbi2mdc(sbi)->cl_target_uuid.uuid,
313                                  strlen(sbi2mdc(sbi)->cl_target_uuid.uuid));
314 #endif
315
316         obd = class_name2obd(dt);
317         if (!obd) {
318                 CERROR("DT %s: not setup or attached\n", dt);
319                 GOTO(out_md, err = -ENODEV);
320         }
321
322         data->ocd_connect_flags = OBD_CONNECT_GRANT | OBD_CONNECT_VERSION |
323                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE;
324         if (sbi->ll_flags & LL_SBI_OSS_CAPA)
325                 data->ocd_connect_flags |= OBD_CONNECT_OSS_CAPA;
326
327         CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d "
328                "ocd_grant: %d\n", data->ocd_connect_flags,
329                data->ocd_version, data->ocd_grant);
330
331         obd->obd_upcall.onu_owner = &sbi->ll_lco;
332         obd->obd_upcall.onu_upcall = ll_ocd_update;
333         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << PAGE_SHIFT;
334
335         err = obd_connect(NULL, &dt_conn, obd, &sbi->ll_sb_uuid, data);
336         if (err == -EBUSY) {
337                 LCONSOLE_ERROR("An OST (dt %s) is performing recovery, of which this"
338                                " client is not a part.  Please wait for recovery to "
339                                "complete, abort, or time out.\n", dt);
340                 GOTO(out, err);
341         } else if (err) {
342                 CERROR("cannot connect to %s: rc = %d\n", dt, err);
343                 GOTO(out_md, err);
344         }
345
346         sbi->ll_dt_exp = class_conn2export(&dt_conn);
347
348         spin_lock(&sbi->ll_lco.lco_lock);
349         sbi->ll_lco.lco_flags = data->ocd_connect_flags;
350         spin_unlock(&sbi->ll_lco.lco_lock);
351
352         ll_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp);
353
354         err = obd_prep_async_page(sbi->ll_dt_exp, NULL, NULL, NULL,
355                                   0, NULL, NULL, NULL);
356         if (err < 0) {
357                 LCONSOLE_ERROR("There are no OST's in this filesystem. "
358                                "There must be at least one active OST for "
359                                "a client to start.\n");
360                 GOTO(out_dt, err);
361         }
362
363         if (!ll_async_page_slab) {
364                 ll_async_page_slab_size =
365                         size_round(sizeof(struct ll_async_page)) + err;
366                 ll_async_page_slab = kmem_cache_create("ll_async_page",
367                                                        ll_async_page_slab_size,
368                                                        0, 0, NULL, NULL);
369                 if (!ll_async_page_slab)
370                         GOTO(out_dt, err = -ENOMEM);
371         }
372
373         err = md_getstatus(sbi->ll_md_exp, &rootfid, &oc);
374         if (err) {
375                 CERROR("cannot mds_connect: rc = %d\n", err);
376                 GOTO(out_dt, err);
377         }
378         CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&rootfid));
379         sbi->ll_root_fid = rootfid;
380
381         sb->s_op = &lustre_super_operations;
382 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
383         sb->s_export_op = &lustre_export_operations;
384 #endif
385
386         /* make root inode
387          * XXX: move this to after cbd setup? */
388         valid = OBD_MD_FLGETATTR | OBD_MD_FLBLOCKS | OBD_MD_FLMDSCAPA;
389         if (sbi->ll_flags & LL_SBI_RMT_CLIENT)
390                 valid |= OBD_MD_FLRMTPERM;
391         else if (sbi->ll_flags & LL_SBI_ACL)
392                 valid |= OBD_MD_FLACL;
393
394         err = md_getattr(sbi->ll_md_exp, &rootfid, oc, valid, 0, &request);
395         if (oc)
396                 free_capa(oc);
397         if (err) {
398                 CERROR("md_getattr failed for root: rc = %d\n", err);
399                 GOTO(out_dt, err);
400         }
401
402         err = md_get_lustre_md(sbi->ll_md_exp, request, 
403                                REPLY_REC_OFF, sbi->ll_dt_exp, sbi->ll_md_exp, 
404                                &lmd);
405         if (err) {
406                 CERROR("failed to understand root inode md: rc = %d\n", err);
407                 ptlrpc_req_finished (request);
408                 GOTO(out_dt, err);
409         }
410
411         LASSERT(fid_is_sane(&sbi->ll_root_fid));
412         root = ll_iget(sb, ll_fid_build_ino(sbi, &sbi->ll_root_fid), &lmd);
413         ptlrpc_req_finished(request);
414
415         if (root == NULL || is_bad_inode(root)) {
416                 md_free_lustre_md(sbi->ll_dt_exp, &lmd);
417                 CERROR("lustre_lite: bad iget4 for root\n");
418                 GOTO(out_root, err = -EBADF);
419         }
420
421         err = ll_close_thread_start(&sbi->ll_lcq);
422         if (err) {
423                 CERROR("cannot start close thread: rc %d\n", err);
424                 GOTO(out_root, err);
425         }
426
427         /* making vm readahead 0 for 2.4.x. In the case of 2.6.x,
428            backing dev info assigned to inode mapping is used for
429            determining maximal readahead. */
430 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,0)) && \
431     !defined(KERNEL_HAS_AS_MAX_READAHEAD)
432         /* bug 2805 - set VM readahead to zero */
433         vm_max_readahead = vm_min_readahead = 0;
434 #endif
435
436         sb->s_root = d_alloc_root(root);
437         if (data != NULL)
438                 OBD_FREE(data, sizeof(*data));
439         sb->s_root->d_op = &ll_d_root_ops;
440         RETURN(err);
441
442 out_root:
443         if (root)
444                 iput(root);
445 out_dt:
446         obd_disconnect(sbi->ll_dt_exp);
447         sbi->ll_dt_exp = NULL;
448 out_md:
449         obd_disconnect(sbi->ll_md_exp);
450         sbi->ll_md_exp = NULL;
451 out:
452         if (data != NULL)
453                 OBD_FREE_PTR(data);
454         lprocfs_unregister_mountpoint(sbi);
455         RETURN(err);
456 }
457
458 int ll_get_max_mdsize(struct ll_sb_info *sbi, int *lmmsize)
459 {
460         int size, rc;
461
462         *lmmsize = obd_size_diskmd(sbi->ll_dt_exp, NULL);
463         size = sizeof(int);
464         rc = obd_get_info(sbi->ll_md_exp, strlen("max_easize"), "max_easize",
465                           &size, lmmsize);
466         if (rc)
467                 CERROR("Get max mdsize error rc %d \n", rc);
468
469         RETURN(rc);
470 }
471
472 void ll_dump_inode(struct inode *inode)
473 {
474         struct list_head *tmp;
475         int dentry_count = 0;
476
477         LASSERT(inode != NULL);
478
479         list_for_each(tmp, &inode->i_dentry)
480                 dentry_count++;
481
482         CERROR("inode %p dump: dev=%s ino=%lu mode=%o count=%u, %d dentries\n",
483                inode, ll_i2mdexp(inode)->exp_obd->obd_name, inode->i_ino,
484                inode->i_mode, atomic_read(&inode->i_count), dentry_count);
485 }
486
487 void lustre_dump_dentry(struct dentry *dentry, int recur)
488 {
489         struct list_head *tmp;
490         int subdirs = 0;
491
492         LASSERT(dentry != NULL);
493
494         list_for_each(tmp, &dentry->d_subdirs)
495                 subdirs++;
496
497         CERROR("dentry %p dump: name=%.*s parent=%.*s (%p), inode=%p, count=%u,"
498                " flags=0x%x, fsdata=%p, %d subdirs\n", dentry,
499                dentry->d_name.len, dentry->d_name.name,
500                dentry->d_parent->d_name.len, dentry->d_parent->d_name.name,
501                dentry->d_parent, dentry->d_inode, atomic_read(&dentry->d_count),
502                dentry->d_flags, dentry->d_fsdata, subdirs);
503         if (dentry->d_inode != NULL)
504                 ll_dump_inode(dentry->d_inode);
505
506         if (recur == 0)
507                 return;
508
509         list_for_each(tmp, &dentry->d_subdirs) {
510                 struct dentry *d = list_entry(tmp, struct dentry, d_child);
511                 lustre_dump_dentry(d, recur - 1);
512         }
513 }
514
515 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
516 void lustre_throw_orphan_dentries(struct super_block *sb)
517 {
518         struct dentry *dentry, *next;
519         struct ll_sb_info *sbi = ll_s2sbi(sb);
520
521         /* Do this to get rid of orphaned dentries. That is not really trw. */
522         list_for_each_entry_safe(dentry, next, &sbi->ll_orphan_dentry_list,
523                                  d_hash) {
524                 CWARN("found orphan dentry %.*s (%p->%p) at unmount, dumping "
525                       "before and after shrink_dcache_parent\n",
526                       dentry->d_name.len, dentry->d_name.name, dentry, next);
527                 lustre_dump_dentry(dentry, 1);
528                 shrink_dcache_parent(dentry);
529                 lustre_dump_dentry(dentry, 1);
530         }
531 }
532 #else
533 #define lustre_throw_orphan_dentries(sb)
534 #endif
535
536 static void prune_dir_dentries(struct inode *inode)
537 {
538         struct dentry *dentry, *prev = NULL;
539
540         /* due to lustre specific logic, a directory
541          * can have few dentries - a bug from VFS POV */
542 restart:
543         spin_lock(&dcache_lock);
544         if (!list_empty(&inode->i_dentry)) {
545                 dentry = list_entry(inode->i_dentry.prev,
546                                     struct dentry, d_alias);
547                 /* in order to prevent infinite loops we
548                  * break if previous dentry is busy */
549                 if (dentry != prev) {
550                         prev = dentry;
551                         dget_locked(dentry);
552                         spin_unlock(&dcache_lock);
553
554                         /* try to kill all child dentries */
555                         lock_dentry(dentry);
556                         shrink_dcache_parent(dentry);
557                         unlock_dentry(dentry);
558                         dput(dentry);
559
560                         /* now try to get rid of current dentry */
561                         d_prune_aliases(inode);
562                         goto restart;
563                 }
564         }
565         spin_unlock(&dcache_lock);
566 }
567
568 static void prune_deathrow_one(struct ll_inode_info *lli)
569 {
570         struct inode *inode = ll_info2i(lli);
571
572         /* first, try to drop any dentries - they hold a ref on the inode */
573         if (S_ISDIR(inode->i_mode))
574                 prune_dir_dentries(inode);
575         else
576                 d_prune_aliases(inode);
577
578
579         /* if somebody still uses it, leave it */
580         LASSERT(atomic_read(&inode->i_count) > 0);
581         if (atomic_read(&inode->i_count) > 1)
582                 goto out;
583
584         CDEBUG(D_INODE, "inode %lu/%u(%d) looks a good candidate for prune\n",
585                inode->i_ino,inode->i_generation, atomic_read(&inode->i_count));
586
587         /* seems nobody uses it anymore */
588         inode->i_nlink = 0;
589
590 out:
591         iput(inode);
592         return;
593 }
594
595 static void prune_deathrow(struct ll_sb_info *sbi, int try)
596 {
597         struct ll_inode_info *lli;
598         int empty;
599
600         do {
601                 if (need_resched() && try)
602                         break;
603
604                 if (try) {
605                         if (!spin_trylock(&sbi->ll_deathrow_lock))
606                                 break;
607                 } else {
608                         spin_lock(&sbi->ll_deathrow_lock);
609                 }
610
611                 empty = 1;
612                 lli = NULL;
613                 if (!list_empty(&sbi->ll_deathrow)) {
614                         lli = list_entry(sbi->ll_deathrow.next,
615                                          struct ll_inode_info,
616                                          lli_dead_list);
617                         list_del_init(&lli->lli_dead_list);
618                         if (!list_empty(&sbi->ll_deathrow))
619                                 empty = 0;
620                 }
621                 spin_unlock(&sbi->ll_deathrow_lock);
622
623                 if (lli)
624                         prune_deathrow_one(lli);
625
626         } while (empty == 0);
627 }
628
629 void client_common_put_super(struct super_block *sb)
630 {
631         struct ll_sb_info *sbi = ll_s2sbi(sb);
632         ENTRY;
633
634         ll_close_thread_shutdown(sbi->ll_lcq);
635
636         /* destroy inodes in deathrow */
637         prune_deathrow(sbi, 0);
638
639         list_del(&sbi->ll_conn_chain);
640         obd_disconnect(sbi->ll_dt_exp);
641         sbi->ll_dt_exp = NULL;
642
643         lprocfs_unregister_mountpoint(sbi);
644
645         obd_disconnect(sbi->ll_md_exp);
646         sbi->ll_md_exp = NULL;
647
648         lustre_throw_orphan_dentries(sb);
649         EXIT;
650 }
651
652 char *ll_read_opt(const char *opt, char *data)
653 {
654         char *value;
655         char *retval;
656         ENTRY;
657
658         CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data);
659         if (strncmp(opt, data, strlen(opt)))
660                 RETURN(NULL);
661         if ((value = strchr(data, '=')) == NULL)
662                 RETURN(NULL);
663
664         value++;
665         OBD_ALLOC(retval, strlen(value) + 1);
666         if (!retval) {
667                 CERROR("out of memory!\n");
668                 RETURN(NULL);
669         }
670
671         memcpy(retval, value, strlen(value)+1);
672         CDEBUG(D_SUPER, "Assigned option: %s, value %s\n", opt, retval);
673         RETURN(retval);
674 }
675
676 static inline int ll_set_opt(const char *opt, char *data, int fl)
677 {
678         if (strncmp(opt, data, strlen(opt)) != 0)
679                 return(0);
680         else
681                 return(fl);
682 }
683
684 /* non-client-specific mount options are parsed in lmd_parse */
685 static int ll_options(char *options, int *flags)
686 {
687         int tmp;
688         char *s1 = options, *s2;
689         ENTRY;
690
691         if (!options) 
692                 RETURN(0);
693
694         CDEBUG(D_CONFIG, "Parsing opts %s\n", options);
695
696         while (*s1) {
697                 CDEBUG(D_SUPER, "next opt=%s\n", s1);
698                 tmp = ll_set_opt("nolock", s1, LL_SBI_NOLCK);
699                 if (tmp) {
700                         *flags |= tmp;
701                         goto next;
702                 }
703                 tmp = ll_set_opt("flock", s1, LL_SBI_FLOCK);
704                 if (tmp) {
705                         *flags |= tmp;
706                         goto next;
707                 }
708                 tmp = ll_set_opt("noflock", s1, LL_SBI_FLOCK);
709                 if (tmp) {
710                         *flags &= ~tmp;
711                         goto next;
712                 }
713                 tmp = ll_set_opt("user_xattr", s1, LL_SBI_USER_XATTR);
714                 if (tmp) {
715                         *flags |= tmp;
716                         goto next;
717                 }
718                 tmp = ll_set_opt("nouser_xattr", s1, LL_SBI_USER_XATTR);
719                 if (tmp) {
720                         *flags &= ~tmp;
721                         goto next;
722                 }
723                 tmp = ll_set_opt("acl", s1, LL_SBI_ACL);
724                 if (tmp) {
725                         /* Ignore deprecated mount option.  The client will
726                          * always try to mount with ACL support, whether this
727                          * is used depends on whether server supports it. */
728                         goto next;
729                 }
730                 tmp = ll_set_opt("noacl", s1, LL_SBI_ACL);
731                 if (tmp) {
732                         goto next;
733                 }
734                 tmp = ll_set_opt("remote_client", s1, LL_SBI_RMT_CLIENT);
735                 if (tmp) {
736                         *flags |= tmp;
737                         goto next;
738                 }
739
740                 LCONSOLE_ERROR("Unknown option '%s', won't mount.\n", s1);
741                 RETURN(-EINVAL);
742
743 next:
744                 /* Find next opt */
745                 s2 = strchr(s1, ',');
746                 if (s2 == NULL)
747                         break;
748                 s1 = s2 + 1;
749         }
750         RETURN(0);
751 }
752
753 void ll_lli_init(struct ll_inode_info *lli)
754 {
755         sema_init(&lli->lli_open_sem, 1);
756         sema_init(&lli->lli_size_sem, 1);
757         sema_init(&lli->lli_write_sem, 1);
758         lli->lli_flags = 0;
759         lli->lli_maxbytes = PAGE_CACHE_MAXBYTES;
760         spin_lock_init(&lli->lli_lock);
761         INIT_LIST_HEAD(&lli->lli_pending_write_llaps);
762         INIT_LIST_HEAD(&lli->lli_close_list);
763         lli->lli_inode_magic = LLI_INODE_MAGIC;
764         sema_init(&lli->lli_och_sem, 1);
765         lli->lli_mds_read_och = lli->lli_mds_write_och = NULL;
766         lli->lli_mds_exec_och = NULL;
767         lli->lli_open_fd_read_count = lli->lli_open_fd_write_count = 0;
768         lli->lli_open_fd_exec_count = 0;
769         INIT_LIST_HEAD(&lli->lli_dead_list);
770         lli->lli_remote_perms = NULL;
771         lli->lli_rmtperm_utime = 0;
772         sema_init(&lli->lli_rmtperm_sem, 1);
773         INIT_LIST_HEAD(&lli->lli_oss_capas);
774 }
775
776 /* COMPAT_146 */
777 #define MDCDEV "mdc_dev"
778 static int old_lustre_process_log(struct super_block *sb, char *newprofile,
779                                   struct config_llog_instance *cfg)
780 {
781         struct lustre_sb_info *lsi = s2lsi(sb);
782         struct obd_device *obd;
783         struct lustre_handle mdc_conn = {0, };
784         struct obd_export *exp;
785         char *ptr, *mdt, *profile;
786         char niduuid[10] = "mdtnid0";
787         class_uuid_t uuid;
788         struct obd_uuid mdc_uuid;
789         struct llog_ctxt *ctxt;
790         struct obd_connect_data ocd = { 0 };
791         lnet_nid_t nid;
792         int i, rc = 0, recov_bk = 1, failnodes = 0;
793         ENTRY;
794
795         class_generate_random_uuid(uuid);
796         class_uuid_unparse(uuid, &mdc_uuid);
797         CDEBUG(D_HA, "generated uuid: %s\n", mdc_uuid.uuid);
798         
799         /* Figure out the old mdt and profile name from new-style profile
800            ("lustre" from "mds/lustre-client") */
801         mdt = newprofile;
802         profile = strchr(mdt, '/');
803         if (profile == NULL) {
804                 CDEBUG(D_CONFIG, "Can't find MDT name in %s\n", newprofile);
805                 GOTO(out, rc = -EINVAL);
806         }
807         *profile = '\0';
808         profile++;
809         ptr = strrchr(profile, '-');
810         if (ptr == NULL) {
811                 CDEBUG(D_CONFIG, "Can't find client name in %s\n", newprofile);
812                 GOTO(out, rc = -EINVAL);
813         }
814         *ptr = '\0';
815
816         LCONSOLE_WARN("This looks like an old mount command; I will try to "
817                       "contact MDT '%s' for profile '%s'\n", mdt, profile);
818
819         /* Use nids from mount line: uml1,1@elan:uml2,2@elan:/lustre */
820         i = 0;
821         ptr = lsi->lsi_lmd->lmd_dev;
822         while (class_parse_nid(ptr, &nid, &ptr) == 0) {
823                 rc = do_lcfg(MDCDEV, nid, LCFG_ADD_UUID, niduuid, 0,0,0);
824                 i++;
825                 /* Stop at the first failover nid */
826                 if (*ptr == ':') 
827                         break;
828         }
829         if (i == 0) {
830                 CERROR("No valid MDT nids found.\n");
831                 GOTO(out, rc = -EINVAL);
832         }
833         failnodes++;
834
835         rc = do_lcfg(MDCDEV, 0, LCFG_ATTACH, LUSTRE_MDC_NAME, mdc_uuid.uuid, 0, 0);
836         if (rc < 0)
837                 GOTO(out_del_uuid, rc);
838
839         rc = do_lcfg(MDCDEV, 0, LCFG_SETUP, mdt, niduuid, 0, 0);
840         if (rc < 0) {
841                 LCONSOLE_ERROR("I couldn't establish a connection with the MDT."
842                                " Check that the MDT host NID is correct and the"
843                                " networks are up.\n");
844                 GOTO(out_detach, rc);
845         }
846
847         obd = class_name2obd(MDCDEV);
848         if (obd == NULL)
849                 GOTO(out_cleanup, rc = -EINVAL);
850
851         /* Add any failover nids */
852         while (*ptr == ':') {
853                 /* New failover node */
854                 sprintf(niduuid, "mdtnid%d", failnodes);
855                 i = 0;
856                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
857                         i++;
858                         rc = do_lcfg(MDCDEV, nid, LCFG_ADD_UUID, niduuid,0,0,0);
859                         if (rc)
860                                 CERROR("Add uuid for %s failed %d\n", 
861                                        libcfs_nid2str(nid), rc);
862                         if (*ptr == ':') 
863                                 break;
864                 }
865                 if (i > 0) {
866                         rc = do_lcfg(MDCDEV, 0, LCFG_ADD_CONN, niduuid, 0, 0,0);
867                         if (rc) 
868                                 CERROR("Add conn for %s failed %d\n", 
869                                        libcfs_nid2str(nid), rc);
870                         failnodes++;
871                 } else {
872                         /* at ":/fsname" */
873                         break;
874                 }
875         }
876
877         /* Try all connections, but only once. */
878         rc = obd_set_info_async(obd->obd_self_export,
879                                 strlen("init_recov_bk"), "init_recov_bk",
880                                 sizeof(recov_bk), &recov_bk, NULL);
881         if (rc)
882                 GOTO(out_cleanup, rc);
883
884         /* If we don't have this then an ACL MDS will refuse the connection */
885         ocd.ocd_connect_flags = OBD_CONNECT_ACL;
886
887         rc = obd_connect(NULL, &mdc_conn, obd, &mdc_uuid, &ocd);
888         if (rc) {
889                 CERROR("cannot connect to %s: rc = %d\n", mdt, rc);
890                 GOTO(out_cleanup, rc);
891         }
892
893         exp = class_conn2export(&mdc_conn);
894
895         ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
896         
897         cfg->cfg_flags |= CFG_F_COMPAT146;
898
899 #if 1
900         rc = class_config_parse_llog(ctxt, profile, cfg);
901 #else
902         /*
903          * For debugging, it's useful to just dump the log
904          */
905         rc = class_config_dump_llog(ctxt, profile, cfg);
906 #endif
907         switch (rc) {
908         case 0: {
909                 /* Set the caller's profile name to the old-style */
910                 memcpy(newprofile, profile, strlen(profile) + 1);
911                 break;
912         }
913         case -EINVAL:
914                 LCONSOLE_ERROR("%s: The configuration '%s' could not be read "
915                                "from the MDT '%s'.  Make sure this client and "
916                                "the MDT are running compatible versions of "
917                                "Lustre.\n",
918                                obd->obd_name, profile, mdt);
919                 /* fall through */
920         default:
921                 LCONSOLE_ERROR("%s: The configuration '%s' could not be read "
922                                "from the MDT '%s'.  This may be the result of "
923                                "communication errors between the client and "
924                                "the MDT, or if the MDT is not running.\n",
925                                obd->obd_name, profile, mdt);
926                 break;
927         }
928
929         /* We don't so much care about errors in cleaning up the config llog
930          * connection, as we have already read the config by this point. */
931         obd_disconnect(exp);
932
933 out_cleanup:
934         do_lcfg(MDCDEV, 0, LCFG_CLEANUP, 0, 0, 0, 0);
935
936 out_detach:
937         do_lcfg(MDCDEV, 0, LCFG_DETACH, 0, 0, 0, 0);
938
939 out_del_uuid:
940         /* class_add_uuid adds a nid even if the same uuid exists; we might
941            delete any copy here.  So they all better match. */
942         for (i = 0; i < failnodes; i++) {
943                 sprintf(niduuid, "mdtnid%d", i);
944                 do_lcfg(MDCDEV, 0, LCFG_DEL_UUID, niduuid, 0, 0, 0);
945         }
946         /* class_import_put will get rid of the additional connections */
947 out:
948         RETURN(rc);
949 }
950 /* end COMPAT_146 */
951
952 int ll_fill_super(struct super_block *sb)
953 {
954         struct lustre_profile *lprof;
955         struct lustre_sb_info *lsi = s2lsi(sb);
956         struct ll_sb_info *sbi;
957         char  *dt = NULL, *md = NULL;
958         char  *profilenm = get_profile_name(sb);
959         struct config_llog_instance cfg;
960         char   ll_instance[sizeof(sb) * 2 + 1];
961         int    err;
962         ENTRY;
963
964         CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb);
965
966         cfs_module_get();
967
968         /* client additional sb info */
969         lsi->lsi_llsbi = sbi = ll_init_sbi();
970         if (!sbi) {
971                 cfs_module_put();
972                 RETURN(-ENOMEM);
973         }
974
975         err = ll_options(lsi->lsi_lmd->lmd_opts, &sbi->ll_flags);
976         if (err) 
977                 GOTO(out_free, err);
978
979         /* Generate a string unique to this super, in case some joker tries
980            to mount the same fs at two mount points.
981            Use the address of the super itself.*/
982         sprintf(ll_instance, "%p", sb);
983         cfg.cfg_instance = ll_instance;
984         cfg.cfg_uuid = lsi->lsi_llsbi->ll_sb_uuid;
985         cfg.cfg_last_idx = 0;
986
987         /* set up client obds */
988         err = lustre_process_log(sb, profilenm, &cfg);
989         /* COMPAT_146 */
990         if (err < 0) {
991                 char *oldname;
992                 int rc, oldnamelen;
993                 oldnamelen = strlen(profilenm) + 1;
994                 /* Temp storage for 1.4.6 profile name */
995                 OBD_ALLOC(oldname, oldnamelen);
996                 if (oldname) {
997                         memcpy(oldname, profilenm, oldnamelen); 
998                         rc = old_lustre_process_log(sb, oldname, &cfg);
999                         if (rc >= 0) {
1000                                 /* That worked - update the profile name 
1001                                    permanently */
1002                                 err = rc;
1003                                 OBD_FREE(lsi->lsi_lmd->lmd_profile, 
1004                                          strlen(lsi->lsi_lmd->lmd_profile) + 1);
1005                                 OBD_ALLOC(lsi->lsi_lmd->lmd_profile, 
1006                                          strlen(oldname) + 1);
1007                                 if (!lsi->lsi_lmd->lmd_profile) {
1008                                         OBD_FREE(oldname, oldnamelen);
1009                                         GOTO(out_free, err = -ENOMEM);
1010                                 }
1011                                 memcpy(lsi->lsi_lmd->lmd_profile, oldname,
1012                                        strlen(oldname) + 1); 
1013                                 profilenm = get_profile_name(sb);
1014                         }
1015                         OBD_FREE(oldname, oldnamelen);
1016                 }
1017         }
1018         /* end COMPAT_146 */
1019         if (err < 0) {
1020                 CERROR("Unable to process log: %d\n", err);
1021                 GOTO(out_free, err);
1022         }
1023
1024         lprof = class_get_profile(profilenm);
1025         if (lprof == NULL) {
1026                 LCONSOLE_ERROR("The client profile '%s' could not be read "
1027                                "from the MGS.  Does that filesystem exist?\n",
1028                                profilenm);
1029                 GOTO(out_free, err = -EINVAL);
1030         }
1031         CDEBUG(D_CONFIG, "Found profile %s: mdc=%s osc=%s\n", profilenm,
1032                lprof->lp_md, lprof->lp_dt);
1033
1034         OBD_ALLOC(dt, strlen(lprof->lp_dt) +
1035                   strlen(ll_instance) + 2);
1036         if (!dt)
1037                 GOTO(out_free, err = -ENOMEM);
1038         sprintf(dt, "%s-%s", lprof->lp_dt, ll_instance);
1039
1040         OBD_ALLOC(md, strlen(lprof->lp_md) +
1041                   strlen(ll_instance) + 2);
1042         if (!md)
1043                 GOTO(out_free, err = -ENOMEM);
1044         sprintf(md, "%s-%s", lprof->lp_md, ll_instance);
1045
1046         /* connections, registrations, sb setup */
1047         err = client_common_fill_super(sb, md, dt,
1048                                        lsi->lsi_lmd->lmd_pag,
1049                                        lsi->lsi_lmd->lmd_nllu,
1050                                        lsi->lsi_lmd->lmd_nllg);
1051
1052 out_free:
1053         if (md)
1054                 OBD_FREE(md, strlen(md) + 1);
1055         if (dt)
1056                 OBD_FREE(dt, strlen(dt) + 1);
1057         if (err) 
1058                 ll_put_super(sb);
1059         RETURN(err);
1060 } /* ll_fill_super */
1061
1062
1063 void ll_put_super(struct super_block *sb)
1064 {
1065         struct config_llog_instance cfg;
1066         char   ll_instance[sizeof(sb) * 2 + 1];
1067         struct obd_device *obd;
1068         struct lustre_sb_info *lsi = s2lsi(sb);
1069         struct ll_sb_info *sbi = ll_s2sbi(sb);
1070         char *profilenm = get_profile_name(sb);
1071         int force = 1, next;
1072         ENTRY;
1073
1074         CDEBUG(D_VFSTRACE, "VFS Op: sb %p - %s\n", sb, profilenm);
1075
1076         sprintf(ll_instance, "%p", sb);
1077         cfg.cfg_instance = ll_instance;
1078         lustre_end_log(sb, NULL, &cfg);
1079         
1080         if (sbi->ll_md_exp) {
1081                 obd = class_exp2obd(sbi->ll_md_exp);
1082                 if (obd) 
1083                         force = obd->obd_no_recov;
1084         }
1085         
1086         /* We need to set force before the lov_disconnect in 
1087            lustre_common_put_super, since l_d cleans up osc's as well. */
1088         if (force) {
1089                 next = 0;
1090                 while ((obd = class_devices_in_group(&sbi->ll_sb_uuid,
1091                                                      &next)) != NULL) {
1092                         obd->obd_force = force;
1093                 }
1094         }                       
1095
1096         if (sbi->ll_lcq) {
1097                 /* Only if client_common_fill_super succeeded */
1098                 client_common_put_super(sb);
1099         }
1100         next = 0;
1101         while ((obd = class_devices_in_group(&sbi->ll_sb_uuid, &next)) !=NULL) {
1102                 class_manual_cleanup(obd);
1103         }
1104
1105         if (profilenm)
1106                 class_del_profile(profilenm);
1107
1108         ll_free_sbi(sb);
1109         lsi->lsi_llsbi = NULL;
1110
1111         lustre_common_put_super(sb);
1112
1113         LCONSOLE_WARN("client %s umount complete\n", ll_instance);
1114         
1115         cfs_module_put();
1116
1117         EXIT;
1118 } /* client_put_super */
1119
1120 #ifdef HAVE_REGISTER_CACHE
1121 #include <linux/cache_def.h>
1122 #ifdef HAVE_CACHE_RETURN_INT
1123 static int
1124 #else
1125 static void
1126 #endif
1127 ll_shrink_cache(int priority, unsigned int gfp_mask)
1128 {
1129         struct ll_sb_info *sbi;
1130         int count = 0;
1131
1132         list_for_each_entry(sbi, &ll_super_blocks, ll_list)
1133                 count += llap_shrink_cache(sbi, priority);
1134
1135 #ifdef HAVE_CACHE_RETURN_INT
1136         return count;
1137 #endif
1138 }
1139
1140 struct cache_definition ll_cache_definition = {
1141         .name = "llap_cache",
1142         .shrink = ll_shrink_cache
1143 };
1144 #endif /* HAVE_REGISTER_CACHE */
1145
1146 struct inode *ll_inode_from_lock(struct ldlm_lock *lock)
1147 {
1148         struct inode *inode = NULL;
1149         /* NOTE: we depend on atomic igrab() -bzzz */
1150         lock_res_and_lock(lock);
1151         if (lock->l_ast_data) {
1152                 struct ll_inode_info *lli = ll_i2info(lock->l_ast_data);
1153                 if (lli->lli_inode_magic == LLI_INODE_MAGIC) {
1154                         inode = igrab(lock->l_ast_data);
1155                 } else {
1156                         inode = lock->l_ast_data;
1157                         ldlm_lock_debug(NULL, inode->i_state & I_FREEING ?
1158                                                 D_INFO : D_WARNING,
1159                                         lock, __FILE__, __func__, __LINE__,
1160                                         "l_ast_data %p is bogus: magic %08x",
1161                                         lock->l_ast_data, lli->lli_inode_magic);
1162                         inode = NULL;
1163                 }
1164         }
1165         unlock_res_and_lock(lock);
1166         return inode;
1167 }
1168
1169 static int null_if_equal(struct ldlm_lock *lock, void *data)
1170 {
1171         if (data == lock->l_ast_data) {
1172                 lock->l_ast_data = NULL;
1173
1174                 if (lock->l_req_mode != lock->l_granted_mode)
1175                         LDLM_ERROR(lock,"clearing inode with ungranted lock");
1176         }
1177
1178         return LDLM_ITER_CONTINUE;
1179 }
1180
1181 void ll_clear_inode(struct inode *inode)
1182 {
1183         struct ll_inode_info *lli = ll_i2info(inode);
1184         struct ll_sb_info *sbi = ll_i2sbi(inode);
1185         ENTRY;
1186
1187         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
1188                inode->i_generation, inode);
1189
1190         ll_i2info(inode)->lli_flags &= ~LLIF_MDS_SIZE_LOCK;
1191         md_change_cbdata(sbi->ll_md_exp, ll_inode2fid(inode),
1192                          null_if_equal, inode);
1193
1194         LASSERT(!lli->lli_open_fd_write_count);
1195         LASSERT(!lli->lli_open_fd_read_count);
1196         LASSERT(!lli->lli_open_fd_exec_count);
1197
1198         if (lli->lli_mds_write_och)
1199                 ll_md_real_close(inode, FMODE_WRITE);
1200         if (lli->lli_mds_exec_och) {
1201                 if (!FMODE_EXEC)
1202                         CERROR("No FMODE exec, bug exec och is present for "
1203                                "inode %ld\n", inode->i_ino);
1204                 ll_md_real_close(inode, FMODE_EXEC);
1205         }
1206         if (lli->lli_mds_read_och)
1207                 ll_md_real_close(inode, FMODE_READ);
1208
1209         if (lli->lli_smd) {
1210                 obd_change_cbdata(sbi->ll_dt_exp, lli->lli_smd,
1211                                   null_if_equal, inode);
1212
1213                 obd_free_memmd(sbi->ll_dt_exp, &lli->lli_smd);
1214                 lli->lli_smd = NULL;
1215         }
1216
1217         if (lli->lli_symlink_name) {
1218                 OBD_FREE(lli->lli_symlink_name,
1219                          strlen(lli->lli_symlink_name) + 1);
1220                 lli->lli_symlink_name = NULL;
1221         }
1222
1223         if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
1224                 LASSERT(lli->lli_posix_acl == NULL);
1225                 if (lli->lli_remote_perms) {
1226                         free_rmtperm_hash(lli->lli_remote_perms);
1227                         lli->lli_remote_perms = NULL;
1228                 }
1229         }
1230 #ifdef CONFIG_FS_POSIX_ACL
1231         else if (lli->lli_posix_acl) {
1232                 LASSERT(atomic_read(&lli->lli_posix_acl->a_refcount) == 1);
1233                 LASSERT(lli->lli_remote_perms == NULL);
1234                 posix_acl_release(lli->lli_posix_acl);
1235                 lli->lli_posix_acl = NULL;
1236         }
1237 #endif
1238         lli->lli_inode_magic = LLI_INODE_DEAD;
1239
1240         spin_lock(&sbi->ll_deathrow_lock);
1241         list_del_init(&lli->lli_dead_list);
1242         spin_unlock(&sbi->ll_deathrow_lock);
1243
1244         ll_clear_inode_capas(inode);
1245
1246         EXIT;
1247 }
1248
1249 int ll_md_setattr(struct inode *inode, struct md_op_data *op_data)
1250 {
1251         struct lustre_md md;
1252         struct ll_sb_info *sbi = ll_i2sbi(inode);
1253         struct ptlrpc_request *request = NULL;
1254         int rc;
1255         ENTRY;
1256         
1257         op_data = ll_prep_md_op_data(op_data, inode, NULL, NULL, 0, 0);
1258         rc = md_setattr(sbi->ll_md_exp, op_data, NULL, 0, NULL, 0, &request);
1259         if (rc) {
1260                 ptlrpc_req_finished(request);
1261                 if (rc == -ENOENT) {
1262                         inode->i_nlink = 0;
1263                         /* Unlinked special device node? Or just a race?
1264                          * Pretend we done everything. */
1265                         if (!S_ISREG(inode->i_mode) &&
1266                             !S_ISDIR(inode->i_mode))
1267                                 rc = inode_setattr(inode, &op_data->attr);
1268                 } else if (rc != -EPERM && rc != -EACCES) {
1269                         CERROR("md_setattr fails: rc = %d\n", rc);
1270                 }
1271                 RETURN(rc);
1272         }
1273
1274         rc = md_get_lustre_md(sbi->ll_md_exp, request, REPLY_REC_OFF,
1275                               sbi->ll_dt_exp, sbi->ll_md_exp, &md);
1276         if (rc) {
1277                 ptlrpc_req_finished(request);
1278                 RETURN(rc);
1279         }
1280
1281         /* We call inode_setattr to adjust timestamps.
1282          * If there is at least some data in file, we cleared ATTR_SIZE
1283          * above to avoid invoking vmtruncate, otherwise it is important
1284          * to call vmtruncate in inode_setattr to update inode->i_size
1285          * (bug 6196) */
1286         rc = inode_setattr(inode, &op_data->attr);
1287
1288         /* Extract epoch data if obtained. */
1289         memcpy(&op_data->handle, &md.body->handle, sizeof(op_data->handle));
1290         op_data->ioepoch = md.body->ioepoch;
1291         
1292         ll_update_inode(inode, &md);
1293         ptlrpc_req_finished(request);
1294
1295         RETURN(rc);
1296 }
1297
1298 /* Close IO epoch and send Size-on-MDS attribute update. */
1299 static int ll_setattr_done_writing(struct inode *inode,
1300                                    struct md_op_data *op_data)
1301 {
1302         struct ll_inode_info *lli = ll_i2info(inode);
1303         int rc = 0;
1304         ENTRY;
1305         
1306         LASSERT(op_data != NULL);
1307         if (!S_ISREG(inode->i_mode))
1308                 RETURN(0);
1309
1310         CDEBUG(D_INODE, "Epoch "LPU64" closed on "DFID" for truncate\n",
1311                op_data->ioepoch, PFID(&lli->lli_fid));
1312
1313         op_data->flags = MF_EPOCH_CLOSE | MF_SOM_CHANGE;
1314         /* XXX: pass och here for the recovery purpose. */
1315         rc = md_done_writing(ll_i2sbi(inode)->ll_md_exp, op_data, NULL);
1316         if (rc == -EAGAIN) {
1317                 /* MDS has instructed us to obtain Size-on-MDS attribute
1318                  * from OSTs and send setattr to back to MDS. */
1319                 rc = ll_sizeonmds_update(inode, &op_data->handle);
1320         } else if (rc) {
1321                 CERROR("inode %lu mdc truncate failed: rc = %d\n",
1322                        inode->i_ino, rc);
1323         }
1324         RETURN(rc);
1325 }
1326
1327 /* If this inode has objects allocated to it (lsm != NULL), then the OST
1328  * object(s) determine the file size and mtime.  Otherwise, the MDS will
1329  * keep these values until such a time that objects are allocated for it.
1330  * We do the MDS operations first, as it is checking permissions for us.
1331  * We don't to the MDS RPC if there is nothing that we want to store there,
1332  * otherwise there is no harm in updating mtime/atime on the MDS if we are
1333  * going to do an RPC anyways.
1334  *
1335  * If we are doing a truncate, we will send the mtime and ctime updates
1336  * to the OST with the punch RPC, otherwise we do an explicit setattr RPC.
1337  * I don't believe it is possible to get e.g. ATTR_MTIME_SET and ATTR_SIZE
1338  * at the same time.
1339  */
1340 int ll_setattr_raw(struct inode *inode, struct iattr *attr)
1341 {
1342         struct ll_inode_info *lli = ll_i2info(inode);
1343         struct lov_stripe_md *lsm = lli->lli_smd;
1344         struct ll_sb_info *sbi = ll_i2sbi(inode);
1345         struct md_op_data *op_data = NULL;
1346         int ia_valid = attr->ia_valid;
1347         int rc = 0, rc1 = 0;
1348         ENTRY;
1349
1350         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu valid %x\n", inode->i_ino,
1351                attr->ia_valid);
1352         lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_SETATTR);
1353
1354         if (ia_valid & ATTR_SIZE) {
1355                 if (attr->ia_size > ll_file_maxbytes(inode)) {
1356                         CDEBUG(D_INODE, "file too large %llu > "LPU64"\n",
1357                                attr->ia_size, ll_file_maxbytes(inode));
1358                         RETURN(-EFBIG);
1359                 }
1360
1361                 attr->ia_valid |= ATTR_MTIME | ATTR_CTIME;
1362         }
1363
1364         /* POSIX: check before ATTR_*TIME_SET set (from inode_change_ok) */
1365         if (ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET)) {
1366                 if (current->fsuid != inode->i_uid && !capable(CAP_FOWNER))
1367                         RETURN(-EPERM);
1368         }
1369
1370         /* We mark all of the fields "set" so MDS/OST does not re-set them */
1371         if (attr->ia_valid & ATTR_CTIME) {
1372                 attr->ia_ctime = CURRENT_TIME;
1373                 attr->ia_valid |= ATTR_CTIME_SET;
1374         }
1375         if (!(ia_valid & ATTR_ATIME_SET) && (attr->ia_valid & ATTR_ATIME)) {
1376                 attr->ia_atime = CURRENT_TIME;
1377                 attr->ia_valid |= ATTR_ATIME_SET;
1378         }
1379         if (!(ia_valid & ATTR_MTIME_SET) && (attr->ia_valid & ATTR_MTIME)) {
1380                 attr->ia_mtime = CURRENT_TIME;
1381                 attr->ia_valid |= ATTR_MTIME_SET;
1382         }
1383         if ((attr->ia_valid & ATTR_CTIME) && !(attr->ia_valid & ATTR_MTIME)) {
1384                 /* To avoid stale mtime on mds, obtain it from ost and send 
1385                    to mds. */
1386                 rc = ll_glimpse_size(inode, 0);
1387                 if (rc) 
1388                         RETURN(rc);
1389                 
1390                 attr->ia_valid |= ATTR_MTIME_SET | ATTR_MTIME;
1391                 attr->ia_mtime = inode->i_mtime;
1392         }
1393
1394         if (attr->ia_valid & (ATTR_MTIME | ATTR_CTIME))
1395                 CDEBUG(D_INODE, "setting mtime %lu, ctime %lu, now = %lu\n",
1396                        LTIME_S(attr->ia_mtime), LTIME_S(attr->ia_ctime),
1397                        CURRENT_SECONDS);
1398
1399         /* NB: ATTR_SIZE will only be set after this point if the size
1400          * resides on the MDS, ie, this file has no objects. */
1401         if (lsm)
1402                 attr->ia_valid &= ~ATTR_SIZE;
1403
1404         /* If only OST attributes being set on objects, don't do MDS RPC.
1405          * In that case, we need to check permissions and update the local
1406          * inode ourselves so we can call obdo_from_inode() always. */
1407         if (ia_valid & (lsm ? ~(ATTR_FROM_OPEN | ATTR_RAW) : ~0)) {
1408                 OBD_ALLOC_PTR(op_data);
1409                 if (op_data == NULL)
1410                         RETURN(-ENOMEM);
1411
1412                 memcpy(&op_data->attr, attr, sizeof(*attr));
1413
1414                 /* Open epoch for truncate. */
1415                 if (ia_valid & ATTR_SIZE)
1416                         op_data->flags = MF_EPOCH_OPEN;
1417                 rc = ll_md_setattr(inode, op_data);
1418                 if (rc)
1419                         GOTO(out, rc);
1420
1421                 CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID" for truncate\n",
1422                        op_data->ioepoch, PFID(&lli->lli_fid));
1423
1424                 if (!lsm || !S_ISREG(inode->i_mode)) {
1425                         CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n");
1426                         GOTO(out, rc = 0);
1427                 }
1428         } else {
1429                 /* The OST doesn't check permissions, but the alternative is
1430                  * a gratuitous RPC to the MDS.  We already rely on the client
1431                  * to do read/write/truncate permission checks, so is mtime OK?
1432                  */
1433                 if (ia_valid & (ATTR_MTIME | ATTR_ATIME)) {
1434                         /* from sys_utime() */
1435                         if (!(ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET))) {
1436                                 if (current->fsuid != inode->i_uid &&
1437                                     (rc=ll_permission(inode,MAY_WRITE,NULL))!=0)
1438                                         RETURN(rc);
1439                         } else {
1440                                 /* from inode_change_ok() */
1441                                 if (current->fsuid != inode->i_uid &&
1442                                     !capable(CAP_FOWNER))
1443                                         RETURN(-EPERM);
1444                         }
1445                 }
1446
1447                 /* Won't invoke vmtruncate, as we already cleared ATTR_SIZE */
1448                 rc = inode_setattr(inode, attr);
1449         }
1450
1451         /* We really need to get our PW lock before we change inode->i_size.
1452          * If we don't we can race with other i_size updaters on our node, like
1453          * ll_file_read.  We can also race with i_size propogation to other
1454          * nodes through dirtying and writeback of final cached pages.  This
1455          * last one is especially bad for racing o_append users on other
1456          * nodes. */
1457         if (ia_valid & ATTR_SIZE) {
1458                 ldlm_policy_data_t policy = { .l_extent = {attr->ia_size,
1459                                                            OBD_OBJECT_EOF } };
1460                 struct lustre_handle lockh = { 0 };
1461                 int err, ast_flags = 0;
1462                 /* XXX when we fix the AST intents to pass the discard-range
1463                  * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA
1464                  * XXX here. */
1465                 if (attr->ia_size == 0)
1466                         ast_flags = LDLM_AST_DISCARD_DATA;
1467
1468                 UNLOCK_INODE_MUTEX(inode);
1469                 UP_WRITE_I_ALLOC_SEM(inode);
1470                 rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy, &lockh,
1471                                     ast_flags);
1472 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1473                 DOWN_WRITE_I_ALLOC_SEM(inode);
1474                 LOCK_INODE_MUTEX(inode);
1475 #else
1476                 LOCK_INODE_MUTEX(inode);
1477                 DOWN_WRITE_I_ALLOC_SEM(inode);
1478 #endif
1479                 if (rc != 0)
1480                         GOTO(out, rc);
1481
1482                 /* Only ll_inode_size_lock is taken at this level.
1483                  * lov_stripe_lock() is grabbed by ll_truncate() only over
1484                  * call to obd_adjust_kms().  If vmtruncate returns 0, then
1485                  * ll_truncate dropped ll_inode_size_lock() */
1486                 ll_inode_size_lock(inode, 0);
1487                 rc = vmtruncate(inode, attr->ia_size);
1488                 if (rc != 0) {
1489                         LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
1490                         ll_inode_size_unlock(inode, 0);
1491                 }
1492
1493                 err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh);
1494                 if (err) {
1495                         CERROR("ll_extent_unlock failed: %d\n", err);
1496                         if (!rc)
1497                                 rc = err;
1498                 }
1499         } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) {
1500                 obd_flag flags;
1501                 struct obd_info oinfo = { { { 0 } } };
1502                 struct obdo *oa = obdo_alloc();
1503
1504                 CDEBUG(D_INODE, "set mtime on OST inode %lu to %lu\n",
1505                        inode->i_ino, LTIME_S(attr->ia_mtime));
1506
1507                 if (oa) {
1508                         oa->o_id = lsm->lsm_object_id;
1509                         oa->o_gr = lsm->lsm_object_gr;
1510                         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1511
1512                         flags = OBD_MD_FLTYPE | OBD_MD_FLATIME |
1513                                 OBD_MD_FLMTIME | OBD_MD_FLCTIME |
1514                                 OBD_MD_FLFID | OBD_MD_FLGENER | 
1515                                 OBD_MD_FLGROUP;
1516
1517                         obdo_from_inode(oa, inode, flags);
1518
1519                         oinfo.oi_oa = oa;
1520                         oinfo.oi_md = lsm;
1521                         oinfo.oi_capa = ll_mdscapa_get(inode);
1522
1523                         /* XXX: this looks unnecessary now. */
1524                         rc = obd_setattr_rqset(sbi->ll_dt_exp, &oinfo, NULL);
1525                         capa_put(oinfo.oi_capa);
1526                         if (rc)
1527                                 CERROR("obd_setattr_async fails: rc=%d\n", rc);
1528                         obdo_free(oa);
1529                 } else {
1530                         rc = -ENOMEM;
1531                 }
1532         }
1533         EXIT;
1534 out:
1535         if (op_data) {
1536                 if (op_data->ioepoch) {
1537                         rc1 = ll_setattr_done_writing(inode, op_data);
1538                 }
1539                 ll_finish_md_op_data(op_data);
1540         }
1541         return rc ? rc : rc1;
1542 }
1543
1544 int ll_setattr(struct dentry *de, struct iattr *attr)
1545 {
1546         if ((attr->ia_valid & (ATTR_CTIME|ATTR_SIZE|ATTR_MODE)) ==
1547             (ATTR_CTIME|ATTR_SIZE|ATTR_MODE))
1548                 attr->ia_valid |= MDS_OPEN_OWNEROVERRIDE;
1549
1550         return ll_setattr_raw(de->d_inode, attr);
1551 }
1552
1553 int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs,
1554                        __u64 max_age)
1555 {
1556         struct ll_sb_info *sbi = ll_s2sbi(sb);
1557         struct obd_statfs obd_osfs;
1558         int rc;
1559         ENTRY;
1560
1561         rc = obd_statfs(class_exp2obd(sbi->ll_md_exp), osfs, max_age);
1562         if (rc) {
1563                 CERROR("md_statfs fails: rc = %d\n", rc);
1564                 RETURN(rc);
1565         }
1566
1567         osfs->os_type = sb->s_magic;
1568
1569         CDEBUG(D_SUPER, "MDC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n",
1570                osfs->os_bavail, osfs->os_blocks, osfs->os_ffree,osfs->os_files);
1571
1572         rc = obd_statfs_rqset(class_exp2obd(sbi->ll_dt_exp),
1573                               &obd_osfs, max_age);
1574         if (rc) {
1575                 CERROR("obd_statfs fails: rc = %d\n", rc);
1576                 RETURN(rc);
1577         }
1578
1579         CDEBUG(D_SUPER, "OSC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n",
1580                obd_osfs.os_bavail, obd_osfs.os_blocks, obd_osfs.os_ffree,
1581                obd_osfs.os_files);
1582
1583         osfs->os_blocks = obd_osfs.os_blocks;
1584         osfs->os_bfree = obd_osfs.os_bfree;
1585         osfs->os_bavail = obd_osfs.os_bavail;
1586
1587         /* If we don't have as many objects free on the OST as inodes
1588          * on the MDS, we reduce the total number of inodes to
1589          * compensate, so that the "inodes in use" number is correct.
1590          */
1591         if (obd_osfs.os_ffree < osfs->os_ffree) {
1592                 osfs->os_files = (osfs->os_files - osfs->os_ffree) +
1593                         obd_osfs.os_ffree;
1594                 osfs->os_ffree = obd_osfs.os_ffree;
1595         }
1596
1597         RETURN(rc);
1598 }
1599
1600 int ll_statfs(struct super_block *sb, struct kstatfs *sfs)
1601 {
1602         struct obd_statfs osfs;
1603         int rc;
1604
1605         CDEBUG(D_VFSTRACE, "VFS Op:\n");
1606         lprocfs_counter_incr(ll_s2sbi(sb)->ll_stats, LPROC_LL_STAFS);
1607
1608         /* For now we will always get up-to-date statfs values, but in the
1609          * future we may allow some amount of caching on the client (e.g.
1610          * from QOS or lprocfs updates). */
1611         rc = ll_statfs_internal(sb, &osfs, cfs_time_current_64() - 1);
1612         if (rc)
1613                 return rc;
1614
1615         statfs_unpack(sfs, &osfs);
1616
1617         /* We need to downshift for all 32-bit kernels, because we can't
1618          * tell if the kernel is being called via sys_statfs64() or not.
1619          * Stop before overflowing f_bsize - in which case it is better
1620          * to just risk EOVERFLOW if caller is using old sys_statfs(). */
1621         if (sizeof(long) < 8) {
1622                 while (osfs.os_blocks > ~0UL && sfs->f_bsize < 0x40000000) {
1623                         sfs->f_bsize <<= 1;
1624
1625                         osfs.os_blocks >>= 1;
1626                         osfs.os_bfree >>= 1;
1627                         osfs.os_bavail >>= 1;
1628                 }
1629         }
1630
1631         sfs->f_blocks = osfs.os_blocks;
1632         sfs->f_bfree = osfs.os_bfree;
1633         sfs->f_bavail = osfs.os_bavail;
1634
1635         return 0;
1636 }
1637
1638 void ll_inode_size_lock(struct inode *inode, int lock_lsm)
1639 {
1640         struct ll_inode_info *lli;
1641         struct lov_stripe_md *lsm;
1642
1643         lli = ll_i2info(inode);
1644         LASSERT(lli->lli_size_sem_owner != current);
1645         down(&lli->lli_size_sem);
1646         LASSERT(lli->lli_size_sem_owner == NULL);
1647         lli->lli_size_sem_owner = current;
1648         lsm = lli->lli_smd;
1649         LASSERTF(lsm != NULL || lock_lsm == 0, "lsm %p, lock_lsm %d\n",
1650                  lsm, lock_lsm);
1651         if (lock_lsm)
1652                 lov_stripe_lock(lsm);
1653 }
1654
1655 void ll_inode_size_unlock(struct inode *inode, int unlock_lsm)
1656 {
1657         struct ll_inode_info *lli;
1658         struct lov_stripe_md *lsm;
1659
1660         lli = ll_i2info(inode);
1661         lsm = lli->lli_smd;
1662         LASSERTF(lsm != NULL || unlock_lsm == 0, "lsm %p, lock_lsm %d\n",
1663                  lsm, unlock_lsm);
1664         if (unlock_lsm)
1665                 lov_stripe_unlock(lsm);
1666         LASSERT(lli->lli_size_sem_owner == current);
1667         lli->lli_size_sem_owner = NULL;
1668         up(&lli->lli_size_sem);
1669 }
1670
1671 static void ll_replace_lsm(struct inode *inode, struct lov_stripe_md *lsm)
1672 {
1673         struct ll_inode_info *lli = ll_i2info(inode);
1674
1675         dump_lsm(D_INODE, lsm);
1676         dump_lsm(D_INODE, lli->lli_smd);
1677         LASSERTF(lsm->lsm_magic == LOV_MAGIC_JOIN,
1678                  "lsm must be joined lsm %p\n", lsm);
1679         obd_free_memmd(ll_i2dtexp(inode), &lli->lli_smd);
1680         CDEBUG(D_INODE, "replace lsm %p to lli_smd %p for inode %lu%u(%p)\n",
1681                lsm, lli->lli_smd, inode->i_ino, inode->i_generation, inode);
1682         lli->lli_smd = lsm;
1683         lli->lli_maxbytes = lsm->lsm_maxbytes;
1684         if (lli->lli_maxbytes > PAGE_CACHE_MAXBYTES)
1685                 lli->lli_maxbytes = PAGE_CACHE_MAXBYTES;
1686 }
1687
1688 void ll_update_inode(struct inode *inode, struct lustre_md *md)
1689 {
1690         struct ll_inode_info *lli = ll_i2info(inode);
1691         struct mdt_body *body = md->body;
1692         struct lov_stripe_md *lsm = md->lsm;
1693         struct ll_sb_info *sbi = ll_i2sbi(inode);
1694
1695         LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0));
1696         if (lsm != NULL) {
1697                 if (lli->lli_smd == NULL) {
1698                         if (lsm->lsm_magic != LOV_MAGIC &&
1699                             lsm->lsm_magic != LOV_MAGIC_JOIN) {
1700                                 dump_lsm(D_ERROR, lsm);
1701                                 LBUG();
1702                         }
1703                         CDEBUG(D_INODE, "adding lsm %p to inode %lu/%u(%p)\n",
1704                                lsm, inode->i_ino, inode->i_generation, inode);
1705                         /* ll_inode_size_lock() requires it is only called
1706                          * with lli_smd != NULL or lock_lsm == 0 or we can
1707                          * race between lock/unlock.  bug 9547 */
1708                         lli->lli_smd = lsm;
1709                         lli->lli_maxbytes = lsm->lsm_maxbytes;
1710                         if (lli->lli_maxbytes > PAGE_CACHE_MAXBYTES)
1711                                 lli->lli_maxbytes = PAGE_CACHE_MAXBYTES;
1712                 } else {
1713                         if (lli->lli_smd->lsm_magic == lsm->lsm_magic &&
1714                              lli->lli_smd->lsm_stripe_count ==
1715                                         lsm->lsm_stripe_count) {
1716                                 if (lov_stripe_md_cmp(lli->lli_smd, lsm)) {
1717                                         CERROR("lsm mismatch for inode %ld\n",
1718                                                 inode->i_ino);
1719                                         CERROR("lli_smd:\n");
1720                                         dump_lsm(D_ERROR, lli->lli_smd);
1721                                         CERROR("lsm:\n");
1722                                         dump_lsm(D_ERROR, lsm);
1723                                         LBUG();
1724                                 }
1725                         } else
1726                                 ll_replace_lsm(inode, lsm);
1727                 }
1728                 if (lli->lli_smd != lsm)
1729                         obd_free_memmd(ll_i2dtexp(inode), &lsm);
1730         }
1731
1732         if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
1733                 if (body->valid & OBD_MD_FLRMTPERM)
1734                         ll_update_remote_perm(inode, md->remote_perm);
1735         }
1736 #ifdef CONFIG_FS_POSIX_ACL
1737         else if (body->valid & OBD_MD_FLACL) {
1738                 spin_lock(&lli->lli_lock);
1739                 if (lli->lli_posix_acl)
1740                         posix_acl_release(lli->lli_posix_acl);
1741                 lli->lli_posix_acl = md->posix_acl;
1742                 spin_unlock(&lli->lli_lock);
1743         }
1744 #endif
1745         if (body->valid & OBD_MD_FLATIME &&
1746             body->atime > LTIME_S(inode->i_atime))
1747                 LTIME_S(inode->i_atime) = body->atime;
1748         
1749         /* mtime is always updated with ctime, but can be set in past.
1750            As write and utime(2) may happen within 1 second, and utime's
1751            mtime has a priority over write's one, so take mtime from mds 
1752            for the same ctimes. */
1753         if (body->valid & OBD_MD_FLCTIME &&
1754             body->ctime >= LTIME_S(inode->i_ctime)) {
1755                 LTIME_S(inode->i_ctime) = body->ctime;
1756                 if (body->valid & OBD_MD_FLMTIME) {
1757                         CDEBUG(D_INODE, "setting ino %lu mtime "
1758                                "from %lu to "LPU64"\n", inode->i_ino, 
1759                                LTIME_S(inode->i_mtime), body->mtime);
1760                         LTIME_S(inode->i_mtime) = body->mtime;
1761                 }
1762         }
1763         if (body->valid & OBD_MD_FLMODE)
1764                 inode->i_mode = (inode->i_mode & S_IFMT)|(body->mode & ~S_IFMT);
1765         if (body->valid & OBD_MD_FLTYPE)
1766                 inode->i_mode = (inode->i_mode & ~S_IFMT)|(body->mode & S_IFMT);
1767         if (S_ISREG(inode->i_mode))
1768                 inode->i_blksize = min(2UL*PTLRPC_MAX_BRW_SIZE, LL_MAX_BLKSIZE);
1769         else
1770                 inode->i_blksize = inode->i_sb->s_blocksize;
1771         if (body->valid & OBD_MD_FLUID)
1772                 inode->i_uid = body->uid;
1773         if (body->valid & OBD_MD_FLGID)
1774                 inode->i_gid = body->gid;
1775         if (body->valid & OBD_MD_FLFLAGS)
1776                 inode->i_flags = ll_ext_to_inode_flags(body->flags);
1777         if (body->valid & OBD_MD_FLNLINK)
1778                 inode->i_nlink = body->nlink;
1779         if (body->valid & OBD_MD_FLRDEV)
1780 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1781                 inode->i_rdev = body->rdev;
1782 #else
1783                 inode->i_rdev = old_decode_dev(body->rdev);
1784 #endif
1785         if (body->valid & OBD_MD_FLSIZE) {
1786                 inode->i_size = body->size;
1787
1788                 if (body->valid & OBD_MD_FLBLOCKS)
1789                         inode->i_blocks = body->blocks;
1790
1791                 lli->lli_flags |= LLIF_MDS_SIZE_LOCK;
1792         }
1793
1794         if (body->valid & OBD_MD_FLID) {
1795                 /* FID shouldn't be changed! */
1796                 if (fid_is_sane(&lli->lli_fid)) {
1797                         LASSERTF(lu_fid_eq(&lli->lli_fid, &body->fid1),
1798                                  "Trying to change FID "DFID
1799                                  " to the "DFID", inode %lu/%u(%p)\n",
1800                                  PFID(&lli->lli_fid), PFID(&body->fid1),
1801                                  inode->i_ino, inode->i_generation, inode);
1802                 } else 
1803                         lli->lli_fid = body->fid1;
1804         }
1805
1806         LASSERT(fid_seq(&lli->lli_fid) != 0);
1807
1808         if (body->valid & OBD_MD_FLMDSCAPA) {
1809                 LASSERT(md->mds_capa);
1810                 ll_add_capa(inode, md->mds_capa);
1811         }
1812         if (body->valid & OBD_MD_FLOSSCAPA) {
1813                 LASSERT(md->oss_capa);
1814                 ll_add_capa(inode, md->oss_capa);
1815         }
1816 }
1817
1818 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
1819 static struct backing_dev_info ll_backing_dev_info = {
1820         .ra_pages       = 0,    /* No readahead */
1821 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,12))
1822         .capabilities   = 0,    /* Does contribute to dirty memory */
1823 #else
1824         .memory_backed  = 0,    /* Does contribute to dirty memory */
1825 #endif
1826 };
1827 #endif
1828
1829 void ll_read_inode2(struct inode *inode, void *opaque)
1830 {
1831         struct lustre_md *md = opaque;
1832         struct ll_inode_info *lli = ll_i2info(inode);
1833         ENTRY;
1834
1835         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n",
1836                inode->i_ino, inode->i_generation, inode);
1837
1838         ll_lli_init(lli);
1839
1840         LASSERT(!lli->lli_smd);
1841
1842         /* Core attributes from the MDS first.  This is a new inode, and
1843          * the VFS doesn't zero times in the core inode so we have to do
1844          * it ourselves.  They will be overwritten by either MDS or OST
1845          * attributes - we just need to make sure they aren't newer. */
1846         LTIME_S(inode->i_mtime) = 0;
1847         LTIME_S(inode->i_atime) = 0;
1848         LTIME_S(inode->i_ctime) = 0;
1849         inode->i_rdev = 0;
1850         ll_update_inode(inode, md);
1851
1852         /* OIDEBUG(inode); */
1853
1854         if (S_ISREG(inode->i_mode)) {
1855                 struct ll_sb_info *sbi = ll_i2sbi(inode);
1856                 inode->i_op = &ll_file_inode_operations;
1857                 inode->i_fop = sbi->ll_fop;
1858                 inode->i_mapping->a_ops = &ll_aops;
1859                 EXIT;
1860         } else if (S_ISDIR(inode->i_mode)) {
1861                 inode->i_op = &ll_dir_inode_operations;
1862                 inode->i_fop = &ll_dir_operations;
1863                 inode->i_mapping->a_ops = &ll_dir_aops;
1864                 EXIT;
1865         } else if (S_ISLNK(inode->i_mode)) {
1866                 inode->i_op = &ll_fast_symlink_inode_operations;
1867                 EXIT;
1868         } else {
1869                 inode->i_op = &ll_special_inode_operations;
1870
1871 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1872                 init_special_inode(inode, inode->i_mode,
1873                                    kdev_t_to_nr(inode->i_rdev));
1874
1875                 /* initializing backing dev info. */
1876                 inode->i_mapping->backing_dev_info = &ll_backing_dev_info;
1877 #else
1878                 init_special_inode(inode, inode->i_mode, inode->i_rdev);
1879 #endif
1880                 EXIT;
1881         }
1882 }
1883
1884 void ll_delete_inode(struct inode *inode)
1885 {
1886         struct ll_sb_info *sbi = ll_i2sbi(inode);
1887         int rc;
1888         ENTRY;
1889
1890         rc = obd_fid_delete(sbi->ll_md_exp, ll_inode2fid(inode));
1891         if (rc) {
1892                 CERROR("fid_delete() failed, rc %d\n", rc);
1893         }
1894         clear_inode(inode);
1895
1896         EXIT;
1897 }
1898
1899 int ll_iocontrol(struct inode *inode, struct file *file,
1900                  unsigned int cmd, unsigned long arg)
1901 {
1902         struct ll_sb_info *sbi = ll_i2sbi(inode);
1903         struct ptlrpc_request *req = NULL;
1904         int rc, flags = 0;
1905         ENTRY;
1906
1907         switch(cmd) {
1908         case EXT3_IOC_GETFLAGS: {
1909                 struct mdt_body *body;
1910                 struct obd_capa *oc;
1911
1912                 oc = ll_mdscapa_get(inode);
1913                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
1914                                 OBD_MD_FLFLAGS, 0, &req);
1915                 capa_put(oc);
1916                 if (rc) {
1917                         CERROR("failure %d inode %lu\n", rc, inode->i_ino);
1918                         RETURN(-abs(rc));
1919                 }
1920
1921                 body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1922                                       sizeof(*body));
1923
1924                 /*Now the ext3 will be packed directly back to client,
1925                  *no need convert here*/
1926                 flags = body->flags;
1927
1928                 ptlrpc_req_finished (req);
1929
1930                 RETURN(put_user(flags, (int *)arg));
1931         }
1932         case EXT3_IOC_SETFLAGS: {
1933                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1934                 struct obd_info oinfo = { { { 0 } } };
1935                 struct md_op_data *op_data;
1936
1937                 if (get_user(flags, (int *)arg))
1938                         RETURN(-EFAULT);
1939
1940                 oinfo.oi_md = lsm;
1941                 oinfo.oi_oa = obdo_alloc();
1942                 if (!oinfo.oi_oa)
1943                         RETURN(-ENOMEM);
1944
1945                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0);
1946                 if (op_data == NULL)
1947                         RETURN(-ENOMEM);
1948                 
1949                 ((struct ll_iattr *)&op_data->attr)->ia_attr_flags = flags;
1950                 op_data->attr.ia_valid |= ATTR_ATTR_FLAG;
1951                 rc = md_setattr(sbi->ll_md_exp, op_data,
1952                                 NULL, 0, NULL, 0, &req);
1953                 ll_finish_md_op_data(op_data);
1954                 ptlrpc_req_finished(req);
1955                 if (rc || lsm == NULL) {
1956                         obdo_free(oinfo.oi_oa);
1957                         RETURN(rc);
1958                 }
1959
1960                 oinfo.oi_oa->o_id = lsm->lsm_object_id;
1961                 oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
1962                 oinfo.oi_oa->o_flags = flags;
1963                 oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | 
1964                                        OBD_MD_FLGROUP;
1965
1966                 obdo_from_inode(oinfo.oi_oa, inode,
1967                                 OBD_MD_FLFID | OBD_MD_FLGENER);
1968                 rc = obd_setattr_rqset(sbi->ll_dt_exp, &oinfo, NULL);
1969                 obdo_free(oinfo.oi_oa);
1970                 if (rc) {
1971                         if (rc != -EPERM && rc != -EACCES)
1972                                 CERROR("md_setattr_async fails: rc = %d\n", rc);
1973                         RETURN(rc);
1974                 }
1975
1976                 inode->i_flags = ll_ext_to_inode_flags(flags |
1977                                                        MDS_BFLAG_EXT_FLAGS);
1978                 RETURN(0);
1979         }
1980         default:
1981                 RETURN(-ENOSYS);
1982         }
1983
1984         RETURN(0);
1985 }
1986
1987 int ll_flush_ctx(struct inode *inode)
1988 {
1989         struct ll_sb_info  *sbi = ll_i2sbi(inode);
1990
1991         CDEBUG(D_SEC, "flush context for user %d\n", current->uid);
1992
1993         obd_set_info_async(sbi->ll_md_exp,
1994                            sizeof(KEY_FLUSH_CTX) - 1, KEY_FLUSH_CTX,
1995                            0, NULL, NULL);
1996         obd_set_info_async(sbi->ll_dt_exp,
1997                            sizeof(KEY_FLUSH_CTX) - 1, KEY_FLUSH_CTX,
1998                            0, NULL, NULL);
1999         return 0;
2000 }
2001
2002 /* umount -f client means force down, don't save state */
2003 void ll_umount_begin(struct super_block *sb)
2004 {
2005         struct lustre_sb_info *lsi = s2lsi(sb);
2006         struct ll_sb_info *sbi = ll_s2sbi(sb);
2007         struct obd_device *obd;
2008         struct obd_ioctl_data ioc_data = { 0 };
2009         ENTRY;
2010
2011         /* Tell the MGC we got umount -f */
2012         lsi->lsi_flags |= LSI_UMOUNT_FORCE;
2013
2014         CDEBUG(D_VFSTRACE, "VFS Op: superblock %p count %d active %d\n", sb,
2015                sb->s_count, atomic_read(&sb->s_active));
2016
2017         obd = class_exp2obd(sbi->ll_md_exp);
2018         if (obd == NULL) {
2019                 CERROR("Invalid MDC connection handle "LPX64"\n",
2020                        sbi->ll_md_exp->exp_handle.h_cookie);
2021                 EXIT;
2022                 return;
2023         }
2024         obd->obd_no_recov = 1;
2025         obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_md_exp, sizeof ioc_data,
2026                       &ioc_data, NULL);
2027
2028         obd = class_exp2obd(sbi->ll_dt_exp);
2029         if (obd == NULL) {
2030                 CERROR("Invalid LOV connection handle "LPX64"\n",
2031                        sbi->ll_dt_exp->exp_handle.h_cookie);
2032                 EXIT;
2033                 return;
2034         }
2035
2036         obd->obd_no_recov = 1;
2037         obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_dt_exp, sizeof ioc_data,
2038                       &ioc_data, NULL);
2039
2040         /* Really, we'd like to wait until there are no requests outstanding,
2041          * and then continue.  For now, we just invalidate the requests,
2042          * schedule, and hope.
2043          */
2044         schedule();
2045
2046         EXIT;
2047 }
2048
2049 int ll_remount_fs(struct super_block *sb, int *flags, char *data)
2050 {
2051         struct ll_sb_info *sbi = ll_s2sbi(sb);
2052         int err;
2053         __u32 read_only;
2054
2055         if ((*flags & MS_RDONLY) != (sb->s_flags & MS_RDONLY)) {
2056                 read_only = *flags & MS_RDONLY;
2057                 err = obd_set_info_async(sbi->ll_md_exp, strlen("read-only"),
2058                                          "read-only", sizeof(read_only),
2059                                          &read_only, NULL);
2060                 if (err) {
2061                         CERROR("Failed to change the read-only flag during "
2062                                "remount: %d\n", err);
2063                         return err;
2064                 }
2065
2066                 if (read_only)
2067                         sb->s_flags |= MS_RDONLY;
2068                 else
2069                         sb->s_flags &= ~MS_RDONLY;
2070         }
2071         return 0;
2072 }
2073
2074 int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req,
2075                   int offset, struct super_block *sb)
2076 {
2077         struct ll_sb_info *sbi = NULL;
2078         struct lustre_md md;
2079         int rc = 0;
2080         ENTRY;
2081
2082         LASSERT(*inode || sb);
2083         sbi = sb ? ll_s2sbi(sb) : ll_i2sbi(*inode);
2084         prune_deathrow(sbi, 1);
2085
2086         rc = md_get_lustre_md(sbi->ll_md_exp, req, offset,
2087                               sbi->ll_dt_exp, sbi->ll_md_exp, &md);
2088         if (rc)
2089                 RETURN(rc);
2090
2091         if (*inode) {
2092                 ll_update_inode(*inode, &md);
2093         } else {
2094                 LASSERT(sb != NULL);
2095
2096                 /*
2097                  * At this point server returns to client's same fid as client
2098                  * generated for creating. So using ->fid1 is okay here.
2099                  */
2100                 LASSERT(fid_is_sane(&md.body->fid1));
2101
2102                 *inode = ll_iget(sb, ll_fid_build_ino(sbi, &md.body->fid1), &md);
2103                 if (*inode == NULL || is_bad_inode(*inode)) {
2104                         md_free_lustre_md(sbi->ll_dt_exp, &md);
2105                         rc = -ENOMEM;
2106                         CERROR("new_inode -fatal: rc %d\n", rc);
2107                         GOTO(out, rc);
2108                 }
2109         }
2110
2111         rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp,
2112                          ll_i2info(*inode)->lli_smd);
2113 out:
2114         RETURN(rc);
2115 }
2116
2117 char *llap_origins[] = {
2118         [LLAP_ORIGIN_UNKNOWN] = "--",
2119         [LLAP_ORIGIN_READPAGE] = "rp",
2120         [LLAP_ORIGIN_READAHEAD] = "ra",
2121         [LLAP_ORIGIN_COMMIT_WRITE] = "cw",
2122         [LLAP_ORIGIN_WRITEPAGE] = "wp",
2123 };
2124
2125 struct ll_async_page *llite_pglist_next_llap(struct ll_sb_info *sbi,
2126                                              struct list_head *list)
2127 {
2128         struct ll_async_page *llap;
2129         struct list_head *pos;
2130
2131         list_for_each(pos, list) {
2132                 if (pos == &sbi->ll_pglist)
2133                         return NULL;
2134                 llap = list_entry(pos, struct ll_async_page, llap_pglist_item);
2135                 if (llap->llap_page == NULL)
2136                         continue;
2137                 return llap;
2138         }
2139         LBUG();
2140         return NULL;
2141 }
2142
2143 int ll_obd_statfs(struct inode *inode, void *arg)
2144 {
2145         struct ll_sb_info *sbi = NULL;
2146         struct obd_device *client_obd = NULL, *lov_obd = NULL;
2147         struct lov_obd *lov = NULL;
2148         struct obd_statfs stat_buf = {0};
2149         char *buf = NULL;
2150         struct obd_ioctl_data *data = NULL;
2151         __u32 type, index;
2152         int len, rc;
2153
2154         if (!inode || !(sbi = ll_i2sbi(inode)))
2155                 GOTO(out_statfs, rc = -EINVAL);
2156
2157         rc = obd_ioctl_getdata(&buf, &len, arg);
2158         if (rc)
2159                 GOTO(out_statfs, rc);
2160
2161         data = (void*)buf;
2162         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2 ||
2163             !data->ioc_pbuf1 || !data->ioc_pbuf2)
2164                 GOTO(out_statfs, rc = -EINVAL);
2165
2166         memcpy(&type, data->ioc_inlbuf1, sizeof(__u32));
2167         memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
2168
2169         if (type == LL_STATFS_MDC) {
2170                 if (index > 0)
2171                         GOTO(out_statfs, rc = -ENODEV);
2172                 client_obd = class_exp2obd(sbi->ll_md_exp);
2173         } else if (type == LL_STATFS_LOV) {
2174                 lov_obd = class_exp2obd(sbi->ll_dt_exp);
2175                 lov = &lov_obd->u.lov;
2176
2177                 if ((index >= lov->desc.ld_tgt_count) ||
2178                     !lov->lov_tgts[index])
2179                         GOTO(out_statfs, rc = -ENODEV);
2180
2181                 client_obd = class_exp2obd(lov->lov_tgts[index]->ltd_exp);
2182                 if (!lov->lov_tgts[index]->ltd_active)
2183                         GOTO(out_uuid, rc = -ENODATA);
2184         }
2185
2186         if (!client_obd)
2187                 GOTO(out_statfs, rc = -EINVAL);
2188
2189         rc = obd_statfs(client_obd, &stat_buf, cfs_time_current_64() - 1);
2190         if (rc)
2191                 GOTO(out_statfs, rc);
2192
2193         if (copy_to_user(data->ioc_pbuf1, &stat_buf, data->ioc_plen1))
2194                 GOTO(out_statfs, rc = -EFAULT);
2195
2196 out_uuid:
2197         if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(client_obd),
2198                          data->ioc_plen2))
2199                 rc = -EFAULT;
2200
2201 out_statfs:
2202         if (buf)
2203                 obd_ioctl_freedata(buf, len);
2204         return rc;
2205 }
2206
2207 int ll_process_config(struct lustre_cfg *lcfg)
2208 {
2209         char *ptr;
2210         void *sb;
2211         struct lprocfs_static_vars lvars;
2212         unsigned long x; 
2213         int rc = 0;
2214
2215         lprocfs_init_vars(llite, &lvars);
2216
2217         /* The instance name contains the sb: lustre-client-aacfe000 */
2218         ptr = strrchr(lustre_cfg_string(lcfg, 0), '-');
2219         if (!ptr || !*(++ptr)) 
2220                 return -EINVAL;
2221         if (sscanf(ptr, "%lx", &x) != 1)
2222                 return -EINVAL;
2223         sb = (void *)x;
2224         /* This better be a real Lustre superblock! */
2225         LASSERT(s2lsi((struct super_block *)sb)->lsi_lmd->lmd_magic == LMD_MAGIC);
2226
2227         /* Note we have not called client_common_fill_super yet, so 
2228            proc fns must be able to handle that! */
2229         rc = class_process_proc_param(PARAM_LLITE, lvars.obd_vars,
2230                                       lcfg, sb);
2231         return(rc);
2232 }
2233
2234 /* this function prepares md_op_data hint for passing ot down to MD stack. */
2235 struct md_op_data *
2236 ll_prep_md_op_data(struct md_op_data *op_data, struct inode *i1,
2237                    struct inode *i2, const char *name, int namelen, int mode)
2238 {
2239         LASSERT(i1 != NULL);
2240
2241         if (op_data == NULL)
2242                 OBD_ALLOC_PTR(op_data);
2243         
2244         if (op_data == NULL)
2245                 return NULL;
2246
2247         ll_i2gids(op_data->suppgids, i1, i2);
2248         op_data->fid1 = *ll_inode2fid(i1);
2249         op_data->mod_capa1 = ll_mdscapa_get(i1);
2250
2251         /* @i2 may be NULL. In this case caller itself has to initialize ->fid2
2252          * if needed. */
2253         if (i2) {
2254                 op_data->fid2 = *ll_inode2fid(i2);
2255                 op_data->mod_capa2 = ll_mdscapa_get(i2);
2256         }
2257
2258         op_data->name = name;
2259         op_data->namelen = namelen;
2260         op_data->mode = mode;
2261         op_data->mod_time = CURRENT_SECONDS;
2262         op_data->fsuid = current->fsuid;
2263         op_data->fsgid = current->fsgid;
2264         op_data->cap = current->cap_effective;
2265
2266         return op_data;
2267 }
2268
2269 void ll_finish_md_op_data(struct md_op_data *op_data)
2270 {
2271         capa_put(op_data->mod_capa1);
2272         capa_put(op_data->mod_capa2);
2273         OBD_FREE_PTR(op_data);
2274 }
2275
2276 int ll_ioctl_getfacl(struct inode *inode, struct rmtacl_ioctl_data *ioc)
2277 {
2278         struct ll_sb_info *sbi = ll_i2sbi(inode);
2279         struct ptlrpc_request *req = NULL;
2280         struct mdt_body *body;
2281         char *cmd, *buf;
2282         struct obd_capa *oc;
2283         int rc, buflen;
2284         ENTRY;
2285
2286         if (!(sbi->ll_flags & LL_SBI_RMT_CLIENT))
2287                 RETURN(-EBADE);
2288
2289         LASSERT(ioc->cmd && ioc->cmd_len && ioc->res && ioc->res_len);
2290
2291         OBD_ALLOC(cmd, ioc->cmd_len);
2292         if (!cmd)
2293                 RETURN(-ENOMEM);
2294         if (copy_from_user(cmd, ioc->cmd, ioc->cmd_len))
2295                 GOTO(out, rc = -EFAULT);
2296
2297         oc = ll_mdscapa_get(inode);
2298         rc = md_getxattr(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2299                          OBD_MD_FLXATTR, XATTR_NAME_LUSTRE_ACL, cmd,
2300                          ioc->cmd_len, ioc->res_len, 0, &req);
2301         capa_put(oc);
2302         if (rc < 0) {
2303                 CERROR("mdc_getxattr %s [%s] failed: %d\n",
2304                        XATTR_NAME_LUSTRE_ACL, cmd, rc);
2305                 GOTO(out, rc);
2306         }
2307
2308         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
2309         LASSERT(body);
2310
2311         buflen = lustre_msg_buflen(req->rq_repmsg, REPLY_REC_OFF);
2312         LASSERT(buflen <= ioc->res_len);
2313         buf = lustre_msg_string(req->rq_repmsg, REPLY_REC_OFF + 1, ioc->res_len);
2314         LASSERT(buf);
2315         if (copy_to_user(ioc->res, buf, buflen))
2316                 GOTO(out, rc = -EFAULT);
2317         EXIT;
2318 out:
2319         if (req)
2320                 ptlrpc_req_finished(req);
2321         OBD_FREE(cmd, ioc->cmd_len);
2322         return rc;
2323 }
2324
2325 int ll_ioctl_setfacl(struct inode *inode, struct rmtacl_ioctl_data *ioc)
2326 {
2327         struct ll_sb_info *sbi = ll_i2sbi(inode);
2328         struct ptlrpc_request *req = NULL;
2329         char *cmd, *buf;
2330         struct obd_capa *oc;
2331         int buflen, rc;
2332         ENTRY;
2333
2334         if (!(sbi->ll_flags & LL_SBI_RMT_CLIENT))
2335                 RETURN(-EBADE);
2336
2337         if (!(sbi->ll_flags & LL_SBI_ACL)) 
2338                 RETURN(-EOPNOTSUPP);
2339
2340         LASSERT(ioc->cmd && ioc->cmd_len && ioc->res && ioc->res_len);
2341
2342         OBD_ALLOC(cmd, ioc->cmd_len);
2343         if (!cmd)
2344                 RETURN(-ENOMEM);
2345         if (copy_from_user(cmd, ioc->cmd, ioc->cmd_len))
2346                 GOTO(out, rc = -EFAULT);
2347
2348         oc = ll_mdscapa_get(inode);
2349         rc = md_setxattr(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2350                          OBD_MD_FLXATTR, XATTR_NAME_LUSTRE_ACL, cmd,
2351                          ioc->cmd_len, ioc->res_len, 0, &req);
2352         capa_put(oc);
2353         if (rc) {
2354                 CERROR("mdc_setxattr %s [%s] failed: %d\n",
2355                        XATTR_NAME_LUSTRE_ACL, cmd, rc);
2356                 GOTO(out, rc);
2357         }
2358
2359         buflen = lustre_msg_buflen(req->rq_repmsg, REPLY_REC_OFF);
2360         LASSERT(buflen <= ioc->res_len);
2361         buf = lustre_msg_string(req->rq_repmsg, REPLY_REC_OFF, ioc->res_len);
2362         LASSERT(buf);
2363         if (copy_to_user(ioc->res, buf, buflen))
2364                 GOTO(out, rc = -EFAULT);
2365         EXIT;
2366 out:
2367         if (req)
2368                 ptlrpc_req_finished(req);
2369         OBD_FREE(cmd, ioc->cmd_len);
2370         return rc;
2371 }